RabbitMQ交换机

8/5/2022 RabbitMQ

# Exchanges

RabbitMQ 消息传递模型的核心思想是: 生产者生产的消息从不会直接发送到队列。实际上,通常生产者甚至都不知道这些消息传递传递到了哪些队列中。

相反,生产者只能将消息发送到交换机(exchange),交换机工作的内容非常简单,一方面它接收来自生产者的消息,另一方面将它们推入队列。交换机必须确切知道如何处理收到的消息。是应该把这些消息放到特定队列还是说把他们到许多队列中还是说应该丢弃它们。这就的由交换机的类型来决定。

# Exchanges的类型

# 直接(direct)

处理路由键,需要将一个队列绑定到交换机上,要求该消息与一个特定的路由键完全匹配。这是一个完整的匹配。如果一个队列绑定到该交换机上要求路由键 abc ,则只有被标记为 abc 的消息才被转发,不会转发 abc.def,也不会转发 dog.ghi,只会转发 abc

# 主题(topic)

将路由键和某模式进行匹配。此时队列需要绑定要一个模式上

符号“#”匹配一个或多个词,符号 * 匹配不多不少一个词。因此 abc.# 能够匹配到 abc.xsa.sdw,但是 abc.* 只会匹配到 abc.xsa

# 标题(headers)

不处理路由键,而是根据发送的消息内容中的headers属性进行匹配。

在绑定 Queue 与 Exchange 时指定一组键值对;当消息发送到RabbitMQ 时会取到该消息的 headers 与 Exchange 绑定时指定的键值对进行匹配;如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 属性是一个键值对,可以是 Hashtable,键值对的值可以是任何类型。而 fanout,direct,topic 的路由键都需要要字符串形式的。

匹配规则 x-match 有下列两种类型:

x-match = all :表示所有的键值对都匹配才能接受到消息

x-match = any :表示只要有键值对匹配就能接受到消息

headers交换机与direct交换机完全一致,且性能很差,目前几乎用不到

# 扇出(fanout)

不处理路由键,你只需要简单的将队列绑定到交换机上。一个发送到交换机的消息都会被转发到与该交换机绑定的所有队列上。很像子网广播,每台子网内的主机都获得了一份复制的消息。Fanout 交换机转发消息是最快的。

# direct

登录我们的RabbitMQ控制台,点击 Exchanges菜单,我们可以看到,新增exchange时默认的就是 direct

rabbitmq

# Direct示例

# 新建队列

我们在RabbitMQ控制台中新建一个名为ajia的队列,并在Exchanges菜单与exchange.direct交换机进行绑定

rabbitmq

rabbitmq

# 新建SpringBoot项目

1、新建SpringBoot项目,并引入rabbitMQ依赖

<!--RabbitMQ-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
1
2
3
4
5

2、配置RabbitMQ

spring:
  application:
    name: rabbitmq-basis
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
    virtual-host: /ems
1
2
3
4
5
6
7
8
9

3、配置消息序列化

/**
 * @version 1.0.0
 * @className: MyAMQPConfig
 * @description: 将amqp消息转成json形式发送
 * @author: LiJunYi
 * @create: 2022/8/4 10:29
 */
@Configuration
public class MyAMQPConfig
{
    @Bean
    public MessageConverter messageConverter(){
        return new Jackson2JsonMessageConverter();
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15

4、测试 direct

@SpringBootTest
class BasisApplicationTests {

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Autowired
    AmqpAdmin amqpAdmin;

    @Test
    void directExchange()
    {
        // 只需要传入要发送的对象,自动序列化发送给rabbitMQ
        // 对象被默认序列化发送
        rabbitTemplate.convertAndSend("exchange.direct","ajia",new User("A佳技术",28));
    }

    //接收消息
    @Test
    public void getMsg(){
        Object o = rabbitTemplate.receiveAndConvert("ajia");
        System.out.println(o.getClass());
        System.out.println(o);
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25

# 发送消息

我们运行 directExchange进行发送消息,运行完成后观察RabbitMQ控制台变化

rabbitmq

rabbitmq

我们可以看见队列中已经有一条消息,说明我们发送成功。

# 接收消息

  • 接收方式1:运行getMsg方法

    rabbitmq

  • 接收方式2:配置监听

/**
 * @version 1.0.0
 * @className: UserService
 * @description: 监听MQ消息
 * 接收消息1 与 接收消息2 只能选择其中一个
 * @author: LiJunYi
 * @create: 2022/8/4 11:15
 */
@Service
@Slf4j
public class UserService
{
    /**
     * 接收消息1
     *
     * @param user 用户
     */
    @RabbitListener(queues = "ajia")
    public void receive(User user){
        log.info("接收到【ajia】队列消息{}",user);
    }

    /**
     * 接收消息2
     *
     * @param message 消息
     */
    @RabbitListener(queues = "ajia")
    public void receive02(Message message){
        log.info("接收到【ajia】队列消息方式2,message.getBody:{}",message.getBody());
        log.info("接收到【ajia】队列消息方式2,message.getMessageProperties:{}",message.getMessageProperties());
    }
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 其他交换器示例

# 其他交换机的配置如下:

rabbitmq

rabbitmq

rabbitmq