消息队列测试实例
Producer:
通过代码创建交换机、队列、绑定队列和交换机的关系
package com.sw.docker.rabbitmq.all;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
* direct模式
*/
public class Producer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2.创建连接
connection = conn.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//5.准备消息内容
String msg = "你好,谢谢";
//准备交换机
String exchangeName = "direct_message_exchange";
//交换机类型:direct/topic/fanout/headers
String exchangeType = "direct";
//声明交换机,并开启持久化,如果不开启,服务器重启,存盘内容会随之丢失
channel.exchangeDeclare(exchangeName, exchangeType, true);
//声明队列
channel.queueDeclare("queue03", true, false, false, null);
channel.queueDeclare("queue04", true, false, false, null);
channel.queueDeclare("queue05", true, false, false, null);
//绑定队列和交换机的关系
channel.queueBind("queue03", exchangeName, "order");
channel.queueBind("queue04", exchangeName, "order");
channel.queueBind("queue05", exchangeName, "class");
//6.发送消息给队列queue
channel.basicPublish(exchangeName, "order", null, msg.getBytes());
System.out.println("消息发送成功");
}catch (Exception e){
//7.关闭通道
if (channel != null && channel.isOpen()){
try{
channel.close();
}catch (Exception e1){
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()){
try{
connection.close();
}catch (Exception e1){
e.printStackTrace();
}
}
}
}
}
Consumer:
package com.sw.docker.rabbitmq.all;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
*/
public class Comsumer {
private static Runnable runnable = new Runnable() {
public void run() {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
//获取队列名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
//2.创建连接
connection = conn.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//4.创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + "收到消息:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println(queueName + "接收消息失败!");
}
});
System.out.println(queueName + "开始接收消息");
System.in.read();
} catch (Exception e) {
//7.关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e1) {
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e1) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
//启动三个线程接收消息
new Thread(runnable, "queue03").start();
new Thread(runnable, "queue04").start();
new Thread(runnable, "queue05").start();
}
}
评论 (0)