网页制作与网站建设答案,数字营销网,移动端原型,股权融资介绍具有队列的特性#xff0c;再给它附加一个延迟消费队列消息的功能#xff0c;也就是说可以指定队列中的消息在哪个时间点被消费。使用场景延迟队列在项目中的应用还是比较多的#xff0c;尤其像电商类平台#xff1a;订单成功后#xff0c;在30分钟内没有支付#xf… 介绍具有队列的特性再给它附加一个延迟消费队列消息的功能也就是说可以指定队列中的消息在哪个时间点被消费。使用场景延迟队列在项目中的应用还是比较多的尤其像电商类平台订单成功后在30分钟内没有支付自动取消订单外卖平台发送订餐通知下单成功后60s给用户推送短信。如果订单一直处于某一个未完结状态时及时处理关单并退还库存淘宝新建商户一个月内还没上传商品信息将冻结商铺等该介绍来自其他文章方案下面的例子没有进行封装所以代码仅供参考Redis过期事件注意不保证在设定的过期时间立即删除并发送通知数据量大的时候会延迟比较大不保证一定送达发送即忘策略不包含持久化但是比如有些场景对这个时间不是那么看重并且有其他措施双层保障该实现方案是比较简单。redis自2.8.0之后版本提供Keyspace Notifications功能允许客户订阅Pub / Sub频道以便以某种方式接收影响Redis数据集事件。配置需要修改配置启用过期事件比如在windows客户端中需要修改redis.windows.conf文件,在linux中需要修改redis.conf修改内容是img-- 取消注释
notify-keyspace-events Ex-- 注释
#notify-keyspace-events 然后重新启动服务器比如windows.\redis-server.exe .\redis.windows.conf或者linux中使用docker-compose重新部署redisredis:container_name: redisimage: redishostname: redisrestart: alwaysports: - 6379:6379volumes: - $PWD/redis/redis.conf:/etc/redis.conf- /root/common-docker-compose/redis/data:/datacommand: /bin/bash -c redis-server /etc/redis.conf #启动执行指定的redis.conf文件然后使用客户端订阅事件-- windows
.\redis-cli-- linux
docker exec -it 容器标识 redis-clipsubscribe __keyevent0__:expired控制台订阅使用StackExchange.Redis组件订阅过期事件var connectionMultiplexer ConnectionMultiplexer.Connect(_redisConnection);
var db connectionMultiplexer.GetDatabase(0);db.StringSet(orderno:123456, 订单创建, TimeSpan.FromSeconds(10));
Console.WriteLine(开始订阅);var subscriber connectionMultiplexer.GetSubscriber();//订阅库0的过期通知事件
subscriber.Subscribe(__keyevent0__:expired, (channel, key)
{Console.WriteLine($key过期 channel:{channel} key:{key});
});Console.ReadLine();输出结果key过期 channel:keyevent0:expired key:orderno:123456如果启动多个客户端监听那么多个客户端都可以收到过期事件。WebApi中订阅创建RedisListenService继承自BackgroundServicepublic class RedisListenService : BackgroundService
{private readonly ISubscriber _subscriber;public RedisListenService(IServiceScopeFactory serviceScopeFactory){using var scope serviceScopeFactory.CreateScope();var configuration scope.ServiceProvider.GetRequiredServiceIConfiguration();var connectionMultiplexer ConnectionMultiplexer.Connect(configuration[redis]);var db connectionMultiplexer.GetDatabase(0);_subscriber connectionMultiplexer.GetSubscriber();}protected override Task ExecuteAsync(CancellationToken stoppingToken){//订阅库0的过期通知事件_subscriber.Subscribe(__keyevent0__:expired, (channel, key) {Console.WriteLine($key过期 channel:{channel} key:{key});});return Task.CompletedTask;}
}注册该后台服务services.AddHostedServiceRedisListenService();启用项目给redis指定库设置值等过期后会接收到过期通知事件。RabbitMq延迟队列版本信息 Rabbitmq版本3.10.5 Erlang版本24.3.4.2要使用rabbitmq做延迟是需要安装插件(rabbitmq_delayed_message_exchange)的插件介绍https://blog.rabbitmq.com/posts/2015/04/scheduling-messages-with-rabbitmq下载地址https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases将下载好的插件(d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez)映射到容器的plugins目录下docker run -d --name myrabbit -p 9005:15672 -p 5672:5672 -e RABBITMQ_DEFAULT_VHOSTcustomer -e RABBITMQ_DEFAULT_USERadmin -e RABBITMQ_DEFAULT_PASS123456 -v d:/Download/rabbitmq_delayed_message_exchange-3.10.2.ez:/plugins/rabbitmq_delayed_message_exchange-3.10.2.ez rabbitmq:3-management-alpine进入容器docker exec -it 容器名称/标识 bash启用插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange查看是否启用rabbitmq-plugins list[E*]和[e*]表示启用然后重启服务rabbitmq-server restart然后在管理界面添加交换机都可以看到img生产消息发送的消息类型是x-delayed-message[HttpGet(send/delay)]
public string SendDelayedMessage()
{var factory new ConnectionFactory(){HostName localhost,//IP地址Port 5672,//端口号UserName admin,//用户账号Password 123456,//用户密码VirtualHost customer};using var connection factory.CreateConnection();using var channel connection.CreateModel();var exchangeName delay-exchange;var routingkey delay.delay;var queueName delay_queueName;//设置Exchange队列类型var argMaps new Dictionarystring, object(){{x-delayed-type, topic}};//设置当前消息为延时队列channel.ExchangeDeclare(exchange: exchangeName, type: x-delayed-message, true, false, argMaps);channel.QueueDeclare(queueName, true, false, false, argMaps);channel.QueueBind(queueName, exchangeName, routingkey);var time 1000 * 5;var message $发送时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss} 延时时间为:{time};var body Encoding.UTF8.GetBytes(message);var props channel.CreateBasicProperties();//设置消息的过期时间props.Headers new Dictionarystring, object(){{ x-delay, time }};channel.BasicPublish(exchange: exchangeName, routingKey: routingkey, basicProperties: props, body: body);Console.WriteLine(成功发送消息: message);return success;
}消费消息消费消息我是弄了一个后台任务(RabbitmqDelayedHostService)在处理public class RabbitmqDelayedHostService : BackgroundService
{private readonly IModel _channel;private readonly IConnection _connection;public RabbitmqDelayedHostService(){var connFactory new ConnectionFactory//创建连接工厂对象{HostName localhost,//IP地址Port 5672,//端口号UserName admin,//用户账号Password 123456,//用户密码VirtualHost customer};_connection connFactory.CreateConnection();_channel _connection.CreateModel();//交换机名称var exchangeName exchangeDelayed;var queueName delay_queueName;var routingkey delay.delay;var argMaps new Dictionarystring, object(){{x-delayed-type, topic}};_channel.ExchangeDeclare(exchange: exchangeName, type: x-delayed-message, true, false, argMaps);_channel.QueueDeclare(queueName, true, false, false, argMaps);_channel.QueueBind(queue: queueName, exchange: exchangeName, routingKey: routingkey);//声明为手动确认_channel.BasicQos(0, 1, false);}protected override Task ExecuteAsync(CancellationToken stoppingToken){var queueName delay_queueName;var consumer new EventingBasicConsumer(_channel);consumer.Received (model, ea) {var message Encoding.UTF8.GetString(ea.Body.ToArray());var routingKey ea.RoutingKey;Console.WriteLine($接受到消息的时间为 {DateTime.Now:yyyy-MM-dd HH:mm:ss},routingKey:{routingKey} message:{message} );//手动确认_channel.BasicAck(ea.DeliveryTag, true);};_channel.BasicConsume(queue: queueName, autoAck: false, consumer: consumer);return Task.CompletedTask;}public override void Dispose(){_connection.Dispose();_channel.Dispose();base.Dispose();}
}注册该后台任务services.AddHostedServiceRabbitmqDelayedHostService();输出结果成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000成功发送消息:发送时间为 2022-07-02 18:54:22 延时时间为:5000成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000成功发送消息:发送时间为 2022-07-02 18:54:23 延时时间为:5000接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000接受到消息的时间为 2022-07-02 18:54:27,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:22 延时时间为:5000接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000接受到消息的时间为 2022-07-02 18:54:28,routingKey:delay.delay message:发送时间为 2022-07-02 18:54:23 延时时间为:5000其他方案Hangfire延迟队列BackgroundJob.Schedule(() Console.WriteLine(Delayed!),TimeSpan.FromDays(7));时间轮Redisson DelayQueue计时管理器