首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,082 阅读
2
类的加载
741 阅读
3
Spring Cloud OAuth2.0
726 阅读
4
SpringBoot自动装配原理
691 阅读
5
集合不安全问题
584 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
388
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
页面
统计
关于
搜索到
1
篇与
的结果
2023-06-22
RabbitMQ - 延时队列
1. 概念队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素希望在指定时间到了之后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。2. 使用场景订单在十分钟之内未支付则自动取消新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒用户注册成功后,如果三天内没有登录则进行短信提醒用户发起退款,如果三天内没有得到处理则通知相关运营人员预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议……4. RabbitMQ中的TTLTTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,则优先取较小的值。5. 设置TTL的两种方式(1)消息设置ttlrabbitTemplate.convertAndSend("X", "XC", msg, message -> { //设置ttl message.getMessageProperties().setExpiration(ttlTime); return message; });(2)队列设置ttl//ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();(3)区别消息设置ttl,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外还需要注意的一点是,如果不设置 ttl,表示消息永远不会过期,如果将 ttl设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃队列设置 ttl 时,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队 列中)6. 整合 SpringBootpom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.sw.rabbitmq</groupId> <artifactId>RabbitmqDemo01</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitmqDemo01</name> <description>RabbitmqDemo01</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <!-- springboot web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.15</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- swagger2 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <!-- swagger ui --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!-- test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- rabbit test --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>application.ymlserver: port: 8088 spring: rabbitmq: host: 192.168.123.88 port: 5672 username: admin password: 123456swagger config@Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket defaultApi() { return new Docket(DocumentationType.SWAGGER_2) .select() .apis(RequestHandlerSelectors.basePackage("com.sw")) .paths(PathSelectors.any()) .build() .apiInfo(apiInfo()); } private ApiInfo apiInfo() { return new ApiInfo( "rabbitmq", "Rabbitmq-Demo", "1.0.0-SNAPSHOT", null, new Contact("suaxi", "http://www.test.com/", "test@qq.com"), "Apache 2.0", "https://www.apache.org/licenses/LICENSE-2.0.html", Collections.emptyList()); } }7. 队列TTL创建两个队列 QA 和 QB,两者队列 ttl 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,同时创建一个死信队列 QD(1)队列配置@Configuration public class TtlQueueConfig { //普通交换机 public static final String X_EXCHANGE = "X"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; /** * 普通交换机 * * @return */ @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 死信交换机 * * @return */ @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 普通队列A * * @return */ @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } /** * 普通队列B * * @return */ @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 40); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } /** * 死信队列 * * @return */ @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } /** * 绑定 * * @return */ @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD").noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}") @ApiOperation("发送消息") public void sendMsg(@PathVariable("msg") String msg) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(), msg); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + msg); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + msg); } }(3)消费者@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("当前时间:{},接收到死信队列的消息:{}", new Date(), msg); } }(4)测试消息分别在10s和40s之后变为死信消息,由处理死信队列消息的消费者消费8. 优化延时队列新增一个队列 QC,且不设置 ttl 时间(1)队列配置@Configuration public class TtlQueueConfig { //普通交换机 public static final String X_EXCHANGE = "X"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String QUEUE_C = "QC"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; /** * 普通交换机 * * @return */ @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 死信交换机 * * @return */ @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 普通队列A * * @return */ @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } /** * 普通队列B * * @return */ @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 40); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } /** * 死信队列 * * @return */ @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } /** * 绑定 * * @return */ @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD").noargs(); } /** * 死信队列C * * @return */ @Bean("queueC") public Queue queueC() { Map<String, Object> arguments = new HashMap<>(2); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } /** * 绑定 * * @return */ @Bean public Binding queueCBindingY(@Qualifier("queueC") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XC").noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendExpirationMsg/{msg}/{ttlTime}") @ApiOperation("发送消息(自定义ttl)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("ttlTime") String ttlTime) { log.info("当前时间:{},发送一条过期时间为:{}ms 的信息给TTL队列:{}", new Date(), ttlTime, msg); rabbitTemplate.convertAndSend("X", "XC", msg, message -> { //设置ttl message.getMessageProperties().setExpiration(ttlTime); return message; }); } }(3)测试两条消息设置了不同的ttl(由生产者发送消息时决定),但 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行9. RabbitMQ插件实现延迟队列如果不能实现在消息粒度上的ttl,并使其在设置的ttl时间及时死亡,就无法设计成一个通用的延时队列。(1)安装延时队列插件通过官网 https://www.rabbitmq.com/community-plugins.html 可下载rabbitmq_delayed_message_exchange 插件,下载完成后解压到 RabbitMQ 的插件目录,进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启rabbitmq-plugins enable rabbitmq_delayed_message_exchange安装成功后开看到 The following plugins have been enabled 字眼(2)新增延时队列、交换机新增一个队列 delayed.queue,一个自定义交换机 delayed.exchange(3)队列配置@Configuration public class DelayQueueConfig { public static final String DELAYED_EXCHANGE = "delayed.exchange"; public static final String DELAYED_QUEUE = "delayed.queue"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean public CustomExchange delayedExchange() { /** * 参数数目:1.名称 2.类型 3.是否持久化 4.是否自动删除 5.其他参数 */ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments); } @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE); } @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs(); } }(4)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendDelayMsg/{msg}/{delayTime}") @ApiOperation("发送消息(基于 x-delayed-message 插件)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("delayTime") Integer delayTime) { log.info("当前时间:{},发送一条过期时间为:{}ms 的信息给delay.queue:{}", new Date(), delayTime, msg); rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE, DelayQueueConfig.DELAYED_ROUTING_KEY, msg, message -> { //设置ttl message.getMessageProperties().setDelay(delayTime); return message; }); } }(5)消费者@Slf4j @Component public class DelayQueueConsumer { @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE) public void receiveDelayQueue(Message message) { String msg = new String(message.getBody()); log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg); } }(6)测试可看到ttl较小的消息优先被消费了(7)补充延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。延时队列还有很多其它选择,如:Java的DelayQueue,Redis的zset,Quartz任务调度或kafka的时间轮,可根据具体的业务场景进行选择
2023年06月22日
109 阅读
0 评论
0 点赞