RabbitMQ - 交换机

suaxi
2023-06-20 / 0 评论 / 69 阅读 / 正在检测是否收录...

1. Exchanges

(1)概念

RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息,是应该把这些消息放到特定队列还是说把他们放到许多队列中,亦或是应该丢弃它们,这就得由交换机的类型来决定。


(2)交换机类型

  • 直接(direct)
  • 主题(topic)
  • 标题(headers)
  • 扇出(fanout)


(3)无名exchange

channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());

之前的demo中生产者发送消息时未设置交换机,即第一个参数为空,表示使用的是默认交换机(又称无名交换机)


2. 临时队列

在之前的demo中使用的都是临时队列,一旦断开了消费者的连接,队列将被自动删除

创建临时队列的方式:

String queue = channel.queueDeclare().getQueue();

创建临时队列.png


3. 绑定

binding 是 exchange 和 queue 之间的桥梁,即 exchange 和哪个队列进行了绑定

绑定关系.png


4. Fanout

(1)介绍

将接收到的所有消息广播到它知道的所有队列中,RabbitMQ默认的交换机如下:
系统默认的交换机.png


(2)Demo

Producer

public class EmitLog {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String msg = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送的消息是:" + msg);
        }
    }
}

Consumer

public class ReceiveLog01 {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //生成临时队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定临时队列,其中routingKey为空字符串
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("ReceiveLog01等待接收消息,接收到消息后在控制台打印...");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收到的消息是:" + msg);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}

public class ReceiveLog02 {

    private static final String EXCHANGE_NAME = "logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, "fanout");

        //生成临时队列
        String queueName = channel.queueDeclare().getQueue();

        //绑定临时队列,其中routingKey为空字符串
        channel.queueBind(queueName, EXCHANGE_NAME, "");
        System.out.println("ReceiveLog02等待接收消息,接收到消息后在控制台打印...");
        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收到的消息是:" + msg);
        };
        channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
        });
    }
}


5. Direct

(1)介绍

Direct Exchange:消息只去到它绑定的 routingKey 队列中

Fanout模式下每一个消费者都收到了消息,此处使用 Direct 模式,让消息根据 routing key 去到指定的地方,如下图:X绑定了Q1,Q2两个队列,绑定类型为direct,队列Q1绑定的orange,Q2绑定了两个key:black、green。

在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1,绑定键为 black/green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。

1.DirectExchange.png

注:如果绑定的key都相同,则该模式与 Fanout 模式类似,变成了另一种形式的广播


(2)Demo

2.Demo.png

Producer

public class DirectLog {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        Scanner scanner = new Scanner(System.in);
        while (scanner.hasNext()) {
            String msg = scanner.next();
            channel.basicPublish(EXCHANGE_NAME, "info", null, msg.getBytes(StandardCharsets.UTF_8));
            System.out.println("发送的消息是:" + msg);
        }
    }
}

Consumer

public class ReceiveLogDirect01 {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        channel.queueDeclare("console", false, false, false, null);

        channel.queueBind("console", EXCHANGE_NAME, "info");
        channel.queueBind("console", EXCHANGE_NAME, "warning");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收到的消息是:" + msg);
        };
        channel.basicConsume("console", true, deliverCallback, consumerTag -> {
        });
    }
}

public class ReceiveLogDirect02 {

    private static final String EXCHANGE_NAME = "direct_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);

        channel.queueDeclare("disk", false, false, false, null);

        channel.queueBind("console", EXCHANGE_NAME, "error");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收到的消息是:" + msg);
        };
        channel.basicConsume("disk", true, deliverCallback, consumerTag -> {
        });
    }
}


6. Topics

(1)介绍

发送到 topic 类型交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 . 点号分隔开。这些单词可以是任意单词,且不能超过 255 个字节。

规则列表中常用的替换符:

  • *(星号) 可以代替一个单词
  • #(井号) 可以替代零个或多个单词


(2)举例

下图绑定关系如下:

  • Q1绑定的是中间带 orange ,单词总数为 3 个的字符串 *.orange.*
  • Q2:

    • 绑定的是最后一个单词是 rabbit 且单词总数为 3 个的单词 *.*.rabbit
    • 第一个单词是 lazy 的多个单词 lazy.#

1.举例.png

以下是消息接收情况:

  • quick.orange.rabbit 被队列 Q1Q2 接收到
  • lazy.orange.elephant 被队列 Q1Q2 接收到
  • quick.orange.fox 被队列 Q1 接收到
  • lazy.brown.fox 被队列 Q2 接收到
  • lazy.pink.rabbit 虽然满足两个绑定但只被队列 Q2 接收一次
  • quick.brown.fox 不匹配任何绑定不会被任何队列接收到会被丢弃
  • quick.orange.male.rabbit 是四个单词不匹配任何绑定会被丢弃
  • lazy.orange.male.rabbit 是四个单词但匹配 Q2

注:当一个队列绑定键是 #,那么这个队列将接收所有数据,类似于 fanout;如果队列绑定键中没有 # 和 * 出现,那么该队列绑定类型就是 direct (固定的routing key)


(3)Demo

Producer

public class TopicLog {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        Map<String, String> map = new HashMap<>();
        map.put(" quick.orange.rabbit", "被队列Q1Q2接收到");
        map.put("lazy.orange.elephant", "被队列Q1Q2接收到");
        map.put("quick.orange.fox", "被队列Q1接收到");
        map.put(" lazy.brown.fox", "被队列Q2接收到");
        map.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次");
        map.put("quick. brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
        map.put(" quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
        map.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2");

        for (Map.Entry<String, String> entry : map.entrySet()) {
            channel.basicPublish(EXCHANGE_NAME, entry.getKey(), null, entry.getValue().getBytes(StandardCharsets.UTF_8));
            System.out.println("发送的消息是:" + entry.getValue());
        }
    }
}

Consumer

public class ReceiveLogTopic01 {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        channel.queueDeclare("Q1", false, false, false, null);

        channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey());
        };
        channel.basicConsume("Q1", true, deliverCallback, consumerTag -> {
        });
    }
}

public class ReceiveLogTopic02 {

    private static final String EXCHANGE_NAME = "topic_logs";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMqUtils.getChannel();
        channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);

        channel.queueDeclare("Q2", false, false, false, null);

        channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit");
        channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#");

        DeliverCallback deliverCallback = (consumerTag, delivery) -> {
            String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
            System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey());
        };
        channel.basicConsume("Q2", true, deliverCallback, consumerTag -> {
        });
    }
}
0

评论 (0)

取消