Work模式

suaxi
2021-04-11 / 0 评论 / 170 阅读 / 正在检测是否收录...

Work模式

1、轮询模式(Round-Robin)

  • 类型:无
  • 特点:当有多个消费者时,一个消费者分配一条消息,直至消费完成


Producer:

package com.sw.docker.rabbitmq.work.lunxun;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author suaxi
 * @Date 2021/4/8 23:26
 * 轮询模式
 */
public class Producer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory conn = new ConnectionFactory();
        conn.setHost("x.x.x.x");
        conn.setPort(5672);
        conn.setUsername("admin");
        conn.setPassword("123456");
        conn.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //2.创建连接
            connection = conn.newConnection("生产者");
            //3.通过连接获取通道
            channel = connection.createChannel();

            //4.准备消息内容
            for (int i = 0; i <= 20; i++) {
                String msg = "你好,谢谢" + i;
                //5.发送消息给队列queue
                channel.basicPublish("", "queue01", null, msg.getBytes());

            }

            System.out.println("消息发送成功");
        }catch (Exception e){
            //7.关闭通道
            if (channel != null && channel.isOpen()){
                try{
                    channel.close();
                }catch (Exception e1){
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection != null && connection.isOpen()){
                try{
                    connection.close();
                }catch (Exception e1){
                    e.printStackTrace();
                }
            }
        }

    }
}


Work01:

package com.sw.docker.rabbitmq.work.lunxun;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author suaxi
 * @Date 2021/4/8 23:26
 */
public class Work01 {
    public static void main(String[] args) {

        //1.创建连接工厂
        ConnectionFactory conn = new ConnectionFactory();
        conn.setHost("x.x.x.x");
        conn.setPort(5672);
        conn.setUsername("admin");
        conn.setPassword("123456");
        conn.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = conn.newConnection("消费者-work01");
            channel = connection.createChannel();
            Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue01", true, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try {
                        System.out.println("work01收到的消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {

                }
            });
            System.out.println("work01 开始接收消息");
            System.in.read();
        } catch (Exception e) {
            //7.关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e1) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e1) {
                    e.printStackTrace();
                }
            }
        }
    }

}


Work02:

//Work02的代码同理01


2、公平分发

根据消费者的消费能力进行公平分发(按处理效率进行分配)

Producer:

package com.sw.docker.rabbitmq.work.lunxun;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author suaxi
 * @Date 2021/4/8 23:26
 * Work模式
 */
public class Producer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory conn = new ConnectionFactory();
        conn.setHost("x.x.x.x");
        conn.setPort(5672);
        conn.setUsername("admin");
        conn.setPassword("123456");
        conn.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //2.创建连接
            connection = conn.newConnection("生产者");
            //3.通过连接获取通道
            channel = connection.createChannel();

            //4.准备消息内容
            for (int i = 0; i <= 20; i++) {
                String msg = "你好,谢谢" + i;
                //5.发送消息给队列queue
                channel.basicPublish("", "queue01", null, msg.getBytes());

            }

            System.out.println("消息发送成功");
        }catch (Exception e){
            //7.关闭通道
            if (channel != null && channel.isOpen()){
                try{
                    channel.close();
                }catch (Exception e1){
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection != null && connection.isOpen()){
                try{
                    connection.close();
                }catch (Exception e1){
                    e.printStackTrace();
                }
            }
        }

    }
}


Work01:

将消息接收模式改为手动应答,且需设置 qos,注意:在轮询模式中,qos设置与不设置没有影响,此处需做区分

package com.sw.docker.rabbitmq.work.fair;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author suaxi
 * @Date 2021/4/8 23:26
 */
public class Work01 {
    public static void main(String[] args) {

        //1.创建连接工厂
        ConnectionFactory conn = new ConnectionFactory();
        conn.setHost("x.x.x.x");
        conn.setPort(5672);
        conn.setUsername("admin");
        conn.setPassword("123456");
        conn.setVirtualHost("/");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = conn.newConnection("消费者-work01");
            channel = connection.createChannel();
            Channel finalChannel = channel;
            finalChannel.basicQos(1);
            finalChannel.basicConsume("queue01", false, new DeliverCallback() {
                @Override
                public void handle(String s, Delivery delivery) throws IOException {
                    try {
                        System.out.println("work01收到的消息是:" + new String(delivery.getBody(), "UTF-8"));
                        Thread.sleep(2000);
                        //接收消息改为手动应答
                        finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, new CancelCallback() {
                @Override
                public void handle(String s) throws IOException {

                }
            });
            System.out.println("work01 开始接收消息");
            System.in.read();
        } catch (Exception e) {
            //7.关闭通道
            if (channel != null && channel.isOpen()) {
                try {
                    channel.close();
                } catch (Exception e1) {
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection != null && connection.isOpen()) {
                try {
                    connection.close();
                } catch (Exception e1) {
                    e.printStackTrace();
                }
            }
        }
    }

}
0

评论 (0)

取消