首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,164 阅读
2
类的加载
792 阅读
3
Spring Cloud OAuth2.0
749 阅读
4
SpringBoot自动装配原理
708 阅读
5
集合不安全问题
614 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Canvas
Linux
容器
Docker
Containerd
Kubernetes
Python
FastApi
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
403
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Canvas
Linux
容器
Docker
Containerd
Kubernetes
Python
FastApi
页面
统计
关于
搜索到
403
篇与
的结果
2023-06-22
RabbitMQ - 死信队列
无法被消费的消息,即:producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。应用场景:为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中1. 来源消息 TTL 过期队列达到最大长度(队列满了,无法再添加数据到 mq 中)消息被拒绝(basic.reject 或 basic.nack)并且 requeue = false2. 消息 TTL 过期Producerpublic class Producer { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //死信消息 AMQP.BasicProperties properties = new AMQP.BasicProperties() .builder().expiration("10000").build(); for (int i = 1; i < 11; i++) { String msg = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes()); System.out.println("发送的消息是:" + msg); } } }Consumer1 (启动之后关闭,模拟消费者不能正常消息的情况)public class Consumer01 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //过期时间(过期时间可由生产者或者消费者设置,二选一) arguments.put("x-message-ttl", 10000); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c1等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer01接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {}); } }Consumer2 处理死信队列的消息public class Consumer02 { //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("消费者c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {}); } }3. 队列达到最大长度Peoducerpublic class Producer { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); for (int i = 1; i < 11; i++) { String msg = "info" + i; channel.basicPublish(NORMAL_EXCHANGE, "zhangsan", null, msg.getBytes()); System.out.println("发送的消息是:" + msg); } } }Consumer1public class Consumer01 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); //队列长度 arguments.put("x-max-length", 6); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c3等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer03接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(NORMAL_QUEUE, true, deliverCallback, consumerTag -> {}); } }注:添加 arguments.put("x-max-length", 6) 参数Consumer2 处理死信队列的消息public class Consumer02 { //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("消费者c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { System.out.println("Consumer02接收的(死信)消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); }; channel.basicConsume(DEAD_QUEUE, true, deliverCallback, consumerTag -> {}); } }4. 消息被拒绝Producer 同理 队列达到最大长度 的生产者Consumerpublic class Consumer04 { //普通交换机 public static final String NORMAL_EXCHANGE = "normal_exchange"; //死信交换机 private static final String DEAD_EXCHANGE = "dead_exchange"; //普通队列 public static final String NORMAL_QUEUE = "normal_queue"; //死信队列 private static final String DEAD_QUEUE = "dead_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); //声明普通/死信交换机 channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT); channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT); //声明普通队列 //普通队列绑定死信交换机 Map<String, Object> arguments = new HashMap<>(); //死信交换机 arguments.put("x-dead-letter-exchange", DEAD_EXCHANGE); //死信routingKey arguments.put("x-dead-letter-routing-key", "lisi"); channel.queueDeclare(NORMAL_QUEUE, false, false, false, arguments); //死信队列 channel.queueDeclare(DEAD_QUEUE, false, false, false, null); //绑定普通交换机 - 队列 channel.queueBind(NORMAL_QUEUE, NORMAL_EXCHANGE, "zhangsan"); //绑定死信交换机 - 队列 channel.queueBind(DEAD_QUEUE, DEAD_EXCHANGE, "lisi"); System.out.println("消费者c4等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { if ("info5".equals(new String(message.getBody(), StandardCharsets.UTF_8))) { System.out.println("Consumer04拒绝此消息:" + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicReject(message.getEnvelope().getDeliveryTag(), false); } else { System.out.println("Consumer04接收的消息是:" + new String(message.getBody(), StandardCharsets.UTF_8)); channel.basicAck(message.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(NORMAL_QUEUE, false, deliverCallback, consumerTag -> {}); } }启动生产者启动消费者启动处理死信队列消息的消费者(该消费者同理前三个处理死信队列消息的消费者)
2023年06月22日
116 阅读
0 评论
0 点赞
2023-06-20
RabbitMQ - 交换机
1. Exchanges(1)概念RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息,是应该把这些消息放到特定队列还是说把他们放到许多队列中,亦或是应该丢弃它们,这就得由交换机的类型来决定。(2)交换机类型直接(direct)主题(topic)标题(headers)扇出(fanout)(3)无名exchangechannel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());之前的demo中生产者发送消息时未设置交换机,即第一个参数为空,表示使用的是默认交换机(又称无名交换机)2. 临时队列在之前的demo中使用的都是临时队列,一旦断开了消费者的连接,队列将被自动删除创建临时队列的方式:String queue = channel.queueDeclare().getQueue();3. 绑定binding 是 exchange 和 queue 之间的桥梁,即 exchange 和哪个队列进行了绑定4. Fanout(1)介绍将接收到的所有消息广播到它知道的所有队列中,RabbitMQ默认的交换机如下:(2)DemoProducerpublic class EmitLog { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送的消息是:" + msg); } } }Consumerpublic class ReceiveLog01 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //生成临时队列 String queueName = channel.queueDeclare().getQueue(); //绑定临时队列,其中routingKey为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("ReceiveLog01等待接收消息,接收到消息后在控制台打印..."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } } public class ReceiveLog02 { private static final String EXCHANGE_NAME = "logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, "fanout"); //生成临时队列 String queueName = channel.queueDeclare().getQueue(); //绑定临时队列,其中routingKey为空字符串 channel.queueBind(queueName, EXCHANGE_NAME, ""); System.out.println("ReceiveLog02等待接收消息,接收到消息后在控制台打印..."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } }5. Direct(1)介绍Direct Exchange:消息只去到它绑定的 routingKey 队列中Fanout模式下每一个消费者都收到了消息,此处使用 Direct 模式,让消息根据 routing key 去到指定的地方,如下图:X绑定了Q1,Q2两个队列,绑定类型为direct,队列Q1绑定的orange,Q2绑定了两个key:black、green。在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1,绑定键为 black/green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。注:如果绑定的key都相同,则该模式与 Fanout 模式类似,变成了另一种形式的广播(2)DemoProducerpublic class DirectLog { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish(EXCHANGE_NAME, "info", null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("发送的消息是:" + msg); } } }Consumerpublic class ReceiveLogDirect01 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("console", false, false, false, null); channel.queueBind("console", EXCHANGE_NAME, "info"); channel.queueBind("console", EXCHANGE_NAME, "warning"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume("console", true, deliverCallback, consumerTag -> { }); } } public class ReceiveLogDirect02 { private static final String EXCHANGE_NAME = "direct_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT); channel.queueDeclare("disk", false, false, false, null); channel.queueBind("console", EXCHANGE_NAME, "error"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg); }; channel.basicConsume("disk", true, deliverCallback, consumerTag -> { }); } }6. Topics(1)介绍发送到 topic 类型交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 . 点号分隔开。这些单词可以是任意单词,且不能超过 255 个字节。规则列表中常用的替换符:*(星号) 可以代替一个单词#(井号) 可以替代零个或多个单词(2)举例下图绑定关系如下:Q1绑定的是中间带 orange ,单词总数为 3 个的字符串 *.orange.*Q2:绑定的是最后一个单词是 rabbit 且单词总数为 3 个的单词 *.*.rabbit第一个单词是 lazy 的多个单词 lazy.#以下是消息接收情况:quick.orange.rabbit 被队列 Q1Q2 接收到lazy.orange.elephant 被队列 Q1Q2 接收到quick.orange.fox 被队列 Q1 接收到lazy.brown.fox 被队列 Q2 接收到lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃lazy.orange.male.rabbit 是四个单词但匹配 Q2注:当一个队列绑定键是 #,那么这个队列将接收所有数据,类似于 fanout;如果队列绑定键中没有 # 和 * 出现,那么该队列绑定类型就是 direct (固定的routing key)(3)DemoProducerpublic class TopicLog { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); Map<String, String> map = new HashMap<>(); map.put(" quick.orange.rabbit", "被队列Q1Q2接收到"); map.put("lazy.orange.elephant", "被队列Q1Q2接收到"); map.put("quick.orange.fox", "被队列Q1接收到"); map.put(" lazy.brown.fox", "被队列Q2接收到"); map.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次"); map.put("quick. brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃"); map.put(" quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃"); map.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2"); for (Map.Entry<String, String> entry : map.entrySet()) { channel.basicPublish(EXCHANGE_NAME, entry.getKey(), null, entry.getValue().getBytes(StandardCharsets.UTF_8)); System.out.println("发送的消息是:" + entry.getValue()); } } }Consumerpublic class ReceiveLogTopic01 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q1", false, false, false, null); channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey()); }; channel.basicConsume("Q1", true, deliverCallback, consumerTag -> { }); } } public class ReceiveLogTopic02 { private static final String EXCHANGE_NAME = "topic_logs"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC); channel.queueDeclare("Q2", false, false, false, null); channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit"); channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String msg = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey()); }; channel.basicConsume("Q2", true, deliverCallback, consumerTag -> { }); } }
2023年06月20日
73 阅读
0 评论
0 点赞
2023-06-20
RabbitMQ - 发布确认
1. 发布确认原理生产者将信道设置成 confirm 模式,一旦信道进入 confirm 模式,所有在该信道上面发布的 消息都将会被指派一个唯一的 ID(从 1 开始),一旦消息被投递到所有匹配的队列之后,broker 就会发送一个确认给生产者(包含消息的唯一 ID),这就使得生产者知道消息已经正确到达目的队列了,如果消息和队列是可持久化的,那么确认消息会在将消息写入磁盘之后发出,broker 回传给生产者的确认消息中 delivery-tag 域包含了确认消息的序列号,此外 broker 也可以设置 basic.ack 的 multiple 域,表示到这个序列号之前的所有消息都已经得到了处理。注:confirm 模式是异步的2. 发布确认策略(1)开启发布确认发布确认默认未开启,手动开启//开启发布确认 channel.confirmSelect();(2)单个确认发布是一种同步确认发布的方式,即发布一个消息之后只有它被确认发布,后续的消息才能继续发布,waitForConfirmsOrDie(long) 这个方法只有在消息被确认的时候才返回,如果在指定时间范围内这个消息没有被确认那么它将抛出异常。这种确认方式有一个最大的缺点就是:发布速度特别的慢,因为如果没有确认发布的消息就会阻塞所有后续消息的发布,这种方式最多提供每秒不超过数百条发布消息的吞吐量,但对于某些应用程序来说这可能已经足够了。public static void publishMessageIndividually() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //单个确认 boolean flag = channel.waitForConfirms(); if (flag) { System.out.println("第" + i + "条消息发送成功"); } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(单个确认模式)耗时:" + (endTime - startTime)); }(3)批量发布确认先发布一批消息然后一起确认可以极大地提高吞吐量,当然这种方式的缺点就是:当发生故障导致发布出现问题时,不知道是哪个消息出现了问题,我们必须将整个批处理保存在内存中,以记录重要的信息而后重新发布消息,该方案也是同步的,也一样阻塞消息的发布。public static void publishMessageBatch() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); int batchSize = 100; //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //批量确认 if (i % batchSize == 0) { boolean flag = channel.waitForConfirms(); if (flag) { System.out.println(i + "条消息发送成功"); } } } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(批量确认模式)耗时:" + (endTime - startTime)); }(4)异步确认发布利用回调函数来达到消息可靠性的传递public static void publishMessageAsync() throws IOException, TimeoutException, InterruptedException { Channel channel = RabbitMqUtils.getChannel(); String queueName = UUID.randomUUID().toString(); channel.queueDeclare(queueName, true, false, false, null); //开启发布确认 channel.confirmSelect(); //使用线程安全的哈希表记录消息 ConcurrentSkipListMap<Long, Object> outstandingConfirms = new ConcurrentSkipListMap<>(); //确认成功 回调 ConfirmCallback ackCallback = (deliveryTag, multiple) -> { //2.删除已确认的消息,即剩下的就是确认失败的消息 if (multiple) { outstandingConfirms.headMap(deliveryTag).clear(); } else { outstandingConfirms.remove(deliveryTag); } System.out.println("确认的消息:" + deliveryTag); }; //确认失败 回调 ConfirmCallback nackCallback = (deliveryTag, multiple) -> { //3.打印未确认的消息 Object msg = outstandingConfirms.get(deliveryTag); System.out.println("未确认的消息tag:" + deliveryTag + ",未确认的消息内容:" + msg.toString()); }; //消息监听器 channel.addConfirmListener(ackCallback, nackCallback); //开始时间 long startTime = System.currentTimeMillis(); for (int i = 0; i < MESSAGE_COUNT; i++) { String msg = i + ""; channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes()); //1.记录已发送的消息 outstandingConfirms.put(channel.getNextPublishSeqNo(), msg); } //开始时间 long endTime = System.currentTimeMillis(); System.out.println("发布" + MESSAGE_COUNT + "条数据(异步确认模式)耗时:" + (endTime - startTime)); }(5)处理异步未确认消息把未确认的消息放到一个基于内存的且能被发布线程访问的队列,如:用 ConcurrentLinkedQueue 队列在 confirm callbacks 与发布线程之间进行消息的传递。(6)三种方式对比单独发布消息:同步等待确认,简单,但吞吐量非常有限批量发布消息:批量同步等待确认,简单,合理的吞吐量,但出现问题后很难判断是那条消息出现了问题异步处理: 最佳性能和资源使用,在出现错误的情况下可以很好地控制,但是实现起来较为复杂
2023年06月20日
86 阅读
0 评论
0 点赞
2023-06-19
RabbitMQ - WorkQueue
工作队列(又称任务队列):其主要思想是避免立即执行资源密集型任务,而不得不等待它完成。消息队列将任务封装为消息并将其发送到队列。在后台运行的工作进程将弹出任务并最终执行作业。当有多个工作线程时,这些工作线程将一起处理这些任务。1. 轮询分发(1)抽取工具类public class RabbitMqUtils { /** * 获取Channel * * @return Channel * @throws IOException * @throws TimeoutException */ public static Channel getChannel() throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); return connection.createChannel(); } }(2)工作线程public class Worker01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("收到消息:" + new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); //接收消息 System.out.println("c1等待接收消息..."); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } } public class Worker02 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println("收到消息:" + new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println(consumerTag + "消费者取消消费接口回调逻辑"); //接收消息 System.out.println("c2等待接收消息..."); channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }(3)Task线程public class Task01 { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("发送了消息:" + msg); } } }(4)结果生产者一共发送了4条消息,消费者1和消费者2按次序分别接收了两条消息2. 消息应答(1)概念RabbitMQ 一旦向消费者传递了一条消息,便立即将该消息标记为删除。在这种情况下,突然有个消费者挂掉了,我们将丢失正在处理的消息,以及后续发送给该消费者的消息也将丢失,因为它无法接收。为了保证消息在发送过程中不丢失,rabbitmq 引入了消息应答机制,即:消费者在接收到消息并且处理该消息之后,告诉 rabbitmq 它已经处理了,rabbitmq 可以把该消息删除了。(2)自动应答消息发送后立即被认为已经传送成功,这种模式需要在高吞吐量和数据传输安全性方面做权 衡,因为在这种模式下,如果消息在接收到之前,消费者那边出现连接的问题或者 channel 关闭,那么消息就丢失了,当然另一方面这种模式消费者那边可以传递过载的消息,没有对传递的消息数量进行限制,这样有可能使得消费者这边由于接收太多还来不及处理的消息,导致这些消息的积压,最终使得内存耗尽,最终这些消费者线程被操作系统杀死,所以这种模式仅适用在消费者可以高效并以某种速率处理这些消息的情况下使用。(3)消息应答的方式Channel.basicAck(用于肯定确认) RabbitMQ 已知道该消息并且成功地处理了消息,可以将其丢弃了Channel.basicNack(用于否定确认)Channel.basicReject(用于否定确认) 与 Channel.basicNack 相比少一个参数,不处理该消息直接拒绝,表示可以将其丢弃了(4)Mutiplechannel.basicAck(deliveryTag, true)true 代表批量应答 channel 上未应答的消息,比如说 channel 上有传送 tag 为5,6,7,8的消息,当前的 tag 是 8,那么此时 5 - 8 的这些还未应答的消息都会被确认收到消息应答false 同上面相比 只会应答 tag = 8 的消息,5,6,7 这三个消息依然不会被确认收到消息应答(5)消息自动重新入队如果消费者由于某些原因失去连接(其通道已关闭,连接已关闭或 TCP 连接丢失),导致消息未发送 ACK 确认,RabbitMQ 将了解到消息未完全处理,并将对其进行重新排队。如果此时其他消费者(状态良好)可以处理任务,MQ将会很快将其重新分发给另一个消费者,这样,即使某个消费者遇到突发状况,也可以确保不会丢失任何消息。(6)Demo消息应答默认值为 true,即自动应答,此处以手动应答为例SleepUtilpublic class SleepUtils { public static void sleep(int seconds) { try { Thread.sleep(1000L * seconds); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } } }Producerpublic class Task02 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); Scanner scanner = new Scanner(System.in); while (scanner.hasNext()) { String msg = scanner.next(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); System.out.println("生产者发送消息:" + msg); } } }Consumerpublic class Worker03 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("c1等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(1); System.out.println("接收到的消息:" + new String(message.getBody())); //手动应答 /** * 参数说明: * 1.消息的标记 * 2.是否批量应答 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> System.out.println("消费者取消消费接口回调逻辑")); } } public class Worker04 { private static final String QUEUE_NAME = "ack_queue"; public static void main(String[] args) throws IOException, TimeoutException { Channel channel = RabbitMqUtils.getChannel(); System.out.println("c2等待接收消息..."); DeliverCallback deliverCallback = (consumerTag, message) -> { SleepUtils.sleep(30); System.out.println("接收到的消息:" + new String(message.getBody())); //手动应答 /** * 参数说明: * 1.消息的标记 * 2.是否批量应答 */ channel.basicAck(message.getEnvelope().getDeliveryTag(), false); }; boolean autoAck = false; channel.basicConsume(QUEUE_NAME, autoAck, deliverCallback, (consumerTag) -> System.out.println("消费者取消消费接口回调逻辑")); } }在发送者发送消息 xxx 后把 C2 消费者停掉,按理说该 C2 来处理的消息,但是由于它处理时间较长,在还没有执行 ack 代码的时候,C2 被停掉了,此时会看到消息被 C1 接收到了,说明消息 xxx 被重新入队,然后分配给能处理消息的 C1 消费者3. 消息持久化(1)概念默认情况下 RabbitMQ 退出或由于某种原因崩溃时,它会忽视队列和消息,除非告知它不要这样做,确保消息不会丢失需要做两件事:我们需要将队列和消息都标记为持久化(2)队列持久化之前创建的队列时都是非持久化的,rabbitmq 在重启之后该队列就会被删除掉,如果要队列实现持久化需要在声明队列的时候把 durable 参数设置为(true)持久化boolean durable = true; channel.queueDeclare(QUEUE_NAME, durable, false, false, null);注:如果之前声明的队列(相同的队列名)不是持久化的,需要把原先的队列先删除,或者重新创建一个不同名的持久化队列,不然会报错 received 'true' but current is 'false'在控制面板中 持久化 与 非持久化队列 的区别(3)消息持久化消息实现持久化需要修改生产者的代码,添加 MessageProperties.PERSISTENT_TEXT_PLAIN 属性# 未持久化 channel.basicPublish("", null, msg.getBytes()); # 持久化 channel.basicPublish("", QUEUE_NAME, MessageProperties.PERSISTENT_TEXT_PLAIN, msg.getBytes());将消息标记为持久化并不能完全保证不会丢失消息。尽管它告诉 RabbitMQ 将消息保存到磁盘,但是 这里依然存在当消息刚准备存储在磁盘的时候 但是还没有存储完,消息还在缓存的一个间隔点,此时并没有真正写入磁盘。(4)不公平分发轮询分发在一些场景下并不是很好,比方说有两个消费者在处理任务,其中消费者1处理任务的速度非常快,而另外一个消费者2 处理速度却很慢,这个时候如果还是采用轮询分发的话就会让处理速度快的消费者在很大一部分时间处于空闲状态,而处理慢的那个消费者一直在干活,这种分配方式在这种情况下并不太好,但此时 RabbitMQ 并不知道这种情况,它依然很公平的在进行消息分发,为了避免这种情况,可以设置参数 channel.basicQos(1)int prefetchCount = 2; channel.basicQos(prefetchCount);(5)预取值消息的发送是异步的,所以在任何时候,channel 上肯定不止只有一个消息,且消费者的手动确认本质上也是异步的,因此这里就存在一个未确认的消息缓冲区,所以要求开发人员能限制此缓冲区的大小,以避免缓冲区里面无限制的未确认消息问题。可以通过channel.basicQos() 方法设置”预取值“来完成,该值定义通道上允许的未确认消息的最大数量。一旦数量达到配置的数量,RabbitMQ 将停止在通道上传递更多消息,除非至少有一个未处理的消息被确认,例:假设在通道上有未确认的消息 5,6,7,8,并且通道的预取计数为 4,此时 RabbitMQ 将不会在该通道上再传递任何消息,除非至少有一个未应答的消息被 ack。消息应答和 QoS 预取值对吞吐量有很大的影响。通常,增加预取将提高向消费者传递消息的速度。虽然自动应答传输消息速率是最佳的,但在这种情况下已传递但尚未处理的消息的数量也会增加,从而增加了消费者的 RAM 消耗,应该小心使用具有无限预处理的自动确认模式或手动确认模式,消费者消费了大量的消息如果没有确认的话,会导致消费者连接的节点的内存消耗变大,所以设置合适的预取值是一个反复试验的过程,不同的负载该值取值也不同,默认情况话建议100 到 300,这个范围内的值通常可提供最佳的吞吐量,并且不会给消费者带来太大的风险,预取值为 1 是最保守的,但这将使吞吐量变得很低,特别是消费者连接延迟很大的情况下//不公平分发 /** * prefetchCount 等于 0 公平分发(默认) * prefetchCount 等于 1 公平分发 * prefetchCount 大于1时 预取值 */ int prefetchCount = 2; channel.basicQos(prefetchCount);
2023年06月19日
47 阅读
0 评论
0 点赞
2023-06-19
RabbitMQ - 介绍、四大核心概念
参考于b站尚硅谷RabbitMQ课程一、介绍RabbitMQ 是一个消息中间件:它接受并转发消息,可以把它当做一个快递站点,当你要发送一个包裹时,你把包裹放到快递站,快递员最终会把你的快递送到收件人那里,此时 RabbitMQ 可以比作是一个快递站,由快递员帮你传递快件(RabbitMQ 与快递站的主要区别在于它不处理快件而仅作接收,存储和转发消息数据)二、四大核心概念生产者产生数据发送消息交换机交换机是 RabbitMQ 非常重要的一个部件,一方面它接收来自生产者的消息,另一方面它将消息 推送到队列中。交换机必须确切知道如何处理它接收到的消息,是将这些消息推送到特定队列还是推 送到多个队列,亦者是将消息丢弃,这个得由交换机类型决定 队列队列是 RabbitMQ 内部使用的一种数据结构,尽管消息流经 RabbitMQ 和应用程序,但它们只能存储在队列中,队列仅受主机的内存和磁盘限制的约束,本质上是一个大的消息缓冲区。多个生产者可以将消息发送到一个队列,同理多个消费者也可以尝试从一个队列接收数据消费者消费与接收具有相似的含义,消费者大多时候是一个等待接收消息的程序。注:生产者、消费者和消息中间件很多时候并不在同一台机器上,同一个应用程序既可以是生产者又可以是消费者名词介绍Broker:接收和分发消息的应用,RabbitMQ Server 就是 Message BrokerVirtual host:出于多租户和安全因素设计,把 AMQP 的基本组件划分到一个虚拟的分组中,类似于网络中的 namespace 概念。当多个不同的用户使用同一个 RabbitMQ server 提供的服务时,可以划分出 多个 vhost,每个用户在自己的 vhost 创建 exchange/queue 等 Connection:publisher/consumer 和 broker 之间的 TCP 连接Channel:如果每一次访问 RabbitMQ 都建立一个 Connection,在消息量大的时候建立 TCP Connection 的开销将是巨大的,效率也较低。Channel 是在 connection 内部建立的逻辑连接,如果应用程序支持多线程,通常每个 thread 创建单独的 channel 进行通讯,AMQP method 包含了 channel id 帮助客户端和 message broker 识别 channel,所以 channel 之间是完全隔离的。Channel 作为轻量级的 Connection 极大减少了操作系统建立 TCP connection 的开销Exchange:message 到达 broker 的第一站,根据分发规则,匹配查询表中的 routing key,分发消息到 queue 中去。常用的类型有:direct (point-to-point), topic (publish-subscribe), fanout (multicast)Queue:消息最终被送到这里等待 consumer 取走Binding:exchange 和 queue 之间的虚拟连接,binding 中可以包含 routing key,Binding 信息被保 存到 exchange 中的查询表中,用于 message 的分发依据安装可参考官网文档:https://www.rabbitmq.com/download.htmlDemopom.xml<?xml version="1.0" encoding="UTF-8"?> <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <modelVersion>4.0.0</modelVersion> <groupId>com.sw</groupId> <artifactId>rabbitmq</artifactId> <version>1.0-SNAPSHOT</version> <properties> <maven.compiler.source>8</maven.compiler.source> <maven.compiler.target>8</maven.compiler.target> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> </properties> <dependencies> <!-- rabbitmq client --> <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <!-- commons io --> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency> </dependencies> <build> <plugins> <plugin> <groupId>org.apache.maven.plugins</groupId> <artifactId>maven-compiler-plugin</artifactId> <configuration> <source>8</source> <target>8</target> </configuration> </plugin> </plugins> </build> </project>Producerpublic class Producer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); //创建连接 Connection connection = factory.newConnection(); //获取信道 Channel channel = connection.createChannel(); /** * 创建队列 * 参数说明: * 1.队列名称 * 2.是否持久化存储消息 * 3.是否进行消息共享(只供一个消费者进行消费) * 4.是否自动删除(最后一个消费者断开连接后,是否自动删除该队列) * 5.其他参数 */ /** * 2023-06-15偷懒 优先级队列 */ Map<String, Object> arguments = new HashMap<>(); //优先级范围 0-10(设置过大会存在性能问题) arguments.put("x-max-priority", 10); channel.queueDeclare(QUEUE_NAME, true, false, false, arguments); for (int i = 1; i < 10; i++) { String msg = "hello world" + i; channel.basicPublish("", QUEUE_NAME, null, msg.getBytes()); } /** * 发送消息 * 参数说明: * 1.交换机 * 2.路由key * 3.其他参数 * 4.消息体 */ System.out.println("消息发送完毕"); } }Consumerpublic class Consumer { private static final String QUEUE_NAME = "hello"; public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("192.168.123.88"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("123456"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); DeliverCallback deliverCallback = (consumerTag, message) -> System.out.println(new String(message.getBody())); CancelCallback cancelCallback = (consumerTag) -> System.out.println("消息消费由于某些原因中断"); /** * 接收消息 * 1.队列名称 * 2.消费成功后是否自动应答 * 3.未成功消费的回调 * 4.取消消费的回调 */ channel.basicConsume(QUEUE_NAME, true, deliverCallback, cancelCallback); } }
2023年06月19日
57 阅读
0 评论
0 点赞
2023-05-27
工作流 - 网关
1. 排他网关 ExclusiveGateway(1)概念排他网关,用来在流程中实现决策。 当流程执行到这个网关,所有分支都会判断条件是否为true,如果 为true则执行该分支,排他网关只会选择一个为true的分支执行,如果有两个分支条件都为true,排他网关会选择id值较小的一条分支去执行当从网关出去的线所有条件都不满足时则会抛出异常org.activiti.engine.ActivitiException: No outgoing sequence flow of the exclusive gateway 'exclusivegateway' could be selected for continuing the process at org.activiti.engine.impl.bpmn.behavior.ExclusiveGatewayActivityBehavior.leave(ExclusiveGatewayActivityBehavior.java:85)(2)测试public class ExclusiveTest { @Test public void deploy() { //1.获取ProcessEngine对象 ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); //2.获取RepositoryService RepositoryService service = engine.getRepositoryService(); //3.部署 Deployment deploy = service.createDeployment() .addClasspathResource("bpmn/evection-exclusive.bpmn") .addClasspathResource("bpmn/evection-exclusive.png") .name("出差申请流程-排他网关") .deploy(); System.out.println("id: " + deploy.getId()); System.out.println("name: " + deploy.getName()); //service.deleteDeployment("57501", true); } @Test public void startProcess() { ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); RuntimeService runtimeService = engine.getRuntimeService(); Evection evection = new Evection(); evection.setNum(4d); Map<String, Object> map = new HashMap<>(); map.put("evection", evection); ProcessInstance instance = runtimeService.startProcessInstanceByKey("evection-exclusive", map); System.out.println("流程定义id:" + instance.getProcessDefinitionId()); } @Test public void completeTask() { String userId = "lisi"; ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); Task task = taskService.createTaskQuery().processDefinitionKey("evection-exclusive").taskAssignee(userId).singleResult(); if (task != null) { Evection evection = new Evection(); evection.setNum(2d); Map<String, Object> map = new HashMap<>(); map.put("evection", evection); taskService.complete(task.getId(), map); System.out.println("任务:" + task.getId() + "完成"); } } }2. 并行网关ParallelGateway(1)概念并行网关允许将流程分成多条分支,也可以把多条分支汇聚到一起,并行网关的功能是基于进入和外出顺序流的:fork分支: 并行后的所有外出顺序流,为每个顺序流都创建一个并发分支join汇聚: 所有到达并行网关,在此等待的进入分支,直到所有进入顺序流的分支都到达以后,流程就会通过汇聚网关注意,如果同一个并行网关有多个进入和多个外出顺序流,它就同时具有分支和汇聚功能,这时,网关会先汇聚所有进入的顺序流,然后再切分成多个并行分支,并行网关与其他网关的主要区别是,并行网关不会解析条件,即使顺序流中定义了条件,在流程流转时也会被忽略(2)测试public class 8ParallelTest { @Test public void deploy() { //1.获取ProcessEngine对象 ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); //2.获取RepositoryService RepositoryService service = engine.getRepositoryService(); //3.部署 Deployment deploy = service.createDeployment() .addClasspathResource("bpmn/evection-parallel.bpmn") //.addClasspathResource("bpmn/evection-exclusive.png") .name("出差申请流程-并行网关") .deploy(); System.out.println("id: " + deploy.getId()); System.out.println("name: " + deploy.getName()); //service.deleteDeployment("57501", true); } @Test public void startProcess() { ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); RuntimeService runtimeService = engine.getRuntimeService(); Evection evection = new Evection(); evection.setNum(4d); Map<String, Object> map = new HashMap<>(); map.put("evection", evection); ProcessInstance instance = runtimeService.startProcessInstanceByKey("evection-parallel", map); System.out.println("流程定义id:" + instance.getProcessDefinitionId()); } @Test public void completeTask() { String userId = "lisi"; ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); Task task = taskService.createTaskQuery().processDefinitionKey("evection-parallel").taskAssignee(userId).singleResult(); if (task != null) { taskService.complete(task.getId()); System.out.println("任务:" + task.getId() + "完成"); } } }流程流转至网关处,可查看 act_ru_task 表对应的数据,有两个正在执行的任务查看 act_ru_execution 表,有多个分支正在运行并行任务执行不分先后,由任务的负责人去执行即可,处理完一个任务则 act_ru_task 减少一条记录(负责人为技术经理对应的任务)有并行网关的汇聚结点,则说明有一个分支已经到汇聚,等待其它的分支到达当所有分支任务都完成,都到达汇聚结点后,查看 act_ru_task 表,执行流程实例已经变为总经理审批(出差天数大于等于3),说明流程执行已经通过并行网关(即所有分支到达汇聚结点,并行网关执行完成)3. 包含网关InclusiveGateway(1)概念包含网关可以看做是排他网关和并行网关的结合体,和排他网关一样,你可以在外出顺序流上定义条件,包含网关会解析它们,但其主要的区别是包含网关可以选择多于的一条顺序流,这和并行网关一样,包含网关的功能是基于进入和外出顺序流的:分支: 所有外出顺序流的条件都会被解析,结果为true的顺序流会以并行方式继续执行,会为每个顺序流创建 一个分支汇聚: 所有并行分支到达包含网关会进入等待状态,直到每个进入顺序流的分支都到达,这是与并行网关最大的不同(包含网关只会等待被选中且执行了的进入顺序流,在汇聚之后, 流程会穿过包含网关继续执行)(2)测试public class InclusiveTest { @Test public void deploy() { //1.获取ProcessEngine对象 ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); //2.获取RepositoryService RepositoryService service = engine.getRepositoryService(); //3.部署 Deployment deploy = service.createDeployment() .addClasspathResource("bpmn/evection-inclusive.bpmn") //.addClasspathResource("bpmn/evection-inclusive.png") .name("出差申请流程-包含网关") .deploy(); System.out.println("id: " + deploy.getId()); System.out.println("name: " + deploy.getName()); } @Test public void startProcess() { ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); RuntimeService runtimeService = engine.getRuntimeService(); Evection evection = new Evection(); evection.setNum(4d); Map<String, Object> map = new HashMap<>(); map.put("evection", evection); ProcessInstance instance = runtimeService.startProcessInstanceByKey("evection-inclusive", map); System.out.println("流程定义id:" + instance.getProcessDefinitionId()); } @Test public void completeTask() { String userId = "zhangsan"; ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); Task task = taskService.createTaskQuery().processDefinitionKey("evection-inclusive").taskAssignee(userId).singleResult(); if (task != null) { taskService.complete(task.getId()); System.out.println("任务:" + task.getId() + "完成"); } } }执行流程a. 当流程执行到第一个包含网关后,会根据条件判断,当前要走哪几个分支act_ru_task 表信息技术经理审批、人事经理审批都是当前并行执行的任务act_ru_execution 表信息第一条记录代表包含网关分支,第二、三条记录代表要执行的分支如果有一个分支执行先走到汇聚结点的分支,要在此等待其它执行分支b. 先执行技术经理审批,然后查看当前任务表 act_ru_task,还剩人事经理的任务查看 act_ru_execution 表信息人事经理的分支还在,技术经理的分支已走到汇聚节点c. 人事经理进行任务处理,然后查看当前任务表 act_ru_task,任务来到了总经理这边查看 act_ru_execution 表信息包含网关执行完成,分支和汇聚就从 act_ru_execution 删除注:如果包含网关中设置的条件在流程变量中不存在,则会抛出异常分支需要判断条件,只有符合条件的分支才会执行,符合条件的分支最终才进行汇聚4. 事件网关EventGateway事件网关允许根据事件判断流向,网关的每个外出顺序流都要连接到一个中间捕获事件,当流程到达一个基于事件网关时,网关会进入等待状态(即暂停执行),与此同时,会为每个外出顺序流创建相对应的事件订阅,事件网关的外出顺序流和普通顺序流不同,这些顺序流不会真的"执行",相反它们让流程引擎去决定执行到事件网关的流程需要订阅哪些事件,同时还要考虑以下条件:事件网关必须有两条或以上外出顺序流事件网关后,只能使用 intermediateCatchEvent 类型(activiti不支持基于事件网关后连接 ReceiveTask)连接到事件网关的中间捕获事件必须只有一个入口顺序流intermediateCatchEvent 类型:Message Event 消息事件Singal Event 信号事件Timer Event 定时事件
2023年05月27日
183 阅读
0 评论
0 点赞
2023-05-26
工作流 - 组任务
1. 组任务处理流程(1)查询组任务指定候选人,查询该候选人当前的待办任务,候选人不能立即办理任务(2)拾取(claim)任务该组任务的所有候选人都能拾取将候选人的组任务,变成个人任务,原来候选人就变成了该任务的负责人如果拾取后不想办理该任务时怎么办?需要将已经拾取的个人任务归还到组里边,将个人任务变成了组任务(即设置assignee为null)(3)查询个人任务查询方式同理个人任务部分,根据assignee查询用户负责的个人任务(4)办理个人任务同理任务处理2. 查询组任务根据候选人查询组任务@Test public void queryGroupTask() { String key = "evection1"; String candidateUser = "zhangsan"; ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); List<Task> list = taskService.createTaskQuery() .processDefinitionKey(key) .taskCandidateUser(candidateUser) //.taskCandidateOrAssigned(candidateUser) .list(); for (Task task : list) { System.out.println("流程实例id:" + task.getProcessInstanceId()); System.out.println("任务id:" + task.getId()); System.out.println("任务名称:" + task.getName()); System.out.println("负责人:" + task.getAssignee()); } }3. 拾取组任务候选人员拾取组任务后该任务变为自己(即指定候选人)的个人任务@Test public void claimTask() { String taskId = "45002"; String userId = "zhangsan"; ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); Task task = taskService.createTaskQuery().taskId(taskId).taskCandidateUser(userId).singleResult(); if (task != null) { taskService.claim(taskId, userId); System.out.println("任务拾取成功"); } }4. 查询个人待办任务查询方式同理个人任务查询@Test public void queryGroupTask() { String key = "evection1"; String candidateUser = "zhangsan"; ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); List<Task> list = taskService.createTaskQuery() .processDefinitionKey(key) //.taskCandidateUser(candidateUser) //.taskCandidateOrAssigned(candidateUser) .taskAssignee(candidateUser) .list(); for (Task task : list) { System.out.println("流程实例id:" + task.getProcessInstanceId()); System.out.println("任务id:" + task.getId()); System.out.println("任务名称:" + task.getName()); System.out.println("负责人:" + task.getAssignee()); } }5. 办理个人任务@Test public void completeTask() { ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); taskService.complete("42505"); System.out.println("任务完成"); }6. 归还组任务设置 assignee 为 null 即可@Test public void taskBack() { String taskId = "45002"; String userId = "zhangsan"; ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); Task task = taskService.createTaskQuery().taskId(taskId).taskAssignee(userId).singleResult(); if (task != null) { //设置userId为null,则归还组任务 taskService.setAssignee(taskId, null); System.out.println("归还任务成功"); } }7. 任务交接将任务负责人设置为其他用户@Test public void setOtherUserId() { String taskId = "45002"; String userId = "zhangsan"; ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); Task task = taskService.createTaskQuery().taskId(taskId).taskAssignee(userId).singleResult(); if (task != null) { //设置新的任务负责人 taskService.setAssignee(taskId, "sunxiaocuan"); System.out.println("任务交接成功"); } }8. 数据库表操作查询当前任务执行表SELECT * FROM act_ru_task任务执行表记录当前执行的任务信息,由于该任务当前是组任务,所有assignee为空,当拾取任务后该字段就是拾取用户的 id查询任务参与者SELECT * FROM act_ru_identitylink该表记录当前任务用户或组,当前任务如果设置了候选人,会向该表插入候选人记录,有 几个候选人就插入几个与 act_ru_identitylink 对应的还有一张历史表 act_hi_identitylink,向 act_ru_identitylink 插入记录的同时也会向历史表插入对应的记录
2023年05月26日
48 阅读
0 评论
0 点赞
2023-05-26
工作流 - 流程变量
1. 概念流程变量在 activiti 中是一个非常重要的角色,流程运转有时需要靠流程变量,业务系统和 activiti 结合时少不了流程变量,流程变量就是 activiti 在管理工作流时根据管理需要而设置的变量。 如:在出差申请流程流转时如果出差天数大于 3 天则由总经理审核,否则由人事直接审核, 出差天数就可以设置为流程变量,在流程流转时使用。 注:虽然流程变量中可以存储业务数据,可以通过activiti的api查询流程变量从而实现查询业务数据, 但是不建议这样使用,因为业务数据查询由业务系统负责,activiti设置流程变量是为了流程执行需要而创建的。2. 流程变量类型如果将 pojo 存储到流程变量中,必须实现序列化接口 serializable,为了防止由于新增字段无法反序列化,需要生成 serialVersionUID3. 流程变量作用域流程变量的作用域可以是一个流程实例(processInstance),或一个任务(task),或一个执行实例 (execution)global变量流程变量的默认作用域是流程实例,当一个流程变量的作用域为流程实例时,可以称为 global 变量注:Global变量:userId(变量名)、xxx(变量值) global 变量中变量名不允许重复,设置相同名称的变量,后设置的值会覆盖前设置的变量值local变量任务和执行实例仅仅是针对一个任务和一个执行实例范围,范围没有流程实例大, 称为 local 变量。Local 变量由于在不同的任务或不同的执行实例中,作用域互不影响,变量名可以相同没有影响。Local 变量名也可以和 global 变量名相同,也是没有影响的4. 流程表变量的使用方法(1)在属性上使用UEL表达式可以在 assignee 处设置 UEL 表达式,表达式的值为任务的负责人,比如: ${assignee}, assignee 就 是一个流程变量名称, Activiti获取UEL表达式的值,即流程变量assignee的值 ,将assignee的值作为任务的负责人进行任务分配(2)在连线上使用UEL表达式可以在连线上设置UEL表达式,决定流程走向。 比如:${price<10000} ,price就是一个流程变量名称,uel表达式结果类型为布尔类型, 如果UEL表达式是true,则决定流程执行走向(即流程走这条连线)5. 使用流程变量例:员工创建出差申请单,由部门经理审核,部门经理申请通过后3天以下由财务直接申批,3天以上先由总经理审批,总经理审批通过后再由财务审批(1)流程定义先通过UEL-value来设置负责人然后在分支线上来设置条件注:此处还可以通过对象参数命名,比如 evection.num另一根线对应的设置设置完成之后即可部署流程(2)使用global变量创建pojo对象public class Evection implements Serializable { /** * id */ private long id; /** * 名称 */ private String evectionName; /** * 出差天数 */ private double num; /** * 开始日期 */ private Date startDate; /** * 结束日期 */ private Date endDate; /** * 目的地 */ private String destination; /** * 事由 */ private String reason; public Evection() { } public Evection(long id, String evectionName, double num, Date startDate, Date endDate, String destination, String reason) { this.id = id; this.evectionName = evectionName; this.num = num; this.startDate = startDate; this.endDate = endDate; this.destination = destination; this.reason = reason; } public long getId() { return id; } public void setId(long id) { this.id = id; } public String getEvectionName() { return evectionName; } public void setEvectionName(String evectionName) { this.evectionName = evectionName; } public double getNum() { return num; } public void setNum(double num) { this.num = num; } public Date getStartDate() { return startDate; } public void setStartDate(Date startDate) { this.startDate = startDate; } public Date getEndDate() { return endDate; } public void setEndDate(Date endDate) { this.endDate = endDate; } public String getDestination() { return destination; } public void setDestination(String destination) { this.destination = destination; } public String getReason() { return reason; } public void setReason(String reason) { this.reason = reason; } @Override public String toString() { return "Evection{" + "id=" + id + ", evectionName='" + evectionName + '\'' + ", num=" + num + ", startDate=" + startDate + ", endDate=" + endDate + ", destination='" + destination + '\'' + ", reason='" + reason + '\'' + '}'; } }部署流程@Test public void deploy() { //1.获取ProcessEngine对象 ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); //2.获取RepositoryService RepositoryService service = engine.getRepositoryService(); //3.部署 Deployment deploy = service.createDeployment() .addClasspathResource("bpmn/evection-variable.bpmn") .addClasspathResource("bpmn/evection-variable.png") .name("出差申请流程-Variable") .deploy(); System.out.println("id: " + deploy.getId()); System.out.println("name: " + deploy.getName()); }设置流程变量a. 启动流程时设置,变量的作用域是整个流程实例通过startProcessInstanceByKey方法设置流程变量的作用域是一个流程实例,流程变量使用Map存储,同一个流程实例map中的key相同时,后者会覆盖前者@Test public void startAndSet() { ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); RuntimeService runtimeService = engine.getRuntimeService(); String key = "evection-variable"; Map<String, Object> variables = new HashMap<>(); Evection evection = new Evection(); evection.setNum(2d); variables.put("evection", evection); variables.put("assignee0", "zhangsan"); variables.put("assignee1", "lisi"); variables.put("assignee2", "wangwu"); variables.put("assignee3", "xiaoming"); ProcessInstance instance = runtimeService.startProcessInstanceByKey(key, variables); System.out.println("流程实例名称:" + instance.getName()); System.out.println("流程定义id:" + instance.getProcessDefinitionId()); }处理任务@Test public void completeTask() { String key = "evection-variable"; String assignee = "lisi"; ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = engine.getTaskService(); Task task = taskService.createTaskQuery().processDefinitionKey(key).taskAssignee(assignee).singleResult(); if (task != null) { taskService.complete(task.getId()); System.out.println("任务:" + task.getId() + "," + task.getName() + ",完成"); } }b. 处理任务时设置在完成任务时设置流程变量,该流程变量只有在该任务完成后其它结点才可使用该变量,它的作用域是整个流程实例,如果设置的流程变量的key在流程实例中已存在相同的名字则后设置的变量会替换前边设置的变量@Test public void startProcess() { ProcessEngine engine = ProcessEngines.getDefaultProcessEngine(); RuntimeService runtimeService = engine.getRuntimeService(); String key = "evection-variable"; Map<String, Object> variables = new HashMap<>(); variables.put("assignee0", "zhangsan"); variables.put("assignee1", "lisi"); variables.put("assignee2", "wangwu"); variables.put("assignee3", "xiaoming"); ProcessInstance instance = runtimeService.startProcessInstanceByKey(key, variables); System.out.println("流程实例名称:" + instance.getName()); System.out.println("流程定义id:" + instance.getProcessDefinitionId()); }处理时设置@Test public void doingSet() { String taskId = "1404"; ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = processEngine.getTaskService(); Map<String, Object> variables = new HashMap<>(); Evection evection = new Evection(); evection.setNum(3d); variables.put("evection", evection); taskService.setVariablesLocal(taskId, variables); taskService.complete(taskId); }注: 通过当前任务设置流程变量,需要指定当前任务id,如果当前执行的任务id不存在则抛出异常(任务办理时也是通过map设置流程变量,一次可以设置多个变量)c. 当前流程实例设置通过流程实例id设置全局变量,该流程实例必须未执行完成(即该流程实例还没有走完/结束)@Test public void setLocalVariableByTaskId(){ String taskId="1404"; ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = processEngine.getTaskService(); Evection evection = new Evection (); evection.setNum(3d); taskService.setVariableLocal(taskId, "evection", evection); // 可以通过map一次设置多个值 //taskService.setVariablesLocal(taskId, variables) }注:任务id必须是当前待办的任务id,act_ru_task中存在,如果该任务已结束,会抛出异常,同时也可以通过 taskService.getVariable() 方法获取流程变量(3)设置local变量a. 任务办理时设置任务办理时设置local流程变量,当前运行的流程实例只能在该任务结束前使用,任务结束该变量无法在 当前流程实例使用,可以通过查询历史任务查询@Test public void completTask() { //任务id String taskId = "1404"; //获取processEngine ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = processEngine.getTaskService(); //定义流程变量 Map<String, Object> variables = new HashMap<String, Object>(); Evection evection = new Evection (); evection.setNum(3d); //定义流程变量 Map<String, Object> variables = new HashMap<>(); //变量名是evection,变量值是evection对象 variables.put("evection", evection); //设置local变量,作用域为该任务 taskService.setVariablesLocal(taskId, variables); //完成任务 taskService.complete(taskId); }注:作用域为当前任务,每个任务可以设置同名的变量,互不影响b. 通过当前任务设置@Test public void setLocalVariableByTaskId(){ String taskId="1404"; ProcessEngine processEngine = ProcessEngines.getDefaultProcessEngine(); TaskService taskService = processEngine.getTaskService(); Evection evection = new Evection (); evection.setNum(3d); taskService.setVariableLocal(taskId, "evection", evection); // 可以通过map一次设置多个值 //taskService.setVariablesLocal(taskId, variables) }注:同理全局变量设置时的前提条件,即当前任务不能执行完/结束c. 补充部门经理审核、总经理审核、财务审核时设置local变量,可通过historyService查询每个历史任务时一起将流程变量的值也查询出来@Test public void queryHistoryInfo() { //创建历史任务查询对象 HistoricTaskInstanceQuery historicTaskInstanceQuery = historyService.createHistoricTaskInstanceQuery(); //查询结果包括local变量 historicTaskInstanceQuery.includeTaskLocalVariables(); for (HistoricTaskInstance historicTaskInstance : list) { System.out.println("=============================="); System.out.println("任务id:" + historicTaskInstance.getId()); System.out.println("任务名称:" + historicTaskInstance.getName()); System.out.println("任务负责人:" + historicTaskInstance.getAssignee()); System.out.println("任务local变量:"+ historicTaskInstance.getTaskLocalVariables()); }
2023年05月26日
79 阅读
0 评论
0 点赞
1
...
11
12
13
...
51