首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,082 阅读
2
类的加载
741 阅读
3
Spring Cloud OAuth2.0
726 阅读
4
SpringBoot自动装配原理
691 阅读
5
集合不安全问题
584 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
388
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
页面
统计
关于
搜索到
1
篇与
的结果
2023-06-20
RabbitMQ - 发布确认
1. 发布确认原理生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的 消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。注:confirm 模式是异步的2. 发布确认策略(1)开启发布确认发布确认默认未开启,手动开启//开启发布确认 channel.confirmSelect();(2)单个确认发布是一种同步确认发布的方式,即发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量,但对于某些应用程序来说这可能已经足够了。public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //单个确认 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("第" + i + "条消息发送成功"); } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(单个确认模式)耗时:" + (endTime - startTime)); }(3)批量发布确认先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息,该方案也是同步的,也一样阻塞消息的发布。public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); int batchSize = 100; //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //批量确认 if (i % batchSize == 0) { boolean flag = channel.waitForConfirms(); if (flag) { System.out.println(i + "条消息发送成功"); } } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(批量确认模式)耗时:" + (endTime - startTime)); }(4)异步确认发布利用回调函数来达到消息可靠性的传递public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //使用线程安全的哈希表记录消息 ConcurrentSkipListMap<Long, Object> outstandingConfirms = new ConcurrentSkipListMap<>(); //确认成功 回调 ConfirmCallback ackCallback = (deliveryTag, multiple) -> { //2.删除已确认的消息,即剩下的就是确认失败的消息 if (multiple) { outstandingConfirms.headMap(deliveryTag).clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息:" + deliveryTag); }; //确认失败 回调 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { //3.打印未确认的消息 Object msg = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息tag:" + deliveryTag + ",未确认的消息内容:" + msg.toString()); }; //消息监听器 channel.addConfirmListener(ackCallback, nackCallback); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //1.记录已发送的消息 outstandingConfirms.put(channel.getNextPublishSeqNo(), msg); } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(异步确认模式)耗时:" + (endTime - startTime)); }(5)处理异步未确认消息把未确认的消息放到一个基于内存的且能被发布线程访问的队列,如:用 ConcurrentLinkedQueue 队列在 confirm callbacks 与发布线程之间进行消息的传递。(6)三种方式对比单独发布消息:同步等待确认,简单,但吞吐量非常有限批量发布消息:批量同步等待确认,简单,合理的吞吐量,但出现问题后很难判断是那条消息出现了问题异步处理: 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来较为复杂
2023年06月20日
78 阅读
0 评论
0 点赞