网站建设维护管理办法,个人怎么注册一个品牌,seo综合查询是什么意思,台州市建站公司RocketMQ 延迟消息
RocketMQ 消费者启动流程
什么是延迟消息
RocketMQ 延迟消息是指#xff0c;生产者发送消息给消费者消息#xff0c;消费者需要等待一段时间后才能消费到。
使用场景
用户下单之后#xff0c;15分钟未支付#xff0c;对支付账单进行提醒或者关单处理…RocketMQ 延迟消息
RocketMQ 消费者启动流程
什么是延迟消息
RocketMQ 延迟消息是指生产者发送消息给消费者消息消费者需要等待一段时间后才能消费到。
使用场景
用户下单之后15分钟未支付对支付账单进行提醒或者关单处理。
RocketMQ 开源版本的消息不支持任意时间精度只支持5s 10s 1m等等。
Broker 如何处理延迟消息 消息投递如下
生产者发送一个延迟消息到一个 topic 中Broker 判断是个延迟消息后将消息暂存Broker 通过延迟服务 先检查消息是否过期如果到期将消息投递到目标 topic消费者消费topic中的投递延迟消息。
开源RocketMQ 的消息不支持任意精度默认支持 18个 level
messageDelayLevel1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2hBroker 在启动的时候会创建一个内部 topic:“SCHEDULE_TOPIC_XXXX” 根据延迟 level 数量创建对应数量的 队列。 也就是说 18 level 对应了18 个队列。
具体可以在 代码TopicConfigManager.java 中 看到
private static final int SCHEDULE_TOPIC_QUEUE_NUM 18;要注意的是Broker 一般是集群模式 部署也就是说每个Broker 都会有18个队列。
TopicConfigManager#TopicConfigManager(BrokerController brokerController)
生产者消息延迟发送
代码示例如下
Message msgnew Message();
msg.setTopic(TopicA);
msg.setTags(Tag);
msg.setBody(this is a delay message.getBytes());
//设置延迟level为5对应延迟1分钟
msg.setDelayTimeLevel(5);
producer.send(msg);Broker 存储延迟消息
上一篇文章已经谈到Broker 收到消费者消息后会进行消息存储然后再转发到消费队列ConsumerQueue)然后再推给消费者。
其实一旦消息转发到
存储延迟消息的流程也类似
确定延迟消息投递到topic 哪个队列。存储生产者写入的消息时将消息转发到 ConsumeQueue 中消费者就能消费到。 延迟消息不能立即消息到于是将 topic 名称修改为 SCHEDULE_TOPIC_XXX并根据延迟消息级别确定投递到哪个队列上。同时还会将原来消息要发送到的目标 topic 和队列记录投递到哪个队列。
代码在CommitLog#asyncPutMessage 中
设置延迟消息的投递队列信息代码如下 // Delay Deliveryif (msg.getDelayTimeLevel() 0) {// 如果设置的级别超过了最大级别重置延迟级别if (msg.getDelayTimeLevel() this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) {msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());}
// 计算延迟消息应该投递到 SCHEDULE_TOPIC_XXXX 到哪个队列。topic TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;int queueId ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());// Backup real topic, queueId// 记录原始 topic ,queueid,方便后期投递到目标 topicMessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
// 更新消息投递目标为 SCHEDULE_TOPIC_XXX,queueIdmsg.setTopic(topic);msg.setQueueId(queueId);}消息转发
消息转发过程其实中会对延迟消息做一些特殊处理
CommitLog中的消息转发到CosumeQueue中是异步进行的。在转发过程中会对延迟消息进行特殊处理主要是计算这条延迟消息需要在什么时候进行投递。