Springboot中使用RabbitMq
代码地址: https://gitee.com/Aes_yt/middleware-demo/tree/master/rabbitmq
安装RabbitMq
1. docker拉取镜像
docker pull rabbitmq:3.9.29-management
2. 创建rabbitmq容器
docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.9.29-management
3. 访问地址
http://{ip地址}:15672/,可以看到RabbitMq的管理后台界面。账号密码默认 guest
消息生产和消费
rabbitmq-producer
-
新建module,引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
yaml配置地址
spring: rabbitmq: host: 192.168.67.131 port: 5672 username: guest password: guest virtual-host: /
-
配置交换机
@Configuration public class RabbitMqConfig { /*定义交换机名称*/ public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange"; @Bean public Exchange userInfoExchange() { return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build(); } }
-
发送消息
@Test public void sendMessage() { String registerMsg = "user register..." + new Date(); // 1. 发送一条注册消息 rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME, "user.register.user1", registerMsg, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return msg; }); log.info("消息发送完成:{}", registerMsg); String loginMsg = "user login..." + new Date(); // 2. 发送一条登录消息 rabbitTemplate.convertAndSend(RabbitMqConfig.USER_INFO_EXCHANGE_NAME, "user.login.user1", loginMsg, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return msg; }); log.info("消息发送完成:{}", loginMsg); }
rabbitmq-consumer
-
新建module,引入依赖
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
-
yaml配置地址
spring: rabbitmq: host: 192.168.67.131 port: 5672 username: guest password: guest virtual-host: /
-
配置队列绑定关系
@Configuration public class RabbitMqConfig { @Value("${spring.rabbitmq.host}") private String host; @Value("${spring.rabbitmq.port}") private Integer port; @Value("${spring.rabbitmq.username}") private String username; @Value("${spring.rabbitmq.password}") private String password; /*定义交换机名称*/ public static final String USER_INFO_EXCHANGE_NAME = "user_info_exchange"; /*定义队列名称*/ public static final String USER_REGISTER_QUEUE_NAME = "user_register_queue"; public static final String USER_LOGIN_QUEUE_NAME = "user_login_queue"; public static final String USER_REGISTER_ROUTING_KEY = "user.register.#"; public static final String USER_LOGIN_ROUTING_KEY = "user.login.#"; @Bean public Exchange userInfoExchange() { return ExchangeBuilder.topicExchange(USER_INFO_EXCHANGE_NAME).durable(true).build(); } @Bean public Queue userRegisterQueue() { return QueueBuilder.durable(USER_REGISTER_QUEUE_NAME).build(); } @Bean public Queue userLoginQueue() { return QueueBuilder.durable(USER_LOGIN_QUEUE_NAME).build(); } @Bean public Binding userRegisterBinding() { return BindingBuilder.bind(userRegisterQueue()).to(userInfoExchange()).with(USER_REGISTER_ROUTING_KEY).noargs(); } @Bean public Binding userLoginBinding() { return BindingBuilder.bind(userLoginQueue()).to(userInfoExchange()).with(USER_LOGIN_ROUTING_KEY).noargs(); } }
-
监听器
@Component @Slf4j public class UserInfoListener { @RabbitListener(queues = RabbitMqConfig.USER_REGISTER_QUEUE_NAME) public void userRegister(String msg){ log.info(msg); } @RabbitListener(queues = RabbitMqConfig.USER_LOGIN_QUEUE_NAME) public void userLogin(String msg){ log.info(msg); } }
-
结果
启动producer项目和consumer项目,producer发送消息,consumer接收到消息:
producer:
2023-07-08 10:26:10.660 INFO 7432 --- [main] com.yt.rabbit.RabbitProducerTest: 消息发送完成:user register...Sat Jul 08 10:26:10 CST 2023 2023-07-08 10:26:10.663 INFO 7432 --- [main] com.yt.rabbit.RabbitProducerTest: 消息发送完成:user login...Sat Jul 08 10:26:10 CST 2023
consumer:
2023-07-08 10:26:10.661 INFO 25108 --- [ntContainer#1-1] c.yt.rabbitmq.listener.UserInfoListener : user register...Sat Jul 08 10:26:10 CST 2023 2023-07-08 10:26:10.665 INFO 25108 --- [ntContainer#0-1] c.yt.rabbitmq.listener.UserInfoListener : user login...Sat Jul 08 10:26:10 CST 2023
交换器类型
交换器类型有四种,fanout,topic,direct,headers。
接下来在代码中创建三种交换器类型,对应的routingKey和queue绑定如表格所示。发送对应消息,看看是否能接收到 [Y/N]。headers类型不演示。
Exchange | ExchangeType | RoutingKey | MessageKey | Queue | Receive |
---|---|---|---|---|---|
fanout_exchange | fanout | fanout.test.key1 fanout.# |
xxx.yyy.zzz | fanout_test_queue1 fanout_test_queue2 |
Y Y |
topic_exchange | topic | topic.test.# topic.# topic.* |
topic.test.key1 |
topic_test_queue1 topic_test_queue2 topic_test_queue3 |
Y Y N |
direct_exchange | direct | direct.test.key1 direct.test.# direct.test.key3 |
direct.test.key1 direct.test.key2 direct.test.key3 |
direct_test_queue1 direct_test_queue2 direct_test_queue3 && direct_test_queue4 |
Y N Y && Y |
direct 的Routingkey是全匹配,通配符不起作用,所以direct_test_queue2没有接收到消息。
topic 的通配符,*正好匹配一个词,#可以匹配一个或多个词,所以topic_test_queue3没有接收到消息。