SpringBoot整合RabbitMQ

suaxi
2021-04-11 / 0 评论 / 75 阅读 / 正在检测是否收录...

1、Fanout模式

1.整合fanout模式.png

创建队列
@Configuration
public class FanoutRabbitMQConfig {

    //1、声明注册fanout模式的交换机
    @Bean
    public FanoutExchange fanoutExchange() {
        return new FanoutExchange("fanout_order_exchange", true, false);
    }

    //2、声明队列 msg.fanout.queue,email.fanout.queue,sms.fanout.queue
    @Bean
    public Queue fanoutMsgQueue() {
        return new Queue("msg.fanout.queue", true);
    }

    @Bean
    public Queue fanoutEmailQueue() {
        return new Queue("email.fanout.queue", true);
    }

    @Bean
    public Queue fanoutSmsQueue() {
        return new Queue("sms.fanout.queue", true);
    }

    //3、完成绑定
    @Bean
    public Binding msgBinding() {
        return BindingBuilder.bind(fanoutMsgQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(fanoutEmailQueue()).to(fanoutExchange());
    }

    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(fanoutSmsQueue()).to(fanoutExchange());
    }
}


生产者
@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //fanout模式 模拟用户下单
    public void makeOrderFanout(String userId, String productId, int num) {
        //1、根据商品id查询库存
        //2、保存订单
        String orderId = UUID.randomUUID().toString().replaceAll("-", "");
        System.out.println("创建订单成功:" + orderId);
        //3、通过 MQ 来完成消息的分发
        String exchangeName = "fanout_order_exchange";
        String routingKey = "";
        //交换机   路由key/queue队列名  消息内容
        rabbitTemplate.convertAndSend(exchangeName, routingKey, orderId);
    }
}


消费者
@Service
@RabbitListener(queues = {"email.fanout.queue"})
public class FanoutEmailConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("email.fanout 接收到订单信息:" + message);
    }
}
@Service
@RabbitListener(queues = {"msg.fanout.queue"})
public class FanoutMsgConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("msg.fanout 接收到订单信息:" + message);
    }
}
@Service
@RabbitListener(queues = {"sms.fanout.queue"})
public class FanoutSmsConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("sms.fanout 接收到订单信息:" + message);
    }
}


2、Direct模式

创建队列
@Configuration
public class DirectRabbitMQConfig {

    //1、声明注册direct模式的交换机
    @Bean
    public DirectExchange directExchange() {
        return new DirectExchange("direct_order_exchange", true, false);
    }

    //2、声明队列 msg.direct.queue,email.direct.queue,sms.direct.queue
    @Bean
    public Queue directMsgQueue() {
        return new Queue("msg.direct.queue", true);
    }

    @Bean
    public Queue directEmailQueue() {
        return new Queue("email.direct.queue", true);
    }

    @Bean
    public Queue directSmsQueue() {
        return new Queue("sms.direct.queue", true);
    }

    //3、完成绑定
    @Bean
    public Binding msgBinding() {
        return BindingBuilder.bind(directMsgQueue()).to(directExchange()).with("smg");
    }

    @Bean
    public Binding emailBinding() {
        return BindingBuilder.bind(directEmailQueue()).to(directExchange()).with("email");
    }

    @Bean
    public Binding smsBinding() {
        return BindingBuilder.bind(directSmsQueue()).to(directExchange()).with("sms");
    }
}


生产者
@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //direct模式
    public void makeOrderDirect(String userId, String productId, int num) {
        //1、根据商品id查询库存
        //2、保存订单
        String orderId = UUID.randomUUID().toString().replaceAll("-", "");
        System.out.println("创建订单成功:" + orderId);
        //3、通过 MQ 来完成消息的分发
        String exchangeName = "direct_order_exchange";
        //交换机   路由key/queue队列名  消息内容
        rabbitTemplate.convertAndSend(exchangeName, "sms", orderId);
        rabbitTemplate.convertAndSend(exchangeName, "email", orderId);
    }
}


消费者
@Service
@RabbitListener(queues = {"email.direct.queue"})
public class DirectEmailConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("email.direct接收到订单信息:" + message);
    }
}
@Service
@RabbitListener(queues = {"msg.direct.queue"})
public class DirectMsgConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("msg.direct 接收到订单信息:" + message);
    }
}
@Service
@RabbitListener(queues = {"sms.direct.queue"})
public class DirectSmsConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("sms.direct 接收到订单信息:" + message);
    }
}


3、Topic模式

生产者
@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    //topic模式
    public void makeOrderTopic(String userId, String productId, int num) {
        //1、根据商品id查询库存
        //2、保存订单
        String orderId = UUID.randomUUID().toString().replaceAll("-", "");
        System.out.println("创建订单成功:" + orderId);
        //3、通过 MQ 来完成消息的分发
        String exchangeName = "topic_order_exchange";
        String routingKry = "com.msg";
        //交换机   路由key/queue队列名  消息内容
        // #.msg.#      msg
        // *.email.#    email
        // com.#        sms
        rabbitTemplate.convertAndSend(exchangeName, routingKry, orderId);
    }
}


创建队列(通过注解)、消费者
@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "email.topic.queue", durable = "true", autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "*.email.#"
))
public class TopicEmailConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("email.topic 接收到订单信息:" + message);
    }
}
@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "msg.topic.queue", durable = "true", autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "#.msg.#"
))
public class TopicMsgConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("msg.topic 接收到订单信息:" + message);
    }
}
@Service
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "sms.topic.queue", durable = "true", autoDelete = "false"),
        exchange = @Exchange(value = "topic_order_exchange", type = ExchangeTypes.TOPIC),
        key = "com.#"
))
public class TopicSmsConsumer {

    @RabbitHandler
    public void receiveMessage(String message){
        System.out.println("sms.topic 接收到订单信息:" + message);
    }
}
0

评论 (0)

取消