如何做自己的业务网站,互联网销售公司起名,小型企业网站设计教程,东莞网站推广优化建设之前文章#xff1a;
兔老大的系统设计#xff08;一#xff09;健康度系统
一、背景
延迟队列的应用场景非常广泛#xff0c;如客户主动操作#xff1a;
股票定投顾客预约场景会员定时续费/缴费CSDN定时发布或系统内部操作#xff1a;
订单成功后#xff0c;在30分…之前文章
兔老大的系统设计一健康度系统
一、背景
延迟队列的应用场景非常广泛如客户主动操作
股票定投顾客预约场景会员定时续费/缴费CSDN定时发布或系统内部操作
订单成功后在30分钟内没有支付自动取消订单外卖平台发送订餐通知下单成功后60s给用户推送短信。如果订单一直处于某一个未完结状态时及时处理关单并退还库存淘宝新建商户一个月内还没上传商品信息将冻结商铺等
二、需求分析
场景多种多样我们尽量做出一个通用的功能完备的能满足大部分场景的系统。
可以以顾客预约场景为例进行设计假设会量大、量不稳定、存储时间长比如几个月后执行这样设计出来的系统就普遍适用。
三、目标明确
3.1功能
延时队列相比于普通队列最大的区别就体现在其延迟的属性上.
普通队列的元素是先进先出按入队顺序进行处理而延时队列中的元素在入队时会指定一个延迟时间表示其希望能够在经过该指定时间后处理。
从某种意义上来讲延迟队列的结构并不像一个队列而更像是一种以时间为权重的有序堆结构。
要实现的功能如下
1可以设置定时任务
2可以修改未到执行时间的任务属性包括执行时间
3可以查询任务情况人工干预
3.2设计重点
大部分系统的重点一般在性能、可用性、安全三方面。
3.2.1性能
及时时间到了立刻执行不能延迟太久。如crontab分钟粒度就太粗了
3.2.2可用
可靠保证任务不重不漏的执行不能丢任务、不能重复执行
高可用可扩展服务尽量不挂、可抗住突发的大量请求
可恢复系统挂了或者任务失败/丢失等等可以恢复 3.2.3其它
可撤回/修改如果定时任务还没到执行时间可以修改执行时间和其他内容也可取消。
存时间长有些场景甚至要保存一年以上比如用户办理年卡后要有一些策略诱导消费。 四、一些探索
本章不局限于实现所有的目标提出一些业内常见的实现方案供大家增长知识面和最终方案可以有个对比。
请注意看每种方案下的分析
4.0 数据库
在小型项目中通过一个线程定时扫数据库通过执行时间字段来判断是否到时然后进行操作
优点:简单支持集群操作
缺点: (1)对服务器内存消耗大
(2)存在延迟执行时间粒度和mysql本身的速度都会影响
(3)假设你的订单有几千万条每隔几分钟这样扫描一次数据库损耗极大
4.1 DelayQueue 延时队列
4.1.1 介绍
JDK 中提供了一组实现延迟队列的 API位于Java.util.concurrent包下的 DelayQueue。
DelayQueue 是一个 BlockingQueue本质就是封装了一个 PriorityQueue(优先队列)内部用堆来实现队列元素排序向 DelayQueue 队列中添加元素时会给元素一个 Delay(延迟时间)作为排序条件队列中最小的元素会优先放在队首。队列中的元素只有到了 Delay 时间才允许从队列中取出。
4.1.2简单实现
1实现 Delayed 接口接口里只有一个 getDelay 方法用于设置延期时间。
2Order 类中compareTo()负责对队列中的元素进行排序。
public class Order implements Delayed {//延迟时间JsonFormat(locale zh, timezone GMT8, pattern yyyy-MM-dd HH:mm:ss)private long time;String name;public Order(String name, long time, TimeUnit unit) {this.name name;this.time System.currentTimeMillis() (time 0 ? unit.toMillis(time) : 0);}Overridepublic long getDelay(TimeUnit unit) {return time - System.currentTimeMillis();}Overridepublic int compareTo(Delayed o) {Order Order (Order) o;long diff this.time - Order.time;if (diff 0) {return -1;} else {return 1;}}
} DelayQueue 的 put 方法是线程安全的内部使用了ReentrantLock进行线程同步。
上边只是简单的实现实际开发中会有专门的线程负责消息的入队与消费。
4.1.3 分析
分析事实上如无必要我们应该尽可能使用语言自带的库而非过度设计。从这方面考虑DelayQueue无疑是一个简单优秀的实现但是在大型项目中本地存储的方案确实不太适用。
无论如何我们仍然可以对它大根堆和线程控制的方法进行学习和借鉴。 4.2 RabbitMQ
RabbitMQ 本身并不直接提供对延迟队列的支持我们依靠 RabbitMQ 的TTL以及死信队列功能来实现延迟队列的效果。那就让我们首先来了解一下RabbitMQ 的死信队列以及 TTL 功能。
4.2.1死信队列
死信队列实际上是一种 RabbitMQ 的消息处理机制当 RabbmitMQ 在生产和消费消息的时候消息遇到如下的情况就会变成“死信”
消息被拒绝并且不再重新投递消息超时未消费也就是 TTL 过期了消息队列到达最大长度
消息一旦变成一条死信便会被重新投递到死信交换机Dead-Letter-Exchange然后死信交换机根据绑定规则转发到对应的死信队列上监听该队列就可以让消息被重新消费。
4.2.2消息生存时间 TTL
TTLTime-To-Live表示了一条消息的最大生存时间单位为毫秒。如果一条消息在 TTL 设置的时间内没有被消费那么它就会变成一条死信进入我们上面所说的死信队列。
如何设置消息的 TTL 属性
一种方式是直接在创建队列的时候设置整个队列的 TTL 过期时间所有进入队列的消息都被设置成了统一的过期时间一旦消息过期马上就会被丢弃进入死信队列在延迟队列的延迟时间为固定值的时候比较适合使用这种方式
MapString, Object args new HashMapString, Object();
args.put(x-message-ttl, 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args); 另一种方式是针对单条消息设置参考代码如下该消息被设置了 6 秒的过期时间
AMQP.BasicProperties.Builder builder new AMQP.BasicProperties.Builder();
builder.expiration(6000);
AMQP.BasicProperties properties builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, msg content.getBytes());
4.2.3实现延迟队列
我们利用死信队列的这个属性把需要延迟的消息将 TTL 设置为其延迟时间投递到 RabbitMQ 的普通队列中一直不去消费它那么经过 TTL 的时间后消息就会自动被投递到死信队列这时候我们使用消费者进程实时地去消费死信队列中的消息就实现了延迟队列的效果。
从下图可以直观的看出使用 RabbitMQ 实现延迟队列的整体流程 使用 RabbitMQ 来实现延迟队列具有很明显的一些优势比如消息可靠发送、消息可靠投递、死信队列来保障消息至少被消费一次以及未被正确处理的消息不会被丢弃。另外通过 RabbitMQ 集群的特性可以很好的解决单点故障问题不会因为单个节点挂掉导致延迟队列不可用或者消息丢失。
4.2.4存在的bug
RabbitMQ 在检查消息是否过期时只会检查第一个消息是否过期并不会校验后面消息过期的情况比如第一个消息设置了 20s 的 TTL第二个消息设置了 10s 的 TTL那么 RabbitMQ 会等到第一个消息过期之后才会让第二个消息过期。
解决这个问题的方法也很简单只需要安装 RabbitMQ 的一个插件即可
Community Plugins — RabbitMQ
安装好这个插件后所有的消息就都能按照被设置的 TTL 过期了。 4.2.5插件实现原理
上面使用 DLX TTL 的模式消息首先会路由到一个正常的队列根据设置的 TTL 进入死信队列与之不同的是通过 x-delayed-message 声明的交换机它的消息在发布之后不会立即进入队列先将消息保存至 Mnesia。
这个插件将会尝试确认消息是否过期首先要确保消息的延迟范围是 Delay 0, Delay ERL_MAX_T在 Erlang 中可以被设置的范围为 (2^32)-1 毫秒如果消息过期通过 x-delayed-type 类型标记的交换机投递至目标队列整个消息的投递过程也就完成了。
4.2.6 分析
作为网上流传非常广的一种方案它似乎真的是一种不错的实现。
尤其是由于 RabbitMQ 本身的消息可靠发送、消息可靠投递、死信队列等特性可以保障消息至少被消费一次以及未被正确处理的消息不会被丢弃让消息的可靠性有了保障。
但是这方案有如下缺点
1、为了解决一个问题又引入了队列交换机mq私信队列交换机私信队列插件我们并不希望引入如此复杂不可控的架构。
2、配置麻烦额外增加死信交换机和死信队列等配置不好维护
3、不可靠实际测试环境延时插件有时收不到消息不是很稳定。配置错误、生产者消费者连接的队列错误和其他未知因素都有可能造成延迟失效。
4、真实消费原因不唯一消息被拒绝、消息过期、消息超长等等原因都会进入死信队列这种不唯一也是我们无法忍受的。我们无法知道死信队列中是否都是过期消息。 4.3 kafka-TimeWheel
TimeWheel 时间轮算法是一种实现延迟队列的巧妙且高效的算法被应用在 NettyZookeeperKafka 等各种框架中。 4.3.1时间轮 如上图所示时间轮是一个存储延迟消息的环形队列其底层采用数组实现可以高效循环遍历。这个环形队列中的每个元素对应一个延迟任务列表这个列表是一个双向环形链表链表中每一项都代表一个需要执行的延迟任务。时间轮会有表盘指针表示时间轮当前所指时间随着时间推移该指针会不断前进并处理对应位置上的延迟任务列表。
4.3.2添加延迟任务
由于时间轮的大小固定并且时间轮中每个元素都是一个双向环形链表我们可以在O(1) 的时间复杂度下向时间轮中添加延迟任务。
如下图例如我们有一个这样的时间轮在表盘指针指向当前时间为 2 时我们需要新添加一个延迟 3 秒的任务我们可以快速计算出延迟任务在时间轮中所对应的位置为 5并添加到位置 5 上任务列表尾部。 4.3.3多层时间轮
上面的时间轮的大小是固定的只有 12 秒。如果此时我们有一个需要延迟 200 秒的任务我们应该怎么处理呢直接扩充整个时间轮的大小吗这显然不可取因为这样做的话我们就需要维护一个非常非常大的时间轮内存是不可接受的而且底层数组大了之后寻址效率也会降低影响性能。
为此Kafka 引入了多层时间轮的概念。其实多层时间轮的概念和我们的机械表上时针、分针、秒针的概念非常类似当仅使用秒针无法表示当前时间时就使用分针结合秒针一起表示。同样的当任务的到期时间超过了当前时间轮所表示的时间范围时就会尝试添加到上层时间轮中如下图所示 第一层时间轮整个时间轮所表示时间范围是 0-12 秒第二层时间轮每格能表示的时间范围是整个第一层时间轮所表示的范围也就是 12 秒所以整个第二层时间轮能表示的时间范围即 12*12144 秒依次类推第三层时间轮能表示的范围是 1728 秒第四层为 20736 秒等等。
比如现在我们需要添加一个延时为 200 秒的延迟消息我们发现其已经超过了第一层时间轮能表示的时间范围我们就需要继续往上层时间轮看将其添加在第二层时间轮 200/12 17 的位置然后我们发现 17 也超过了第二次时间轮的表示范围那么我们就需要继续往上层看将其添加在第三层时间轮的 17/12 2 的位置。
4.3.4具体实现
Kafka 中时间轮算法添加延迟任务以及推动时间轮滚动的核心流程如下其中 Bucket 即时间轮中的延迟任务队列并且 Kafka 引入的 DelayQueue 解决了多数 Bucket 为空导致的时间轮滚动效率低下的问题 使用时间轮实现的延迟队列能够支持大量任务的高效触发。并且在 Kafka 的时间轮算法的实现方案中还引入了 DelayQueue使用 DelayQueue 来推送时间轮滚动而延迟任务的添加与删除操作都放在时间轮中这样的设计大幅提升了整个延迟队列的执行效率。
4.3.5 分析
从复杂度上说相比 DelayQueue 时间轮在复杂度上有优势。DelayQueue 由于涉及到调整数据的位置插入和移除复杂度是 O(lgn)而时间轮在插入和移除的复杂度都是 O(1)。
从实际上说相比其它MQkafka在我的认知里是最优秀的事实上在我的十万级压测中它是唯一性能达标的MQ有些MQ已经接近挂了它还很健康。同时kafka也有一定的持久化方案。
但是这种方案依旧有一些问题
1、正如我开头提到的需求很可能是保存一个月甚至更长时间超过了默认的log.retention.hours168的大小。
2、我们希望执行时间视可修改的但是kafka的消息一旦由生产者发送则不可变。关于这方面讨论我贴了一个链接感兴趣的可以看看。stackoverflow问题https://stackoverflow.com/questions/60046428/what-is-kafka-message-tweaking 其实探索到想用众多MQ来实现延迟队列时我越来越清晰的有一种感觉非逼着众多MQ比如kafka做不擅长的事情本身就有问题人家的定位就是消息队列而不是替你保存动辄一个月才执行的消息然后精准执行。 4.4 一些其他方案
这里我不准备继续分析所有方案的优缺点了因为这是很无聊的而且影响接下来方案叙述的节奏如Quartz、ActiveMQ、RocketMQ、nsq、pulsar等等原因无非是性能不达标、时间粒度不够、存储时间不够等等在这里放一张MQ的对比图如果满足你的要求当然也可以用 4.5 Redis ZSet 基于4.3结尾的考虑首先要有地方做持久化redis作为nosql的老大呼之欲出。 4.5.1过期回调
只是提一嘴这种歪门邪道的实现就不要想了事实上容易出大问题有兴趣可以了解。
4.5.2 正解介绍
Redis有一个有序集合的数据结构ZSetZSet中每个元素都有一个对应ScoreZSet 中所有元素是按照其 Score 进行排序的。我们利用它Score有序的属性可以对入队的成员按过期时间从小到大排列 那么我们可以通过以下这几个操作使用 Redis 的 ZSet 来实现一个延迟队列
入队操作我们将需要处理的任务按其需要延迟处理时间作为 Score 加入到 ZSet 中。Redis 的 ZAdd 的时间复杂度是O(logN)N是ZSet 中元素个数因此我们能相对比较高效的进行入队操作。起一个进程定时比如每隔一秒查询 ZSet 中 Score 最小的元素查询结果有两种情况
查询出的分数小于等于当前时间戳说明到这个任务需要执行的时间了则去异步处理该任务查询出的分数大于当前时间戳说明 ZSet 中所有的任务都还没有到需要执行的时间则休眠一秒后继续查询
4.5.3 分析
从上述的讨论中我们可以看到通过 Redis Zset 实现延迟队列是一种理解起来较为直观可以快速落地的方案。并且我们可以依赖 Redis 自身的持久化来实现持久化使用 Redis 集群来支持高并发和高可用是一种不错的延迟队列的实现方案。
但是redis同样有缺点但是被解决了
1、定位问题和上文提到的众多mq一样redis的定位并不是延迟队列。
经验告诉我们如果硬要用与需求定位不符的东西就是容易出问题 但是由于这种方式实在过于简单好用在业界确实非常太有市场我记得redis作者本人都曾经谈过这个问题告诉大家最好不要把redis用作消息队列之类的只不过貌似没人听。 2、持久化不是百分百可靠redis持久化两种方式我就不讲了最高级的持久化配置就是每次操作都记录但是由于性能问题基本不可能这样配事实上大公司有明确规定不能这样配。
3、 真实案例如果qps过高虽然redis扛得住和kafka一样真男人啊但是我们的服务扛不住。 五、方案
5.1 思考
读者紧接着4.5.3的想法不要断我们想用redis但是有三个问题如何解决呢
问题2没的说上mysql万事大吉。
问题3redis接MQ完美解决问题这不就是mq天天吹的其中一大作用吗。
问题1我们只是用来排个序消息队列和持久化都不是redis做了符合定位。
5.2 整体架构
基于这些考虑最终我们的架构是这样的 其他服务想使用定时功能调用写接口核心线路有两条
第一条
1将延迟的消息任务通过 hash 算法路由至不同的 Redis Key 上
2每个 Redis Key 都对应建立一个处理进程称为 Event 轮询查询是否有待处理的延迟消息
3 Event 进程不负责业务基本只负责分发消息具体的业务逻辑通过kafka解耦消费者实现。
4我们规定消费者一定要上报执行结果以便我们决定是否重复请求
第二条
1将消息写入DB或更改
2event进程扫过期一段时间的任务可配置
3主动请求消费者执行
5.3 细节补充
1mysql
表结构
2redis
如觉得写mysql这条链路也太麻烦并且没有存储很久的需求可以用redis自身的持久化功能同时开启RDB和AOFAOF设置everysec即每秒异步刷盘一次。极端情况下可能会丢失一秒的数据。
高可用使用的是redis的主从复制模式。服务高可用方面在实现过程中考虑了服务节点的横向扩展Timer、Cleaner等对同一个redis队列的操作都加了分布式锁。每个服务节点都是无状态的不需要进行元数据同步等操作少数服务节点宕机不影响整个服务的可用性。
3监控
对于消息堆积以及消息超过重试次数被丢弃等场景说明消费端服务异常没有正常消费及ack需要及时上报并通知给业务方及服务提供方方便快速发现并排查问题。
4mq
mq的选择上文有对比图详细分析以后补充吧。
六、总结和QA
分析架构
可能很多人会有些疑惑你抛出来看起来这么复杂的图实现起来是不是很麻烦它真的很好用吗
下面首先回忆第三章看是否实现目标。
从性能出发整条链路看
1redis key可以增加不用担心量大影响性能
2event定时任务每秒轮询基本没延迟
3event没业务逻辑校验转发个消息很快
3消息队列选用的性能最强的kafka
实际测试也符合要求
从可用出发整条链路看
1将延迟的消息任务通过 hash 算法路由至不同的 Redis Key 上这样做有两大好处
避免了当一个 KEY 在存储了较多的延时消息后入队操作以及查询操作速度变慢的问题两个操作的时间复杂度均为O(logN)。系统具有了更好的横向可扩展性当数据量激增时我们可以通过增加Redis Key 的数量来快速的扩展整个系统。但是会存在一个问题因为增加key的数量必然涉及到hash算法范围的调整那么原先集合中的元素就不能通过新的hash算法路由到所以需要采用一致性hash算法。 2所有的 Event 进程只负责分发消息具体的业务逻辑通过MQ解耦由消费者异步处理这么做的好处也是显而易见的
Event 进程只负责分发消息那么其处理消息的速度就会非常快就不太会出现因为业务逻辑复杂而导致消息堆积的情况。采用一个额外的消息队列后消息处理的可扩展性也会更好我们可以通过增加消费者进程数量来扩展整个系统的消息处理能力。Event 是多机多进程模型保证整个系统的高可用性。采用 Zookeeper 选主的方式保证同一时间只会有一个进程去处理消息一旦 Zookeeper 的 leader 主机宕机会自动选择新的 leader 来处理。
从其他功能看
要求可恢复、可撤回、可修改、保存时间超长。
我们用mysql解决了大部分问题修改时记得把redis也改了这也是比用kafka好的点可修改
到底如何设计
这是本人一点粗浅的理解
回看我们的方案第一无论是基于死信队列还是数据先存储mysql/redis后投递kafka亦或是redis超时时间本质上都是将延迟待发送的消息数据与正常订阅的队列分开存储这么做一方面降低耦合度另一方面也是为了降低数据不可控的时间。 这也是我想说的经验告诉我们大量数据存在不可操作、不可见的地方是一件很糟糕的事本文提的很多方案有这个毛病服务不是写完就完了还要维护所以我们尽量不要这么做。 第二既然选择了数据分离整条链路的存储组件和队列组件的选择按需选择十分重要。
本方案就是mysql/rediskafka
第三、无论是检查队头消息TTL还是调度存储的数据本质上都是通过定时任务来完成的定时任务的触发策略也是决定你方案优劣的决定性因素你是crontab配置还是主备选举策略、还是大家一起抢分布式锁也值得根据具体情况具体分析 还是觉得太复杂能否简化一点
可以我的建议是如果qps不高的话去掉kafka会是一个简单方案。 多线程如何处理
如果你指的是高并发场景下存在同一条消息被多次消费的情况你可以使用分布式锁如zookpeer、redis的红锁、自己做一个等等。
本方案目前不存在这类问题