网站怎么做登录界面,友汇网站建设,网站建设上海,炫酷html5网站模板1 背景
A服务作为生产者#xff0c;每天发送上千万的mq消息#xff0c;每一个消息包含500个用户ids数据。B服务作为消费者#xff0c;接受MQ消息并通过http调用第三方请求进行业务处理#xff0c;消费组启用了rabbitmq的多线程消费组#xff0c;一个实例并发40个mq消费者…1 背景
A服务作为生产者每天发送上千万的mq消息每一个消息包含500个用户ids数据。B服务作为消费者接受MQ消息并通过http调用第三方请求进行业务处理消费组启用了rabbitmq的多线程消费组一个实例并发40个mq消费者线程每个线程一次获取10个消息进行消费。
Mq消费者配置如下
# mq配置rabbitmq:connection-timeout: 15000cache:channel:size: 200# 消息发送到rabbitmq broker cluster需要回调publisher-confirms: true# 交换机将消息投递至队列失败时需要回调publisher-returns: truelistener:# 手动确认消息已被消费simple:acknowledge-mode: manual# consumer的并发数concurrency: 40max-concurrency: 50# 每个消息者每次取10条prefetch: 10
Mq挤压消息如下 2 排查
2.1 复制rabbitmq挤压消息数据进行模拟复现
找出rabbitmq挤压的消息在本地模拟消费找出没有进行消息确认的原因通过rabbitmq控制台的Get messages功能 复制payload的消息进行base64转码转出来的消息是乱码不完整的怀疑 是rabbitmq还结合了其他加密处理放弃这种排查思路
2.2 检查报错日志
rabbitmq的unack消息挤压那就是消费者没有进行ack确认怀疑消费者代码有异常导致没能执行到ack的代码。 查询服务器日志没发现有报错的日志梳理业务代码消费者使用了spring aop around机制进行消息确认所以不管代码有没有报错按理说都会手动进行mq消息ack确认
2.3 检查服务是否宕机
消费组实例数量符合服务器大小配置因此服务器应用没有宕机 2.4 检查java线程
使用IBM的TMDA工具进行分析线程堆栈工具下载地址TMDA工具下载地址
TMDA工具简介
TMDA分析线程堆栈结果如下 通过分析图看到大量park线程确实是符合现状应用的线程挂起了
3 分析和解决
通过stack深度高到底排序业务代码存在线程等待情况具体代码CountDownLatch.await 3.1 结合业务代码分析
通过上图stack提示找到关联的业务代码
伪代码如下
// new一个CompletableFuture
public CompletableFutureInteger httpCall(String tokenData){CompletableFutureInteger completableFuture CompletableFuture.supplyAsync(() - {long time 3000L;try {Thread.sleep(time);} catch (InterruptedException e) {e.printStackTrace();}return Integer.parseInt(tokenData);});return completableFuture;}httpCall(tokenData).whenCompleteAsync((returnValue, ex)-{// do business// ex.getMessage()// 其中ex对象为空使用ex.getMessage()报了空指针导致没能执行如下的countDowncountDownLatch.countDown();})
消费者服务通过http调用第三方服务为了提高并发使用了多线程每一组(数十个为一组)http请求批量调用完成后再把请求响应结果异步存入数据库, 主线程使用了countDownLatch.await进行等待, 其中whenCompleteAsync方法存在空指针问题导致没能执行如下的countDown方法。
这里有人会问, 上面错误日志检查步骤不是说日志没有空指针异常吗 对子线程报了空指针因为CompletableFuture执行每次都是new 一个新的CompletableFuture对象并把结果作为下一个CompletableFuture执行的入参 通过伪代码可以发现执行whenCompleteAsync后没有新的CompletableFuture方法执行所以异常没有抛出来使得排查变得困难
3.2 解决
因为存在whenCompleteAsync报错的情况添加多一个新的异常捕获处理方法捕获异常也进行countDown的操作。
代码如下 httpCall(tokenData).whenCompleteAsync((returnValue, ex)-{// do business// ex.getMessage()// 其中ex对象为空使用ex.getMessage()报了空指针导致没能执行如下的countDowncountDownLatch.countDown();}).exceptionally(e -{log.info(exceptionally捕获到异常tokenData{}, e{}, tokenData, e.getMessage());countDownLatch.countDown();return null;});
4 结论 熟练CompletableFuture的使用要看源码的实现实现原理cas 多个future采用入stack每次把前一个future的结果作为参数传入下一个future去执行 使用多线程需要考虑异常、超时等情况 熟练使用jvm stack分析工具
5 文章参考
CompletableFuture流程图 CompletableFuture参考文章如下
CompletableFuture 原理浅析