RabbitMQ
约 9746 字大约 32 分钟
2025-01-25
1.介绍
RabbitMQ,当你们第一次看到这个单词的时候,会不会跟我兔子队列?
RabbitMQ是实现了高级消息队列协议 AMQP 的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。
RabbitMQ 是一个在 AMQP(Advanced Message Queuing Protocol ,高级消息队列)基础上实现的,可复用的企业消息系统。它可以用于大型软件系统各个模块之间的高效通信,支持高并发,支持可扩展。它支持多种客户端如:Python、Ruby、.NET、Java、JMS、C、PHP、ActionScript、XMPP、STOMP 等,支持 AJAX,持久化,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。
RabbitMQ 是使用 Erlang 编写的一个开源的消息队列,本身支持很多的协议:AMQP,XMPP, SMTP, STOMP,也正是如此,使的它变的非常重量级,更适合于企业级的开发。它同时实现了一个 Broker 构架,这意味着消息在发送给客户端时先在中心队列排队,对路由(Routing)、负载均衡(Load balance)或者数据持久化都有很好的支持。
可靠性:RabbitMQ 使用一些机制来保证可靠性, 如持久化、传输确认及发布确认等。
灵活的路由:在消息进入队列之前,通过交换器来路由消息。对于典型的路由功能, RabbitMQ 己经提供了一些内置的交换器来实现。针对更复杂的路由功能,可以将多个交换器绑定在一起, 也可以通过插件机制来实现自己的交换器
多种协议:RabbitMQ 除了原生支持 AMQP 协议,还支持 STOMP, MQTT 等多种消息 中间件协议
管理界面 : RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息、集 群中的节点等
2.架构设计
RabbitMQ 整体上是一个生产者与消费者模型,主要负责接收、存储和转发消息。可以把消息传递的过程想象成:当你将一个包裹送到邮局,邮局会暂存并最终将邮件通过邮递员送到收件人的手上,RabbitMQ 就好比由邮局、邮箱和邮递员组成的一个系统。从计算机术语层面来说,RabbitMQ 模型更像是一种交换机模型
RabbitMQ 的整体模型架构如下:
3.成员介绍
3.1 Producer和Consumer
- Producer(生产者) :生产消息的一方(邮件投递者)
- Consumer(消费者) :消费消息的一方(邮件收件人)
- 消息一般由 2 部分组成:消息头(或者说是标签 Label)和 消息体。
- 消息体 也可以称为 payLoad ,消息体是不透明的
- 消息头 则由一系列的可选属性组成,这些属性包括 routing-key(路由键)、priority(相对于其他消息的优先权)、delivery-mode(指出该消息可能需要持久性存储) 等。生产者把消息交由 RabbitMQ 后,RabbitMQ 会根据消息头把消息发送给感兴趣的 Consumer(消费者)。
3.2 Exchange
在 RabbitMQ 中,消息并不是直接被投递到 Queue(消息队列) 中的,中间还必须经过 Exchange(交换器) 这一层,Exchange(交换器) 会把我们的消息分配到对应的 Queue(消息队列) 中。
Exchange(交换器) 用来接收生产者发送的消息并将这些消息路由给服务器中的队列中,如果路由不到,或许会返回给 Producer(生产者) ,或许会被直接丢弃掉 。这里可以将 RabbitMQ 中的交换器看作一个简单的实体。
RabbitMQ 的 Exchange(交换器) 有 4 种类型,不同的类型对应着不同的路由策略:direct(默认),fanout, topic, 和 headers,不同类型的 Exchange 转发消息的策略有所区别。这个会在介绍 Exchange Types(交换器类型) 的时候介绍到。
Exchange(交换器) 示意图如下:
- RabbitMQ 中通过 Binding(绑定) 将 Exchange(交换器) 与 Queue(消息队列) 关联起来,在绑定的时候一般会指定一个 BindingKey(绑定建) ,这样 RabbitMQ 就知道如何正确将消息路由到队列了,如下图所示。一个绑定就是基于路由键将交换器和消息队列连接起来的路由规则,所以可以将交换器理解成一个由绑定构成的路由表。Exchange 和 Queue 的绑定可以是多对多的关系。
- 生产者将消息发给交换器的时候,一般会指定一个 ==RoutingKey(路由键),==用来指定这个消息的路由规则,而这个 RoutingKey 需要与交换器类型和绑定键(BindingKey)联合使用才能最终生效。
3.3 Queue
- Queue(消息队列) 用来保存消息直到发送给消费者。它是消息的容器,也是消息的终点。一个消息可投入一个或多个队列。消息一直在队列里面,等待消费者连接到这个队列将其取走。
- RabbitMQ 中消息只能存储在 队列 中,这一点和 Kafka 这种消息中间件相反。Kafka 将消息存储在 topic(主题) 这个逻辑层面,而相对应的队列逻辑只是 topic 实际存储文件中的位移标识。 RabbitMQ 的生产者生产消息并最终投递到队列中,消费者可以从队列中获取消息并消费。
- 多个消费者可以订阅同一个队列,这时队列中的消息会被平均分摊(Round-Robin,即轮询)给多个消费者进行处理,而不是每个消费者都收到所有的消息并处理,这样避免消息被重复消费。
3.4 Broker
对于 RabbitMQ 来说,一个 RabbitMQ Broker 可以简单地看作一个 RabbitMQ 服务节点,或者 RabbitMQ 服务实例。大多数情况下也可以将一个 RabbitMQ Broker 看作一台 RabbitMQ 服务器。
下图展示了生产者将消息存入 RabbitMQ Broker,以及消费者从 Broker 中消费数据的整个流程。
3.5 Exchange Types
RabbitMQ 常用的 Exchange Type 有 fanout、direct、topic、headers 这四种(AMQP 规范里还提到两种 Exchange Type,分别为 system 与 自定义,这里不予以描述)。
3.5.1 fanout
fanout 类型的 Exchange 路由规则非常简单,它会把所有发送到该 Exchange 的消息路由到所有与它绑定的 Queue 中,不需要做任何判断操作,所以 fanout 类型是所有的交换机类型里面速度最快的。fanout 类型常用来广播消息。
3.5.2 direct
direct 类型的 Exchange 路由规则也很简单,它会把消息路由到那些 Bindingkey 与 RoutingKey 完全匹配的 Queue 中。
以上图为例,如果发送消息的时候设置路由键为“warning”,那么消息会路由到 Queue1 和 Queue2。如果在发送消息的时候设置路由键为"Info”或者"debug”,消息只会路由到 Queue2。如果以其他的路由键发送消息,则消息不会路由到这两个队列中。
3.5.3 topic
topic 类型的交换器在direct的匹配规则基础上进行了扩展,它与 direct 类型的交换器相似,也是将消息路由到 BindingKey 和 RoutingKey 相匹配的队列中,但这里的匹配规则有些不同,它约定:
- RoutingKey 为一个点号“.”分隔的字符串(被点号“.”分隔开的每一段独立的字符串称为一个单词),如 “com.rabbitmq.client”、“java.util.concurrent”、“com.hidden.client”;
- BindingKey 和 RoutingKey 一样也是点号“.”分隔的字符串;
- BindingKey 中可以存在两种特殊字符串“”和“#”,用于做模糊匹配,其中“”用于匹配一个单词,“#”用于匹配多个单词(可以是零个)。
- 路由键为 “com.rabbitmq.client” 的消息会同时路由到 Queue1 和 Queue2;
- 路由键为 “com.hidden.client” 的消息只会路由到 Queue2 中;
- 路由键为 “com.hidden.demo” 的消息只会路由到 Queue2 中;
- 路由键为 “java.rabbitmq.demo” 的消息只会路由到 Queue1 中;
- 路由键为 “java.util.concurrent” 的消息将会被丢弃或者返回给生产者(需要设置 mandatory 参数),因为它没有匹配任何路由键。
3.5.4 headers
headers 类型的交换器不依赖于路由键的匹配规则来路由消息,而是根据发送的消息内容中的 headers 属性进行匹配。在绑定队列和交换器时指定一组键值对,当发送消息到交换器时,RabbitMQ 会获取到该消息的 headers(也是一个键值对的形式),对比其中的键值对是否完全匹配队列和交换器绑定时指定的键值对,如果完全匹配则消息会路由到该队列,否则不会路由到该队列。headers 类型的交换器性能会很差,而且也不实用,基本上不会看到它的存在。
4.Springboot整合RabbitMQ
引入amqp依赖场景;RabbitAutoCon>figuration就会自动生
给容器中自动配置了RabbitTemplate、AmqpAdmin、CachingConnectionFactory、RabbitMessagingTemplate
@EnableRabbit:在创建交换机,队列时可以不需要,发送消息可以不需要这个注解,监听消息必须使用这个注解
4.1 导入依赖
<!--rabbitmq-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
4.2 xml配置
spring:
rabbitmq:
host: 192.168.234.133
port: 5672
virtual-host: /
4.3 开启rabbitmq相关功能
@EnableRabbit
4.4 整合测试
4.4.1 认识RabbitmqAutoConfiguration
里面自动注入的组件
RabbitTeamplate:用于发送消息 AmqpAdmin:需要一个连接对象 CachingConnectionFactory:连接创建工厂 RabbitMessagingTemplate:操作消息的类 首先就是创建AmqpAdmin,并且这个时候需要配置一下连接工厂的配置信息,主要就是host、port和virtual-host虚拟主机
4.4.2 创建exchange、queue、binding
参数
- exchange:name、durable(持久化)、autodelete(自动删除)
- queue:name、durable、excusive(排他,其它连接是否能够传输信息进来)、autodelete
@Test
void exchange() {
//注意,创建交换机就指定了交换机的类型为direct
DirectExchange directExchange = new DirectExchange("hello.java.exchange",true,false);
amqpAdmin.declareExchange(directExchange);
}
@Test
void queue() {
Queue queue = new Queue("hello.java.queue",true,false,true);
amqpAdmin.declareQueue(queue);
}
@Test
void binding() {
Binding binding = new Binding("hello.java.queue", Binding.DestinationType.QUEUE,"hello.java.exchange","hello",new HashMap<>());
amqpAdmin.declareBinding(binding);
}
4.4.3 发送消息
思路
①第一个就是要知道在传送消息的时候是需要依靠模板rabbitTemplate,它里面有一个messageConverter,这个转换器主要就是能够把消息变成对应的形式。通常用的是simple,这个转换器就是序列化转换成byte数组来传输,但是我们可以增加自己配置一个Json的Config转换器。发送消息用的是json状态
②接着就是通过模板来发送
@Configuration
public class MyRabbitConfig {
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
}
发送消息用例:
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void sendMessage(){
MemberResVo memberResVo = new MemberResVo();
memberResVo.setCity("sss");
memberResVo.setId(123L);
/**
* 参数如下:交换机名称、消息的路由键、消息内容、(消息的唯一id:new CorrelationData(UUID.randomUUID().toString))
*/
rabbitTemplate.convertAndSend("hello.java.exchange","hello",memberResVo);
}
4.4.4 接收消息
思路
①开启注解EnableRabbit才能够使用@RabbitListener来监听,并且指定监听队列可以是多个。
②然后就发送消息进行测试
③RabbitHandler主要只能对方法起作用,@RabbitListener类和方法都可以,但是handler有利于方法的重载:用于接收不同类型的消息
5.RabbitMQ实现消息的可靠抵达
5.1 引入背景
为保证消息不丢失,可靠抵达,可以使用事务消息,但是性能下降250倍,为此引入确认机制,来实现消息的可靠抵达
5.2 确认机制分类
RabbitMQ 消息确认机制分为两大类:发送方确认、接收方确认。
其中发送方确认又分为:生产者到交换器到确认、交换器到队列的确认。如下图
- confirmCallback 确认模式:确认消息是否到达消息代理
- returnCallback退回模式:若消息没有传递给指定队列,就触发这个失败回调
- ack机制:消费者确认模式
- CK机制是消费者从RabbitMQ收到消息并处理完成后,反馈给RabbitMQ,RabbitMQ收到反馈后才将此消息从队列中删除。
- 如果一个消费者在处理消息出现了网络不稳定、服务器异常等现象,那么就不会有ACK反馈,RabbitMQ会认为这个消息没有正常消费,会将消息重新放入队列中
5.2.1 ConfirmCallback
(确认模式:消息生产者确认)
(1)开启确认配置
# 老版本
spring:
rabbitmq:
publisher-confirms: true
# 新版本
spring:
rabbitmq:
publisher-confirms-type: correlated
(2)实现ConfirmCallback回调接口
- ConfirmCallback 是一个回调接口,消息发送到 Broker 后触发回调,确认消息是否到达 Broker 服务器,也就是只确认是否正确到达 Exchange 中。
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
//消息只要被 broker 接收到就会执行 confirmCallback,如果是 cluster 模式,需要所有broker 接收到才会调用 confirmCallback。
//确认消息送到交换机(Exchange)回调
@PostConstruct //创建MyBabbiConfig对象后,执行该方法
public void initRabbitTemplate(){
//确认消息送到队列(Queue)回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
{
@Override
public void returnedMessage(ReturnedMessage returnedMessage)
{
System.out.println("\n确认消息送到队列(Queue)结果:");
System.out.println("发生消息:" + returnedMessage.getMessage());
System.out.println("回应码:" + returnedMessage.getReplyCode());
System.out.println("回应信息:" + returnedMessage.getReplyText());
System.out.println("交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
}
});
}
}
被 broker 接收到只能表示 message 已经到达交换机,并不能保证消息一定会被投递到目标 queue 里。所以需要用到接下来的 returnCallback
5.2.2 ReturnCallback
(回退模式:交换机确认)
- 通过实现 ReturnCallback 接口,启动消息失败返回,此接口是在交换器路由不到队列时触发回调
- 该方法可以不使用,因为交换器和队列是在代码里绑定的,如果消息成功投递到 Broker 后几乎不存在绑定队列失败,除非你代码写错了。
(1)开启回退配置
spring:
rabbit:
#开启发送端消息抵达Queue确认
publisher-returns: true
#只要消息不能抵达queue时,该消息不会被丢弃,而是会被返回给生产者:可以记录下详细到投递数据,定期的巡检或者自动纠错都需要这些数据
template:
mandatory: true
(2)实现ReturnCallback回调接口
@Configuration
public class MyRabbitConfig {
@Autowired
RabbitTemplate rabbitTemplate;
@Bean
public MessageConverter messageConverter(){
return new Jackson2JsonMessageConverter();
}
@PostConstruct //创建MyBabbiConfig对象后,执行该方法
public void initRabbitTemplate(){
//确认消息送到队列(Queue)失败回调:注意是失败
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
{
@Override
public void returnedMessage(ReturnedMessage returnedMessage)
{
System.out.println("\n确认消息送到队列(Queue)结果:");
System.out.println("发生消息:" + returnedMessage.getMessage());
System.out.println("回应码:" + returnedMessage.getReplyCode());
System.out.println("回应信息:" + returnedMessage.getReplyText());
System.out.println("交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
}
});
//确认消息送到队列(Queue)回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
{
@Override
public void returnedMessage(ReturnedMessage returnedMessage)
{
System.out.println("\n确认消息送到队列(Queue)结果:");
System.out.println("发生消息:" + returnedMessage.getMessage());
System.out.println("回应码:" + returnedMessage.getReplyCode());
System.out.println("回应信息:" + returnedMessage.getReplyText());
System.out.println("交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
}
});
}
}
5.2.3 ACK机制
确认模式:消费者确认
- 消费者确认发生在监听队列的消费者处理业务失败,如:发生了异常,不符合要求的数据等,这些场景我们就需要手动处理消息,比如重新发送或者丢弃。
- RabbitMQ 消息确认机制(ACK)默认是自动确认的,自动确认会在消息发送给消费者后立即确认,但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了,也就是消费端没有处理成功这条消息,那么就相当于丢失了消息。
(1)消息接收确认模式类型
AcknowledgeMode.NONE:自动确认。
- 默认自动ack,消息被消费者收到(注意:只是收到),就会从broker的queue中移除
- 但存在丢失消息的可能,如果消费端消费逻辑抛出异常,假如你用回滚了也只是保证了数据的一致性,但是消息还是丢了
AcknowledgeMode.AUTO:根据情况确认。
AcknowledgeMode.MANUAL:手动确认。
确认过程:就算消费者已经拿到了消息,但是没有确认,队列中的消息仍然不能移除,只不过状态由ready变为unacked,消息处理分为以下三种情况:
- 消息处理成功,ack(),接受下一个消息,此消息broker就会移除
- 消息处理失败,nack()/reject(),重新发送给其他人进行处理,或者容错处理后ack
- 消息一直没有调用ack/nack方法,broker认为此消息正在被处理,不会投递给别人,此时客户端断开,消息不会被broker移除,会投递给别人
(2)手动确认回复方法
- 消费者获取到消息,成功处理,可以回复Ack给Broker
- basic.ack:用于肯定确认;broker将移除此消息
- basic.nack:用于否定确认;可以指定broker是否丢弃此消息,可以批量
- basic.reject:用于否定确认当前消息;同上,但不能批量
(3)basicAck方法
basicAck 方法用于确认当前消息,Channel 类中的 basicAck 方法定义如下:
void basicAck(long deliveryTag, boolean multiple) throws IOException;
参数说明:
- long deliveryTag:唯一标识 ID,当一个消费者向 RabbitMQ 注册后,会建立起一个 Channel ,RabbitMQ 会用 basic.deliver 方法向消费者推送消息,这个方法携带了一个 delivery tag, 它代表了 RabbitMQ 向该 Channel 投递的这条消息的唯一标识 ID,是一个单调递增的正整数,delivery tag 的范围仅限于 Channel
- **boolean multiple:**是否批处理,当该参数为 true 时,则可以一次性确认 delivery_tag 小于等于传入值的所有消息。
(4)basicNack方法
- basicNack 方法用于否定当前消息。
- basicReject 方法一次只能拒绝一条消息
- 如果想批量拒绝消息,则可以使用 basicNack 方法。消费者客户端可以使用 channel.basicNack 方法来实现,方法定义如下:
void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
参数说明:
- **long deliveryTag:**唯一标识 ID。
- **boolean multiple:**上面已经解释。
- **boolean requeue:**如果 requeue 参数设置为 true,则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者; 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,而不会把它发送给新的消费者。
(5)basicReject方法
basicReject 方法用于明确拒绝当前的消息而不是确认。
Channel 类中的basicReject 方法定义如下:
void basicReject(long deliveryTag, boolean requeue) throws IOException;
参数说明:
- **long deliveryTag:**唯一标识 ID。
- **boolean requeue:**上面已经解释。
测试场景:
- 发送五个消息测试,
- 此时关闭服务服务,消息的状态由unacked变为ready,下次客户端服务启动又会接收到消息ready变为unacked
- 除非手动确认
(6)开启手动ack机制
spring:
listener:
simple:
acknowledge-mode: manual
(7)消费者消费消息并手动确认
package com.pjb.receiver;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.stereotype.Component;
/**
* 接收者
* @author pan_junbiao
**/
@Component
public class Receiver implements ChannelAwareMessageListener
{
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
if ("queue_name".equals(message.getMessageProperties().getConsumerQueue()))
{
System.out.println("消费的消息来自的队列名为:"+message.getMessageProperties().getConsumerQueue());
System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
System.out.println("执行queue_name中的消息的业务处理流程......");
}
if ("fanout.A".equals(message.getMessageProperties().getConsumerQueue()))
{
System.out.println("消费的消息来自的队列名为:" + message.getMessageProperties().getConsumerQueue());
System.out.println("接收消息: " + new String(message.getBody(), "UTF-8"));
System.out.println("执行fanout.A中的消息的业务处理流程......");
}
/**
* 确认消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean multiple:是否批处理,当该参数为 true 时,
* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
*/
channel.basicAck(deliveryTag, true);
/**
* 否定消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean multiple:是否批处理,当该参数为 true 时,
* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
//channel.basicNack(deliveryTag, true, false);
}
catch (Exception e)
{
e.printStackTrace();
/**
* 拒绝消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
channel.basicReject(deliveryTag, true);
}
}
}
5.3 总结
RabbitMQ系列第五篇介绍了实现消息的可靠抵达的两大模式:发送者确认、消费者确认;其中发送确认又可以分为消息生产者到交换机的确认(confirmcallback接口:消息到达交换机回调)、交换机到队列的确认(returncallback接口:消息到达不了队列回调);而消费者回调ACK机制可分为自动确认、手动确认、根据情况确认三种类型;自动确认可能会出现消息丢失问题(消息到达消费者后,队列立刻删除该消息,但是此时消费者次此时出现异常或者宕机),手动确认的三个方法(basicAck、basicNack、basicReject)
6.消息重复消费问题
6.1 问题介绍
什么是消息重复消费?首先我们来看一下消息的传输流程。消息生产者-->MQ-->消息消费者;消息生产者发送消息到MQ服务器,MQ服务器存储消息,消息消费者监听MQ的消息,发现有消息就消费消息。
所以消息重复也就出现在**两个阶段**
1**:生产者多发送了消息给MQ;**
2**:MQ的一条消息被消费者消费了多次**。
具体场景如下:
- 生产者发送消息给MQ,在MQ确认的时候出现了网络波动,生产者没有收到确认,这时候生产者就会重新发送这条消息,导致MQ会接收到重复消息。
- 消费者消费成功后,给MQ确认的时候出现了网络波动,MQ没有接收到确认,为了保证消息不丢失,MQ就会继续给消费者投递之前的消息。这时候消费者就接收到了两条一样的消息。由于重复消息是由于网络原因造成的,无法避免。
6.2 解决思路
- 发送消息时让每个消息携带一个全局的唯一ID
- 在消费消息时先判断消息是否已经被消费过,保证消息消费逻辑的幂等性。具体消费过程为:
- 消费者获取到消息后先根据id去查询redis/db是否存在该消息
- 如果不存在,则正常消费,消费完毕后写入redis/db
- 如果存在,则证明消息被消费过,直接丢弃
6.3 将该消息存储到Redis
6.3.1 将id存入string(单消费者场景)
(1)实现思路
- 将id号存入value中,并且value类型为string
- 即以队列名称为key,以消息id为值
- 每次消息过来都覆盖之前的消息
@RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage1(Message message) throws UnsupportedEncodingException {
//获取唯一id
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");
//获取redis中该队列名称对应的value值
String messageRedisValue = redisUtil.get("queueName4","");
//检验唯一id是否存在
if (messageRedisValue.equals(messageId)) {
//存在
return;
}
System.out.println("消息:"+msg+", id:"+messageId);
//以队列为key,id为value
redisUtil.set("queueName4",messageId);
}
(2)问题
- 并发冲突:如果多个消费者同时操作 Redis 中的已消费消息列表,由于 Redis 是单线程处理命令,可能会出现并发冲突导致数据不一致或丢失问题。特别是在高并发情况下,使用字符串类型的 ID 可能会增加并发冲突的风险
- 内存占用:字符串类型的 ID 在内存中占用空间相对较大,尤其是对于大量消息的情况下,会增加 Redis 的内存占用。
- 比较效率:字符串类型的 ID 比较起来相对复杂,需要进行字符串比较操作。
6.3.2 将id存入list中(多消费场景)
(1)实现思路
- 以该队列名称为key,id为value
- 适合多消费场景的原因:
- 顺序性:List 是一个有序集合,可以按照消息的顺序存储消息 ID。在多消费者场景下,保持消息的顺序通常是很重要的,以确保消息按照正确的顺序被消费。
- 原子性操作:Redis 的 List 提供了多个原子性操作,比如从列表两端推入/弹出元素,这些操作可以确保多个消费者同时访问列表时不会出现数据竞争和并发问题。
- 支持阻塞操作:List 提供了阻塞式的弹出操作(如 BLPOP、BRPOP),可以在没有消息时阻塞等待新消息的到来,这对于实现消费者轮询机制非常有用。
@RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");
//获取
List<String> messageRedisValue = redisUtil.lrange("queueName4");
if (messageRedisValue.contains(messageId)) {
return;
}
System.out.println("消息:"+msg+", id:"+messageId);
redisUtil.lpush("queueName4",messageId);//存入list
}
6.3.3 将id以key增量存入string中并设置过期时间
(1)实现思路
以消息id为key,消息内容为value存入string中,==设置过期时间(==可承受的redis服务器异常时间,比如设置过期时间为10分钟,如果redis服务器断了20分钟,那么未消费的数据都会丢了)
@RabbitListener(queues = "queueName4")//发送的队列名称 @RabbitListener注解到类和方法都可以
@RabbitHandler
public void receiveMessage2(Message message) throws UnsupportedEncodingException {
String messageId = message.getMessageProperties().getMessageId();
String msg = new String(message.getBody(),"utf-8");
String messageRedisValue = redisUtil.get(messageId,"");
if (msg.equals(messageRedisValue)) {
return;
}
System.out.println("消息:"+msg+", id:"+messageId);
//以id为key,消息内容为value,过期时间10分钟
redisUtil.set(messageId,msg,10L);
}
6.4总结
该篇文章介绍了消息重复消费问题及解决方案,问题可能产生的两个阶段(生产消息多发、消费者重复消息);解决方案:将消息发送时携带一个唯一id,消费方拿到消息时先去reids/db中有没有该数据,若没有则可以消费,否则不可以消费;并介绍了基于Redsi解决消息重复消费问题,①以队列名称为key,消息id为value,且value为string类型(适合只有一个消费方)②以队列名称为key,消息id为value,且value为list类型(适合有多个消费方场景)③以消息id为key,内容为value,并设置过期时间
7.RabbitMQ实现JSON、Map格式数据的发送与接收
在实现的项目开发中,经常使用Json、Map格式数据。下面将介绍RabbitMQ实现Json、Map格式数据的发送与接收。
7.1 消息发送端
在消息发送端服务引入依赖、yml配置、RabbitMQ配置类、消息发送类
7.1.1 引入依赖
<!-- AMQP客户端 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
7.1.2 yml配置
spring:
application:
name: rabbitmq-provider
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
publisher-confirm-type: correlated
publisher-returns: true
7.1.3 RabbitMQConfig配置类——(非常重要)
在项目中,创建配置类,配置==消息确认,Json转换器,队列名称==等,并将队列交由 IoC 管理。代码如下:
package com.pjb.config;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
/**
* RabbitMQ配置类
**/
@Configuration
public class RabbitMqConfig
{
public static final String DIRECT_QUEUE = "direct_queue"; //Direct队列名称
public static final String DIRECT_EXCHANGE = "direct_exchange"; //交换器名称
public static final String DIRECT_ROUTING_KEY = "direct_routing_key"; //路由键
public static final String DELAY_QUEUE = "delay_queue"; //延时队列名称
public static final String DELAY_EXCHANGE = "delay_exchange"; //交换器名称
public static final String DELAY_ROUTING_KEY = "delay_routing_key"; //路由键
@Autowired
private CachingConnectionFactory connectionFactory;
@Bean
public RabbitTemplate createRabbitTemplate(ConnectionFactory connectionFactory)
{
RabbitTemplate rabbitTemplate = new RabbitTemplate();
rabbitTemplate.setConnectionFactory(connectionFactory);
//设置Json转换器
rabbitTemplate.setMessageConverter(jsonMessageConverter());
//设置开启Mandatory,才能触发回调函数,无论消息推送结果怎么样都强制调用回调函数
rabbitTemplate.setMandatory(true);
//确认消息送到交换机(Exchange)回调
rabbitTemplate.setConfirmCallback(new RabbitTemplate.ConfirmCallback()
{
@Override
public void confirm(CorrelationData correlationData, boolean ack, String cause)
{
System.out.println("\n确认消息送到交换机(Exchange)结果:");
System.out.println("相关数据:" + correlationData);
System.out.println("是否成功:" + ack);
System.out.println("错误原因:" + cause);
}
});
//确认消息送到队列(Queue)回调
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback()
{
@Override
public void returnedMessage(ReturnedMessage returnedMessage)
{
System.out.println("\n确认消息送到队列(Queue)结果:");
System.out.println("发生消息:" + returnedMessage.getMessage());
System.out.println("回应码:" + returnedMessage.getReplyCode());
System.out.println("回应信息:" + returnedMessage.getReplyText());
System.out.println("交换机:" + returnedMessage.getExchange());
System.out.println("路由键:" + returnedMessage.getRoutingKey());
}
});
return rabbitTemplate;
}
/**
* Json转换器
*/
@Bean
public Jackson2JsonMessageConverter jsonMessageConverter()
{
return new Jackson2JsonMessageConverter();
}
/**
* Direct交换器
*/
@Bean
public DirectExchange directExchange()
{
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
/**
* 队列
*/
@Bean
public Queue directQueue()
{
return new Queue(DIRECT_QUEUE, true, false, false, null);
}
/**
* 绑定
*/
@Bean
Binding directBinding(DirectExchange directExchange, Queue directQueue)
{
//将队列和交换机绑定, 并设置用于匹配键:routingKey路由键
return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
}
/******************************延时队列******************************/
@Bean
public CustomExchange delayExchange()
{
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue delayQueue()
{
Queue queue = new Queue(DELAY_QUEUE, true);
return queue;
}
@Bean
public Binding delaybinding(Queue delayQueue, CustomExchange delayExchange)
{
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
}
}
下面将讲述创建交换器、创建队列、将交换器和队列进行绑定各种方法
(1)创建交换器方法
/**
* Direct交换器
*/
@Bean
public DirectExchange directExchange()
{
/**
* 创建交换器,参数说明:
* String name:交换器名称
* boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
* 持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
* boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
*/
return new DirectExchange(DIRECT_EXCHANGE, true, false);
}
(2)创建队列方法
/**
* 队列
*/
@Bean
public Queue directQueue()
{
/**
* 创建队列,参数说明:
* String name:队列名称。
* boolean durable:设置是否持久化,默认是 false。durable 设置为 true 表示持久化,反之是非持久化。
* 持久化的队列会存盘,在服务器重启的时候不会丢失相关信息。
* boolean exclusive:设置是否排他,默认也是 false。为 true 则设置队列为排他。
* boolean autoDelete:设置是否自动删除,为 true 则设置队列为自动删除,
* 当没有生产者或者消费者使用此队列,该队列会自动删除。
* Map<String, Object> arguments:设置队列的其他一些参数。
*/
return new Queue(DIRECT_QUEUE, true, false, false, null);
}
(3)绑定方法
/**
* 绑定
*/
@Bean
Binding directBinding(DirectExchange directExchange, Queue directQueue)
{
//将队列和交换机绑定, 并设置用于匹配键:routingKey路由键
return BindingBuilder.bind(directQueue).to(directExchange).with(DIRECT_ROUTING_KEY);
}
(4)创建死信交换器、死信队列、绑定关系
@Bean
public CustomExchange delayExchange()
{
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message", true, false, args);
}
@Bean
public Queue delayQueue()
{
Queue queue = new Queue(DELAY_QUEUE, true);
return queue;
}
@Bean
public Binding delaybinding(Queue delayQueue, CustomExchange delayExchange)
{
return BindingBuilder.bind(delayQueue).to(delayExchange).with(DELAY_ROUTING_KEY).noargs();
}
(5)发送消息方(立即发送JSON格式数据/延迟发送Map格式数据)
①实体类
package com.pjb.entity;
/**
* 用户信息实体类
**/
public class UserInfo
{
private int userId; //用户编号
private String userName; //用户姓名
private String blogUrl; //博客地址
private String blogRemark; //博客信息
//省略getter与setter方法...
}
②service层
package com.pjb.sender;
import com.pjb.entity.UserInfo;
import java.util.Map;
/**
* 用户消息发送服务接口
**/
public interface UserSender
{
/**
* 发送用户信息Json格式数据
* @param userInfo 用户信息实体类
*/
public void sendUserJson(UserInfo userInfo);
/**
* 延时发送用户信息Map格式数据
* @param userMap 用户信息Map
*/
public void sendDelayUserMap(Map userMap);
}
③serviceImpl类
package com.pjb.sender.impl;
import com.pjb.config.RabbitMqConfig;
import com.pjb.entity.UserInfo;
import com.pjb.sender.UserSender;
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Map;
/**
* 用户消息发送服务类
**/
@Service
public class UserSenderImpl implements UserSender
{
@Autowired
RabbitTemplate rabbitTemplate;
//时间格式
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
/**
* 发送用户信息Json格式数据
* @param userInfo 用户信息实体类
*/
@Override
public void sendUserJson(UserInfo userInfo)
{
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, RabbitMqConfig.DIRECT_ROUTING_KEY, userInfo);
System.out.println("Json格式数据消息发送成功,发送时间:" + dateFormat.format(new Date()));
}
/**
* 延时发送用户信息Map格式数据
* @param userMap 用户信息Map
*/
@Override
public void sendDelayUserMap(Map userMap)
{
rabbitTemplate.convertAndSend(RabbitMqConfig.DELAY_EXCHANGE, RabbitMqConfig.DELAY_ROUTING_KEY, userMap, new MessagePostProcessor()
{
@Override
public Message postProcessMessage(Message message) throws AmqpException
{
//消息延迟5秒
message.getMessageProperties().setHeader("x-delay", 5000);
return message;
}
});
System.out.println("Map格式数据消息发送成功,发送时间:" + dateFormat.format(new Date()));
}
}
- 发送消息核心方法:
/**
* 发送消息,参数说明:
* String exchange:交换器名称。
* String routingKey:路由键。
* Object object:发送内容。
*/
rabbitTemplate.convertAndSend(RabbitMqConfig.DIRECT_EXCHANGE, RabbitMqConfig.DIRECT_ROUTING_KEY, userInfo);
7.2 消息接收端
7.2.1 引入依赖
<!-- AMQP客户端 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
7.2.2 yml配置
spring:
# 项目名称
application:
name: rabbitmq-consumer
# RabbitMQ服务配置
rabbitmq:
host: 127.0.0.1
port: 5672
username: guest
password: guest
listener:
simple:
# 重试机制
retry:
enabled: true #是否开启消费者重试
max-attempts: 5 #最大重试次数
initial-interval: 5000ms #重试间隔时间(单位毫秒)
max-interval: 1200000ms #重试最大时间间隔(单位毫秒)
multiplier: 2 #间隔时间乘子,间隔时间*乘子=下一次的间隔时间,最大不能超过设置的最大间隔时间
7.2.3 RabbitMQConfig配置类
package com.pjb.config;
import com.pjb.receiver.impl.AckReceiver;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* RabbitMQ配置类
**/
@Configuration
public class RabbitMqConfig
{
public static final String DIRECT_QUEUE = "direct_queue"; //Direct队列名称
public static final String DELAY_QUEUE = "delay_queue"; //延时队列名称
/**
* 消息接收确认处理类
*/
@Autowired
private AckReceiver ackReceiver;
@Autowired
private CachingConnectionFactory connectionFactory;
/**
* 客户端配置
* 配置手动确认消息、消息接收确认
*/
@Bean
public SimpleMessageListenerContainer simpleMessageListenerContainer()
{
//消费者数量,默认10
int DEFAULT_CONCURRENT = 10;
//每个消费者获取最大投递数量 默认50
int DEFAULT_PREFETCH_COUNT = 50;
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setConcurrentConsumers(DEFAULT_CONCURRENT);
container.setMaxConcurrentConsumers(DEFAULT_PREFETCH_COUNT);
// RabbitMQ默认是自动确认,这里改为手动确认消息
container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
//添加队列,可添加多个队列
container.addQueues(new Queue(DIRECT_QUEUE,true));
container.addQueues(new Queue(DELAY_QUEUE,true));
//设置消息处理类
container.setMessageListener(ackReceiver);
return container;
}
}
7.2.4 统一消息处理类——AckReceiver
package com.pjb.receiver.impl;
import com.pjb.config.RabbitMqConfig;
import com.pjb.receiver.UserReceiver;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.listener.api.ChannelAwareMessageListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 消息接收确认处理类
* 所有的消息,都由该类接收
* @author pan_junbiao
**/
@Service
public class AckReceiver implements ChannelAwareMessageListener
{
/**
* 用户消息接收类
*/
@Autowired
private UserReceiver userReceiver;
@Override
public void onMessage(Message message, Channel channel) throws Exception
{
//时间格式
SimpleDateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
System.out.println("消息接收成功,接收时间:" + dateFormat.format(new Date()) + "\n");
//获取队列名称
String queueName = message.getMessageProperties().getConsumerQueue();
//接收用户信息Json格式数据
if (queueName.equals(RabbitMqConfig.DIRECT_QUEUE))
{
userReceiver.receiverUserJson(message, channel);
}
//延时接收用户信息Map格式数据
if (queueName.equals(RabbitMqConfig.DELAY_QUEUE))
{
userReceiver.receiverDelayUserMap(message, channel);
}
//多个队列的处理,则如上述代码,继续添加方法....
}
}
7.2.5 接收消息
package com.pjb.receiver;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
/**
* 用户消息接收接口
**/
public interface UserReceiver
{
/**
* 接收用户信息Json格式数据
*/
public void receiverUserJson(Message message, Channel channel) throws Exception;
/**
* 延时接收用户信息Map格式数据
*/
public void receiverDelayUserMap(Message message, Channel channel) throws Exception;
}
(1)接收JSON数据
/**
* 接收用户信息Json格式数据
*/
@Override
public void receiverUserJson(Message message, Channel channel) throws Exception
{
//获取消息唯一id
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
//将JSON格式数据转换为实体对象——非常重要、重要、重要
ObjectMapper mapper = new ObjectMapper();
UserInfo userInfo = mapper.readValue(message.getBody(), UserInfo.class);
System.out.println("接收者收到JSON格式消息:");
System.out.println("用户编号:" + userInfo.getUserId());
System.out.println("用户名称:" + userInfo.getUserName());
System.out.println("博客地址:" + userInfo.getBlogUrl());
System.out.println("博客信息:" + userInfo.getBlogRemark());
//确认消息
channel.basicAck(deliveryTag, true);
/**
* 否定消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean multiple:是否批处理,当该参数为 true 时,
* 则可以一次性确认 deliveryTag 小于等于传入值的所有消息。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
//channel.basicNack(deliveryTag, true, false);
}
catch (Exception e)
{
/**
* 拒绝消息,参数说明:
* long deliveryTag:唯一标识 ID。
* boolean requeue:如果 requeue 参数设置为 true,
* 则 RabbitMQ 会重新将这条消息存入队列,以便发送给下一个订阅的消费者;
* 如果 requeue 参数设置为 false,则 RabbitMQ 立即会还把消息从队列中移除,
* 而不会把它发送给新的消费者。
*/
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
(2)接收map数据
/**
* 延时接收用户信息Map格式数据
*/
@Override
public void receiverDelayUserMap(Message message, Channel channel) throws Exception
{
long deliveryTag = message.getMessageProperties().getDeliveryTag();
try
{
//将JSON格式数据转换为Map对象
ObjectMapper mapper = new ObjectMapper();
JavaType javaType = mapper.getTypeFactory().constructMapType(Map.class, String.class, Object.class);
Map<String, Object> resultMap = mapper.readValue(message.getBody(),javaType);
System.out.println("接收者收到Map格式消息:");
System.out.println("用户编号:" + resultMap.get("userId"));
System.out.println("用户名称:" + resultMap.get("userName"));
System.out.println("博客地址:" + resultMap.get("blogUrl"));
System.out.println("博客信息:" + resultMap.get("userRemark"));
//确认消息
channel.basicAck(deliveryTag, true);
//否定消息
//channel.basicNack(deliveryTag, true, false);
}
catch (Exception e)
{
//拒绝消息
channel.basicReject(deliveryTag, false);
e.printStackTrace();
}
}
7.3 总结
- RabbitMQ系列第七篇文章主要介绍了如何实现JSON、Map格式数据的发送与接收;
- 在发送服务端中,核心为RabbitMQ的配置文件(配置json转换器、设置交换机、队列、绑定交换机与队列)发送消息的核心方法为RabbitTemplate.convertAndSend方法,传入指定交换机、路由键、以及要发送的消息内容;
- 在接收服务端中,核心依然为配置文件中的客户端配置:①手动确认信息;②添加监听队列;③设置统一消息处理类,接着在消息处理类中获取消息的队列名称,根据不同的队列名称调用不同的消息处理类;
- 在接收json格式的处理类中,通过ObjectMapper对象的readValue方法,将message中的body内容,转成想要的对象;
- 在接收map格式的处理中,先通过ObjectMapper对象的getTypeFactroy()的constructMapType(),构造出JavaType对象,接着使用ObjectMapper对象的readValue方法和JavaType对象,将message中的body内容转成map格式