一、概述
Spring Cloud Stream 是一个构建消息驱动微服务的框架,应用程序通过inputs(生产者)或outputs(消费者)来与Spring Cloud Stream中binder对象交互,binder主要负责与消息中间件交互(开发人员只需操作Stream,而不用关心底层使用的是什么MQ)。
注:目前仅支持RabbitMQ
和kafka
二、如何屏蔽底层差异
在没有绑定器这个概念的情况下,SpringBoot应用在直接与消息中间件交互的时候,由于各MQ的设计初衷有不同,细节上存在较大的差异。
通过定义绑定器作为中间层,完美的实现了应用程序
与消息中间件细节
之间的隔离;通过向应用程序暴露统一的channel频道
,使得应用程序不需要再考虑各种MQ的实现。
Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ<==>kafka),使得开发高度解耦,更加专注于自己的业务流程。
Spring Cloud Stream处理架构
三、通信模式
Stream消息通信遵循发布-订阅
模式
Spring Cloud Stream的业务流程
source:发布消息
sink:接收消息
图片来源:尚硅谷 - 周阳 - Spring Cloud Alibaba
四、常用注解和API
名称 | 说明 |
---|---|
Middleware | 中间件 |
Binder | 应用与消息中间件之间的封装,目前实现了RabbitMQ和Kafka的Binder,利用Binder可以很方便的连接MQ,可以动态的改变消息类型(RabbitMQ--exchange,Kafaka--topic),可以通过yml文件的配置来实现 |
@Input | 输入 |
@Output | 输出 |
@StreamListener | 监听队列 |
@EnableBinding | 绑定频道 |
五、测试实例
1、提供者
pom
<!-- stream rabbitmq -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>
<!-- Eureka-client -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
yml
server:
port: 8801
spring:
application:
name: cloud-stream-provider
rabbitmq:
host: x.x.x.x
port: 5672
username: admin
password: 123456
cloud:
stream:
binders: # 配置要绑定的rabbitmq的服务信息
defaultRabbit: # 定义的名称,用于binding整合
type: rabbit # 消息组件类型
bindings: # 服务整合处理
output: # 通道名称
destination: StreamExChange # 交换机的名字
content-type: application/json # 消息类型
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 心跳时间间隔
lease-expiration-duration-in-seconds: 5 # 默认90s
instance-id: send-8801.com # 在信息列表显示主机名称
prefer-ip-address: true # 访问路径显示ip
主启动类
@SpringBootApplication
public class StreamMQMain8801 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8801.class, args);
}
}
Service
@EnableBinding(Source.class) //定义消息的推送通道
public class MessageProviderImpl implements MessageProvider {
@Autowired
private MessageChannel output;
@Override
public String send() {
String serial = UUID.randomUUID().toString();
output.send(MessageBuilder.withPayload(serial).build());
System.out.println("======>" + serial);
return null;
}
}
controller
@RestController
public class SendMessageController {
@Autowired
private MessageProvider service;
@GetMapping("/send")
public String sendMessage(){
return service.send();
}
}
2、消费者
==创建8802、8803两个消费者==
pom
同理消费者
yml
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
rabbitmq:
host: x.x.x.x
port: 5672
username: admin
password: 123456
cloud:
stream:
binders: # 配置要绑定的rabbitmq的服务信息
defaultRabbit: # 定义的名称,用于binding整合
type: rabbit # 消息组件类型
bindings: # 服务整合处理
input: # 通道名称
destination: StreamExChange # 交换机的名字
content-type: application/json # 消息类型
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: GroupA8802
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 心跳时间间隔
lease-expiration-duration-in-seconds: 5 # 默认90s
instance-id: recive-8802.com # 在信息列表显示主机名称
prefer-ip-address: true # 访问路径显示ip
启动类
@SpringBootApplication
public class StreamMQMain8802 {
public static void main(String[] args) {
SpringApplication.run(StreamMQMain8802.class, args);
}
}
controller
@Component
@EnableBinding(Sink.class)
public class ReciveController {
@Value("${server.port}")
private String serverPort;
@StreamListener(Sink.INPUT)
public void input(Message<String> msg){
System.out.println("消费者8802,接收到消息:" + msg.getPayload() + "\t serverPort:" + serverPort);
}
}
六、重复消费的问题
启动以上的测试实例,会发现8801生产者发送一条消息,8802、8803都收到了消息
如何解决?
通过将消费者分组来解决:即将原先GroupA8802、GroupA8803两个分组改为同一个分组GroupA
注:在同一个分组中,多个消费者是竞争关系,这样就能保证消息只会被分组中的一个消费者消费
server:
port: 8802
spring:
application:
name: cloud-stream-consumer
rabbitmq:
host: x.x.x.x
port: 5672
username: admin
password: 123456
cloud:
stream:
binders: # 配置要绑定的rabbitmq的服务信息
defaultRabbit: # 定义的名称,用于binding整合
type: rabbit # 消息组件类型
bindings: # 服务整合处理
input: # 通道名称
destination: StreamExChange # 交换机的名字
content-type: application/json # 消息类型
binder: defaultRabbit # 设置要绑定的消息服务的具体设置
group: GroupA
eureka:
client:
service-url:
defaultZone: http://localhost:7001/eureka
instance:
lease-renewal-interval-in-seconds: 2 # 心跳时间间隔
lease-expiration-duration-in-seconds: 5 # 默认90s
instance-id: recive-8802.com # 在信息列表显示主机名称
prefer-ip-address: true # 访问路径显示ip
消息持久化问题
在分组解决重复消费的问题是,分组同时也实现了消息的持久化
评论 (0)