12. 如何保证消费者的消费消息的幂等性?

?分析原因*

「消息队列有几种消费语义?」 中,我们已经看了三种消费语义。如果要达到消费者的消费消息的幂等性,就需要消息仅被消费一次,且每条消息从 Producer 保证被送达,并且被 Consumer 仅消费一次

那么,我们就基于这个场景,来思考下,为什么会出现消息重复的问题?

  • 对于 Producer 来说

    • 可能因为网络问题,Producer 重试多次发送消息,实际第一次就发送成功,那么就会产生多条相同的消息。
  • 对于 Consumer 来说

    • 可能因为 Broker 的消息进度丢失,导致消息重复投递给 Consumer 。

    • Consumer 消费成功,但是因为 JVM 异常崩溃,导致消息的消费进度未及时同步给 Consumer 。

      对于大多数消息队列,考虑到性能,消费进度是异步定时同步给 Broker 。

? 如何解决

所以,上述的种种情况,都可能导致消费者会获取到重复的消息,那么我们的思考就无法是解决不发送、投递重复的消息,而是消费者在消费时,如何保证幂等性。

消费者实现幂等性,有两种方式:

  1. 框架层统一封装。
  2. 业务层自己实现。

框架层统一封装

首先,需要有一个消息排重的唯一标识,该编号只能由 Producer 生成,例如说使用 uuid、或者其它唯一编号的算法 。

然后,就需要有一个排重的存储器,例如说:

  • 使用关系数据库,增加一个排重表,使用消息编号作为唯一主键。
  • 使用 KV 数据库,KEY 存储消息编号,VALUE 任一。此处,暂时不考虑 KV 数据库持久化的问题

那么,我们要什么时候插入这条排重记录呢?

  • 在消息消费执行业务逻辑之前,插入这条排重记录。但是,此时会有可能 JVM 异常崩溃。那么 JVM 重启后,这条消息就无法被消费了。因为,已经存在这条排重记录。

  • 在消息消费执行业务逻辑

    之后

    ,插入这条排重记录。

    • 如果业务逻辑执行失败,显然,我们不能插入这条排重记录,因为我们后续要消费重试。
    • 如果业务逻辑执行成功,此时,我们可以插入这条排重记录。但是,万一插入这条排重记录失败呢?那么,需要让插入记录和业务逻辑在同一个事务当中,此时,我们只能使用数据库

? 感觉好复杂,嘿嘿。

业务层自己实现

方式很多,这个和 HTTP 请求实现幂等是一样的逻辑:

  • 先查询数据库,判断数据是否已经被更新过。如果是,则直接返回消费完成,否则执行消费。
  • 更新数据库时,带上数据的状态。如果更新失败,则直接返回消费完成,否则执行消费。

如果胖友的系统的并发量非常大,可以使用 Zookeeper 或者 Redis 实现分布式锁,避免并发带来的问题。当然,引入一个组件,也会带来另外的复杂性:

  1. 系统的并发能力下降。
  2. Zookeeper 和 Redis 在获取分布式锁时,发现它们已经挂掉,此时到底要不要继续执行下去呢?嘿嘿。

选择

正常情况下,出现重复消息的概率其实很小,如果由框架层统一封装来实现的话,肯定会对消息系统的吞吐量和高可用有影响,所以最好还是由业务层自己实现处理消息重复的问题。

当然,这两种方式不是冲突的。可以提供不同类型的消息,根据配置,使用哪种方式。例如说:

  • 默认情况下,开启【框架层统一封装】的功能。
  • 可以通过配置,关闭【框架层统一封装】的功能。

当然,如果可能的话,尽可能业务层自己实现。/(ㄒoㄒ)/~~但是,实际上,很多时候,开发者不太会注意,哈哈哈哈。