Work模式
1、轮询模式(Round-Robin)
- 类型:无
- 特点:当有多个消费者时,一个消费者分配一条消息,直至消费完成
Producer:
package com.sw.docker.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
* 轮询模式
*/
public class Producer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2.创建连接
connection = conn.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//4.准备消息内容
for (int i = 0; i <= 20; i++) {
String msg = "你好,谢谢" + i;
//5.发送消息给队列queue
channel.basicPublish("", "queue01", null, msg.getBytes());
}
System.out.println("消息发送成功");
}catch (Exception e){
//7.关闭通道
if (channel != null && channel.isOpen()){
try{
channel.close();
}catch (Exception e1){
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()){
try{
connection.close();
}catch (Exception e1){
e.printStackTrace();
}
}
}
}
}
Work01:
package com.sw.docker.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
*/
public class Work01 {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = conn.newConnection("消费者-work01");
channel = connection.createChannel();
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue01", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try {
System.out.println("work01收到的消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("work01 开始接收消息");
System.in.read();
} catch (Exception e) {
//7.关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e1) {
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e1) {
e.printStackTrace();
}
}
}
}
}
Work02:
//Work02的代码同理01
2、公平分发
根据消费者的消费能力进行公平分发(按处理效率进行分配)
Producer:
package com.sw.docker.rabbitmq.work.lunxun;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
* Work模式
*/
public class Producer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2.创建连接
connection = conn.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//4.准备消息内容
for (int i = 0; i <= 20; i++) {
String msg = "你好,谢谢" + i;
//5.发送消息给队列queue
channel.basicPublish("", "queue01", null, msg.getBytes());
}
System.out.println("消息发送成功");
}catch (Exception e){
//7.关闭通道
if (channel != null && channel.isOpen()){
try{
channel.close();
}catch (Exception e1){
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()){
try{
connection.close();
}catch (Exception e1){
e.printStackTrace();
}
}
}
}
}
Work01:
将消息接收模式改为手动应答,且需设置 qos,注意:在轮询模式中,qos设置与不设置没有影响,此处需做区分
package com.sw.docker.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
*/
public class Work01 {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
connection = conn.newConnection("消费者-work01");
channel = connection.createChannel();
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue01", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try {
System.out.println("work01收到的消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(2000);
//接收消息改为手动应答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
} catch (Exception e) {
e.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("work01 开始接收消息");
System.in.read();
} catch (Exception e) {
//7.关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e1) {
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e1) {
e.printStackTrace();
}
}
}
}
}
评论 (0)