消息积压了怎么办?

2021/09/15 1280点热度 0人点赞 0条评论

引入消息中间件以后,系统交互的复杂性提升,一旦出了问题以后,要想各种各样的解决方案。

如果用消息中间件是为了削峰填谷,那在高峰期积压了慢慢处理即可。

如果用消息中间件只是为了解耦、异步处理,业务对时效性有一定的要求,那么就要求必须尽快处理。

消息积压,一般分为两大类:

  • 生产者生产消息过快

  • 消费者消费过慢 这两个问题,又可能引出别的问题,如中间件的磁盘有限,无法导致

这两大类又可以细分为三种:

生产者消费过快

  • 运营活动(导致单位时间内流量激增)

  • 微信推文

  • push营销

  • 短信触达

  • 跑批处理,一下子导入过多的数据到消费者

  • 异常数据处理,一下子把所有的异常数据推入到了队列,导致正常业务延迟严重;

  • 数据跑批

  • 新需求上线未评估业务量

  • 消费端能满足正常需求的2倍处理,但是生产端上线了个需求,导致生产速度翻了n倍,导致队列快速膨胀

消费者消费不过来

  • 一种是系统处理确实慢,下游系统有瓶颈;

  • 一种是消费端处理异常或超时,导致无法提交offset,

  • 一种是失败又重新放入队列进行了重试;

消息队列混合使用

  • 有时效性的和无时效性的业务处理都放入到了一个队列里(比如风控授信决策、准入决策、交易决策)

解决方案又分为几种

  1. 消费者瓶颈,但是能通过扩容快速解决的

  2. 消费者有瓶颈,但是能满足正常的业务需求的

  3. 消费者异常导致的无法提交

  4. 消费者异常导致的重试机制

  5. 多种类型的消息在队列中堵塞

消费者瓶颈,但是能通过扩容快速解决的

  • 比如我们默认建的kafka,partition 默认是2个,能满足大部分业务

  • 分析情况是消费者处理不过来?还是消费者能处理,就是queue的数量太少?

  • 消费者处理不过来

  • 消费者是否多线程消费(有的项目只有一个线程处理,处理速度优先,评估能否改成多线程消费)

  • 评估增加消费者的可能性(别增加了消费者,数据库等又承受不了,导致崩溃)

  • 消费者能处理,就是queue的数量太少(扩容queue的数量)

  • 消费者不是纯粹的消费者,可能还承担了别的业务,消费者节点多,但是queue只有两个,只有两个消费者处理,通过增加queue让别的消费者也能处理;

  • kafka可以通过kafka管理后台,调整partition的数量(只能增加,不能减少),需评估是临时性的,还是以后会经常性这样

  • 其他的消息中间件不知道能不能动态调整queue的数量,如果不能动态调整,只能新建topic,然后将消息都导过去,但是消费者得改代码

消费者有瓶颈(消费者不能扩容)但是能满足正常的业务需求的

  • 这种情况就比较麻烦,如果是削峰填谷,那就慢慢等着处理即可;

  • 如果是异步、解耦又有时效性(这时候需要将积压的消息先转移,优先恢复正常业务)

  • 临时申请一个新的topic

  • 先暂停下所有的消费者(也可以不暂停)

  • 通过工具,将原队列中积压的消息,抽取到新的队列里(将积压的消息转发出去,可以根据消息的发送时间判定);

  • 启动消费者消费新的消息,确保正在操作的用户的流程

  • 转移出的消息如何处理?得具体业务场景具体分析

消费者异常

  • 代码bug,导致抛异常,未提交成功(重复消费),一直是那几条消息在处理,后面的消息都无法处理

  • 处理过程:改代码上线,将异常数据写入日志文件,分析如何修复数据

  • 执行过程过慢(这就有点危险了,如果开启了自动提交,呵呵,就看你对一致性的要求)

  • 消费过程中间有慢sql(优化sql),自动提交后,如果执行异常,则会丢失

  • 消费过程中调用外部接口,外部接口响应过慢(读取超时,抛异常,和代码bug处理一致,响应时间过长,自动提交,可能导致数据不一致)

  • 手动提交,或者控制消费者整体执行时间,不超过自动提交时间

  • 通过定时任务补偿异常数据

  • 代码bug,导致的数据质量不符合要求,无法处理

  • 如:我们的延迟消息,有些系统会失败后继续延迟,导致循环执行

  • 异常后,又将消息推入到了队列

多种类型的消息在队列中堵塞

  • 例如:跑批和正常业务搅合到了一起,跑批可以慢慢处理,毕竟用户没在app那操作,正常业务的用户可等不得;

  • 临时额度大批量到期,到期时间没有随机,一下子几百万数据丢入了队列里,堵塞了正常业务

  • 授信(最长10分钟)、准入(准实时)、交易审核(异步慢慢跑),混合到一个队列,交易可能压单后批量处理

  • 解决方案:

  • 正常业务和跑批任务的队列尽可能隔离开;

  • 对实时性有要求的和对实时性无要求的也分隔开;

  • 跑批任务也尽可能的散开,比如临时额度的到期时间内存int自增,在到期时间上加上对应的int的毫秒数,100万条最多+100万毫秒,也就1000秒,也就16分钟,如果还是密集,可以设置步长(最终根据系统的吞吐量来决定),比如我们的额度系统单机qps能达到5000~8000,1秒钟1000条基本上不会影响业务

如果觉得对你有帮助,请关注公众号:5ycode,后续会不断更新哦

公众号图片

yxkong

这个人很懒,什么都没留下