首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,164 阅读
2
类的加载
792 阅读
3
Spring Cloud OAuth2.0
749 阅读
4
SpringBoot自动装配原理
708 阅读
5
集合不安全问题
614 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Canvas
Linux
容器
Docker
Containerd
Kubernetes
Python
FastApi
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
403
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Canvas
Linux
容器
Docker
Containerd
Kubernetes
Python
FastApi
页面
统计
关于
搜索到
403
篇与
的结果
2024-03-03
新增的全局属性
<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的全局属性</title> <style> div { width: 600px; height: 200px; border: 1px solid black; font-size: 20px; } .box1 { background-color: skyblue; } .box2 { margin-top: 10px; background-color: green; } </style> </head> <body> <div class="box1" contenteditable="true" spellcheck="true">Lor1em ipsum, dolor sit amet consectetur adipisicing elit. Nostrum, repudiandae assumenda sed quod voluptates delectus similique dignissimos enim! Officiis, repellat.</div> <div class="box2" draggable="true" ondragend="go(event)" data-a="1" data-b="2">Lorem ipsum, dolor sit amet consectetur adipisicing elit. Nostrum, repudiandae assumenda sed quod voluptates delectus similique dignissimos enim! Officiis, repellat.</div> <script> function go(e) { alert(e.x) } </script> </body> </html>
2024年03月03日
35 阅读
0 评论
0 点赞
2024-03-03
新增的多媒体标签
1. 视频标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的视频标签</title> <style> video { width: 600px; } </style> </head> <body> <video src="./小电影.mp4" controls muted loop poster="./封面.png" preload="auto"></video> </body> </html>2. 音频标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的音频标签</title> </head> <body> <audio src="./小曲.mp3" controls autoplay loop preload="auto"></audio> </body> </html>
2024年03月03日
47 阅读
0 评论
0 点赞
2024-03-03
表单相关的新增
1. 表单控件属性<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的表单控件属性</title> </head> <body> <form action=""> 账号:<input type="text" name="account" placeholder="请输入账号" required autofocus autocomplete="on" pattern="\w{6}" > <br> 密码:<input type="password" name="pwd" placeholder="请输入密码" required> <br> 性别: <input type="radio" value="male" name="gender" required>男 <input type="radio" value="female" name="gender">女 <br> 爱好: <input type="checkbox" value="smoke" name="hobby">抽烟 <input type="checkbox" value="drink" name="hobby" required>喝酒 <input type="checkbox" value="perm" name="hobby">烫头 <br> 其他:<textarea name="other" placeholder="请输入" required></textarea> <br> <button>提交</button> </form> </body> </html>2. input新增的type属性值<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>input新增的type属性值</title> </head> <body> <form action="" novalidate> 邮箱:<input type="email" name="email" required> <br> 网址:<input type="url" name="url"> <br> 数值:<input type="number" name="number" step="2" max="100" min="10" required> <br> 搜索:<input type="search" name="keyword"> <br> 电话:<input type="tel" name="tel"> <br> 范围:<input type="range" name="range" max="100" min="10"> <br> 颜色:<input type="color" name="color"> <br> 日期:<input type="date" name="date"> <br> 月份:<input type="month" name="month"> <br> 周:<input type="week" name="week"> <br> 时间:<input type="time" name="time1"> <br> 日期+时间:<input type="datetime-local" name="time2"> <br> <button type="submit">提交</button> </form> </body> </html>
2024年03月03日
18 阅读
0 评论
0 点赞
2024-03-03
新增的语义化标签
1. 布局标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增布局标签</title> </head> <body> <!-- 头部 --> <header> <h1>xxx商城</h1> </header> <hr> <!-- 主导航 --> <nav> <a href="#">首页</a> <a href="#">订单</a> <a href="#">购物车</a> <a href="#">我的</a> </nav> <!-- 主要内容 --> <div class="page-content"> <article> <h2>感动中国人物</h2> <section> <h3>第一名:孙笑川</h3> <p>男,籍贯四川,33岁</p> </section> <section> <h3>第二名:药水哥</h3> <p>真名刘波,男,籍贯湖北,30岁</p> </section> <section> <h3>第三名:Giao哥</h3> <p>男,籍贯河南,29岁</p> </section> </article> <!-- 侧边栏导航 --> <aside style="float: right;"> <nav> <ul> <li><a href="#">秒杀专区</a></li> <li><a href="#">会员专区</a></li> <li><a href="#">优惠券专区</a></li> <li><a href="#">品牌专区</a></li> </ul> </nav> </aside> </div> <hr> <footer> <nav> <a href="#">友情链接1</a> <a href="#">友情链接2</a> <a href="#">友情链接3</a> <a href="#">友情链接4</a> </nav> </footer> </body> </html>2. 状态标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的状态标签</title> </head> <body> <span>手机电量:</span> <meter min="0" max="100" value="5" low="10" high="20" optimum="90"></meter> <span>当前进度:</span> <progress max="100" value="80"></progress> </body> </html>3. 列表标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的列表标签</title> </head> <body> <form action="#"> <input type="text" list="myData"> <button>搜索</button> </form> <datalist id="myData"> <option value="孙笑川">孙笑川</option> <option value="药水哥">药水哥</option> <option value="Giao哥">Giao哥</option> </datalist> <hr> <details> <summary>如何一夜暴富</summary> <p>白日做梦</p> </details> </body> </html>4. 文本标签<!DOCTYPE html> <html lang="zh-CN"> <head> <meta charset="UTF-8"> <title>新增的文本标签</title> </head> <body> <ruby> <span>魑魅魍魉</span> <rt>chī mèi wǎng liǎng</rt> </ruby> <hr> <p>Lorem ipsum <mark>dolor</mark> sit amet consectetur adipisicing elit. Ipsa in quae molestias id nesciunt consequatur ex deserunt enim reiciendis obcaecati!</p> </body> </html>
2024年03月03日
61 阅读
0 评论
0 点赞
2023-06-25
MongoDB 笔记
//进入test数据库 use test; //向user集合插入一个文档 db.user.insert({ username: '孙笑川' }); //查询user集合中的文档 db.user.find(); //再插入一个 db.user.insert({ username: '药水哥' }); //统计user集合中文档的数量 db.user.find().count(); //按条件查询 db.user.find({username: '孙笑川'}); //向user集合中username为孙笑川的文档添加一个age属性,值为33 db.user.update({username: '孙笑川'}, {$set: {age: 33}}); //使用{username: 'Giao哥'}替换username为药水哥的文档 db.user.replaceOne({username: '药水哥'}, {username: 'Giao哥'}); //删除username为孙笑川的文档的age属性 db.user.update({username: '孙笑川'}, {$unset: {age: 1}}) //向username为孙笑川的文档添加一个hobby:{cities: ['北京', '上海', '深圳']} //MongoDB的属性值也可以是一个文档,当一个文档的属性也是一个文档时,称这个文档为内嵌文档 db.user.update({username: '孙笑川'}, {$set: {hobby:{cities: ['北京', '上海', '深圳']}}}); //向username为Giao哥的文档添加一个hobby:{movies: ['007', '碟中谍', '加勒比海盗']} db.user.update({username: 'Giao哥'}, {$set: {hobby:{movies: ['007', '碟中谍', '加勒比海盗']}}}); //查询喜欢007电影的文档 //MongoDB支持内嵌文档的属性直接查询,此时属性名必须带引号 db.user.find({'hobby.movies':'007'}); //向Giao哥添加一个新的电影 //$push 向集合中添加一个新的元素 //$addToSet 向集合中添加一个新的元素(如果数组中已存在了该元素,则不执行添加) db.user.update({username: 'Giao哥'}, {$push: {'hobby.movies': '大话西游'}}); db.user.update({username: 'Giao哥'}, {$addToSet: {'hobby.movies': '大话西游'}}); //删除喜欢北京的用户文档 db.user.remove({'hobby.cities': '北京'}); //删除user集合 db.user.remove({}); db.user.drop(); show collections; //向number集合中插入20000条数据 for(var i = 1; i <= 20000; i++) { db.number.insert({num: i}); } //11s db.number.find().count(); db.number.drop(); var arr = []; for(var i = 1; i <= 20000; i++) { arr.push({num: i}); } db.number.insert(arr); //0.857s db.number.find().count(); //查找num为500的文档 db.number.find({num: 500}); //查找num大于500的文档 大于 $gt 大于等于 $gte 小于 $lt 小于等于 $lte 不等于 $ne db.number.find({num: {$gt: 500}}); //查找num大于40小于50的文档 db.number.find({num: {$gt: 40, $lt: 50}}); //查找number的前10条数据 db.number.find().limit(10); //查找number的11条到20条数据 skip 跳过指定数量的数据 //skip((pageNum - 1) * 10).limit(pageSize) db.number.find().skip(10).limit(10); //MongoDB会自定调整limit和skip的位置,无顺序影响 db.number.find().limit(10).skip(10); //文档间的关系 //一对一 db.couple.insert([ { name: '黄蓉', husband: { name: '郭靖' } }, { name: '潘金莲', husband: { name: '武大郎' } } ]); db.couple.find(); //一对多 用户 user 订单order(使用内嵌文档可能会出现数据越来越多难以维护的情况) db.user.insert([ {username: '孙笑川'}, {username: '药水哥'}, {username: 'Giao哥'} ]); db.user.find(); db.order.insert([ { list: ['苹果', '西瓜', '梨'], user_id: ObjectId("649847f13d750000000045e4") }, { list: ['铅笔', '钢笔', '圆珠笔'], user_id: ObjectId("649847f13d750000000045e5") }, { list: ['羽毛球', '篮球'], user_id: ObjectId("649847f13d750000000045e6") }, ]); db.order.find(); //查找孙笑川的订单 var user_id = db.user.findOne({username: '孙笑川'})._id; db.order.find({user_id: user_id}); //多对多 老师 teacher 学生 student db.teacher.insert([ {name: '孙笑川'}, {name: '药水哥'}, {name: 'Giao哥'} ]); db.teacher.find(); db.student.insert([ { name: '小明', teacher_ids: [ ObjectId("64984a183d750000000045ea"), ObjectId("64984a183d750000000045eb") ] }, { name: '小华', teacher_ids: [ ObjectId("64984a183d750000000045ea"), ObjectId("64984a183d750000000045ec") ] }, ]); db.student.find(); //查询工资小于1000或大于2500的员工 db.emp.find({$or: [{sal: {$lt: 1000}}, {sal: {$gt: 2500}}]}); //查询财务部的所有员工 var dept_no = db.dept.findOne({dname: '财务部'}).deptno; db.emp.find({depno: dept_no}) //为工资低于1000的员工加400工资 $inc 在原基础上增加 db.emp.updateMany({sal: {$lte: 1000}}, {$inc: {sal: 400}}); //查询emp并按照_id升序排列 //sort() 按照指定属性排序 1 升序 -1 降序 //先按sal升序,再按empno降序 db.emp.find({}).sort({sal: 1, empno: -1}); //在查询时,可以在第二个参数设置要查询的列(投影) db.emp.find({}, {ename: 1, _id: 0, sal: 1});
2023年06月25日
196 阅读
0 评论
0 点赞
2023-06-23
RabbitMQ - 其他补充
1. 幂等性(1)概念用户对于同一操作发起的一次请求或者多次请求的结果是一致的,不会因为多次点击而产生了副作用。(2)消息重复消费消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息。(3)解决思路一般使用全局 ID、唯一标识(时间戳、UUID)或 MQ 消息的 id tag 来判断,亦或是zi定义全局唯一 id,每次消费消息时先判断 id 对应的消息是否已消费过(4)消费端的幂等性保障在海量订单生成的业务高峰期,生产端有可能就会重复发送消息,这时候消费端就要实现幂等性,即消息永远不会被消费多次(即使多次收到了一样的消息)。唯一ID + 指纹码机制,利用数据库主键去重利用 redis 的原子性实现(setnx)2. 优先级队列(1)设置方式在管理页面添加在代码中添加Map<String, Object> arguments = new HashMap<>(); //优先级范围 0-10(设置过大会存在性能问题) arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);设置优先级之后的队列(2)测试Producerpublic class Producer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); /** * 创建队列 * 参数说明: * 1.队列名称 * 2.是否持久化存储消息 * 3.是否进行消息共享(只供一个消费者进行消费) * 4.是否自动删除(最后一个消费者断开连接后,是否自动删除该队列) * 5.其他参数 */ /** * 2023-06-15偷懒 优先级队列 */ Map<String, Object> arguments = new HashMap<>(); //优先级范围 0-10(设置过大会存在性能问题) arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); for (int i = 1; i < 10; i++) { String msg = "hello world" + i; if (i == 5) { AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().priority(5).build(); channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes()); } else { channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } } } }Consumerpublic class Consumer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费由于某些原因中断"); /** * 接收消息 * 1.队列名称 * 2.消费成功后是否自动应答 * 3.未成功消费的回调 * 4.取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }3. 惰性队列(1)使用场景RabbitMQ 从 3.6.0 版本开始引入了惰性队列的概念,惰性队列会尽可能的将消息存入磁盘中,而在消费者消费到相应的消息时才会被加载到内存中,它的一个重要的设计目标是能够支持更长的队列,即支持更多的消息存储。当消费者由于各种各样的原因(比如消费者下线、宕机亦或是由于维护停机等)而导致使长时间内不能消费消息造成堆积时,惰性队列就很有必要了。 默认情况下,当生产者将消息发送到 RabbitMQ 的时候,队列中的消息会尽可能的存储在内存中,这样可以更快速的将消息发送给消费者。即使是持久化的消息,在被写入磁盘的同时也会在内存中驻留 一份备份。当 RabbitMQ 需要释放内存的时候,会将内存中的消息换页至磁盘中,这个操作会耗费较长的时间(同步操作),进而无法接收新的消息。虽然 RabbitMQ 的开发者们一直在升级相关的算法, 但是效果始终不太理想,尤其是在消息量特别大的时候。(2)两种模式default(默认模式):在 3.6.0 之前的版本无需做任何变更lazy 模式:即惰性队列模式,可以通过调用 channel.queueDeclare 方法的时候在参数中设置,也可以通过 Policy 的方式设置,如果一个队列同时使用这两种方式设置的话,那么 Policy 的方式具备更高的优先级。注:如果要通过声明的方式改变已有队列的模式,那么只能先删除队列,然后再重新声明一个新的队列,同时已产生的消息会同步删除。在管理界面设置在代码中设置Map<String, Object> arguments = new HashMap<>(); arguments.put("x-queue-mode", "lazy"); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments);(3)内存开销对比
2023年06月23日
108 阅读
0 评论
0 点赞
2023-06-23
RabbitMQ - 发布确认补充
1. 发布确认在RabbitMQ不可用的情况下,如何处理无法投递的消息?在application.yml配置文件中添加# 消息确认类型 执行回调 publisher-confirm-type: correlated补充:NONE 禁用发布确认模式,是默认值CORRELATED 发布消息成功到交换器后会触发回调方法SIMPLE 包含两种效果:和 CORRELATED 值一样会触发回调方法在发布消息成功后使用 rabbitTemplate 调用 waitForConfirms 或 waitForConfirmsOrDie 方法等待 broker 节点返回发送结果,根据返回结果来判定下一步的逻辑,要注意的点是 waitForConfirmsOrDie 方法如果返回 false 则会关闭 channel,则接下来无法发送消息到 broker(1)队列配置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/confirm") public class SendConfirmMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}/{routingKey}") @ApiOperation("发送消息(发布确认)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) { log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey); //正常发送 rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString())); } }(3)回调接口@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 注入 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 交换机确认回调方法 * * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息 * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData != null) { String id = correlationData.getId() != null ? correlationData.getId() : ""; if (ack) { log.info("交换机收到ID:{} 的消息", id); } else { log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause); } } else { log.error("消息接收发生了异常!"); } } }(4)消费者@Slf4j @Component public class ConfirmConsumer { @RabbitListener(queues = ConfirmQueueConfig.CONFIRM_QUEUE) public void receiveMsg(Message message) { String msg = new String(message.getBody()); log.info("接收到 {} 队列的消息:{}", ConfirmQueueConfig.CONFIRM_QUEUE, msg); } }(5)测试可以看到,发送了两条消息,第一条消息的 RoutingKey 为 "key1",第二条消息的 RoutingKey 为 "key2",两条消息都成功被交换机接收,也收到了交换机的确认回调,但消费者只收到了一条消息,因为第二条消息的 RoutingKey 与队列的 BindingKey 不一致,也没有其它队列能接收这个消息,所有第二条消息被直接丢弃了2.回退消息(1)Mandatory 参数在仅开启了生产者确认机制的情况下,交换机接收到消息后,会直接给消息生产者发送确认消息,如果发现该消息不可路由,那么消息会被直接丢弃,并且生产者是不知道消息被丢弃的。此时可通过设置 mandatory 参数,当消息在传递过程中不可达目的地时将消息返回给生产者(2)包含回退的队列设置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return QueueBuilder.durable(CONFIRM_QUEUE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } }(3)消息生产者@Slf4j @RestController @RequestMapping("/confirm") public class SendConfirmMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}/{routingKey}") @ApiOperation("发送消息(发布确认)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("routingKey") String routingKey) { log.info("当前时间:{},发送消息:{},routingKey:{}", new Date(), msg, routingKey); //正常发送 rabbitTemplate.convertAndSend(ConfirmQueueConfig.CONFIRM_EXCHANGE, routingKey, msg, new CorrelationData(UUID.randomUUID().toString())); } }(4)回调接口@Slf4j @Component public class ConfirmCallback implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback { @Autowired private RabbitTemplate rabbitTemplate; /** * 注入 */ @PostConstruct public void init() { rabbitTemplate.setConfirmCallback(this); rabbitTemplate.setReturnCallback(this); } /** * 交换机确认回调方法 * * @param correlationData correlation data for the callback. 回调消息的ID及相关的信息 * @param ack true for ack, false for nack * @param cause An optional cause, for nack, when available, otherwise null. */ @Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if (correlationData != null) { String id = correlationData.getId() != null ? correlationData.getId() : ""; if (ack) { log.info("交换机收到ID:{} 的消息", id); } else { log.error("交换机未收到ID:{} 的消息,原因:{}", id, cause); } } else { log.error("消息接收发生了异常!"); } } /** * 消息在传递过程中不可达目的地时,进行消息回退 * @param message the returned message. * @param replyCode the reply code. * @param replyText the reply text. * @param exchange the exchange. * @param routingKey the routing key. */ @Override public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) { log.error("消息:{},被交换机:{}退回,路由key:{},原因:{}", new String(message.getBody()), exchange, routingKey, replyText); } }(5)测试参考未设置消息回退时的测试结果,设置回退后,未匹配routingkey的消息被返回到了队列中3. 备份交换机备份交换机可以理解为 RabbitMQ 中交换机的“备胎”,当为某一个交换机声明一个对应的备份交换机时,就是为它创建一个备胎,当交换机接收到一条不可路由消息时,会把这条消息转发到备份交换机中,由备份交换机来进行转发和处理,通常备份交换机的类型为 Fanout ,这样就能把所有消息都投递到与其绑定的队列中,然后我们在备份交换机下绑定一个队列,这样所有那些原交换机无法被路由的消息,就会都进入这个队列(1)队列配置@Configuration public class ConfirmQueueConfig { public static final String CONFIRM_EXCHANGE = "confirm.exchange"; public static final String CONFIRM_QUEUE = "confirm.queue"; public static final String CONFIRM_ROUTING_KEY = "key1"; public static final String BACKUP_EXCHANGE = "backup.exchange"; public static final String BACKUP_QUEUE = "backup.queue"; public static final String WARRING_QUEUE = "warring.queue"; @Bean("confirmExchange") public DirectExchange confirmExchange() { return new DirectExchange(CONFIRM_EXCHANGE); } @Bean("confirmQueue") public Queue confirmQueue() { return ExchangeBuilder.directExchange(CONFIRM_EXCHANGE).durable(true).withArgument("alternate-exchange", BACKUP_EXCHANGE).build(); } @Bean public Binding queueBindingExchange(@Qualifier("confirmQueue") Queue queue, @Qualifier("confirmExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(CONFIRM_ROUTING_KEY).noargs(); } @Bean("backupExchange") public FanoutExchange backupExchange() { return new FanoutExchange(BACKUP_EXCHANGE); } @Bean("backupQueue") public Queue backupQueue() { return QueueBuilder.durable(BACKUP_QUEUE).build(); } @Bean("warringQueue") public Queue warringQueue() { return QueueBuilder.durable(WARRING_QUEUE).build(); } @Bean public Binding queueBindingBackupExchange(@Qualifier("backupQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } @Bean public Binding queueBindingWarringExchange(@Qualifier("warringQueue") Queue queue, @Qualifier("backupExchange") FanoutExchange exchange) { return BindingBuilder.bind(queue).to(exchange); } }(5)报警消费者@Slf4j @Component public class WarringConsumer { @RabbitListener(queues = ConfirmQueueConfig.WARRING_QUEUE) public void receiveWarringMsg(Message message) { String msg = new String(message.getBody()); log.warn("发现不可路由消息:{}", msg); } }(5)生产者、回调接口配置同理(6)测试注:mandatory 参数可与备份交换机同时使用,但备份交换机优先级最高
2023年06月23日
126 阅读
0 评论
0 点赞
2023-06-22
RabbitMQ - 延时队列
1. 概念队列内部是有序的,最重要的特性就体现在它的延时属性上,延时队列中的元素希望在指定时间到了之后或之前取出和处理,简单来说,延时队列就是用来存放需要在指定时间被处理的元素的队列。2. 使用场景订单在十分钟之内未支付则自动取消新创建的店铺,如果在十天内都没有上传过商品,则自动发送消息提醒用户注册成功后,如果三天内没有登录则进行短信提醒用户发起退款,如果三天内没有得到处理则通知相关运营人员预定会议后,需要在预定的时间点前十分钟通知各个与会人员参加会议……4. RabbitMQ中的TTLTTL 是 RabbitMQ 中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位毫秒。换句话说,如果一条消息设置了 TTL 属性或者进入了设置 TTL 属性的队列,那么这 条消息如果在 TTL 设置的时间内没有被消费,则会成为"死信"。如果同时配置了队列的 TTL 和消息的 TTL,则优先取较小的值。5. 设置TTL的两种方式(1)消息设置ttlrabbitTemplate.convertAndSend("X", "XC", msg, message -> { //设置ttl message.getMessageProperties().setExpiration(ttlTime); return message; });(2)队列设置ttl//ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build();(3)区别消息设置ttl,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间;另外还需要注意的一点是,如果不设置 ttl,表示消息永远不会过期,如果将 ttl设置为 0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃队列设置 ttl 时,那么一旦消息过期,就会被队列丢弃(如果配置了死信队列被丢到死信队 列中)6. 整合 SpringBootpom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.2.5.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <groupId>com.sw.rabbitmq</groupId> <artifactId>RabbitmqDemo01</artifactId> <version>0.0.1-SNAPSHOT</version> <name>RabbitmqDemo01</name> <description>RabbitmqDemo01</description> <properties> <java.version>1.8</java.version> </properties> <dependencies> <!-- springboot web --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <!-- amqp --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> <!-- fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>2.0.15</version> </dependency> <!-- lombok --> <dependency> <groupId>org.projectlombok</groupId> <artifactId>lombok</artifactId> </dependency> <!-- swagger2 --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger2</artifactId> <version>2.9.2</version> </dependency> <!-- swagger ui --> <dependency> <groupId>io.springfox</groupId> <artifactId>springfox-swagger-ui</artifactId> <version>2.9.2</version> </dependency> <!-- test --> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-test</artifactId> <scope>test</scope> </dependency> <!-- rabbit test --> <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-maven-plugin</artifactId> </plugin> </plugins> </build> </project>application.ymlserver: port: 8088 spring: rabbitmq: host: 192.168.123.88 port: 5672 username: admin password: 123456swagger config@Configuration @EnableSwagger2 public class SwaggerConfig { @Bean public Docket defaultApi() { return new Docket(DocumentationType.SWAGGER_2) .select() .apis(RequestHandlerSelectors.basePackage("com.sw")) .paths(PathSelectors.any()) .build() .apiInfo(apiInfo()); } private ApiInfo apiInfo() { return new ApiInfo( "rabbitmq", "Rabbitmq-Demo", "1.0.0-SNAPSHOT", null, new Contact("suaxi", "http://www.test.com/", "test@qq.com"), "Apache 2.0", "https://www.apache.org/licenses/LICENSE-2.0.html", Collections.emptyList()); } }7. 队列TTL创建两个队列 QA 和 QB,两者队列 ttl 分别设置为 10S 和 40S,然后在创建一个交换机 X 和死信交 换机 Y,它们的类型都是 direct,同时创建一个死信队列 QD(1)队列配置@Configuration public class TtlQueueConfig { //普通交换机 public static final String X_EXCHANGE = "X"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; /** * 普通交换机 * * @return */ @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 死信交换机 * * @return */ @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 普通队列A * * @return */ @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } /** * 普通队列B * * @return */ @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 40); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } /** * 死信队列 * * @return */ @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } /** * 绑定 * * @return */ @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD").noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendMsg/{msg}") @ApiOperation("发送消息") public void sendMsg(@PathVariable("msg") String msg) { log.info("当前时间:{},发送一条信息给两个TTL队列:{}", new Date(), msg); rabbitTemplate.convertAndSend("X", "XA", "消息来自ttl为10s的队列" + msg); rabbitTemplate.convertAndSend("X", "XB", "消息来自ttl为40s的队列" + msg); } }(3)消费者@Slf4j @Component public class DeadLetterQueueConsumer { @RabbitListener(queues = "QD") public void receiveD(Message message, Channel channel) { String msg = new String(message.getBody(), StandardCharsets.UTF_8); log.info("当前时间:{},接收到死信队列的消息:{}", new Date(), msg); } }(4)测试消息分别在10s和40s之后变为死信消息,由处理死信队列消息的消费者消费8. 优化延时队列新增一个队列 QC,且不设置 ttl 时间(1)队列配置@Configuration public class TtlQueueConfig { //普通交换机 public static final String X_EXCHANGE = "X"; //死信交换机 public static final String Y_DEAD_LETTER_EXCHANGE = "Y"; //普通队列 public static final String QUEUE_A = "QA"; public static final String QUEUE_B = "QB"; public static final String QUEUE_C = "QC"; //死信队列 public static final String DEAD_LETTER_QUEUE = "QD"; /** * 普通交换机 * * @return */ @Bean("xExchange") public DirectExchange xExchange() { return new DirectExchange(X_EXCHANGE); } /** * 死信交换机 * * @return */ @Bean("yExchange") public DirectExchange yExchange() { return new DirectExchange(Y_DEAD_LETTER_EXCHANGE); } /** * 普通队列A * * @return */ @Bean("queueA") public Queue queueA() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 10); return QueueBuilder.durable(QUEUE_A).withArguments(arguments).build(); } /** * 普通队列B * * @return */ @Bean("queueB") public Queue queueB() { Map<String, Object> arguments = new HashMap<>(3); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); //ttl arguments.put("x-message-ttl", 1000 * 40); return QueueBuilder.durable(QUEUE_B).withArguments(arguments).build(); } /** * 死信队列 * * @return */ @Bean("queueD") public Queue queueD() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); } /** * 绑定 * * @return */ @Bean public Binding queueABindingX(@Qualifier("queueA") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XA").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueBBindingX(@Qualifier("queueB") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XB").noargs(); } /** * 绑定 * * @return */ @Bean public Binding queueDBindingY(@Qualifier("queueD") Queue queue, @Qualifier("yExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("YD").noargs(); } /** * 死信队列C * * @return */ @Bean("queueC") public Queue queueC() { Map<String, Object> arguments = new HashMap<>(2); //设置死信交换机 arguments.put("x-dead-letter-exchange", Y_DEAD_LETTER_EXCHANGE); //设置死信routingKey arguments.put("x-dead-letter-routing-key", "YD"); return QueueBuilder.durable(QUEUE_C).withArguments(arguments).build(); } /** * 绑定 * * @return */ @Bean public Binding queueCBindingY(@Qualifier("queueC") Queue queue, @Qualifier("xExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with("XC").noargs(); } }(2)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendExpirationMsg/{msg}/{ttlTime}") @ApiOperation("发送消息(自定义ttl)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("ttlTime") String ttlTime) { log.info("当前时间:{},发送一条过期时间为:{}ms 的信息给TTL队列:{}", new Date(), ttlTime, msg); rabbitTemplate.convertAndSend("X", "XC", msg, message -> { //设置ttl message.getMessageProperties().setExpiration(ttlTime); return message; }); } }(3)测试两条消息设置了不同的ttl(由生产者发送消息时决定),但 RabbitMQ 只会检查第一个消息是否过期,如果过期则丢到死信队列,如果第一个消息的延时很长,而第二个消息的延时时长很短,第二个消息并不会优先得到执行9. RabbitMQ插件实现延迟队列如果不能实现在消息粒度上的ttl,并使其在设置的ttl时间及时死亡,就无法设计成一个通用的延时队列。(1)安装延时队列插件通过官网 https://www.rabbitmq.com/community-plugins.html 可下载rabbitmq_delayed_message_exchange 插件,下载完成后解压到 RabbitMQ 的插件目录,进入 RabbitMQ 的安装目录下的 plgins 目录,执行下面命令让该插件生效,然后重启rabbitmq-plugins enable rabbitmq_delayed_message_exchange安装成功后开看到 The following plugins have been enabled 字眼(2)新增延时队列、交换机新增一个队列 delayed.queue,一个自定义交换机 delayed.exchange(3)队列配置@Configuration public class DelayQueueConfig { public static final String DELAYED_EXCHANGE = "delayed.exchange"; public static final String DELAYED_QUEUE = "delayed.queue"; public static final String DELAYED_ROUTING_KEY = "delayed.routingkey"; @Bean public CustomExchange delayedExchange() { /** * 参数数目:1.名称 2.类型 3.是否持久化 4.是否自动删除 5.其他参数 */ Map<String, Object> arguments = new HashMap<>(); arguments.put("x-delayed-type", "direct"); return new CustomExchange(DELAYED_EXCHANGE, "x-delayed-message", false, false, arguments); } @Bean public Queue delayedQueue() { return new Queue(DELAYED_QUEUE); } @Bean public Binding delayedQueueBindingDelayedExchange(@Qualifier("delayedQueue") Queue queue, @Qualifier("delayedExchange") Exchange exchange) { return BindingBuilder.bind(queue).to(exchange).with(DELAYED_ROUTING_KEY).noargs(); } }(4)消息生产者@Slf4j @RestController @RequestMapping("/ttl") public class SendMsgController { @Autowired private RabbitTemplate rabbitTemplate; @GetMapping("/sendDelayMsg/{msg}/{delayTime}") @ApiOperation("发送消息(基于 x-delayed-message 插件)") public void sendMsg(@PathVariable("msg") String msg, @PathVariable("delayTime") Integer delayTime) { log.info("当前时间:{},发送一条过期时间为:{}ms 的信息给delay.queue:{}", new Date(), delayTime, msg); rabbitTemplate.convertAndSend(DelayQueueConfig.DELAYED_EXCHANGE, DelayQueueConfig.DELAYED_ROUTING_KEY, msg, message -> { //设置ttl message.getMessageProperties().setDelay(delayTime); return message; }); } }(5)消费者@Slf4j @Component public class DelayQueueConsumer { @RabbitListener(queues = DelayQueueConfig.DELAYED_QUEUE) public void receiveDelayQueue(Message message) { String msg = new String(message.getBody()); log.info("当前时间:{},收到延迟队列的消息:{}", new Date(), msg); } }(6)测试可看到ttl较小的消息优先被消费了(7)补充延时队列在需要延时处理的场景下非常有用,使用 RabbitMQ 来实现延时队列可以很好的利用 RabbitMQ 的特性,如:消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。延时队列还有很多其它选择,如:Java的DelayQueue,Redis的zset,Quartz任务调度或kafka的时间轮,可根据具体的业务场景进行选择
2023年06月22日
116 阅读
0 评论
0 点赞
1
...
10
11
12
...
51