当前位置: 首页 > news >正文

做创意礼品定制的网站51视频

做创意礼品定制的网站,51视频,php企业网站,深圳市网站备案需求使用 Redis Stream 实现消息队列IntroRedis 5.0 中增加了 Stream 的支持#xff0c;利用 Stream 我们可以实现可靠的消息队列#xff0c;并且支持一个消息被多个消费者所消费#xff0c;可以很好的实现消息队列Simple Usage首先我们来看一个简单版本的 Stream 使用#xff… 使用 Redis Stream 实现消息队列IntroRedis 5.0 中增加了 Stream 的支持利用 Stream 我们可以实现可靠的消息队列并且支持一个消息被多个消费者所消费可以很好的实现消息队列Simple Usage首先我们来看一个简单版本的 Stream 使用我们在代码里使用一个发布者一个消费者来模拟一个简单的消息队列的场景来看下面的测试代码private const string StreamKey  test-simple-stream;public static async Task MainTest() {await RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);// register background consumer_  Task.Factory.StartNew(Consume).ConfigureAwait(false);//await Publish(); }private static async Task Publish() {Console.WriteLine(Press Enter to publish messages, Press Q to exit);var input  Console.ReadLine();while (input is not q and not Q){var redis  RedisHelper.GetDatabase();for (var i  0; i  10; i){await redis.StreamAddAsync(StreamKey, message, $test_message_{i});}input  Console.ReadLine();} }private static async Task Consume() {var lastMsgId  0-0;while (true){await InvokeHelper.TryInvokeAsync(async () {var redis  RedisHelper.GetDatabase();var entries  await redis.StreamReadAsync(StreamKey, lastMsgId, 2);if (entries.Length  0){return;}foreach (var entry in entries){Console.WriteLine(entry.Id);entry.Values.Dump();// delete message if you want// redis.StreamDelete(StreamKey, new[] { entry.Id });}lastMsgId  entries[^1].Id;});await Task.Delay(200);} } 上面的代码会使用一个后台线程来运行一个 Consumer 来从 Stream 中读取消息有两种消费消息的模式一种是自己维护一个处理的消息 offset每次从这个 offset 之后读取新消息另外一种模式不需要维护本地的 offset可以在处理完消息之后直接删掉消息默认消息是不会删消息的所以如果不删消息的话需要维护Publisher 每次会发布 10 条消息Consumer 每次会读取两条消息处理之后会等待 200 ms之后再查询消息来看一下运行效果吧Consumer Group上面的示例会相对来说比较简单只有一个 Consumer但是在比较常用的场景下往往会有多个消费者处理比如说用户注册成功之后发布一条消息可能会有多个 Consumer 同时给用户发邮件或短信以及给用户加积分等操作这种场景下使用上面的模式就不合适了Redis Stream 中增加了 Consumer Group 的概念有的人甚至称 Redis 内置了一个 Kafka在创建了 Consumer Group 之后向 Stream 发布消息的时候会广播到各个 Consumer Group 中每个 Consumer Group 的消息消费是独立的不同的 Consumer Group 的消费速度可以不一致一个 Consumer Group 也可以有多个 Consumer 同时运行同一个 Group 内的多个 Consumer 是会共享一个 Consumer Group 的消息消费而且我们可以手动进行消息的 ACK来看下面的示例代码吧private const string StreamKey  test-stream-group; private static int _consumerCount;public static async Task MainTest() {await RedisHelper.GetDatabase().KeyDeleteAsync(StreamKey);// register background consumer_  await Task.Factory.StartNew(Consume).ConfigureAwait(false);_  await Task.Factory.StartNew(Consume).ConfigureAwait(false);//await Publish(); }private static async Task Publish() {Console.WriteLine(Press Enter to publish messages, Press Q to exit);var input  Console.ReadLine();while (input is not q and not Q){var redis  RedisHelper.GetDatabase();for (var i  0; i  10; i){await redis.StreamAddAsync(StreamKey, message, $test_message_{i});}input  Console.ReadLine();} }private static async Task Consume() {Interlocked.Increment(ref _consumerCount);var groupName  $group-{_consumerCount};var consumerName  $consumer-{_consumerCount};var redis  RedisHelper.GetDatabase();redis.StreamCreateConsumerGroup(StreamKey, groupName);while (true){await InvokeHelper.TryInvokeAsync(async () {var messages  await redis.StreamReadGroupAsync(StreamKey, groupName, consumerName, count: SecurityHelper.Random.Next(1, 4));if (messages.Length  0){return;}foreach (var message in messages){Console.WriteLine(${groupName}-{message.Id}-{message.Values.ToJson()});await redis.StreamAcknowledgeAsync(StreamKey, groupName, message.Id);}});await Task.Delay(200);} } 上面的示例代码会先注册两个 Consumer Group两个 Consumer Group 内各有一个 consumer你也可以使用多个 consumer为了体现各个 Consumer Group 是独立的每次获取消息的 Count 是会随机指定的在读取的消息之后会输出消息内容来代替处理消息的逻辑处理完成之后进行消息的 ACK消息的发布逻辑和上面的示例是类似的上述代码执行输出示例可以看到我们发布的消息每一个 consumer group 都会处理消息而且处理消息的速度是独立的互不影响通过 XINFO 命令我们可以对 Stream 做一些监控More利用 Redis 的 Stream 我们可以实现可靠的一个消息机制stream 的每一条消息都会有一个消息 Id默认是两个部分一个部分是时间戳另一个部分是一个序列号消息 Id 可以自定义但是通常情况下推荐用默认的 idRedis 中的 List、HashSet、Set、ZSet 这些数据类型中没有元素的时候会把对应的 Key 也会删掉但是 Stream 是不会的Stream 允许没有消息的时候依然存在Redis Stream 使用的时候需要注意我们是可以指定 Stream 的消息长度的如果我们指定了最大消息长度 10000超出 10000 的时候旧消息就会被挤出队列可能会出现消息的丢失需要对 Stream 做必要的监控和报警Referenceshttps://redis.io/topics/streams-introhttps://redis.io/commandshttps://github.com/WeihanLi/SamplesInPractice/tree/master/RedisSample
http://www.yutouwan.com/news/306602/

相关文章:

  • 如何建设百度网站python快速搭建网站
  • 网站整体配色方案调用wordpress数据
  • 做h5找图网站怎么创建网页快捷方式到桌面
  • 易语言怎么做点击按钮打开网站wordpress下安装论坛 伪静态
  • 广东手机网站建设多少钱网站备案照
  • 扬州网站建设坦克大战网站开发课程设计报告
  • 网站域名注册后怎么建设做响应式网站制作
  • 石油化工建设网站wordpress quizzin
  • 常州网站建设方案wordPress主题模板站
  • 网站群 建设 方案平谷头条新闻
  • 站长工具使用网站开发流程框架
  • 淘宝数据网站开发搜索引擎seo关键词优化效果
  • 焦作网站建设公司排名网站可信度验证
  • 免费公司网站模板镇江关键词优化
  • 网站虚拟建设策划安卓aso优化
  • wordpress怎么设置语言为中文什么是优化
  • 网站的优化与推广上海全屋整装哪家好
  • 网站开发是某某网站建设策划书2000字
  • 怎么样优化网站seoLight模板WordPress
  • wordpress网站换字体颜色wordpress在线点播
  • .net网站开发教程创办网站要多少钱
  • vue 做网站全椒有做网站的吗
  • 男女做那个的的视频网站国外化工网站模板
  • 河南怎么建设网站搭建网站要多久
  • 做图必备素材网站网站开发哈尔滨网站开发公司电话
  • 海南住房和城乡建设厅网站首页40个免费网站推广平台
  • 给平面设计素材网站做素材挣钱吗网站备案上传照片几寸
  • 自己建站流程电子商务网站建设课后作业
  • 临海市住房与城乡建设规划局网站wordpress wmv
  • 官方网站建设的必要迅雷网站做爰视频