安装rabbit mq 可以参考
docker安装RabbitMq | 小云
从Hello World开始 导入相关依赖 1 2 3 4 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
创建生产者
RabbitMq中的channel指的是什么? 信道是生产消费者与rabbit通信的渠道,生产者publish或者消费者消费一个队列都是需要通过信道来通信的。信道是建立在TCP上面的虚拟链接,也就是RabbitMq在一个TCP上面建立成百上千的信道来达到多个线程处理。为什么RabbitMQ 需要信道,如果直接进行TCP通信呢?
TCP的创建开销很大,创建需要三次握手,销毁需要四次握手。如果不使用信道,那么引用程序就会使用TCP方式进行连接到RabbitMQ,因为MQ可能每秒会进行成千上万的链接。 总之就是TCP消耗资源,TCP链接可以容纳无限的信道,不会有并发上面的性能瓶颈。
为了发送,我们必须声明一个队列供我们发送;然后我们可以将消息发布到队列,所有这些都在 try-with-resources 语句中进行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 @RestController @RequestMapping("/index") @Slf4j public class IndexController { @RequestMapping("/send") public String index() { log.info("开始执行"); // 队列的名称 String QUEUE_NAME = "simpleQueue"; ConnectionFactory factory = new ConnectionFactory(); // 设置连接的目标主机 factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; } }
创建消费者 消费者设置与发布者相同;我们打开连接和通道,并声明我们要从中消费的队列。请注意,这与 send 发布到的队列相匹配。
由于异步向消费者推送消息,因此我们提供了一个回调,它是一个对象,它将缓冲消息,直到我们准备好使用它们。这就是 DeliverCallback 子类所做的事情。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 @RestController @RequestMapping("/index") public class IndexController { @GetMapping("/receive") public String index() { // 队列名称 String QUEUE_NAME = "simpleQueue"; // mq连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; } }
测试 启动消费者和生产者服务,调用接口查看打印的结果
生产者
消费者
可以看到消费者成功收到了生产者发送出来的消息,并将其输出到了控制台。
工作队列
工作队列 (又称:任务队列 ) 背后的主要思想是避免立即执行资源密集型任务,并等待其完成。相反,我们将任务安排在稍后完成。我们将任务 封装为消息,并将其发送到队列。后台运行的工作进程将弹出任务并最终执行作业。当您运行多个工作进程时,任务将在它们之间共享。 此概念在 Web 应用程序中特别有用,在 Web 应用程序中,在短 HTTP 请求窗口期间处理复杂任务是不可能的。
我们调整一下消费者的代码,模拟消费者需要耗时处理任务。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 @RestController @RequestMapping("/index") public class IndexController { @GetMapping("/receive") public String index() { // 队列名称 String QUEUE_NAME = "simpleQueue"; // mq连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { dowork(message); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { System.out.println("[x] Done"); } }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; } private void dowork(String message) throws InterruptedException { Thread.sleep(5000); } }
循环调度
默认情况下,RabbitMQ 会将每条消息依次发送到下一个消费者。平均而言,每个消费者将收到相同数量的消息。这种消息分发方式称为循环调度。
我们再开启一个消费者服务
我们调用生产者接口发送四条消息,可以看到两个消费者服务收到的消息数量是一致的,每个服务收到两条消息。
消息确认
执行任务可能需要几秒钟,您可能想知道如果消费者开始一个长时间的任务并在完成之前终止会发生什么。在我们当前的代码中,一旦 RabbitMQ 将消息传递给消费者,它会立即将其标记为已删除。在这种情况下,如果您终止一个工作进程,它正在处理的消息就会丢失。已调度到该特定工作进程但尚未处理的消息也会丢失。
我们调整一下代码,在消费者服务中添加停止服务的代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 @RestController @RequestMapping("/index") public class IndexController { private String stop; @GetMapping("/receive") public String index(@RequestParam String stop) { // 队列名称 String QUEUE_NAME = "simpleQueue"; this.stop = stop; // mq连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { dowork(message); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { System.out.println("[x] Done"); } }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; } private void dowork(String message) throws InterruptedException { Thread.sleep(5000); System.out.println(message); if ("true".equals(stop)) { System.exit(-1); } } }
我们给第一个消费者传入false,第二个传入true,连续使用生产者发送四条消息
此时可以看到消费者2已经停止了
查看rabbit mq后台可以看到四条消息全部消费,说明消费者收到的第二条消息就这样丢失了。
但是我们不想丢失任何任务。如果工作进程死亡,我们希望该任务被传递到另一个工作进程。 为了确保消息永远不会丢失,RabbitMQ 支持消息 确认 。确认由消费者发送回,以告诉 RabbitMQ 特定消息已收到、处理,并且 RabbitMQ 可以将其删除。 如果消费者死亡(其通道关闭、连接关闭或 TCP 连接丢失)但没有发送确认,RabbitMQ 会理解消息没有完全处理,并将将其重新排队。如果有其他消费者同时在线,它会立即将其重新传递给另一个消费者。这样,您可以确保没有消息丢失,即使工作进程偶尔会死亡。
调整消费者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @RestController @RequestMapping("/index") public class IndexController { private String stop; @GetMapping("/receive") public String index(@RequestParam String stop) { // 队列名称 String QUEUE_NAME = "simpleQueue"; this.stop = stop; // mq连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); // 参数 1 表示消费者每次最多只能接收一条未确认的消息。也就是说,在当前消息被确认之前,RabbitMQ 不会向该消费者发送新的消息。 channel.basicQos(1); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { dowork(message); } catch (InterruptedException e) { throw new RuntimeException(e); } finally { System.out.println("[x] Done"); // 执行确认操作 channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); } }; channel.basicConsume(QUEUE_NAME, false, deliverCallback, consumerTag -> { }); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; } private void dowork(String message) throws InterruptedException { Thread.sleep(5000); System.out.println(message); if ("true".equals(stop)) { System.exit(-1); } } }
重启消费者,并重新发送四条消息
可以看到消费者2收到消息后停止服务,但是其收到的消息并没有丢失,而是由消费者1继续消费了。
消息持久化
当 RabbitMQ 退出或崩溃时,它会忘记队列和消息,除非您告诉它不要这样做。为了确保消息不会丢失,需要两件事:我们需要将队列和消息都标记为持久性。 首先,我们需要确保队列能够在 RabbitMQ 节点重启后继续存在。为此,我们需要将其声明为持久性
1 2 boolean durable = true; channel.queueDeclare("hello", durable, false, false, null);
虽然此命令本身是正确的,但它在当前设置中无法正常工作。这是因为我们已经定义了一个名为hello 的队列,它不是持久性的。RabbitMQ 不允许您使用不同的参数重新定义现有的队列,并且会向尝试执行此操作的任何程序返回错误。但有一个快速解决方法 - 让我们声明一个名称不同的队列,例如task_queue
此时,我们确定task_queue 队列即使在 RabbitMQ 重启后也不会丢失。
重启后队列还在,但是消息没有了
现在我们需要将我们的消息标记为持久性
通过将MessageProperties(实现BasicProperties)设置为PERSISTENT_TEXT_PLAIN 值。
1 2 3 4 5 import com.rabbitmq.client.MessageProperties; channel.basicPublish("", "task_queue", MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());
发送几条消息 重启mq
发现消息重启后仍然没有丢失
公平调度
您可能已经注意到,消息分发仍然没有完全按我们预期的方式工作。 例如,在有两个工作进程的情况下,如果所有奇数消息都很重,而偶数消息很轻,那么一个工作进程会一直很忙,而另一个工作进程几乎不会做任何工作。然而,RabbitMQ 并不知道这一点,仍然会平均分发消息。 这是因为 RabbitMQ 只是在消息进入队列时分发消息。它不会查看消费者未确认消息的数量。它只是盲目地将每第 n 个消息分发给第 n 个消费者。
为了解决这个问题,我们可以使用 basicQos 方法,并设置 prefetchCount = 1。这告诉 RabbitMQ 每次只向一个工作进程发送一条消息。换句话说,在工作进程处理并确认前一条消息之前,不要向其分发新的消息。相反,它会将消息分发给下一个未处于繁忙状态的工作进程。
1 2 int prefetchCount = 1; channel.basicQos(prefetchCount);
*如果所有工作进程都处于繁忙状态,您的队列可能会填满。您需要关注这一点,并可能添加更多工作进程,或采取其他策略。*
发布/订阅 在之前学习的内容中,工作队列背后的假设是每个任务都传递给一个且仅一个工作进程。在本部分中,我们将做一些完全不同的事情——我们将把消息传递给多个消费者。这种模式称为“发布/订阅”。
交换机
RabbitMQ 中消息传递模型的核心思想是生产者永远不会直接向队列发送任何消息。实际上,生产者通常甚至不知道消息是否会被传递到任何队列。 相反,生产者只能将消息发送到交换机 。交换机是一个非常简单的东西。它一侧接收来自生产者的消息,另一侧将消息推送到队列。交换机必须准确地知道如何处理它接收到的消息。应该将其追加到特定队列?应该将其追加到多个队列?还是应该将其丢弃?这些规则由交换机类型 定义。
有几种可用的交换机类型:direct、topic、headers 和 fanout。我们将重点关注最后一个——fanout。让我们创建一个这种类型的交换机,并将其命名为 logs
1 2 int prefetchCount = 1; channel.basicQos(prefetchCount);
临时队列 您可能还记得,之前我们使用的是具有特定名称的队列(记得 hello 和 task_queue 吗?)。能够命名队列对我们来说至关重要——我们需要将工作进程指向同一个队列。当您想要在生产者和消费者之间共享队列时,为队列命名非常重要。
但对于我们的日志记录程序来说并非如此。我们希望听到所有日志消息,而不仅仅是其中的一部分。我们也只对当前流动的消息感兴趣,而不是旧的消息。为了解决这个问题,我们需要两件事。
首先,每当我们连接到 Rabbit 时,都需要一个新的、空的队列。为此,我们可以创建一个具有随机名称的队列,或者,更好的是——让服务器为我们选择一个随机的队列名称。
其次,一旦我们断开消费者的连接,队列应该自动删除。
在 Java 客户端中,当我们不向 queueDeclare() 提供任何参数时,我们将创建一个非持久化、独占、自动删除且具有生成名称的队列
1 String queueName = channel.queueDeclare().getQueue();
您可以在队列指南 中了解有关 exclusive 标志和其他队列属性的更多信息。
此时,queueName 包含一个随机的队列名称。例如,它可能看起来像 amq.gen-JzTY20BRgKO-HjmUJj0wLg。
绑定 我们已经创建了一个 fanout 交换机和一个队列。现在我们需要告诉交换机将消息发送到我们的队列。交换机和队列之间的这种关系称为绑定 。
1 channel.queueBind(queueName, "logs", "");
从现在开始,logs 交换机将把消息追加到我们的队列。
代码示例 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @RestController @RequestMapping("/index") @Slf4j public class IndexController { @RequestMapping("/send") public String index() { log.info("开始执行"); ConnectionFactory factory = new ConnectionFactory(); // 设置连接的目标主机 factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 交换机名称, 交换机类型 channel.exchangeDeclare("logs", BuiltinExchangeType.FANOUT); String message = "Hello World!"; channel.basicPublish("logs", "", null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @RestController @RequestMapping("/index") public class IndexController { private String stop; @GetMapping("/receive") public String index() { // mq连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", "fanout"); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", ""); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; } }
路由 在上一节中,我们构建了一个简单的日志系统,能够将消息广播到许多接收方。
在本节中,我们将向其中添加一项功能 - 我们将使其能够仅订阅消息的一部分。例如,我们可以将仅关键错误消息定向到日志文件(以节省磁盘空间),同时仍然能够在控制台上打印所有日志消息。
绑定 在之前的示例中,我们已经创建了绑定。您可能还记得类似的代码
1 channel.queueBind(queueName, EXCHANGE_NAME, "");
绑定是交换机和队列之间的关系。这可以简单地理解为:队列对来自此交换机的消息感兴趣。
绑定可以采用额外的 routingKey 参数。为了避免与 basic_publish 参数混淆,我们将将其称为 绑定键。以下是如何使用键创建绑定
1 channel.queueBind(queueName, EXCHANGE_NAME, "black");
绑定键的含义取决于交换机类型。我们之前使用的 fanout 交换机只是忽略了它的值。
direct交换机 我们之前的教程中的日志系统将所有消息广播到所有消费者。我们希望扩展它以允许基于消息严重性过滤消息。例如,我们可能希望一个将日志消息写入磁盘的程序仅接收严重错误,而不是浪费磁盘空间来存储警告或信息日志消息。
我们使用的是 fanout 交换机,它没有给我们太多灵活性 - 它只能进行盲目广播。
我们将改用 direct 交换机。direct 交换机背后的路由算法很简单 - 消息将发送到其 绑定键 完全匹配消息 路由键 的队列。
为了说明这一点,请考虑以下设置
在此设置中,我们可以看到 direct 交换机 X,有两个队列绑定到它。第一个队列使用绑定键 orange 绑定,第二个队列有两个绑定,一个使用绑定键 black,另一个使用 green。
在这种设置下,发布到交换机且路由键为 orange 的消息将被路由到队列 Q1。路由键为 black 或 green 的消息将发送到 Q2。所有其他消息都将被丢弃。
多个绑定 使用相同的绑定键绑定多个队列是完全合法的。在我们的示例中,我们可以添加 X 和 Q1 之间的绑定,绑定键为 black。在这种情况下,direct 交换机将表现得像 fanout 并将消息广播到所有匹配的队列。路由键为 black 的消息将传递到 Q1 和 Q2。
发出日志 我们将对我们的日志系统使用此模型。我们将消息发送到 direct 交换机,而不是 fanout。我们将日志严重性作为 路由键 提供。这样,接收程序将能够选择它想要接收的严重性。让我们首先关注发出日志。
与往常一样,我们需要首先创建一个交换机
1 channel.exchangeDeclare(EXCHANGE_NAME, "direct");
我们准备发送消息了
1 channel.basicPublish(EXCHANGE_NAME, severity, null, message.getBytes());
为了简化操作,我们将假设“严重性”可以是 info、warning 或 error 之一。
订阅 接收消息的工作方式与上一教程相同,只有一个例外 - 我们将为我们感兴趣的每个严重性创建一个新的绑定。
1 2 3 4 5 String queueName = channel.queueDeclare().getQueue(); for(String severity : argv){ channel.queueBind(queueName, EXCHANGE_NAME, severity); }
代码示例 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @RestController @RequestMapping("/index") @Slf4j public class IndexController { @RequestMapping("/send") public String index(String type) { log.info("开始执行"); ConnectionFactory factory = new ConnectionFactory(); // 设置连接的目标主机 factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 交换机名称, 交换机类型 channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT); String message = "Hello World!"; channel.basicPublish("logs", type, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 @GetMapping("/receive") public String index() { // mq连接信息 ConnectionFactory factory = new ConnectionFactory(); factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); Connection connection = null; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs", BuiltinExchangeType.DIRECT); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs", "info"); System.out.println(" [*] Waiting for messages. To exit press CTRL+C"); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(queueName, true, deliverCallback, consumerTag -> { }); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; }
主题 在上一节中,我们改进了我们的日志系统。我们没有只使用只能进行虚拟广播的 fanout 交换机,而是使用了 direct 交换机,从而获得了选择性接收日志的可能性。
尽管使用 direct 交换机改进了我们的系统,但它仍然存在局限性 - 它无法根据多个条件进行路由。
在我们的日志系统中,我们可能不仅希望订阅基于严重性的日志,还希望订阅基于发出日志的源的日志。您可能从syslog Unix 工具中了解过这个概念,该工具根据严重性(info/warn/crit…)和设备(auth/cron/kern…)对日志进行路由。
这将为我们提供很大的灵活性 - 我们可能希望监听来自 ‘cron’ 的所有严重错误,以及来自 ‘kern’ 的所有日志。
为了在我们的日志系统中实现这一点,我们需要了解更复杂的 topic 交换机。
主题交换机
发送到 topic 交换机的消息不能具有任意的 routing_key 。它必须是一个由点分隔的单词列表。这些单词可以是任何内容,但通常它们指定与消息相关的某些特征。 一些有效的路由键示例:stock.usd.nyse、nyse.vmw、quick.orange.rabbit。路由键中的单词数量可以任意多,最多可达 255 个字节。
绑定键也必须采用相同的格式。topic 交换机背后的逻辑类似于 direct 交换机 - 使用特定路由键发送的消息将传递到所有使用匹配绑定键绑定的队列。但是,绑定键有两个重要的特殊情况
*(星号)可以替换恰好一个单词。
#(井号)可以替换零个或多个单词。
最简单的解释方法是举个例子:
在这个例子中,我们将发送描述动物的所有消息。消息将使用由三个单词(两个点)组成的路由键发送。路由键中的第一个单词将描述速度,第二个单词描述颜色,第三个单词描述物种: . . 。
我们创建了三个绑定:Q1 使用绑定键 .orange. 绑定,Q2 使用 . .rabbit 和 lazy.# 绑定。
这些绑定可以概括为
Q1 对所有橙色动物感兴趣。
Q2 想要了解关于兔子的所有信息,以及关于懒惰动物的所有信息。
路由键设置为 quick.orange.rabbit 的消息将传递到这两个队列。 消息 lazy.orange.elephant 也将传递到这两个队列。 另一方面,quick.orange.fox 仅传递到第一个队列, 而 lazy.brown.fox 仅传递到第二个队列。l azy.pink.rabbit 将仅传递到第二个队列一次,即使它匹配两个绑定。 quick.brown.fox 与任何绑定都不匹配,因此将被丢弃。
如果我们违反约定并发送一个或四个单词的消息,例如 orange 或 quick.orange.new.rabbit 会发生什么?嗯,这些消息将与任何绑定都不匹配,并将丢失。
另一方面,lazy.orange.new.rabbit 即使有四个单词,也会匹配最后一个绑定,并将传递到第二个队列。
代码示例 生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 @RestController @RequestMapping("/index") @Slf4j public class IndexController { @RequestMapping("/send") public String index(String type) { log.info("开始执行"); ConnectionFactory factory = new ConnectionFactory(); // 设置连接的目标主机 factory.setHost("182.254.225.246"); factory.setPort(5672); factory.setUsername("admin"); factory.setPassword("admin"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { // 交换机名称, 交换机类型 channel.exchangeDeclare("logs", BuiltinExchangeType.TOPIC); String message = "Hello World!"; channel.basicPublish("logs", type, null, message.getBytes()); System.out.println(" [x] Sent '" + message + "'"); } catch (IOException | TimeoutException e) { throw new RuntimeException(e); } return "success"; } }
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 @RestController @RequestMapping("/index") public class IndexController { private String stop; @GetMapping("/receive") public String index () { ConnectionFactory factory = new ConnectionFactory (); factory.setHost("182.254.225.246" ); factory.setPort(5672 ); factory.setUsername("admin" ); factory.setPassword("admin" ); Connection connection = null ; try { connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.exchangeDeclare("logs" , BuiltinExchangeType.TOPIC); String queueName = channel.queueDeclare().getQueue(); channel.queueBind(queueName, "logs" , "*.bug" ); System.out.println(" [*] Waiting for messages. To exit press CTRL+C" ); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String (delivery.getBody(), StandardCharsets.UTF_8); System.out.println(" [x] Received '" + message + "'" ); }; channel.basicConsume(queueName, true , deliverCallback, consumerTag -> { }); } catch (IOException | TimeoutException e) { throw new RuntimeException (e); } return "success" ; } }