12. 如何保证消费者的消费消息的幂等性?
?分析原因*
在 「消息队列有几种消费语义?」 中,我们已经看了三种消费语义。如果要达到消费者的消费消息的幂等性,就需要消息仅被消费一次,且每条消息从 Producer 保证被送达,并且被 Consumer 仅消费一次。
那么,我们就基于这个场景,来思考下,为什么会出现消息重复的问题?
对于 Producer 来说
- 可能因为网络问题,Producer 重试多次发送消息,实际第一次就发送成功,那么就会产生多条相同的消息。
对于 Consumer 来说
可能因为 Broker 的消息进度丢失,导致消息重复投递给 Consumer 。
Consumer 消费成功,但是因为 JVM 异常崩溃,导致消息的消费进度未及时同步给 Consumer 。
对于大多数消息队列,考虑到性能,消费进度是异步定时同步给 Broker 。
? 如何解决
所以,上述的种种情况,都可能导致消费者会获取到重复的消息,那么我们的思考就无法是解决不发送、投递重复的消息,而是消费者在消费时,如何保证幂等性。
消费者实现幂等性,有两种方式:
- 框架层统一封装。
- 业务层自己实现。
① 框架层统一封装
首先,需要有一个消息排重的唯一标识,该编号只能由 Producer 生成,例如说使用 uuid、或者其它唯一编号的算法 。
然后,就需要有一个排重的存储器,例如说:
- 使用关系数据库,增加一个排重表,使用消息编号作为唯一主键。
- 使用 KV 数据库,KEY 存储消息编号,VALUE 任一。此处,暂时不考虑 KV 数据库持久化的问题
那么,我们要什么时候插入这条排重记录呢?
在消息消费执行业务逻辑之前,插入这条排重记录。但是,此时会有可能 JVM 异常崩溃。那么 JVM 重启后,这条消息就无法被消费了。因为,已经存在这条排重记录。
在消息消费执行业务逻辑
之后
,插入这条排重记录。
- 如果业务逻辑执行失败,显然,我们不能插入这条排重记录,因为我们后续要消费重试。
- 如果业务逻辑执行成功,此时,我们可以插入这条排重记录。但是,万一插入这条排重记录失败呢?那么,需要让插入记录和业务逻辑在同一个事务当中,此时,我们只能使用数据库。
? 感觉好复杂,嘿嘿。
② 业务层自己实现
方式很多,这个和 HTTP 请求实现幂等是一样的逻辑:
- 先查询数据库,判断数据是否已经被更新过。如果是,则直接返回消费完成,否则执行消费。
- 更新数据库时,带上数据的状态。如果更新失败,则直接返回消费完成,否则执行消费。
- …
如果胖友的系统的并发量非常大,可以使用 Zookeeper 或者 Redis 实现分布式锁,避免并发带来的问题。当然,引入一个组件,也会带来另外的复杂性:
- 系统的并发能力下降。
- Zookeeper 和 Redis 在获取分布式锁时,发现它们已经挂掉,此时到底要不要继续执行下去呢?嘿嘿。
选择
正常情况下,出现重复消息的概率其实很小,如果由框架层统一封装来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务层自己实现处理消息重复的问题。
当然,这两种方式不是冲突的。可以提供不同类型的消息,根据配置,使用哪种方式。例如说:
- 默认情况下,开启【框架层统一封装】的功能。
- 可以通过配置,关闭【框架层统一封装】的功能。
当然,如果可能的话,尽可能业务层自己实现。/(ㄒoㄒ)/~~但是,实际上,很多时候,开发者不太会注意,哈哈哈哈。
