死信队列

suaxi
2021-04-11 / 0 评论 / 63 阅读 / 正在检测是否收录...

修改已创建队列的参数,重启服务报错,原因:修改后的参数随服务的重启并不会进行覆盖

1、概述

Dead-Letter-Exchange(DLX)死信队列,也称作死信交换机,死信邮箱。当一个消息在队列中变成死信(dead message)之后,它会被重新发送到一个交换机中,这个交换机就是DLX,绑定DLX的队列就是死信队列。消息变为死信的原因有:

  • 消息被拒绝
  • 消息过期
  • 队列达到最大长度

DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,也就是设置某一个队列的属性。当这个队列中存在死信时,RabbitMQ就会可将这个消息重新发送到设置的DLX上,进而被路由到另一个队列,即死信队列。

使用:在定义队列的时候设置队列参数 x-dead-letter-exchange指定交换机即可

1.死信队列.png

图片来源:学相伴 - 飞哥 - RabbitMQ


测试:

创建队列
@Configuration
public class DeadRabbitMQConfig {

    //1、声明注册direct模式的交换机
    @Bean
    public DirectExchange DeadDirectExchange() {
        return new DirectExchange("dead_direct_exchange", true, false);
    }

    //2、声明队列过期时间
    @Bean
    public Queue deadQueue() {
        return new Queue("dead.direct.queue", true);
    }


    @Bean
    public Binding deadBind() {
        return BindingBuilder.bind(deadQueue()).to(DeadDirectExchange()).with("dead");
    }
    
}


绑定 TTL队列 和 死信队列
@Configuration
public class TTLRabbitMQConfig {

    //1、声明注册direct模式的交换机
    @Bean
    public DirectExchange ttlDirectExchange() {
        return new DirectExchange("ttl_direct_exchange", true, false);
    }

    //2、声明队列过期时间
    @Bean
    public Queue directTTLQueue() {
        Map<String, Object> args = new HashMap<>();
        //单位 ms
        args.put("x-message-ttl", 5000);
        //绑定死信交换机
        args.put("x-dead-letter-exchange", "dead_direct_exchange");
        //fanout模式不需要指定路由key
        args.put("x-dead-letter-routing-key", "dead");
        return new Queue("ttl.direct.queue", true, false, false, args);
    }

    @Bean
    public Queue directTTLMessageQueue() {
        return new Queue("ttl.message.direct.queue", true);
    }

    //过期队列
    @Bean
    public Binding ttlQueue(){
        return BindingBuilder.bind(directTTLQueue()).to(ttlDirectExchange()).with("ttl");
    }

    //消息含有过期时间,队列没有设置过期时间
    @Bean
    public Binding ttlMessageQueue(){
        return BindingBuilder.bind(directTTLMessageQueue()).to(ttlDirectExchange()).with("ttlMessage");
    }

}


生产者
@Service
public class OrderService {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    // ttl 单独设置过期时间
    public void makeOrderTtlMessage(String userId, String productId, int num) {
        //1、根据商品id查询库存
        //2、保存订单
        String orderId = UUID.randomUUID().toString().replaceAll("-", "");
        System.out.println("创建订单成功:" + orderId);
        //3、通过 MQ 来完成消息的分发
        String exchangeName = "ttl_direct_exchange";
        String routingKry = "ttlMessage";

        //设置消息过期时间
        MessagePostProcessor message = new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                message.getMessageProperties().setExpiration("5000");
                message.getMessageProperties().setContentEncoding("UTF-8");
                return message;
            }
        };

        /**
         * 过期队列:消息存入死信队列
         * 消息设置过期时间:过期后,消息直接移除
         */

        //交换机   路由key/queue队列名  消息内容
        rabbitTemplate.convertAndSend(exchangeName, routingKry, orderId, message);
    }
}


消费者 暂不设置,目的是让消息过期 路由 到死信队列中


消息未过期

2.死信队列测试-消息未过期.png


消息过期后

3.死信队列测试-消息过期后.png

0

评论 (0)

取消