首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,082 阅读
2
类的加载
741 阅读
3
Spring Cloud OAuth2.0
726 阅读
4
SpringBoot自动装配原理
691 阅读
5
集合不安全问题
584 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
388
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
页面
统计
关于
搜索到
2
篇与
的结果
2023-06-22
RabbitMQ - 死信队列
无法被消费的消息,即:producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中1. 来源消息 TTL 过期队列达到最大长度(队列满了,无法再添加数据到 mq 中)消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false2. 消息 TTL 过期Producerpublic class Producer { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //死信消息 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build(); for (int i = 1; i < 11; i++) { String msg = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes()); System.out.println("发送的消息是:" + msg); } } }Consumer1 (启动之后关闭,模拟消费者不能正常消息的情况)public class Consumer01 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //过期时间(过期时间可由生产者或者消费者设置,二选一) arguments.put("x-message-ttl", 10000); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c1等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {}); } }Consumer2 处理死信队列的消息public class Consumer02 { //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("消费者c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {}); } }3. 队列达到最大长度Peoducerpublic class Producer { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); for (int i = 1; i < 11; i++) { String msg = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes()); System.out.println("发送的消息是:" + msg); } } }Consumer1public class Consumer01 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); //队列长度 arguments.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c3等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer03接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {}); } }注:添加 arguments.put("x-max-length", 6) 参数Consumer2 处理死信队列的消息public class Consumer02 { //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("消费者c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {}); } }4. 消息被拒绝Producer 同理 队列达到最大长度 的生产者Consumerpublic class Consumer04 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c4等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { if ("info5".equals(new String(message.getBody(), StandardCharsets.UTF_8))) { System.out.println("Consumer04拒绝此消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("Consumer04接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {}); } }启动生产者启动消费者启动处理死信队列消息的消费者(该消费者同理前三个处理死信队列消息的消费者)
2023年06月22日
106 阅读
0 评论
0 点赞
2021-04-11
死信队列
修改已创建队列的参数,重启服务报错,原因:修改后的参数随服务的重启并不会进行覆盖1、概述Dead-Letter-Exchange(DLX)死信队列,也称作死信交换机,死信邮箱。当一个消息在队列中变成死信(dead message)之后,它会被重新发送到一个交换机中,这个交换机就是DLX,绑定DLX的队列就是死信队列。消息变为死信的原因有:消息被拒绝消息过期队列达到最大长度DLX也是一个正常的交换机,和一般的交换机没有区别,它能在任何的队列上被指定,也就是设置某一个队列的属性。当这个队列中存在死信时,RabbitMQ就会可将这个消息重新发送到设置的DLX上,进而被路由到另一个队列,即死信队列。使用:在定义队列的时候设置队列参数 x-dead-letter-exchange指定交换机即可 图片来源:学相伴 - 飞哥 - RabbitMQ测试:创建队列@Configuration public class DeadRabbitMQConfig { //1、声明注册direct模式的交换机 @Bean public DirectExchange DeadDirectExchange() { return new DirectExchange("dead_direct_exchange", true, false); } //2、声明队列过期时间 @Bean public Queue deadQueue() { return new Queue("dead.direct.queue", true); } @Bean public Binding deadBind() { return BindingBuilder.bind(deadQueue()).to(DeadDirectExchange()).with("dead"); } }绑定 TTL队列 和 死信队列@Configuration public class TTLRabbitMQConfig { //1、声明注册direct模式的交换机 @Bean public DirectExchange ttlDirectExchange() { return new DirectExchange("ttl_direct_exchange", true, false); } //2、声明队列过期时间 @Bean public Queue directTTLQueue() { Map<String, Object> args = new HashMap<>(); //单位 ms args.put("x-message-ttl", 5000); //绑定死信交换机 args.put("x-dead-letter-exchange", "dead_direct_exchange"); //fanout模式不需要指定路由key args.put("x-dead-letter-routing-key", "dead"); return new Queue("ttl.direct.queue", true, false, false, args); } @Bean public Queue directTTLMessageQueue() { return new Queue("ttl.message.direct.queue", true); } //过期队列 @Bean public Binding ttlQueue(){ return BindingBuilder.bind(directTTLQueue()).to(ttlDirectExchange()).with("ttl"); } //消息含有过期时间,队列没有设置过期时间 @Bean public Binding ttlMessageQueue(){ return BindingBuilder.bind(directTTLMessageQueue()).to(ttlDirectExchange()).with("ttlMessage"); } }生产者@Service public class OrderService { @Autowired private RabbitTemplate rabbitTemplate; // ttl 单独设置过期时间 public void makeOrderTtlMessage(String userId, String productId, int num) { //1、根据商品id查询库存 //2、保存订单 String orderId = UUID.randomUUID().toString().replaceAll("-", ""); System.out.println("创建订单成功:" + orderId); //3、通过 MQ 来完成消息的分发 String exchangeName = "ttl_direct_exchange"; String routingKry = "ttlMessage"; //设置消息过期时间 MessagePostProcessor message = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration("5000"); message.getMessageProperties().setContentEncoding("UTF-8"); return message; } }; /** * 过期队列:消息存入死信队列 * 消息设置过期时间:过期后,消息直接移除 */ //交换机 路由key/queue队列名 消息内容 rabbitTemplate.convertAndSend(exchangeName, routingKry, orderId, message); } }消费者 暂不设置,目的是让消息过期 路由 到死信队列中消息未过期消息过期后
2021年04月11日
63 阅读
0 评论
0 点赞