rabbitmq

Posted by zangxin on April 12, 2025

RabbitMq

官网:https://www.rabbitmq.com/

1.概述

1RabbitMQ 是一个流行的开源消息中间件,它实现了高级消息队列协议(AMQP),为 分布式应用程序提供了可靠的、异步的消息传递机制。

2RabbitMQ 可以在多个进程、多个主机之间传递消息,因此它经常用于解耦分布式应 用程序中的各个组件,或者实现任务队列和日志收集等应用场景

3、RabbitMQ 的核心概念是生产者、消费者和队列。 ** **(1) 生产者将消息发布到队列中 ** **(2) 消费者从队列中获取消息并进行处理 ** **(3) RabbitMQ 的优点包括可靠性、灵活性和可扩展性。它使用消息确认机制确保消息能够成功传递,同时提供多种交换机类型和绑定方式,以支持不同的消息路由场景

(4) RabbitMQ 可以满足高负载、高可用性和可扩展性的要求

3RabbitMQ 提供了丰富的客户端库,包括 JavaPython、RubyC# 等,这些库可以 方便地集成到各种编程语言和框架中,以实现高效的消息传递。因此,RabbitMQ 在大规 模分布式系统中得到了广泛的应用

2.安装

环境:centos7

rabbitmq需要erlang环境

1
2
3
4
rpm -ivh erlang-21.3-1.el7.x86_64.rpm
# rabbitmq 需要的依赖包, 需要联网
yum install socat -y
rpm -ivh rabbitmq-server-3.8.8-1.el7.noarch.rpm

启动

1
2
3
4
5
6
7
8
# 添加开机启动 RabbitMQ 服务
chkconfig rabbitmq-server on
# 启动服务
/sbin/service rabbitmq-server start
# 查看服务状态
/sbin/service rabbitmq-server status
# 停止服务指令
/sbin/service rabbitmq-server stop

web管理插件

1
2
# 开启 web 管理插件
rabbitmq-plugins enable rabbitmq_management

开放防火墙端口(15672,web 管控台访问端口)

1
2
3
4
firewall-cmd --zone=public --add-port=15672/tcp --permanent
firewall-cmd --reload
# 查看端口
firewall-cmd --list-ports

访问 **http://192.168.2.85:15672 **

账号/密码: guest/guest

截屏2025-04-12 12.47.29

提示不能远程登录, 修改配置,让guest可以远程登录

- /etc/rabbitmq 目录下创建 rabbitmq.config, 内容如下

1
[{rabbit,[{loopback_users,[]}]}].

重启服务

1
systemctl restart rabbitmq-server.service

guest

截屏2025-04-12 12.59.12

截屏2025-04-12 13.08.32

截屏2025-04-12 13.09.16

- 添加用户

截屏2025-04-12 13.10.00

截屏2025-04-12 13.10.17

截屏2025-04-12 13.13.15

截屏2025-04-12 13.13.26

添加虚拟主机

截屏2025-04-12 13.15.03

截屏2025-04-12 13.16.15

设置test用户可以访问虚拟主机/test, 在admin菜单栏选择test用户点击进入即可

截屏2025-04-12 13.18.29

3.springboot集成RabbitMQ

截屏2025-04-12 13.20.49

P : 消息的发送者生产者

C : 消息的接受者消费者

中间表示队列

引入mq的依赖

1
2
3
4
5
<!--rabbitmq-需要的 AMQP 依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

springboot配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
#rabbitmq
spring.rabbitmq.host=192.168.2.85
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
#启动时是否默认启动容器,默认为 true
spring.rabbitmq.listener.simple.auto-startup=true
#消费者最小数量
spring.rabbitmq.listener.simple.concurrency=10
#消费者最大数量
spring.rabbitmq.listener.simple.max-concurrency=10
#限制消费者每次只能处理一条消息, 处理完毕才能继续下一条数据
spring.rabbitmq.listener.simple.prefetch=1
#消息被拒绝后, 重新进入消息队列
spring.rabbitmq.listener.simple.default-requeue-rejected=true
#启用重试机制,默认false, 如果消失处理失败, 是否重试
spring.rabbitmq.template.retry.enabled=true
#设置初始化的重试的时间间隔
spring.rabbitmq.template.retry.initial-interval=1000ms
##重试最大次数,默认为 3 次
spring.rabbitmq.template.retry.max-attempts=3
##重试最大时间间隔,默认 10s
spring.rabbitmq.template.retry.max-interval=10s
# 重试的间隔乘数,配置2的话, 一二三次重试时间间隔[1 2 4] * 重试时间间隔
spring.rabbitmq.template.retry.multiplier=1

参考 https://blog.csdn.net/begefefsef/article/details/123790849

rabbitmq配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
@Configuration
public class RabbitMQConfig {
    private static final String QUEUE_NAME = "queue";
    /**
     * 1. 配置队列
     * 2. 队列名为 queue
     * 3. durable: 队列是否持久化。 队列默认是存放到内存中的,rabbitmq 重启则丢失,
     * 若想重启之后还存在则队列要持久化 则设置为true
     * 保存到 Erlang 自带的 Mnesia 数据库中,当 rabbitmq 重启之后会读取该数据库
     */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME,true);
    }
}

发送者

1
2
3
4
5
6
7
8
9
10
11
12
13
@Component
@Slf4j
public class MQSender {
    // 操作rabbitmq的模板,类似redisTemplate
    @Autowired
    private RabbitTemplate rabbitTemplate;
    // 发消息
    public void send(Object msg) {
        log.info("发送消息: {}", msg);
        // 向 queue 队列发送消息
        rabbitTemplate.convertAndSend("queue",msg);
    }
}

接收者

1
2
3
4
5
6
7
8
9
@Component
@Slf4j
public class MQReceiver {
    // 接收消息
    @RabbitListener(queues = {"queue"})
    public void receive(Object msg) {
        log.info("接收到消息: {}", msg);
    }
}

测试

1
2
3
4
5
6
7
8
9
10
@Controller
public class RabbitMQHandler {
    @Autowired
    private MQSender mqSender;
    @RequestMapping("/mq")
    @ResponseBody
    public void send() {
        mqSender.send("hello, rabbit mq");
    }
}

日志

1
2
2025-04-12 15:00:48.672  INFO 10732 --- [p-nio-80-exec-4] org.xxx.seckill.rabbitmq.MQSender        : 发送消息: hello, rabbit mq
2025-04-12 15:00:48.692  INFO 10732 --- [ntContainer#0-9] org.xxx.seckill.rabbitmq.MQReceiver      : 接收到消息: (Body:'hello, rabbit mq' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=queue, deliveryTag=1, consumerTag=amq.ctag-UT1NL-DrQ-GsinuOItQoGQ, consumerQueue=queue])

当前案例走的是默认交换机 (AMQP Default)

4.rabbitMQ的工作模式

fanout

fanout就是广播模式, 就是把交换机(Exchange)里的消息发送给所有绑定该交换机的 队列,忽略 routingKey(也就是路由)

截屏2025-04-12 15.09.49

生产者把消息发送给指定的交换机

再把交换机的消息发送给所有绑定该交换机的队列,忽略 routingKey/路由

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
//-- fanout
private static final String QUEUE_FANOUT_01 = "queue_fanout_01";
private static final String QUEUE_FANOUT_02 = "queue_fanout_02";
private static final String FANOUT_EXCHANGE = "fanoutExchange";
@Bean
public Queue fanoutQueue01() {
    return new Queue(QUEUE_FANOUT_01);
}

@Bean
public Queue fanoutQueue02() {
    return new Queue(QUEUE_FANOUT_02);
}

// 创建fanout交换机
@Bean
public FanoutExchange fanoutExchange() {
    return new FanoutExchange(FANOUT_EXCHANGE);
}

// 将QUEUE_FANOUT_01绑定到fanout交换机
@Bean
public Binding binding01() {
    return  BindingBuilder.bind(fanoutQueue01()).to(fanoutExchange());
}

// 将QUEUE_FANOUT_02绑定到fanout交换机
@Bean
public Binding binding02() {
    return  BindingBuilder.bind(fanoutQueue02()).to(fanoutExchange());
}

发送者

1
2
3
4
5
// fanout:发送消息到交换机
public void sendFanout(Object msg) {
    log.info("fanout:发送消息: {}", msg);
    rabbitTemplate.convertAndSend("fanoutExchange","", msg);
}

接收者

1
2
3
4
5
6
7
8
9
10
11
// 接收fanout消息, fanout交换机绑了两个队列--01
@RabbitListener(queues = {"queue_fanout_01"})
public void receiveFanout01(Object msg) {
    log.info("[queue_fanout_01]接收到消息: {}", msg);
}

// 接收fanout消息, fanout交换机绑了两个队列--02
@RabbitListener(queues = {"queue_fanout_02"})
public void receiveFanout02(Object msg) {
    log.info("[queue_fanout_02]接收到消息: {}", msg);
}

测试代码: http://localhost/mqFanout

1
2
3
4
5
@RequestMapping("/mqFanout")
@ResponseBody
public void sendFanout() {
    mqSender.sendFanout("Fanout, I am rabbit mq");
}
1
2
3
2025-04-12 15:30:08.449  INFO 10961 --- [p-nio-80-exec-1] org.xxx.seckill.rabbitmq.MQSender        : fanout:发送消息: Fanout, I am rabbit mq
2025-04-12 15:30:08.481  INFO 10961 --- [ntContainer#0-1] org.xxx.seckill.rabbitmq.MQReceiver      : [queue_fanout_01]接收到消息: (Body:'Fanout, I am rabbit mq' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-R7DhgB01hO3L5LgIyCecXg, consumerQueue=queue_fanout_01])
2025-04-12 15:30:08.481  INFO 10961 --- [ntContainer#1-1] org.xxx.seckill.rabbitmq.MQReceiver      : [queue_fanout_02]接收到消息: (Body:'Fanout, I am rabbit mq' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=fanoutExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-1_JI44EPk3doCKoCu1U9xw, consumerQueue=queue_fanout_02])

direct

1direct 就是路由模式, 路由模式是在使用交换机的同时,生产者指定路由发送数据,消费者绑定路由接受数据。

2 、与广播模式不同的是,广播模式只要是绑定了交换机的队列都会收到生产者向交换 机推送过来的数据。而路由模式下加了一个路由设置,生产者向交换机发送数据时,会 声明发送给交换机下的哪个路由,并且只有当消费者的队列绑定了交换机并且声明了路 由,才会收到数据

截屏2025-04-12 15.35.19

- P :消息的生产者

- X :交换机

- 红色:队列

- C1 C2 :消息消费者

- error info warning :路由

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
//- direct
private static final String QUEUE_DIRECT_01 = "queue_direct_01";
private static final String QUEUE_DIRECT_02 = "queue_direct_02";
private static final String DIRECT_EXCHANGE = "directExchange";
// 路由
private static final String ROUTING_KEY_01 = "queue.red";
private static final String ROUTING_KEY_02 = "queue.blue";

@Bean
public Queue directQueue01() {
  return new Queue(QUEUE_DIRECT_01);
}

@Bean
public Queue directQueue02() {
  return new Queue(QUEUE_DIRECT_02);
}

@Bean
public DirectExchange directExchange() {
  return new DirectExchange(DIRECT_EXCHANGE);
}

// 把队列绑定到交换机上, 同时配置了路由
// queue01绑定路由red
@Bean
public Binding binding03() {
  return BindingBuilder.bind(directQueue01()).to(directExchange()).with(ROUTING_KEY_01);
}

// queue02绑定路由blue
@Bean
public Binding binding04() {
  return BindingBuilder.bind(directQueue02()).to(directExchange()).with(ROUTING_KEY_02);
}
1
2
3
4
5
6
7
8
9
10
11
// direct:发送消息到交换机-routingKey=queue.red
public void sendDirect01(Object msg) {
  log.info("direct:发送消息:queue.red: {}", msg);
  rabbitTemplate.convertAndSend("directExchange","queue.red", msg);
}

// direct:发送消息到交换机-routingKey=queue.blue
public void sendDirect02(Object msg) {
  log.info("direct:发送消息:queue.blue: {}", msg);
  rabbitTemplate.convertAndSend("directExchange","queue.blue", msg);
}
1
2
3
4
5
6
7
8
9
10
11
// 接收direct消息, direct交换机绑了两个队列--01
@RabbitListener(queues = {"queue_direct_01"})
public void receiveDirect01(Object msg) {
  log.info("[queue_direct_01]接收到消息: {}", msg);
}

// 接收direct消息, direct交换机绑了两个队列--02
@RabbitListener(queues = {"queue_direct_02"})
public void receiveDirect02(Object msg) {
  log.info("[queue_direct_02]接收到消息: {}", msg);
}

test

1
2
3
4
5
6
7
8
9
10
11
@RequestMapping("/mqDirect01")
@ResponseBody
public void sendDirect01() {
  mqSender.sendDirect01("Direct01, I am rabbit mq");
}

@RequestMapping("/mqDirect02")
@ResponseBody
public void sendDirect02() {
  mqSender.sendDirect02("Direct02, I am rabbit mq");
}
1
2
3
4
2025-04-12 15:57:46.901  INFO 11137 --- [p-nio-80-exec-1] org.xxx.seckill.rabbitmq.MQSender        : direct:发送消息:queue.red: Direct01, I am rabbit mq
2025-04-12 15:57:46.927  INFO 11137 --- [ntContainer#3-1] org.xxx.seckill.rabbitmq.MQReceiver      : [queue_direct_01]接收到消息: (Body:'Direct01, I am rabbit mq' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=directExchange, receivedRoutingKey=queue.red, deliveryTag=1, consumerTag=amq.ctag-3qVpS34COfrt0__WvgAj4Q, consumerQueue=queue_direct_01])
2025-04-12 15:58:39.316  INFO 11137 --- [p-nio-80-exec-2] org.xxx.seckill.rabbitmq.MQSender        : direct:发送消息:queue.blue: Direct02, I am rabbit mq
2025-04-12 15:58:39.336  INFO 11137 --- [ntContainer#4-1] org.xxx.seckill.rabbitmq.MQReceiver      : [queue_direct_02]接收到消息: (Body:'Direct02, I am rabbit mq' MessageProperties [headers={}, contentType=text/plain, contentEncoding=UTF-8, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=directExchange, receivedRoutingKey=queue.blue, deliveryTag=1, consumerTag=amq.ctag-bFU_cmiuCdq_F1g_o0BKFQ, consumerQueue=queue_direct_02])

topic

1direct 模式会造成路由 RoutingKey 太多, 而实际开发中往往是按照某个规则来进行路

由匹配的, RabbitMQ 提供了 Topic 模式/主题模式来适应这种需求. 2 Topic 模式是 direct 模式上的一种扩展 / 叠加 , 扩展 / 叠加了模糊路由 RoutingKey 的模

, **可以理解为是模糊的路由匹配模式 ** **1) ** *(星号):可以(只能)匹配一个单词

2) # (井号):可以匹配多个单词(或者零个)

截屏2025-04-12 16.13.34

quick.orange.rabbit 可以匹配到 Q1 Q2

lazy.orange.elephant 可以匹配到 Q1 Q2

lazy.brown.fox 可以匹配到Q2

quick.orange.fox 可以匹配到 Q1

quick.brown.fox 没有匹配-消息将会lost

quick.orange.new.rabbit 没有匹配-消息将会lost

lazy.orange.new.rabbit 可以匹配到 Q2, 消息发送到Q2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
//- topic
private static final String QUEUE_TOPIC_01 = "queue_topic_01";
private static final String QUEUE_TOPIC_02 = "queue_topic_02";
private static final String TOPIC_EXCHANGE = "topicExchange";
// 路由
private static final String ROUTING_KEY_03 = "#.queue.#";
private static final String ROUTING_KEY_04 = "*.queue.*";

// 创建队列
@Bean
public Queue queue_topic01() {
  return new Queue(QUEUE_TOPIC_01);
}
@Bean
public Queue queue_topic02() {
  return new Queue(QUEUE_TOPIC_02);
}

@Bean
public TopicExchange topicExchange() {
  return new TopicExchange(TOPIC_EXCHANGE);
}

// 绑定交换机和路由
@Bean
public Binding binding_topic01() {
  return BindingBuilder.bind(queue_topic01()).to(topicExchange()).with(ROUTING_KEY_03);
}

@Bean
public Binding binding_topic02() {
  return BindingBuilder.bind(queue_topic02()).to(topicExchange()).with(ROUTING_KEY_04);
}
1
2
3
4
5
6
7
8
9
10
11
// topic:发送消息到topic交换机
public void sendTopic01(Object msg) {
  log.info("direct:发送消息:#.queue.#: {}", msg);
  rabbitTemplate.convertAndSend("topicExchange", "#.queue.#", msg);
}

// topic:发送消息到topic交换机
public void sendTopic02(Object msg) {
  log.info("direct:发送消息:*.queue.*: {}", msg);
  rabbitTemplate.convertAndSend("topicExchange", "*.queue.*", msg);
}
1
2
3
4
5
6
7
8
9
10
11
// 接收topic消息, topic交换机绑了两个队列--01
@RabbitListener(queues = {"queue_topic_01"})
public void receiveTopic01(Object msg) {
    log.info("[queue_topic_01]接收到消息: {}", msg);
}

// 接收topic消息, topic交换机绑了两个队列--02
@RabbitListener(queues = {"queue_topic_02"})
public void receiveTopic02(Object msg) {
    log.info("[queue_topic_02]接收到消息: {}", msg);
}
1
2
3
4
5
6
7
8
9
10
11
@RequestMapping("/mqTopic01")
@ResponseBody
public void sendTopic01() {
    mqSender.sendTopic01("Topic01, I am rabbit mq");
}

@RequestMapping("/mqTopic02")
@ResponseBody
public void sendTopic02() {
    mqSender.sendTopic02("Topic02, I am rabbit mq");
}

headers

1headers 交换机是一种比较复杂且少见的交换机,不同于 directtopic,它不关心 路由 key 是否匹配,而只关心 header 中的 key-value 对是否匹配(这里的匹配为精确匹配, 包含键和值都必须匹配), 有点类似于 http 中的请求头。

2headers 头路由模型中,消息是根据 prop 即请求头中 key-value 来匹配的。

3、绑定的队列(也可以理解成消费方) 指定的 headers 中必须包含一个“x-match”的键

4、键“x-match”的值有 2 个:allanyall:表示绑定的队列/消费方 指定的所有 key-value 都必须在消息 header 中出现并匹配

any:表示绑定的队列/消费方 指定的 key-value 至少有一个在消息 header 中出现并匹 配即可

headersExchange特点: 可以携带多个header[k-v], 队列绑定交换机时要指定any/all匹配

案例:

需求:

1) headers 交换机发送消息 hello everyone, QUEUE01 QUEUE02 两个队列都接收

2) headers 交换机发送消息 hello queue01, QUEUE01 队列接收

配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
//- headers
private static final String QUEUE_HEADERS_01 = "queue_header_01";
private static final String QUEUE_HEADERS_02 = "queue_header_02";
private static final String HEADERS_EXCHANGE = "headersExchange";


@Bean
public Queue queue_header01() {
  return new Queue(QUEUE_HEADERS_01);
}

@Bean
public Queue queue_header02() {
  return new Queue(QUEUE_HEADERS_02);
}

@Bean
public HeadersExchange headersExchange() {
  return new HeadersExchange(HEADERS_EXCHANGE);
}

// 完成队列和交换机的绑定, 同时声明要匹配的k-v, 和以什么方式绑定(all/any)
@Bean
public Binding binding_header01() {
  // 先定义k-v, 因为有多个可以放入map
  Map<String, Object> map = new HashMap<>();
  map.put("color", "red");
  map.put("gender","f");
  // 只要有任何一个(any)k-v满足就把消息发送到这个队列中
  return BindingBuilder.bind(queue_header01())
    .to(headersExchange())
    .whereAny(map)
    .match();
}

@Bean
public Binding binding_header02() {
  // 先定义k-v, 因为有多个可以放入map
  Map<String, Object> map = new HashMap<>();
  map.put("color", "red");
  map.put("gender","m");
  // 全部满足map中的k-v(all),才把消息发送到这个队列中
  return BindingBuilder.bind(queue_header02())
    .to(headersExchange())
    .whereAll(map)
    .match();
}

sender

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// headers:发送消息到headers交换机
public void sendHeaders01(String msg) {
  log.info("headers:发送消息:color=red,gender=m: {}", msg);
  // 创建消息属性
  MessageProperties messageProperties = new MessageProperties();
  messageProperties.setHeader("color", "red");
  messageProperties.setHeader("gender", "m");
  Message message = new Message(msg.getBytes(),messageProperties);
  // headers交换机不用设置routingKey, 他靠header来过滤消息
  rabbitTemplate.convertAndSend("headersExchange", "", message);
}

// headers:发送消息到headers交换机
public void sendHeaders02(String msg) {
  log.info("headers:发送消息:color=red,gender=f: {}", msg);
  // 创建消息属性
  MessageProperties messageProperties = new MessageProperties();
  messageProperties.setHeader("color", "red");
  messageProperties.setHeader("gender", "f");
  Message message = new Message(msg.getBytes(),messageProperties);
  // headers交换机不用设置routingKey
  rabbitTemplate.convertAndSend("headersExchange", "", message);
}

接收者

1
2
3
4
5
6
7
8
9
10
11
// 接收headers消息, headers交换机绑了两个队列--01
@RabbitListener(queues = {"queue_header_01"})
public void receiveHeaders01(Object msg) {
  log.info("[queue_header_01]接收到消息: {}", msg);
}

// 接收headers消息, headers交换机绑了两个队列--02
@RabbitListener(queues = {"queue_header_02"})
public void receiveHeaders02(Object msg) {
  log.info("[queue_header_02]接收到消息: {}", msg);
}

测试

1
2
3
4
5
6
7
8
9
10
11
@RequestMapping("/mqHeaders01")
@ResponseBody
public void sendHeaders01() {
    mqSender.sendHeaders01("Headers01, I am rabbit mq");
}

@RequestMapping("/mqHeaders02")
@ResponseBody
public void sendHeaders02() {
    mqSender.sendHeaders02("Headers02, I am rabbit mq");
}
1
2
3
4
5
2025-04-12 19:33:56.437  INFO 12189 --- [p-nio-80-exec-1] org.xxx.seckill.rabbitmq.MQSender        : headers:发送消息:color=red,gender=m: Headers01, I am rabbit mq
2025-04-12 19:33:56.472  INFO 12189 --- [ntContainer#7-1] org.xxx.seckill.rabbitmq.MQReceiver      : [queue_header_01]接收到消息: (Body:'[B@6cb9549c(byte[25])' MessageProperties [headers={color=red, gender=m}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=headersExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-oo_zQ5p9B6NWSpmfae94_Q, consumerQueue=queue_header_01])
2025-04-12 19:33:56.472  INFO 12189 --- [ntContainer#8-1] org.xxx.seckill.rabbitmq.MQReceiver      : [queue_header_02]接收到消息: (Body:'[B@711ff813(byte[25])' MessageProperties [headers={color=red, gender=m}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=headersExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-jdmLQ6IZ4K-zhHxFIIKWhQ, consumerQueue=queue_header_02])
2025-04-12 19:34:08.787  INFO 12189 --- [p-nio-80-exec-2] org.xxx.seckill.rabbitmq.MQSender        : headers:发送消息:color=red,gender=f: Headers02, I am rabbit mq
2025-04-12 19:34:08.806  INFO 12189 --- [tContainer#7-10] org.xxx.seckill.rabbitmq.MQReceiver      : [queue_header_01]接收到消息: (Body:'[B@677ac83c(byte[25])' MessageProperties [headers={color=red, gender=f}, contentType=application/octet-stream, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=headersExchange, receivedRoutingKey=, deliveryTag=1, consumerTag=amq.ctag-T04lxIVVzp7eYNEdlsXS7w, consumerQueue=queue_header_01])

可见: any匹配只要满足任意一个header就可以收到消息

all匹配需要满足所有的header(key=value)才能接收到消息

截屏2025-04-12 19.42.15