首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,073 阅读
2
类的加载
737 阅读
3
Spring Cloud OAuth2.0
725 阅读
4
SpringBoot自动装配原理
689 阅读
5
集合不安全问题
582 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
388
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
页面
统计
关于
搜索到
20
篇与
的结果
2023-06-23
RabbitMQ - 其他补充
1. 幂等性(1)概念用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。(2)消息重复消费消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。(3)解决思路一般使用全局 ID、唯一标识(时间戳、UUID)或 MQ 消息的 id tag 来判断,亦或是zi定义全局唯一 id,每次消费消息时先判断 id 对应的消息是否已消费过(4)消费端的幂等性保障在海量订单生成的业务高峰期,生产端有可能就会重复发送消息,这时候消费端就要实现幂等性,即消息永远不会被消费多次(即使多次收到了一样的消息)。唯一ID + 指纹码机制,利用数据库主键去重利用 redis 的原子性实现(setnx)2. 优先级队列(1)设置方式在管理页面添加在代码中添加Map<String, Object> arguments = new HashMap<>(); //优先级范围 0-10(设置过大会存在性能问题) arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);设置优先级之后的队列(2)测试Producerpublic class Producer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); /** * 创建队列 * 参数说明: * 1.队列名称 * 2.是否持久化存储消息 * 3.是否进行消息共享(只供一个消费者进行消费) * 4.是否自动删除(最后一个消费者断开连接后,是否自动删除该队列) * 5.其他参数 */ /** * 2023-06-15偷懒 优先级队列 */ Map<String, Object> arguments = new HashMap<>(); //优先级范围 0-10(设置过大会存在性能问题) arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); for (int i = 1; i < 10; i++) { String msg = "hello world" + i; if (i == 5) { AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes()); } else { channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } } } }Consumerpublic class Consumer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费由于某些原因中断"); /** * 接收消息 * 1.队列名称 * 2.消费成功后是否自动应答 * 3.未成功消费的回调 * 4.取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }3. 惰性队列(1)使用场景RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念,惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或是由于维护停机等)而导致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。 默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存中,这样可以更快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间(同步操作),进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。(2)两种模式default(默认模式):在 3.6.0 之前的版本无需做任何变更lazy 模式:即惰性队列模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。注:如果要通过声明的方式改变已有队列的模式,那么只能先删除队列,然后再重新声明一个新的队列,同时已产生的消息会同步删除。在管理界面设置在代码中设置Map<String, Object> arguments = new HashMap<>(); arguments.put("x-queue-mode", "lazy"); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);(3)内存开销对比
2023年06月23日
84 阅读
0 评论
0 点赞
2023-06-23
RabbitMQ - 发布确认补充
1. 发布确认在RabbitMQ不可用的情况下,如何处理无法投递的消息?在application.yml配置文件中添加# 消息确认类型 执行回调 publisher-confirm-type: correlated补充:NONE 禁用发布确认模式,是默认值CORRELATED 发布消息成功到交换器后会触发回调方法SIMPLE 包含两种效果:和 CORRELATED 值一样会触发回调方法在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker(1)队列配置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/confirm") public class SendConfirmMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}/{routingKey}") @ApiOperation("发送消息(发布确认)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) { log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey); //正常发送 rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString())); } }(3)回调接口@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 注入 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 交换机确认回调方法 * * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息 * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData != null) { String id = correlationData.getId() != null ? correlationData.getId() : ""; if (ack) { log.info("交换机收到ID:{} 的消息", id); } else { log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause); } } else { log.error("消息接收发生了异常!"); } } }(4)消费者@Slf4j @Component public class ConfirmConsumer { @RabbitListener(queues = ConfirmQueueConfig.CONFIRM_QUEUE) public void receiveMsg(Message message) { String msg = new String(message.getBody()); log.info("接收到 {} 队列的消息:{}", ConfirmQueueConfig.CONFIRM_QUEUE, msg); } }(5)测试可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了2.回退消息(1)Mandatory 参数在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,并且生产者是不知道消息被丢弃的。此时可通过设置 mandatory 参数,当消息在传递过程中不可达目的地时将消息返回给生产者(2)包含回退的队列设置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } }(3)消息生产者@Slf4j @RestController @RequestMapping("/confirm") public class SendConfirmMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}/{routingKey}") @ApiOperation("发送消息(发布确认)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) { log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey); //正常发送 rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString())); } }(4)回调接口@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 注入 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 交换机确认回调方法 * * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息 * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData != null) { String id = correlationData.getId() != null ? correlationData.getId() : ""; if (ack) { log.info("交换机收到ID:{} 的消息", id); } else { log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause); } } else { log.error("消息接收发生了异常!"); } } /** * 消息在传递过程中不可达目的地时,进行消息回退 * @param message the returned message. * @param replyCode the reply code. * @param replyText the reply text. * @param exchange the exchange. * @param routingKey the routing key. */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息:{},被交换机:{}退回,路由key:{},原因:{}", new String(message.getBody()), exchange, routingKey, replyText); } }(5)测试参考未设置消息回退时的测试结果,设置回退后,未匹配routingkey的消息被返回到了队列中3. 备份交换机备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列(1)队列配置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; public static final String BACKUP_EXCHANGE = "backup.exchange"; public static final String BACKUP_QUEUE = "backup.queue"; public static final String WARRING_QUEUE = "warring.queue"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE); } @Bean("backupQueue") public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE).build(); } @Bean("warringQueue") public Queue warringQueue() { return QueueBuilder.durable(WARRING_QUEUE).build(); } @Bean public Binding queueBindingBackupExchange(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding queueBindingWarringExchange(@Qualifier("warringQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }(5)报警消费者@Slf4j @Component public class WarringConsumer { @RabbitListener(queues = ConfirmQueueConfig.WARRING_QUEUE) public void receiveWarringMsg(Message message) { String msg = new String(message.getBody()); log.warn("发现不可路由消息:{}", msg); } }(5)生产者、回调接口配置同理(6)测试注:mandatory 参数可与备份交换机同时使用,但备份交换机优先级最高
2023年06月23日
112 阅读
0 评论
0 点赞
2023-06-22
RabbitMQ - 延时队列
1. 概念队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素希望在指定时间到了之后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。2. 使用场景订单在十分钟之内未支付则自动取消新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒用户注册成功后,如果三天内没有登录则进行短信提醒用户发起退款,如果三天内没有得到处理则通知相关运营人员预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议……4. RabbitMQ中的TTLTTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,则优先取较小的值。5. 设置TTL的两种方式(1)消息设置ttlrabbitTemplate.convertAndSend("X", "XC", msg, message -> { //设置ttl message.getMessageProperties().setExpiration(ttlTime); return message; });(2)队列设置ttl//ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();(3)区别消息设置ttl,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外还需要注意的一点是,如果不设置 ttl,表示消息永远不会过期,如果将 ttl设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃队列设置 ttl 时,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队 列中)6. 整合 SpringBootpom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.sw.rabbitmq</groupId> <artifactId>RabbitmqDemo01</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitmqDemo01</name> <description>RabbitmqDemo01</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <!-- springboot web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.15</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- swagger2 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <!-- swagger ui --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!-- test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- rabbit test --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>application.ymlserver: port: 8088 spring: rabbitmq: host: 192.168.123.88 port: 5672 username: admin password: 123456swagger config@Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket defaultApi() { return new Docket(DocumentationType.SWAGGER_2) .select() .apis(RequestHandlerSelectors.basePackage("com.sw")) .paths(PathSelectors.any()) .build() .apiInfo(apiInfo()); } private ApiInfo apiInfo() { return new ApiInfo( "rabbitmq", "Rabbitmq-Demo", "1.0.0-SNAPSHOT", null, new Contact("suaxi", "http://www.test.com/", "test@qq.com"), "Apache 2.0", "https://www.apache.org/licenses/LICENSE-2.0.html", Collections.emptyList()); } }7. 队列TTL创建两个队列 QA 和 QB,两者队列 ttl 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,同时创建一个死信队列 QD(1)队列配置@Configuration public class TtlQueueConfig { //普通交换机 public static final String X_EXCHANGE = "X"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; /** * 普通交换机 * * @return */ @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 死信交换机 * * @return */ @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 普通队列A * * @return */ @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } /** * 普通队列B * * @return */ @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 40); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } /** * 死信队列 * * @return */ @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } /** * 绑定 * * @return */ @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD").noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}") @ApiOperation("发送消息") public void sendMsg(@PathVariable("msg") String msg) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(), msg); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + msg); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + msg); } }(3)消费者@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("当前时间:{},接收到死信队列的消息:{}", new Date(), msg); } }(4)测试消息分别在10s和40s之后变为死信消息,由处理死信队列消息的消费者消费8. 优化延时队列新增一个队列 QC,且不设置 ttl 时间(1)队列配置@Configuration public class TtlQueueConfig { //普通交换机 public static final String X_EXCHANGE = "X"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String QUEUE_C = "QC"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; /** * 普通交换机 * * @return */ @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 死信交换机 * * @return */ @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 普通队列A * * @return */ @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } /** * 普通队列B * * @return */ @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 40); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } /** * 死信队列 * * @return */ @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } /** * 绑定 * * @return */ @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD").noargs(); } /** * 死信队列C * * @return */ @Bean("queueC") public Queue queueC() { Map<String, Object> arguments = new HashMap<>(2); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } /** * 绑定 * * @return */ @Bean public Binding queueCBindingY(@Qualifier("queueC") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XC").noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendExpirationMsg/{msg}/{ttlTime}") @ApiOperation("发送消息(自定义ttl)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("ttlTime") String ttlTime) { log.info("当前时间:{},发送一条过期时间为:{}ms 的信息给TTL队列:{}", new Date(), ttlTime, msg); rabbitTemplate.convertAndSend("X", "XC", msg, message -> { //设置ttl message.getMessageProperties().setExpiration(ttlTime); return message; }); } }(3)测试两条消息设置了不同的ttl(由生产者发送消息时决定),但 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行9. RabbitMQ插件实现延迟队列如果不能实现在消息粒度上的ttl,并使其在设置的ttl时间及时死亡,就无法设计成一个通用的延时队列。(1)安装延时队列插件通过官网 https://www.rabbitmq.com/community-plugins.html 可下载rabbitmq_delayed_message_exchange 插件,下载完成后解压到 RabbitMQ 的插件目录,进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启rabbitmq-plugins enable rabbitmq_delayed_message_exchange安装成功后开看到 The following plugins have been enabled 字眼(2)新增延时队列、交换机新增一个队列 delayed.queue,一个自定义交换机 delayed.exchange(3)队列配置@Configuration public class DelayQueueConfig { public static final String DELAYED_EXCHANGE = "delayed.exchange"; public static final String DELAYED_QUEUE = "delayed.queue"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean public CustomExchange delayedExchange() { /** * 参数数目:1.名称 2.类型 3.是否持久化 4.是否自动删除 5.其他参数 */ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments); } @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE); } @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs(); } }(4)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendDelayMsg/{msg}/{delayTime}") @ApiOperation("发送消息(基于 x-delayed-message 插件)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("delayTime") Integer delayTime) { log.info("当前时间:{},发送一条过期时间为:{}ms 的信息给delay.queue:{}", new Date(), delayTime, msg); rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE, DelayQueueConfig.DELAYED_ROUTING_KEY, msg, message -> { //设置ttl message.getMessageProperties().setDelay(delayTime); return message; }); } }(5)消费者@Slf4j @Component public class DelayQueueConsumer { @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE) public void receiveDelayQueue(Message message) { String msg = new String(message.getBody()); log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg); } }(6)测试可看到ttl较小的消息优先被消费了(7)补充延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。延时队列还有很多其它选择,如:Java的DelayQueue,Redis的zset,Quartz任务调度或kafka的时间轮,可根据具体的业务场景进行选择
2023年06月22日
109 阅读
0 评论
0 点赞
2023-06-22
RabbitMQ - 死信队列
无法被消费的消息,即:producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中1. 来源消息 TTL 过期队列达到最大长度(队列满了,无法再添加数据到 mq 中)消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false2. 消息 TTL 过期Producerpublic class Producer { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //死信消息 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build(); for (int i = 1; i < 11; i++) { String msg = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes()); System.out.println("发送的消息是:" + msg); } } }Consumer1 (启动之后关闭,模拟消费者不能正常消息的情况)public class Consumer01 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //过期时间(过期时间可由生产者或者消费者设置,二选一) arguments.put("x-message-ttl", 10000); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c1等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {}); } }Consumer2 处理死信队列的消息public class Consumer02 { //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("消费者c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {}); } }3. 队列达到最大长度Peoducerpublic class Producer { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); for (int i = 1; i < 11; i++) { String msg = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes()); System.out.println("发送的消息是:" + msg); } } }Consumer1public class Consumer01 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); //队列长度 arguments.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c3等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer03接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {}); } }注:添加 arguments.put("x-max-length", 6) 参数Consumer2 处理死信队列的消息public class Consumer02 { //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("消费者c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {}); } }4. 消息被拒绝Producer 同理 队列达到最大长度 的生产者Consumerpublic class Consumer04 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c4等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { if ("info5".equals(new String(message.getBody(), StandardCharsets.UTF_8))) { System.out.println("Consumer04拒绝此消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("Consumer04接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {}); } }启动生产者启动消费者启动处理死信队列消息的消费者(该消费者同理前三个处理死信队列消息的消费者)
2023年06月22日
105 阅读
0 评论
0 点赞
2023-06-20
RabbitMQ - 交换机
1. Exchanges(1)概念RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息,是应该把这些消息放到特定队列还是说把他们放到许多队列中,亦或是应该丢弃它们,这就得由交换机的类型来决定。(2)交换机类型直接(direct)主题(topic)标题(headers)扇出(fanout)(3)无名exchangechannel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());之前的demo中生产者发送消息时未设置交换机,即第一个参数为空,表示使用的是默认交换机(又称无名交换机)2. 临时队列在之前的demo中使用的都是临时队列,一旦断开了消费者的连接,队列将被自动删除创建临时队列的方式:String queue = channel.queueDeclare().getQueue();3. 绑定binding 是 exchange 和 queue 之间的桥梁,即 exchange 和哪个队列进行了绑定4. Fanout(1)介绍将接收到的所有消息广播到它知道的所有队列中,RabbitMQ默认的交换机如下:(2)DemoProducerpublic class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送的消息是:" + msg); } } }Consumerpublic class ReceiveLog01 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //生成临时队列 String queueName = channel.queueDeclare().getQueue(); //绑定临时队列,其中routingKey为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("ReceiveLog01等待接收消息,接收到消息后在控制台打印..."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } public class ReceiveLog02 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //生成临时队列 String queueName = channel.queueDeclare().getQueue(); //绑定临时队列,其中routingKey为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("ReceiveLog02等待接收消息,接收到消息后在控制台打印..."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }5. Direct(1)介绍Direct Exchange:消息只去到它绑定的 routingKey 队列中Fanout模式下每一个消费者都收到了消息,此处使用 Direct 模式,让消息根据 routing key 去到指定的地方,如下图:X绑定了Q1,Q2两个队列,绑定类型为direct,队列Q1绑定的orange,Q2绑定了两个key:black、green。在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1,绑定键为 black/green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。注:如果绑定的key都相同,则该模式与 Fanout 模式类似,变成了另一种形式的广播(2)DemoProducerpublic class DirectLog { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "info", null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送的消息是:" + msg); } } }Consumerpublic class ReceiveLogDirect01 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("console", false, false, false, null); channel.queueBind("console", EXCHANGE_NAME, "info"); channel.queueBind("console", EXCHANGE_NAME, "warning"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume("console", true, deliverCallback, consumerTag -> { }); } } public class ReceiveLogDirect02 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("disk", false, false, false, null); channel.queueBind("console", EXCHANGE_NAME, "error"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume("disk", true, deliverCallback, consumerTag -> { }); } }6. Topics(1)介绍发送到 topic 类型交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 . 点号分隔开。这些单词可以是任意单词,且不能超过 255 个字节。规则列表中常用的替换符:*(星号) 可以代替一个单词#(井号) 可以替代零个或多个单词(2)举例下图绑定关系如下:Q1绑定的是中间带 orange ,单词总数为 3 个的字符串 *.orange.*Q2:绑定的是最后一个单词是 rabbit 且单词总数为 3 个的单词 *.*.rabbit第一个单词是 lazy 的多个单词 lazy.#以下是消息接收情况:quick.orange.rabbit 被队列 Q1Q2 接收到lazy.orange.elephant 被队列 Q1Q2 接收到quick.orange.fox 被队列 Q1 接收到lazy.brown.fox 被队列 Q2 接收到lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃lazy.orange.male.rabbit 是四个单词但匹配 Q2注:当一个队列绑定键是 #,那么这个队列将接收所有数据,类似于 fanout;如果队列绑定键中没有 # 和 * 出现,那么该队列绑定类型就是 direct (固定的routing key)(3)DemoProducerpublic class TopicLog { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String, String> map = new HashMap<>(); map.put(" quick.orange.rabbit", "被队列Q1Q2接收到"); map.put("lazy.orange.elephant", "被队列Q1Q2接收到"); map.put("quick.orange.fox", "被队列Q1接收到"); map.put(" lazy.brown.fox", "被队列Q2接收到"); map.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次"); map.put("quick. brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃"); map.put(" quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃"); map.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2"); for (Map.Entry<String, String> entry : map.entrySet()) { channel.basicPublish(EXCHANGE_NAME, entry.getKey(), null, entry.getValue().getBytes(StandardCharsets.UTF_8)); System.out.println("发送的消息是:" + entry.getValue()); } } }Consumerpublic class ReceiveLogTopic01 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q1", false, false, false, null); channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey()); }; channel.basicConsume("Q1", true, deliverCallback, consumerTag -> { }); } } public class ReceiveLogTopic02 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q2", false, false, false, null); channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey()); }; channel.basicConsume("Q2", true, deliverCallback, consumerTag -> { }); } }
2023年06月20日
69 阅读
0 评论
0 点赞
2023-06-20
RabbitMQ - 发布确认
1. 发布确认原理生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的 消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。注:confirm 模式是异步的2. 发布确认策略(1)开启发布确认发布确认默认未开启,手动开启//开启发布确认 channel.confirmSelect();(2)单个确认发布是一种同步确认发布的方式,即发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量,但对于某些应用程序来说这可能已经足够了。public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //单个确认 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("第" + i + "条消息发送成功"); } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(单个确认模式)耗时:" + (endTime - startTime)); }(3)批量发布确认先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息,该方案也是同步的,也一样阻塞消息的发布。public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); int batchSize = 100; //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //批量确认 if (i % batchSize == 0) { boolean flag = channel.waitForConfirms(); if (flag) { System.out.println(i + "条消息发送成功"); } } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(批量确认模式)耗时:" + (endTime - startTime)); }(4)异步确认发布利用回调函数来达到消息可靠性的传递public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //使用线程安全的哈希表记录消息 ConcurrentSkipListMap<Long, Object> outstandingConfirms = new ConcurrentSkipListMap<>(); //确认成功 回调 ConfirmCallback ackCallback = (deliveryTag, multiple) -> { //2.删除已确认的消息,即剩下的就是确认失败的消息 if (multiple) { outstandingConfirms.headMap(deliveryTag).clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息:" + deliveryTag); }; //确认失败 回调 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { //3.打印未确认的消息 Object msg = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息tag:" + deliveryTag + ",未确认的消息内容:" + msg.toString()); }; //消息监听器 channel.addConfirmListener(ackCallback, nackCallback); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //1.记录已发送的消息 outstandingConfirms.put(channel.getNextPublishSeqNo(), msg); } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(异步确认模式)耗时:" + (endTime - startTime)); }(5)处理异步未确认消息把未确认的消息放到一个基于内存的且能被发布线程访问的队列,如:用 ConcurrentLinkedQueue 队列在 confirm callbacks 与发布线程之间进行消息的传递。(6)三种方式对比单独发布消息:同步等待确认,简单,但吞吐量非常有限批量发布消息:批量同步等待确认,简单,合理的吞吐量,但出现问题后很难判断是那条消息出现了问题异步处理: 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来较为复杂
2023年06月20日
78 阅读
0 评论
0 点赞
2023-06-19
RabbitMQ - WorkQueue
工作队列(又称任务队列):其主要思想是避免立即执行资源密集型任务,而不得不等待它完成。消息队列将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。1. 轮询分发(1)抽取工具类public class RabbitMqUtils { /** * 获取Channel * * @return Channel * @throws IOException * @throws TimeoutException */ public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection.createChannel(); } }(2)工作线程public class Worker01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("收到消息:" + new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); //接收消息 System.out.println("c1等待接收消息..."); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } } public class Worker02 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("收到消息:" + new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); //接收消息 System.out.println("c2等待接收消息..."); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }(3)Task线程public class Task01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("发送了消息:" + msg); } } }(4)结果生产者一共发送了4条消息,消费者1和消费者2按次序分别接收了两条消息2. 消息应答(1)概念RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息也将丢失,因为它无法接收。为了保证消息在发送过程中不丢失,rabbitmq 引入了消息应答机制,即:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。(2)自动应答消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为在这种模式下,如果消息在接收到之前,消费者那边出现连接的问题或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率处理这些消息的情况下使用。(3)消息应答的方式Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功地处理了消息,可以将其丢弃了Channel.basicNack(用于否定确认)Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数,不处理该消息直接拒绝,表示可以将其丢弃了(4)Mutiplechannel.basicAck(deliveryTag, true)true 代表批量应答 channel 上未应答的消息,比如说 channel 上有传送 tag 为5,6,7,8的消息,当前的 tag 是 8,那么此时 5 - 8 的这些还未应答的消息都会被确认收到消息应答false 同上面相比 只会应答 tag = 8 的消息,5,6,7 这三个消息依然不会被确认收到消息应答(5)消息自动重新入队如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其进行重新排队。如果此时其他消费者(状态良好)可以处理任务,MQ将会很快将其重新分发给另一个消费者,这样,即使某个消费者遇到突发状况,也可以确保不会丢失任何消息。(6)Demo消息应答默认值为 true,即自动应答,此处以手动应答为例SleepUtilpublic class SleepUtils { public static void sleep(int seconds) { try { Thread.sleep(1000L * seconds); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }Producerpublic class Task02 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("生产者发送消息:" + msg); } } }Consumerpublic class Worker03 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("c1等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(1); System.out.println("接收到的消息:" + new String(message.getBody())); //手动应答 /** * 参数说明: * 1.消息的标记 * 2.是否批量应答 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> System.out.println("消费者取消消费接口回调逻辑")); } } public class Worker04 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(30); System.out.println("接收到的消息:" + new String(message.getBody())); //手动应答 /** * 参数说明: * 1.消息的标记 * 2.是否批量应答 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> System.out.println("消费者取消消费接口回调逻辑")); } }在发送者发送消息 xxx 后把 C2 消费者停掉,按理说该 C2 来处理的消息,但是由于它处理时间较长,在还没有执行 ack 代码的时候,C2 被停掉了,此时会看到消息被 C1 接收到了,说明消息 xxx 被重新入队,然后分配给能处理消息的 C1 消费者3. 消息持久化(1)概念默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它会忽视队列和消息,除非告知它不要这样做,确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化(2)队列持久化之前创建的队列时都是非持久化的,rabbitmq 在重启之后该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为(true)持久化boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);注:如果之前声明的队列(相同的队列名)不是持久化的,需要把原先的队列先删除,或者重新创建一个不同名的持久化队列,不然会报错 received 'true' but current is 'false'在控制面板中 持久化 与 非持久化队列 的区别(3)消息持久化消息实现持久化需要修改生产者的代码,添加 MessageProperties.PERSISTENT_TEXT_PLAIN 属性# 未持久化 channel.basicPublish("", null, msg.getBytes()); # 持久化 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点,此时并没有真正写入磁盘。(4)不公平分发轮询分发在一些场景下并不是很好,比方说有两个消费者在处理任务,其中消费者1处理任务的速度非常快,而另外一个消费者2 处理速度却很慢,这个时候如果还是采用轮询分发的话就会让处理速度快的消费者在很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下并不太好,但此时 RabbitMQ 并不知道这种情况,它依然很公平的在进行消息分发,为了避免这种情况,可以设置参数 channel.basicQos(1)int prefetchCount = 2; channel.basicQos(prefetchCount);(5)预取值消息的发送是异步的,所以在任何时候,channel 上肯定不止只有一个消息,且消费者的手动确认本质上也是异步的,因此这里就存在一个未确认的消息缓冲区,所以要求开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。可以通过channel.basicQos() 方法设置”预取值“来完成,该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例:假设在通道上有未确认的消息 5,6,7,8,并且通道的预取计数为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。消息应答和 QoS 预取值对吞吐量有很大的影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗,应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接的节点的内存消耗变大,所以设置合适的预取值是一个反复试验的过程,不同的负载该值取值也不同,默认情况话建议100 到 300,这个范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险,预取值为 1 是最保守的,但这将使吞吐量变得很低,特别是消费者连接延迟很大的情况下//不公平分发 /** * prefetchCount 等于 0 公平分发(默认) * prefetchCount 等于 1 公平分发 * prefetchCount 大于1时 预取值 */ int prefetchCount = 2; channel.basicQos(prefetchCount);
2023年06月19日
40 阅读
0 评论
0 点赞
2023-06-19
RabbitMQ - 介绍、四大核心概念
参考于b站尚硅谷RabbitMQ课程一、介绍RabbitMQ 是一个消息中间件:它接受并转发消息,可以把它当做一个快递站点,当你要发送一个包裹时,你把包裹放到快递站,快递员最终会把你的快递送到收件人那里,此时 RabbitMQ 可以比作是一个快递站,由快递员帮你传递快件(RabbitMQ 与快递站的主要区别在于它不处理快件而仅作接收,存储和转发消息数据)二、四大核心概念生产者产生数据发送消息交换机交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦者是将消息丢弃,这个得由交换机类型决定 队列队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。多个生产者可以将消息发送到一个队列,同理多个消费者也可以尝试从一个队列接收数据消费者消费与接收具有相似的含义,消费者大多时候是一个等待接收消息的程序。注:生产者、消费者和消息中间件很多时候并不在同一台机器上,同一个应用程序既可以是生产者又可以是消费者名词介绍Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message BrokerVirtual host:出于多租户和安全因素设计,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等 Connection:publisher/consumer 和 broker 之间的 TCP 连接Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe), fanout (multicast)Queue:消息最终被送到这里等待 consumer 取走Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据安装可参考官网文档:https://www.rabbitmq.com/download.htmlDemopom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sw</groupId> <artifactId>rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- rabbitmq client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!-- commons io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>Producerpublic class Producer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); /** * 创建队列 * 参数说明: * 1.队列名称 * 2.是否持久化存储消息 * 3.是否进行消息共享(只供一个消费者进行消费) * 4.是否自动删除(最后一个消费者断开连接后,是否自动删除该队列) * 5.其他参数 */ /** * 2023-06-15偷懒 优先级队列 */ Map<String, Object> arguments = new HashMap<>(); //优先级范围 0-10(设置过大会存在性能问题) arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); for (int i = 1; i < 10; i++) { String msg = "hello world" + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } /** * 发送消息 * 参数说明: * 1.交换机 * 2.路由key * 3.其他参数 * 4.消息体 */ System.out.println("消息发送完毕"); } }Consumerpublic class Consumer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费由于某些原因中断"); /** * 接收消息 * 1.队列名称 * 2.消费成功后是否自动应答 * 3.未成功消费的回调 * 4.取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
2023年06月19日
43 阅读
0 评论
0 点赞
1
2
3