消息模式
官方文档地址:https://www.rabbitmq.com/getstarted.html
没有绑定交换机时采用默认交换机(路由模式)
1、简单模式
注:消息预览时,选择Nack 模式,ack模式预览会消费当前在队列中的消息
2、工作模式
- 类型:无
- 特点:分发机制
3、发布订阅模式
- 类型:fanout
- 特点:它是一种广播模式(不包含路由key)
Prodducer:
package com.sw.docker.rabbitmq.routing;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
* fanout模式(绑定队列的操作已通过图形化界面实现)
*/
public class Producer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2.创建连接
connection = conn.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//5.准备消息内容
String msg = "你好,谢谢";
//准备交换机
String exchangeName = "fanout-exchange";
//定义路由key
String routeKey = "";
//指定交换机类型
String type = "fanout";
//6.发送消息给队列queue
channel.basicPublish(exchangeName, routeKey, null, msg.getBytes());
System.out.println("消息发送成功");
}catch (Exception e){
//7.关闭通道
if (channel != null && channel.isOpen()){
try{
channel.close();
}catch (Exception e1){
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()){
try{
connection.close();
}catch (Exception e1){
e.printStackTrace();
}
}
}
}
}
Consumer:
package com.sw.docker.rabbitmq.routing;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
*/
public class Comsumer {
private static Runnable runnable = new Runnable() {
public void run() {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
//获取队列名称
final String queueName = Thread.currentThread().getName();
Connection connection = null;
Channel channel = null;
try {
//2.创建连接
connection = conn.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//4.创建交换机,声明队列,绑定关系,路由key,发送消息,接收消息
channel.basicConsume(queueName, true, new DeliverCallback() {
public void handle(String s, Delivery delivery) throws IOException {
System.out.println(queueName + "收到消息:" + new String(delivery.getBody(), "UTF-8"));
}
}, new CancelCallback() {
public void handle(String s) throws IOException {
System.out.println(queueName + "接收消息失败!");
}
});
System.out.println(queueName + "开始接收消息");
System.in.read();
} catch (Exception e) {
//7.关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception e1) {
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception e1) {
e.printStackTrace();
}
}
}
}
};
public static void main(String[] args) {
//启动三个线程接收消息
new Thread(runnable, "queue01").start();
new Thread(runnable, "queue02").start();
new Thread(runnable, "queue03").start();
}
}
4、路由模式
- 类型:direct
- 特点:支持routing - key匹配模式
Producer:
package com.sw.docker.rabbitmq.direct;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
* direct模式
*/
public class Producer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2.创建连接
connection = conn.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//5.准备消息内容
String msg = "你好,谢谢";
//准备交换机
String exchangeName = "routing_exchange";
//定义路由key
String routeKey = "email";
//指定交换机类型
String type = "direct";
//6.发送消息给队列queue
channel.basicPublish(exchangeName, routeKey, null, msg.getBytes());
System.out.println("消息发送成功");
}catch (Exception e){
//7.关闭通道
if (channel != null && channel.isOpen()){
try{
channel.close();
}catch (Exception e1){
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()){
try{
connection.close();
}catch (Exception e1){
e.printStackTrace();
}
}
}
}
}
Consumer:
消费者的代码与fanout模式同理
5、主题Topic模式
- 类型:topic
- 特点:模糊的routing - key匹配模式
匹配规则
#.test.# 可以有0个,1个或多个
*.test.* 必须有一级,有且只有一级
Producer:
package com.sw.docker.rabbitmq.topics;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @Author suaxi
* @Date 2021/4/8 23:26
* topic模式
*/
public class Producer {
public static void main(String[] args) {
//1.创建连接工厂
ConnectionFactory conn = new ConnectionFactory();
conn.setHost("x.x.x.x");
conn.setPort(5672);
conn.setUsername("admin");
conn.setPassword("123456");
conn.setVirtualHost("/");
Connection connection = null;
Channel channel = null;
try {
//2.创建连接
connection = conn.newConnection("生产者");
//3.通过连接获取通道
channel = connection.createChannel();
//5.准备消息内容
String msg = "你好,谢谢";
//准备交换机
String exchangeName = "topic_exchange";
//定义路由key
String routeKey = "sw.test.com";
//指定交换机类型
String type = "topic";
//6.发送消息给队列queue
channel.basicPublish(exchangeName, routeKey, null, msg.getBytes());
System.out.println("消息发送成功");
}catch (Exception e){
//7.关闭通道
if (channel != null && channel.isOpen()){
try{
channel.close();
}catch (Exception e1){
e.printStackTrace();
}
}
//8.关闭连接
if (connection != null && connection.isOpen()){
try{
connection.close();
}catch (Exception e1){
e.printStackTrace();
}
}
}
}
}
Consumer:
消费者的代码与fanout模式同理
6、参数模式
- 类型:headers
- 特点:参数匹配
评论 (0)