RabbitMQ - 死信队列

suaxi
2023-06-22 / 0 评论 / 106 阅读 / 正在检测是否收录...

无法被消费的消息,即:producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中


1. 来源

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false

    1.来源.png


2. 消息 TTL 过期

Producer

public class Producer {

    //普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();

        //死信消息
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();

        for (int i = 1; i < 11; i++) {
            String msg = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes());
            System.out.println("发送的消息是:" + msg);
        }
    }
}

Consumer1 (启动之后关闭,模拟消费者不能正常消息的情况)

public class Consumer01 {

    //普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交换机
    private static final String DEAD_EXCHANGE = "dead_exchange";

    //普通队列
    public static final String NORMAL_QUEUE = "normal_queue";

    //死信队列
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();

        //声明普通/死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明普通队列
        //普通队列绑定死信交换机
        Map<String, Object> arguments = new HashMap<>();
        //过期时间(过期时间可由生产者或者消费者设置,二选一)
        arguments.put("x-message-ttl", 10000);
        //死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //死信routingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

        //死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //绑定普通交换机 - 队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //绑定死信交换机 - 队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("消费者c1等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

2.1ttl过期-消费者未正确消费消息.png

Consumer2 处理死信队列的消息

public class Consumer02 {

    //死信队列
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("消费者c2等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

2.2ttl过期-处理死信队列的消息.png


3. 队列达到最大长度

Peoducer

public class Producer {

    //普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        for (int i = 1; i < 11; i++) {
            String msg = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes());
            System.out.println("发送的消息是:" + msg);
        }
    }
}

Consumer1

public class Consumer01 {

    //普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交换机
    private static final String DEAD_EXCHANGE = "dead_exchange";

    //普通队列
    public static final String NORMAL_QUEUE = "normal_queue";

    //死信队列
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();

        //声明普通/死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明普通队列
        //普通队列绑定死信交换机
        Map<String, Object> arguments = new HashMap<>();
        //死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //死信routingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        //队列长度
        arguments.put("x-max-length", 6);
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

        //死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //绑定普通交换机 - 队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //绑定死信交换机 - 队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("消费者c3等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer03接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

注:添加 arguments.put("x-max-length", 6) 参数

3.1队列达到最大长度.png

Consumer2 处理死信队列的消息

public class Consumer02 {

    //死信队列
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        System.out.println("消费者c2等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
        };
        channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {});
    }
}

3.2队列达到最大长度-处理死信队列的消息.png


4. 消息被拒绝

Producer 同理 队列达到最大长度 的生产者

Consumer

public class Consumer04 {

    //普通交换机
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    //死信交换机
    private static final String DEAD_EXCHANGE = "dead_exchange";

    //普通队列
    public static final String NORMAL_QUEUE = "normal_queue";

    //死信队列
    private static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();

        //声明普通/死信交换机
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);

        //声明普通队列
        //普通队列绑定死信交换机
        Map<String, Object> arguments = new HashMap<>();
        //死信交换机
        arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE);
        //死信routingKey
        arguments.put("x-dead-letter-routing-key", "lisi");
        channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments);

        //死信队列
        channel.queueDeclare(DEAD_QUEUE, false, false, false, null);

        //绑定普通交换机 - 队列
        channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan");
        //绑定死信交换机 - 队列
        channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi");
        System.out.println("消费者c4等待接收消息...");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            if ("info5".equals(new String(message.getBody(), StandardCharsets.UTF_8))) {
                System.out.println("Consumer04拒绝此消息:" + new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicReject(message.getEnvelope().getDeliveryTag(), false);
            } else {
                System.out.println("Consumer04接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8));
                channel.basicAck(message.getEnvelope().getDeliveryTag(), false);
            }
        };
        channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {});
    }
}

启动生产者

4.1消息被拒绝.png

启动消费者

4.2消息被拒绝-消费者.png

4.3消息被拒绝-消费者.png

启动处理死信队列消息的消费者(该消费者同理前三个处理死信队列消息的消费者)

4.4消息被拒绝-处理死信队列的消息.png

0

评论 (0)

取消