金华网站制作企业,福建闽东建设网站,wordpress数据库安装教程,海口免费做网站目录
1.出现重复消费的原因
2.解决
2.1 数据库插入法
2.2 使用布隆过滤器
2.2.1 添加hutool的依赖
2.2.2 测试生产者
2.2.2 测试消费者 1.出现重复消费的原因
BROADCASTING(广播) 模式下#xff0c;所有注册的消费者都会消费#xff0c;而这些消费者通常是集群部署的…目录
1.出现重复消费的原因
2.解决
2.1 数据库插入法
2.2 使用布隆过滤器
2.2.1 添加hutool的依赖
2.2.2 测试生产者
2.2.2 测试消费者 1.出现重复消费的原因
BROADCASTING(广播) 模式下所有注册的消费者都会消费而这些消费者通常是集群部署的一个个微服务这样就会多台机器重复消费当然这个是根据需要来选择。CLUSTERING负载均衡模式下如果一个 topic 被多个 consumerGroup 消费也会重复消费。即使是在 CLUSTERING 模式下同一个 consumerGroup 下一个队列只会分配给一个消费者看起来好像是不会重复消费。但是有个特殊情况一个消费者新上线后同组的所有消费者要重新负载均衡重平衡 reBalance反之一个消费者掉线后也一样。一个队列所对应的新的消费者要获取之前消费的 offset偏移量也就是消息消费的点位此时之前的消费者可能已经消费了一条消息但是并没有把 offset 提交给 broker那么新的消费者可能会重新消费一次。虽然 orderly 模式是前一个消费者先解锁后一个消费者加锁再消费的模式比起 concurrently 要严格了但是加锁的线程和提交offset 的线程不是同一个所以还是会出现极端情况下的重复消费。还有在发送批量消息的时候会被当做一条消息进行处理那么如果批量消息中有一条业务处理成功其他失败了还是会被重新消费一次。 简单的说 Consumer 消费完消息并不是实时同步到 Broker 的而是将 offset 先保存在本地map中通过定时任务持久化上去。这就导致消息被消费了但是此时消费者宕机了导致 offset 没提交下次没提交 offset 的这部分消息会被再次消费即使 offset 被提交到了 Broker在还没来得及持久化的时候 Broker 宕机了当重启的时候 Broker 会读取consumerOffset.json 中保存的 offset 信息这就会导致没持久化 offset 的这部分消息会被再次消费 那么如果在CLUSTERING负载均衡模式下并且在同一个消费者组中不希望一条消息被重复消费改怎么办呢我们可以想到去重操作找到消息唯一的标识可以是 msgId 也可以是你自定义的唯一的 key这样就可以去重了
2.解决
我们需要给我们的消费者实现 幂等 也就是对同一个消息的处理结果执行多少次都不变。 幂等性多次操作产生的影响均和第一次操作产生的影响相同 例如判断 crud 的幂等性 a. 新增普通的新增是非幂等设置了唯一索引的新增是幂等操作 b. 修改update goods set stock 10 where id 1 幂等 update goods set stock stock - 1 where id 1 非幂等 c. 查询幂等 d. 删除幂等 那么如何给业务实现幂等呢这个还是需要结合具体的业务的。你可以使用写入 Redis 来保证因为Redis 的 key 和 value 就是天然支持幂等的。当然还有使用 数据库插入法 基于数据库的唯一键来保证重复数据不会被插入多条。
2.1 数据库插入法
发送方需要给消息带一个唯一标记根据业务标识
模拟业务 数据库的订单操作日志表结构去重表 给订单号添加唯一索引订单号存的是 key 模拟业务生产者发送了重复的消息
Test
public void repeatTest() throws Exception {
String key UUID.randomUUID().toString();
MessageString msg MessageBuilder.withPayload(扣减库存 -1).setHeader(RocketMQHeaders.KEYS, key).build();
rocketMQTemplate.syncSend(repeatTopic, msg);
rocketMQTemplate.syncSend(repeatTopic, msg);
} 消费者
Component
RocketMQMessageListener(topic repeatTopic,consumerGroup repeat-consumer-group)
public class RepeatListener implements RocketMQListenerMessageExt {Autowiredprivate LogMapper logMapper;Overridepublic void onMessage(MessageExt messageExt) {// 先拿keyString keys messageExt.getKeys();// 插入数据库 因为key做了唯一索引OrderOperLog orderOperLog new OrderOperLog();orderOperLog.setType(1l);orderOperLog.setOrderSn(keys);orderOperLog.setUserId(1003);int insert logMapper.insert(orderOperLog);System.out.println(keys);System.out.println(new String(messageExt.getBody()));}
}
在消费第二条的时候抛出唯一索引重复 SQLIntegrityConstraintViolationException 数据库只插入一条这样的记录 优化捕获到异常是 SQLIntegrityConstraintViolationException 时直接将消息签收了不再进行业务处理因为之前已经消费了一条同样的消息这样便可以解决重复消费问题 2.2 使用布隆过滤器
使用去重方案解决例如将消息的唯一标识存起来然后每次消费之前先判断是否存在这个唯一标识如果存在则不消费如果不存在则消费并且消费以后将这个标记保存。想法很好但是消息的体量是非常大的可能在生产环境中会到达上千万甚至上亿条那么我们该如何选择一个容器来保存所有消息的标识并且又可以快速的判断是否存在呢
我们可以选择布隆过滤器(BloomFilter) 介绍 布隆过滤器英语Bloom Filter是1970年由布隆提出的。它实际上是一个很长的二进制向量和一系列随机映射函数。布隆过滤器可以用于检索一个元素是否在一个集合中。它的优点是空间效率和查询时间都远远超过一般的算法缺点是有一定的误识别率和删除困难。 布隆过滤器的原理是当一个元素被加入集合时通过K个散列函数将这个元素映射成一个位数组中的K个点把它们置为1。检索时我们只要看看这些点是不是都是1就大约知道集合中有没有它了如果这些点有任何一个0则被检元素一定不在如果都是1则被检元素很可能在。这就是布隆过滤器的基本思想。 2.2.1 添加hutool的依赖
dependencygroupIdcn.hutool/groupIdartifactIdhutool-all/artifactIdversion5.7.11/version
/dependency
2.2.2 测试生产者
public void testRepeatProducer() throws Exception {
// 创建默认的生产者
DefaultMQProducer producer new DefaultMQProducer(test-group);
// 设置nameServer地址
producer.setNamesrvAddr(localhost:9876);
// 启动实例
producer.start();
// 我们可以使用自定义key当做唯一标识
String keyId UUID.randomUUID().toString();
System.out.println(keyId);
Message msg new Message(TopicTest, tagA, keyId, 我是一个测试消息.getBytes());
SendResult send producer.send(msg);
System.out.println(send);
// 关闭实例
producer.shutdown();
}
发送了两条相同的消息
55d397c9-814f-4931-b0fd-7e142c04759b
SendResult [sendStatusSEND_OK, msgId7F00000121C418B4AAC204A76B050000, offsetMsgIdC0A8588200002A9F000000000002C359, messageQueueMessageQueue [topicrepeatTestTopic, brokerNamebroker-a, queueId1], queueOffset0]
SendResult [sendStatusSEND_OK, msgId7F00000121C418B4AAC204A76B050000, offsetMsgIdC0A8588200002A9F000000000002C43F, messageQueueMessageQueue [topicrepeatTestTopic, brokerNamebroker-a, queueId2], queueOffset0]
2.2.2 测试消费者
/*** 在boot项目中可以使用Bean在整个容器中放置一个单利对象*/
public static BitMapBloomFilter bloomFilter new BitMapBloomFilter(100); // m数组长度Test
public void testRepeatConsumer() throws Exception {DefaultMQPushConsumer consumer new DefaultMQPushConsumer(repeat-consumer-group);
consumer.setMessageModel(MessageModel.BROADCASTING);
consumer.setNamesrvAddr(MyConstant.NAME_SRV_ADDR);
consumer.subscribe(repeatTestTopic, *);
// 注册一个消费监听 MessageListenerConcurrently是并发消费
consumer.registerMessageListener(new MessageListenerConcurrently() {Overridepublic ConsumeConcurrentlyStatus consumeMessage(ListMessageExt msgs,ConsumeConcurrentlyContext context) {// 拿到消息的keyMessageExt messageExt msgs.get(0);String keys messageExt.getKeys();// 判断是否存在布隆过滤器中if (bloomFilter.contains(keys)) {// 直接返回了 不往下处理业务return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}// 这个处理业务然后放入过滤器中// do sth...bloomFilter.add(keys);System.out.println(keys: keys);System.out.println(new String(messageExt.getBody()));return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;}
});
consumer.start();
System.in.read();
}
业务只处理了一条
keys:55d397c9-814f-4931-b0fd-7e142c04759b
库存-1
延迟过了后 重复消息被签收 解决重复消费问题