首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,082 阅读
2
类的加载
741 阅读
3
Spring Cloud OAuth2.0
726 阅读
4
SpringBoot自动装配原理
691 阅读
5
集合不安全问题
584 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
388
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
页面
统计
关于
搜索到
2
篇与
的结果
2023-06-23
RabbitMQ - 发布确认补充
1. 发布确认在RabbitMQ不可用的情况下,如何处理无法投递的消息?在application.yml配置文件中添加# 消息确认类型 执行回调 publisher-confirm-type: correlated补充:NONE 禁用发布确认模式,是默认值CORRELATED 发布消息成功到交换器后会触发回调方法SIMPLE 包含两种效果:和 CORRELATED 值一样会触发回调方法在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker(1)队列配置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/confirm") public class SendConfirmMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}/{routingKey}") @ApiOperation("发送消息(发布确认)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) { log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey); //正常发送 rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString())); } }(3)回调接口@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 注入 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 交换机确认回调方法 * * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息 * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData != null) { String id = correlationData.getId() != null ? correlationData.getId() : ""; if (ack) { log.info("交换机收到ID:{} 的消息", id); } else { log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause); } } else { log.error("消息接收发生了异常!"); } } }(4)消费者@Slf4j @Component public class ConfirmConsumer { @RabbitListener(queues = ConfirmQueueConfig.CONFIRM_QUEUE) public void receiveMsg(Message message) { String msg = new String(message.getBody()); log.info("接收到 {} 队列的消息:{}", ConfirmQueueConfig.CONFIRM_QUEUE, msg); } }(5)测试可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了2.回退消息(1)Mandatory 参数在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,并且生产者是不知道消息被丢弃的。此时可通过设置 mandatory 参数,当消息在传递过程中不可达目的地时将消息返回给生产者(2)包含回退的队列设置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } }(3)消息生产者@Slf4j @RestController @RequestMapping("/confirm") public class SendConfirmMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}/{routingKey}") @ApiOperation("发送消息(发布确认)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) { log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey); //正常发送 rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString())); } }(4)回调接口@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 注入 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 交换机确认回调方法 * * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息 * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData != null) { String id = correlationData.getId() != null ? correlationData.getId() : ""; if (ack) { log.info("交换机收到ID:{} 的消息", id); } else { log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause); } } else { log.error("消息接收发生了异常!"); } } /** * 消息在传递过程中不可达目的地时,进行消息回退 * @param message the returned message. * @param replyCode the reply code. * @param replyText the reply text. * @param exchange the exchange. * @param routingKey the routing key. */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息:{},被交换机:{}退回,路由key:{},原因:{}", new String(message.getBody()), exchange, routingKey, replyText); } }(5)测试参考未设置消息回退时的测试结果,设置回退后,未匹配routingkey的消息被返回到了队列中3. 备份交换机备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列(1)队列配置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; public static final String BACKUP_EXCHANGE = "backup.exchange"; public static final String BACKUP_QUEUE = "backup.queue"; public static final String WARRING_QUEUE = "warring.queue"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE); } @Bean("backupQueue") public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE).build(); } @Bean("warringQueue") public Queue warringQueue() { return QueueBuilder.durable(WARRING_QUEUE).build(); } @Bean public Binding queueBindingBackupExchange(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding queueBindingWarringExchange(@Qualifier("warringQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }(5)报警消费者@Slf4j @Component public class WarringConsumer { @RabbitListener(queues = ConfirmQueueConfig.WARRING_QUEUE) public void receiveWarringMsg(Message message) { String msg = new String(message.getBody()); log.warn("发现不可路由消息:{}", msg); } }(5)生产者、回调接口配置同理(6)测试注:mandatory 参数可与备份交换机同时使用,但备份交换机优先级最高
2023年06月23日
114 阅读
0 评论
0 点赞
2023-06-20
RabbitMQ - 发布确认
1. 发布确认原理生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的 消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。注:confirm 模式是异步的2. 发布确认策略(1)开启发布确认发布确认默认未开启,手动开启//开启发布确认 channel.confirmSelect();(2)单个确认发布是一种同步确认发布的方式,即发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量,但对于某些应用程序来说这可能已经足够了。public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //单个确认 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("第" + i + "条消息发送成功"); } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(单个确认模式)耗时:" + (endTime - startTime)); }(3)批量发布确认先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息,该方案也是同步的,也一样阻塞消息的发布。public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); int batchSize = 100; //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //批量确认 if (i % batchSize == 0) { boolean flag = channel.waitForConfirms(); if (flag) { System.out.println(i + "条消息发送成功"); } } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(批量确认模式)耗时:" + (endTime - startTime)); }(4)异步确认发布利用回调函数来达到消息可靠性的传递public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //使用线程安全的哈希表记录消息 ConcurrentSkipListMap<Long, Object> outstandingConfirms = new ConcurrentSkipListMap<>(); //确认成功 回调 ConfirmCallback ackCallback = (deliveryTag, multiple) -> { //2.删除已确认的消息,即剩下的就是确认失败的消息 if (multiple) { outstandingConfirms.headMap(deliveryTag).clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息:" + deliveryTag); }; //确认失败 回调 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { //3.打印未确认的消息 Object msg = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息tag:" + deliveryTag + ",未确认的消息内容:" + msg.toString()); }; //消息监听器 channel.addConfirmListener(ackCallback, nackCallback); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //1.记录已发送的消息 outstandingConfirms.put(channel.getNextPublishSeqNo(), msg); } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(异步确认模式)耗时:" + (endTime - startTime)); }(5)处理异步未确认消息把未确认的消息放到一个基于内存的且能被发布线程访问的队列,如:用 ConcurrentLinkedQueue 队列在 confirm callbacks 与发布线程之间进行消息的传递。(6)三种方式对比单独发布消息:同步等待确认,简单,但吞吐量非常有限批量发布消息:批量同步等待确认,简单,合理的吞吐量,但出现问题后很难判断是那条消息出现了问题异步处理: 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来较为复杂
2023年06月20日
78 阅读
0 评论
0 点赞