消息队列测试实例

suaxi
2021-04-11 / 0 评论 / 142 阅读 / 正在检测是否收录...

消息队列测试实例

Producer:

通过代码创建交换机、队列、绑定队列和交换机的关系

package com.sw.docker.rabbitmq.all;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

/**
 * @Author suaxi
 * @Date 2021/4/8 23:26
 * direct模式
 */
public class Producer {
    public static void main(String[] args) {
        //1.创建连接工厂
        ConnectionFactory conn = new ConnectionFactory();
        conn.setHost("x.x.x.x");
        conn.setPort(5672);
        conn.setUsername("admin");
        conn.setPassword("123456");
        conn.setVirtualHost("/");

        Connection connection = null;
        Channel channel = null;
        try {
            //2.创建连接
            connection = conn.newConnection("生产者");
            //3.通过连接获取通道
            channel = connection.createChannel();

            //5.准备消息内容
            String msg = "你好,谢谢";

            //准备交换机
            String exchangeName = "direct_message_exchange";
            //交换机类型:direct/topic/fanout/headers
            String exchangeType = "direct";

            //声明交换机,并开启持久化,如果不开启,服务器重启,存盘内容会随之丢失
            channel.exchangeDeclare(exchangeName, exchangeType, true);

            //声明队列
            channel.queueDeclare("queue03", true, false, false, null);
            channel.queueDeclare("queue04", true, false, false, null);
            channel.queueDeclare("queue05", true, false, false, null);

            //绑定队列和交换机的关系
            channel.queueBind("queue03", exchangeName, "order");
            channel.queueBind("queue04", exchangeName, "order");
            channel.queueBind("queue05", exchangeName, "class");

            //6.发送消息给队列queue
            channel.basicPublish(exchangeName, "order", null, msg.getBytes());
            System.out.println("消息发送成功");
        }catch (Exception e){
            //7.关闭通道
            if (channel != null && channel.isOpen()){
                try{
                    channel.close();
                }catch (Exception e1){
                    e.printStackTrace();
                }
            }
            //8.关闭连接
            if (connection != null && connection.isOpen()){
                try{
                    connection.close();
                }catch (Exception e1){
                    e.printStackTrace();
                }
            }
        }

    }
}


Consumer:

package com.sw.docker.rabbitmq.all;

import com.rabbitmq.client.*;

import java.io.IOException;

/**
 * @Author suaxi
 * @Date 2021/4/8 23:26
 */
public class Comsumer {

    private static Runnable runnable = new Runnable() {
        public void run() {
            //1.创建连接工厂
            ConnectionFactory conn = new ConnectionFactory();
            conn.setHost("x.x.x.x");
            conn.setPort(5672);
            conn.setUsername("admin");
            conn.setPassword("123456");
            conn.setVirtualHost("/");

            //获取队列名称
            final String queueName = Thread.currentThread().getName();
            Connection connection = null;
            Channel channel = null;
            try {
                //2.创建连接
                connection = conn.newConnection("生产者");
                //3.通过连接获取通道
                channel = connection.createChannel();
                //4.创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
                channel.basicConsume(queueName, true, new DeliverCallback() {
                    public void handle(String s, Delivery delivery) throws IOException {
                        System.out.println(queueName + "收到消息:" + new String(delivery.getBody(), "UTF-8"));
                    }
                }, new CancelCallback() {
                    public void handle(String s) throws IOException {
                        System.out.println(queueName + "接收消息失败!");
                    }
                });
                System.out.println(queueName + "开始接收消息");
                System.in.read();
            } catch (Exception e) {
                //7.关闭通道
                if (channel != null && channel.isOpen()) {
                    try {
                        channel.close();
                    } catch (Exception e1) {
                        e.printStackTrace();
                    }
                }
                //8.关闭连接
                if (connection != null && connection.isOpen()) {
                    try {
                        connection.close();
                    } catch (Exception e1) {
                        e.printStackTrace();
                    }
                }
            }
        }
    };

    public static void main(String[] args) {
        //启动三个线程接收消息
        new Thread(runnable, "queue03").start();
        new Thread(runnable, "queue04").start();
        new Thread(runnable, "queue05").start();
    }

}
0

评论 (0)

取消