首页
统计
关于
Search
1
Sealos3.0离线部署K8s集群
1,082 阅读
2
类的加载
741 阅读
3
Spring Cloud OAuth2.0
726 阅读
4
SpringBoot自动装配原理
691 阅读
5
集合不安全问题
584 阅读
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
登录
Search
标签搜索
Java
CSS
mysql
RabbitMQ
JavaScript
Redis
JVM
Mybatis-Plus
Camunda
多线程
CSS3
Python
Spring Cloud
注解和反射
Activiti
工作流
SpringBoot
Mybatis
Spring
html5
蘇阿細
累计撰写
388
篇文章
累计收到
4
条评论
首页
栏目
笔记
Java
多线程
注解和反射
JVM
JUC
设计模式
Mybatis
Spring
SpringMVC
SpringBoot
MyBatis-Plus
Elastic Search
微服务
Dubbo
Zookeeper
SpringCloud
Nacos
Sentinel
数据库
MySQL
Oracle
PostgreSQL
Redis
MongoDB
工作流
Activiti7
Camunda
消息队列
RabbitMQ
前端
HTML5
CSS
CSS3
JavaScript
jQuery
Vue2
Vue3
Linux
容器
Docker
Kubernetes
Python
页面
统计
关于
搜索到
1
篇与
的结果
2021-04-29
Spring Cloud Stream
一、概述 Spring Cloud Stream 是一个构建消息驱动微服务的框架,应用程序通过inputs(生产者)或outputs(消费者)来与Spring Cloud Stream中binder对象交互,binder主要负责与消息中间件交互(开发人员只需操作Stream,而不用关心底层使用的是什么MQ)。注:目前仅支持RabbitMQ和kafka二、如何屏蔽底层差异在没有绑定器这个概念的情况下,SpringBoot应用在直接与消息中间件交互的时候,由于各MQ的设计初衷有不同,细节上存在较大的差异。通过定义绑定器作为中间层,完美的实现了应用程序与消息中间件细节之间的隔离;通过向应用程序暴露统一的channel频道,使得应用程序不需要再考虑各种MQ的实现。Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ<==>kafka),使得开发高度解耦,更加专注于自己的业务流程。Spring Cloud Stream处理架构三、通信模式Stream消息通信遵循发布-订阅模式Spring Cloud Stream的业务流程source:发布消息sink:接收消息图片来源:尚硅谷 - 周阳 - Spring Cloud Alibaba四、常用注解和API名称说明Middleware中间件Binder应用与消息中间件之间的封装,目前实现了RabbitMQ和Kafka的Binder,利用Binder可以很方便的连接MQ,可以动态的改变消息类型(RabbitMQ--exchange,Kafaka--topic),可以通过yml文件的配置来实现@Input输入@Output输出@StreamListener监听队列@EnableBinding绑定频道五、测试实例1、提供者pom<!-- stream rabbitmq --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-rabbit</artifactId> </dependency> <!-- Eureka-client --> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-web</artifactId> </dependency> <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-actuator</artifactId> </dependency>ymlserver: port: 8801 spring: application: name: cloud-stream-provider rabbitmq: host: x.x.x.x port: 5672 username: admin password: 123456 cloud: stream: binders: # 配置要绑定的rabbitmq的服务信息 defaultRabbit: # 定义的名称,用于binding整合 type: rabbit # 消息组件类型 bindings: # 服务整合处理 output: # 通道名称 destination: StreamExChange # 交换机的名字 content-type: application/json # 消息类型 binder: defaultRabbit # 设置要绑定的消息服务的具体设置 eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 心跳时间间隔 lease-expiration-duration-in-seconds: 5 # 默认90s instance-id: send-8801.com # 在信息列表显示主机名称 prefer-ip-address: true # 访问路径显示ip 主启动类@SpringBootApplication public class StreamMQMain8801 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8801.class, args); } }Service@EnableBinding(Source.class) //定义消息的推送通道 public class MessageProviderImpl implements MessageProvider { @Autowired private MessageChannel output; @Override public String send() { String serial = UUID.randomUUID().toString(); output.send(MessageBuilder.withPayload(serial).build()); System.out.println("======>" + serial); return null; } }controller@RestController public class SendMessageController { @Autowired private MessageProvider service; @GetMapping("/send") public String sendMessage(){ return service.send(); } }2、消费者==创建8802、8803两个消费者==pom同理消费者ymlserver: port: 8802 spring: application: name: cloud-stream-consumer rabbitmq: host: x.x.x.x port: 5672 username: admin password: 123456 cloud: stream: binders: # 配置要绑定的rabbitmq的服务信息 defaultRabbit: # 定义的名称,用于binding整合 type: rabbit # 消息组件类型 bindings: # 服务整合处理 input: # 通道名称 destination: StreamExChange # 交换机的名字 content-type: application/json # 消息类型 binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: GroupA8802 eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 心跳时间间隔 lease-expiration-duration-in-seconds: 5 # 默认90s instance-id: recive-8802.com # 在信息列表显示主机名称 prefer-ip-address: true # 访问路径显示ip 启动类@SpringBootApplication public class StreamMQMain8802 { public static void main(String[] args) { SpringApplication.run(StreamMQMain8802.class, args); } }controller@Component @EnableBinding(Sink.class) public class ReciveController { @Value("${server.port}") private String serverPort; @StreamListener(Sink.INPUT) public void input(Message<String> msg){ System.out.println("消费者8802,接收到消息:" + msg.getPayload() + "\t serverPort:" + serverPort); } }六、重复消费的问题启动以上的测试实例,会发现8801生产者发送一条消息,8802、8803都收到了消息如何解决?通过将消费者分组来解决:即将原先GroupA8802、GroupA8803两个分组改为同一个分组GroupA注:在同一个分组中,多个消费者是竞争关系,这样就能保证消息只会被分组中的一个消费者消费server: port: 8802 spring: application: name: cloud-stream-consumer rabbitmq: host: x.x.x.x port: 5672 username: admin password: 123456 cloud: stream: binders: # 配置要绑定的rabbitmq的服务信息 defaultRabbit: # 定义的名称,用于binding整合 type: rabbit # 消息组件类型 bindings: # 服务整合处理 input: # 通道名称 destination: StreamExChange # 交换机的名字 content-type: application/json # 消息类型 binder: defaultRabbit # 设置要绑定的消息服务的具体设置 group: GroupA eureka: client: service-url: defaultZone: http://localhost:7001/eureka instance: lease-renewal-interval-in-seconds: 2 # 心跳时间间隔 lease-expiration-duration-in-seconds: 5 # 默认90s instance-id: recive-8802.com # 在信息列表显示主机名称 prefer-ip-address: true # 访问路径显示ip消息持久化问题在分组解决重复消费的问题是,分组同时也实现了消息的持久化
2021年04月29日
52 阅读
0 评论
0 点赞