1. Exchanges
(1)概念
RabbitMQ 消息传递模型的核心思想是:生产者生产的消息从不会直接发送到队列。实际上,生产者甚至都不知道这些消息传递传递到了哪些队列中。 相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息,是应该把这些消息放到特定队列还是说把他们放到许多队列中,亦或是应该丢弃它们,这就得由交换机的类型来决定。
(2)交换机类型
- 直接(direct)
- 主题(topic)
- 标题(headers)
- 扇出(fanout)
(3)无名exchange
channel.basicPublish("", QUEUE_NAME, properties, msg.getBytes());
之前的demo中生产者发送消息时未设置交换机,即第一个参数为空,表示使用的是默认交换机(又称无名交换机)
2. 临时队列
在之前的demo中使用的都是临时队列,一旦断开了消费者的连接,队列将被自动删除
创建临时队列的方式:
String queue = channel.queueDeclare().getQueue();
3. 绑定
binding 是 exchange 和 queue 之间的桥梁,即 exchange 和哪个队列进行了绑定
4. Fanout
(1)介绍
将接收到的所有消息广播到它知道的所有队列中,RabbitMQ默认的交换机如下:
(2)Demo
Producer
public class EmitLog {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送的消息是:" + msg);
}
}
}
Consumer
public class ReceiveLog01 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//生成临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定临时队列,其中routingKey为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("ReceiveLog01等待接收消息,接收到消息后在控制台打印...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到的消息是:" + msg);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
public class ReceiveLog02 {
private static final String EXCHANGE_NAME = "logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, "fanout");
//生成临时队列
String queueName = channel.queueDeclare().getQueue();
//绑定临时队列,其中routingKey为空字符串
channel.queueBind(queueName, EXCHANGE_NAME, "");
System.out.println("ReceiveLog02等待接收消息,接收到消息后在控制台打印...");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到的消息是:" + msg);
};
channel.basicConsume(queueName, true, deliverCallback, consumerTag -> {
});
}
}
5. Direct
(1)介绍
Direct Exchange:消息只去到它绑定的 routingKey 队列中
Fanout模式下每一个消费者都收到了消息,此处使用 Direct 模式,让消息根据 routing key 去到指定的地方,如下图:X绑定了Q1,Q2两个队列,绑定类型为direct,队列Q1绑定的orange,Q2绑定了两个key:black、green。
在这种绑定情况下,生产者发布消息到 exchange 上,绑定键为 orange 的消息会被发布到队列 Q1,绑定键为 black/green 的消息会被发布到队列 Q2,其他消息类型的消息将被丢弃。
注:如果绑定的key都相同,则该模式与 Fanout 模式类似,变成了另一种形式的广播
(2)Demo
Producer
public class DirectLog {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
Scanner scanner = new Scanner(System.in);
while (scanner.hasNext()) {
String msg = scanner.next();
channel.basicPublish(EXCHANGE_NAME, "info", null, msg.getBytes(StandardCharsets.UTF_8));
System.out.println("发送的消息是:" + msg);
}
}
}
Consumer
public class ReceiveLogDirect01 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("console", false, false, false, null);
channel.queueBind("console", EXCHANGE_NAME, "info");
channel.queueBind("console", EXCHANGE_NAME, "warning");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到的消息是:" + msg);
};
channel.basicConsume("console", true, deliverCallback, consumerTag -> {
});
}
}
public class ReceiveLogDirect02 {
private static final String EXCHANGE_NAME = "direct_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT);
channel.queueDeclare("disk", false, false, false, null);
channel.queueBind("console", EXCHANGE_NAME, "error");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到的消息是:" + msg);
};
channel.basicConsume("disk", true, deliverCallback, consumerTag -> {
});
}
}
6. Topics
(1)介绍
发送到 topic 类型交换机的消息的 routing_key 不能随意写,必须满足一定的要求,它必须是一个单词列表,以 .
点号分隔开。这些单词可以是任意单词,且不能超过 255 个字节。
规则列表中常用的替换符:
- *(星号) 可以代替一个单词
- #(井号) 可以替代零个或多个单词
(2)举例
下图绑定关系如下:
- Q1绑定的是中间带 orange ,单词总数为 3 个的字符串
*.orange.*
Q2:
- 绑定的是最后一个单词是 rabbit 且单词总数为 3 个的单词
*.*.rabbit
- 第一个单词是 lazy 的多个单词
lazy.#
- 绑定的是最后一个单词是 rabbit 且单词总数为 3 个的单词
以下是消息接收情况:
quick.orange.rabbit
被队列 Q1Q2 接收到lazy.orange.elephant
被队列 Q1Q2 接收到quick.orange.fox
被队列 Q1 接收到lazy.brown.fox
被队列 Q2 接收到lazy.pink.rabbit
虽然满足两个绑定但只被队列 Q2 接收一次quick.brown.fox
不匹配任何绑定不会被任何队列接收到会被丢弃quick.orange.male.rabbit
是四个单词不匹配任何绑定会被丢弃lazy.orange.male.rabbit
是四个单词但匹配 Q2
注:当一个队列绑定键是 #,那么这个队列将接收所有数据,类似于 fanout;如果队列绑定键中没有 # 和 * 出现,那么该队列绑定类型就是 direct (固定的routing key)
(3)Demo
Producer
public class TopicLog {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
Map<String, String> map = new HashMap<>();
map.put(" quick.orange.rabbit", "被队列Q1Q2接收到");
map.put("lazy.orange.elephant", "被队列Q1Q2接收到");
map.put("quick.orange.fox", "被队列Q1接收到");
map.put(" lazy.brown.fox", "被队列Q2接收到");
map.put("lazy.pink.rabbit", "虽然满足两个绑定但只被队列Q2接收一次");
map.put("quick. brown.fox", "不匹配任何绑定不会被任何队列接收到会被丢弃");
map.put(" quick.orange.male.rabbit", "是四个单词不匹配任何绑定会被丢弃");
map.put("lazy.orange.male.rabbit", "是四个单词但匹配Q2");
for (Map.Entry<String, String> entry : map.entrySet()) {
channel.basicPublish(EXCHANGE_NAME, entry.getKey(), null, entry.getValue().getBytes(StandardCharsets.UTF_8));
System.out.println("发送的消息是:" + entry.getValue());
}
}
}
Consumer
public class ReceiveLogTopic01 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare("Q1", false, false, false, null);
channel.queueBind("Q1", EXCHANGE_NAME, "*.orange.*");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey());
};
channel.basicConsume("Q1", true, deliverCallback, consumerTag -> {
});
}
}
public class ReceiveLogTopic02 {
private static final String EXCHANGE_NAME = "topic_logs";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.TOPIC);
channel.queueDeclare("Q2", false, false, false, null);
channel.queueBind("Q2", EXCHANGE_NAME, "*.*.rabbit");
channel.queueBind("Q2", EXCHANGE_NAME, "lazy.#");
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String msg = new String(delivery.getBody(), StandardCharsets.UTF_8);
System.out.println("接收到的消息是:" + msg + ",routingKey:" + delivery.getEnvelope().getRoutingKey());
};
channel.basicConsume("Q2", true, deliverCallback, consumerTag -> {
});
}
}
评论 (0)