首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,085 阅读
2
类的加载
741 阅读
3
Spring Cloud OAuth2.0
726 阅读
4
SpringBoot自动装配原理
691 阅读
5
集合不安全问题
586 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
FastApi
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
389
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
FastApi
页面
统计
关于
搜索到
1
篇与
的结果
2021-04-11
Work模式
Work模式1、轮询模式(Round-Robin)类型:无特点:当有多个消费者时,一个消费者分配一条消息,直至消费完成Producer:package com.sw.docker.rabbitmq.work.lunxun; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author suaxi * @Date 2021/4/8 23:26 * 轮询模式 */ public class Producer { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory conn = new ConnectionFactory(); conn.setHost("x.x.x.x"); conn.setPort(5672); conn.setUsername("admin"); conn.setPassword("123456"); conn.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //2.创建连接 connection = conn.newConnection("生产者"); //3.通过连接获取通道 channel = connection.createChannel(); //4.准备消息内容 for (int i = 0; i <= 20; i++) { String msg = "你好,谢谢" + i; //5.发送消息给队列queue channel.basicPublish("", "queue01", null, msg.getBytes()); } System.out.println("消息发送成功"); }catch (Exception e){ //7.关闭通道 if (channel != null && channel.isOpen()){ try{ channel.close(); }catch (Exception e1){ e.printStackTrace(); } } //8.关闭连接 if (connection != null && connection.isOpen()){ try{ connection.close(); }catch (Exception e1){ e.printStackTrace(); } } } } } Work01:package com.sw.docker.rabbitmq.work.lunxun; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author suaxi * @Date 2021/4/8 23:26 */ public class Work01 { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory conn = new ConnectionFactory(); conn.setHost("x.x.x.x"); conn.setPort(5672); conn.setUsername("admin"); conn.setPassword("123456"); conn.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = conn.newConnection("消费者-work01"); channel = connection.createChannel(); Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue01", true, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try { System.out.println("work01收到的消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(2000); } catch (Exception e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("work01 开始接收消息"); System.in.read(); } catch (Exception e) { //7.关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e1) { e.printStackTrace(); } } //8.关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e1) { e.printStackTrace(); } } } } } Work02://Work02的代码同理012、公平分发根据消费者的消费能力进行公平分发(按处理效率进行分配)Producer:package com.sw.docker.rabbitmq.work.lunxun; import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; /** * @Author suaxi * @Date 2021/4/8 23:26 * Work模式 */ public class Producer { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory conn = new ConnectionFactory(); conn.setHost("x.x.x.x"); conn.setPort(5672); conn.setUsername("admin"); conn.setPassword("123456"); conn.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { //2.创建连接 connection = conn.newConnection("生产者"); //3.通过连接获取通道 channel = connection.createChannel(); //4.准备消息内容 for (int i = 0; i <= 20; i++) { String msg = "你好,谢谢" + i; //5.发送消息给队列queue channel.basicPublish("", "queue01", null, msg.getBytes()); } System.out.println("消息发送成功"); }catch (Exception e){ //7.关闭通道 if (channel != null && channel.isOpen()){ try{ channel.close(); }catch (Exception e1){ e.printStackTrace(); } } //8.关闭连接 if (connection != null && connection.isOpen()){ try{ connection.close(); }catch (Exception e1){ e.printStackTrace(); } } } } }Work01:将消息接收模式改为手动应答,且需设置 qos,注意:在轮询模式中,qos设置与不设置没有影响,此处需做区分package com.sw.docker.rabbitmq.work.fair; import com.rabbitmq.client.*; import java.io.IOException; /** * @Author suaxi * @Date 2021/4/8 23:26 */ public class Work01 { public static void main(String[] args) { //1.创建连接工厂 ConnectionFactory conn = new ConnectionFactory(); conn.setHost("x.x.x.x"); conn.setPort(5672); conn.setUsername("admin"); conn.setPassword("123456"); conn.setVirtualHost("/"); Connection connection = null; Channel channel = null; try { connection = conn.newConnection("消费者-work01"); channel = connection.createChannel(); Channel finalChannel = channel; finalChannel.basicQos(1); finalChannel.basicConsume("queue01", false, new DeliverCallback() { @Override public void handle(String s, Delivery delivery) throws IOException { try { System.out.println("work01收到的消息是:" + new String(delivery.getBody(), "UTF-8")); Thread.sleep(2000); //接收消息改为手动应答 finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } catch (Exception e) { e.printStackTrace(); } } }, new CancelCallback() { @Override public void handle(String s) throws IOException { } }); System.out.println("work01 开始接收消息"); System.in.read(); } catch (Exception e) { //7.关闭通道 if (channel != null && channel.isOpen()) { try { channel.close(); } catch (Exception e1) { e.printStackTrace(); } } //8.关闭连接 if (connection != null && connection.isOpen()) { try { connection.close(); } catch (Exception e1) { e.printStackTrace(); } } } } }
2021年04月11日
170 阅读
0 评论
0 点赞