Spring Cloud Stream

suaxi
2021-04-29 / 0 评论 / 52 阅读 / 正在检测是否收录...

一、概述

​ Spring Cloud Stream 是一个构建消息驱动微服务的框架,应用程序通过inputs(生产者)或outputs(消费者)来与Spring Cloud Stream中binder对象交互,binder主要负责与消息中间件交互(开发人员只需操作Stream,而不用关心底层使用的是什么MQ)。

注:目前仅支持RabbitMQkafka

二、如何屏蔽底层差异

在没有绑定器这个概念的情况下,SpringBoot应用在直接与消息中间件交互的时候,由于各MQ的设计初衷有不同,细节上存在较大的差异。

通过定义绑定器作为中间层,完美的实现了应用程序消息中间件细节之间的隔离;通过向应用程序暴露统一的channel频道,使得应用程序不需要再考虑各种MQ的实现。

Stream对消息中间件的进一步封装,可以做到代码层面对中间件的无感知,甚至于动态的切换中间件(RabbitMQ<==>kafka),使得开发高度解耦,更加专注于自己的业务流程。


Spring Cloud Stream处理架构

1.Spring Cloud Stream处理架构.png


三、通信模式

Stream消息通信遵循发布-订阅模式


Spring Cloud Stream的业务流程

2.Spring Cloud Stream业务流程.png

source:发布消息

sink:接收消息

图片来源:尚硅谷 - 周阳 - Spring Cloud Alibaba


四、常用注解和API

名称说明
Middleware中间件
Binder应用与消息中间件之间的封装,目前实现了RabbitMQ和Kafka的Binder,利用Binder可以很方便的连接MQ,可以动态的改变消息类型(RabbitMQ--exchange,Kafaka--topic),可以通过yml文件的配置来实现
@Input输入
@Output输出
@StreamListener监听队列
@EnableBinding绑定频道


五、测试实例

1、提供者

pom
<!-- stream rabbitmq -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

<!-- Eureka-client -->
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
</dependency>

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>


yml
server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  rabbitmq:
    host: x.x.x.x
    port: 5672
    username: admin
    password: 123456
  cloud:
    stream:
      binders: # 配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 定义的名称,用于binding整合
          type: rabbit # 消息组件类型
      bindings: # 服务整合处理
        output: # 通道名称
          destination: StreamExChange # 交换机的名字
          content-type: application/json # 消息类型
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 心跳时间间隔
    lease-expiration-duration-in-seconds: 5 # 默认90s
    instance-id: send-8801.com # 在信息列表显示主机名称
    prefer-ip-address: true # 访问路径显示ip


主启动类
@SpringBootApplication
public class StreamMQMain8801 {

    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8801.class, args);
    }
}


Service
@EnableBinding(Source.class)  //定义消息的推送通道
public class MessageProviderImpl implements MessageProvider {

    @Autowired
    private MessageChannel output;

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        System.out.println("======>" + serial);
        return null;
    }
}


controller
@RestController
public class SendMessageController {

    @Autowired
    private MessageProvider service;

    @GetMapping("/send")
    public String sendMessage(){
        return service.send();
    }
}


2、消费者

==创建8802、8803两个消费者==


pom

同理消费者


yml
server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: x.x.x.x
    port: 5672
    username: admin
    password: 123456
  cloud:
    stream:
      binders: # 配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 定义的名称,用于binding整合
          type: rabbit # 消息组件类型
      bindings: # 服务整合处理
        input: # 通道名称
          destination: StreamExChange # 交换机的名字
          content-type: application/json # 消息类型
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: GroupA8802

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 心跳时间间隔
    lease-expiration-duration-in-seconds: 5 # 默认90s
    instance-id: recive-8802.com # 在信息列表显示主机名称
    prefer-ip-address: true # 访问路径显示ip


启动类
@SpringBootApplication
public class StreamMQMain8802 {

    public static void main(String[] args) {
        SpringApplication.run(StreamMQMain8802.class, args);
    }
}


controller
@Component
@EnableBinding(Sink.class)
public class ReciveController {

    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> msg){
        System.out.println("消费者8802,接收到消息:" + msg.getPayload() + "\t serverPort:" + serverPort);
    }
}


六、重复消费的问题

启动以上的测试实例,会发现8801生产者发送一条消息,8802、8803都收到了消息


如何解决?

通过将消费者分组来解决:即将原先GroupA8802、GroupA8803两个分组改为同一个分组GroupA

注:在同一个分组中,多个消费者是竞争关系,这样就能保证消息只会被分组中的一个消费者消费

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  rabbitmq:
    host: x.x.x.x
    port: 5672
    username: admin
    password: 123456
  cloud:
    stream:
      binders: # 配置要绑定的rabbitmq的服务信息
        defaultRabbit: # 定义的名称,用于binding整合
          type: rabbit # 消息组件类型
      bindings: # 服务整合处理
        input: # 通道名称
          destination: StreamExChange # 交换机的名字
          content-type: application/json # 消息类型
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: GroupA

eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 心跳时间间隔
    lease-expiration-duration-in-seconds: 5 # 默认90s
    instance-id: recive-8802.com # 在信息列表显示主机名称
    prefer-ip-address: true # 访问路径显示ip


消息持久化问题

在分组解决重复消费的问题是,分组同时也实现了消息的持久化

0

评论 (0)

取消