1、Fanout模式

创建队列
@Configuration
public class FanoutRabbitMQConfig {
//1、声明注册fanout模式的交换机
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout_order_exchange", true, false);
}
//2、声明队列 msg.fanout.queue,email.fanout.queue,sms.fanout.queue
@Bean
public Queue fanoutMsgQueue() {
return new Queue("msg.fanout.queue", true);
}
@Bean
public Queue fanoutEmailQueue() {
return new Queue("email.fanout.queue", true);
}
@Bean
public Queue fanoutSmsQueue() {
return new Queue("sms.fanout.queue", true);
}
//3、完成绑定
@Bean
public Binding msgBinding() {
return BindingBuilder.bind(fanoutMsgQueue()).to(fanoutExchange());
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(fanoutEmailQueue()).to(fanoutExchange());
}
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(fanoutSmsQueue()).to(fanoutExchange());
}
}
生产者
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
//fanout模式 模拟用户下单
public void makeOrderFanout(String userId, String productId, int num) {
//1、根据商品id查询库存
//2、保存订单
String orderId = UUID.randomUUID().toString().replaceAll("-", "");
System.out.println("创建订单成功:" + orderId);
//3、通过 MQ 来完成消息的分发
String exchangeName = "fanout_order_exchange";
String routingKey = "";
//交换机 路由key/queue队列名 消息内容
rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
}
}
消费者
@Service
@RabbitListener(queues = {"email.fanout.queue"})
public class FanoutEmailConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("email.fanout 接收到订单信息:" + message);
}
}
@Service
@RabbitListener(queues = {"msg.fanout.queue"})
public class FanoutMsgConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("msg.fanout 接收到订单信息:" + message);
}
}
@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class FanoutSmsConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("sms.fanout 接收到订单信息:" + message);
}
}
2、Direct模式
创建队列
@Configuration
public class DirectRabbitMQConfig {
//1、声明注册direct模式的交换机
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct_order_exchange", true, false);
}
//2、声明队列 msg.direct.queue,email.direct.queue,sms.direct.queue
@Bean
public Queue directMsgQueue() {
return new Queue("msg.direct.queue", true);
}
@Bean
public Queue directEmailQueue() {
return new Queue("email.direct.queue", true);
}
@Bean
public Queue directSmsQueue() {
return new Queue("sms.direct.queue", true);
}
//3、完成绑定
@Bean
public Binding msgBinding() {
return BindingBuilder.bind(directMsgQueue()).to(directExchange()).with("smg");
}
@Bean
public Binding emailBinding() {
return BindingBuilder.bind(directEmailQueue()).to(directExchange()).with("email");
}
@Bean
public Binding smsBinding() {
return BindingBuilder.bind(directSmsQueue()).to(directExchange()).with("sms");
}
}
生产者
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
//direct模式
public void makeOrderDirect(String userId, String productId, int num) {
//1、根据商品id查询库存
//2、保存订单
String orderId = UUID.randomUUID().toString().replaceAll("-", "");
System.out.println("创建订单成功:" + orderId);
//3、通过 MQ 来完成消息的分发
String exchangeName = "direct_order_exchange";
//交换机 路由key/queue队列名 消息内容
rabbitTemplate.convertAndSend(exchangeName, "sms", orderId);
rabbitTemplate.convertAndSend(exchangeName, "email", orderId);
}
}
消费者
@Service
@RabbitListener(queues = {"email.direct.queue"})
public class DirectEmailConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("email.direct接收到订单信息:" + message);
}
}
@Service
@RabbitListener(queues = {"msg.direct.queue"})
public class DirectMsgConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("msg.direct 接收到订单信息:" + message);
}
}
@Service
@RabbitListener(queues = {"sms.direct.queue"})
public class DirectSmsConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("sms.direct 接收到订单信息:" + message);
}
}
3、Topic模式
生产者
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
//topic模式
public void makeOrderTopic(String userId, String productId, int num) {
//1、根据商品id查询库存
//2、保存订单
String orderId = UUID.randomUUID().toString().replaceAll("-", "");
System.out.println("创建订单成功:" + orderId);
//3、通过 MQ 来完成消息的分发
String exchangeName = "topic_order_exchange";
String routingKry = "com.msg";
//交换机 路由key/queue队列名 消息内容
// #.msg.# msg
// *.email.# email
// com.# sms
rabbitTemplate.convertAndSend(exchangeName, routingKry, orderId);
}
}
创建队列(通过注解)、消费者
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "email.topic.queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
key = "*.email.#"
))
public class TopicEmailConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("email.topic 接收到订单信息:" + message);
}
}
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "msg.topic.queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
key = "#.msg.#"
))
public class TopicMsgConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("msg.topic 接收到订单信息:" + message);
}
}
@Service
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms.topic.queue", durable = "true", autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
key = "com.#"
))
public class TopicSmsConsumer {
@RabbitHandler
public void receiveMessage(String message){
System.out.println("sms.topic 接收到订单信息:" + message);
}
}
评论 (0)