长沙市住房与城乡建设厅网站,wordpress投票,注册网站发财的富豪,苏州网站建设选苏州梦易行前言最近需要使用到消息队列相关技术#xff0c;于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题#xff0c;归纳了一下#xff0c;大概有以下几种#xff1a;1. 临时异常#xff0c;如数据库网络闪断、http请求临时失效等#xff1b;2. 时序异常#xff0c;如A任… 前言 最近需要使用到消息队列相关技术于是重新接触RabbitMQ。其中遇到了不少可靠性方面的问题归纳了一下大概有以下几种 1. 临时异常如数据库网络闪断、http请求临时失效等 2. 时序异常如A任务依赖于B任务但可能由于调度或消费者分配的原因导致A任务先于B任务执行 3. 业务异常由于系统测试不充分上线后发现某几个或某几种消息无法正常处理 4. 系统异常业务中间件无法正常操作如网络中断、数据库宕机等 5. 非法异常一些伪造、攻击类型的消息。 针对这些异常我采用了一种基于消息审计、消息重试、消息检索、消息重发的方案。 方案 1. 消息均使用Exchange进行通讯方式可以是direct或topic不建议fanout。 2. 根据业务在Exchange下分配一个或多个Queue同时设置一个审计线程(Audit)监听所有Queue用于记录消息到MongoDB同时又不阻塞正常业务处理。 3. 生产者(Publisher)在发布消息时基于AMQP协议生成消息标识MessageId和时间戳Timestamp根据消息业务添加头信息Headers便于跟踪。 4. 消费者(Comsumer)消息处理失败时则把消息发送到重试交换机(Retry Exchange)并设置过期重试时间及更新重试次数如果超过重试次数则删除消息。 5. 重试交换机Exchange设置死信交换机(Dead Letter Exchange)消息过期后自动转发到业务交换机(Exchange)。 6. WebApi可以根据消息标识MessageId、时间戳Timestamp以及头信息Headers在MongoDB中对消息进行检索或重试。 注选择MongoDB作为存储介质的主要原因是其对头信息headers的动态查询支持较好同等的替代产品还可以是Elastic Search这些。 生产者(Publisher) 1. 设置断线自动恢复 var factory new ConnectionFactory{Uri new Uri(amqp://guest:guest192.168.132.137:5672),AutomaticRecoveryEnabled true }; 2. 定义Exchange模式为direct channel.ExchangeDeclare(Exchange, direct); 3. 根据业务定义QueueA和QueueB channel.QueueDeclare(QueueA, true, false, false);channel.QueueBind(QueueA, Exchange, RouteA);channel.QueueDeclare(QueueB, true, false, false);channel.QueueBind(QueueB, Exchange, RouteB); 4. 启动消息发送确认机制即需要收到RabbitMQ服务端的确认消息 channel.ConfirmSelect(); 5. 设置消息持久化 var properties channel.CreateBasicProperties();properties.Persistent true; 6. 生成消息标识MessageId、时间戳Timestamp以及头信息Headers properties.MessageId Guid.NewGuid().ToString(N);properties.Timestamp new AmqpTimestamp(DateTimeOffset.UtcNow.ToUnixTimeMilliseconds());properties.Headers new Dictionarystring, object {{ key, value i}}; 7. 发送消息偶数序列发送到QueueARouteA奇数序列发送到QueueBRouteB channel.BasicPublish(Exchange, i % 2 0 ? RouteA : RouteB, properties, body); 8. 确定收到RabbitMQ服务端的确认消息 var isOk channel.WaitForConfirms();if (!isOk){throw new Exception(The message is not reached to the server!);} 完整代码效果QueueA和QueueB各一条消息QueueAudit两条消息 注Exchange下必须先声明Queue才能接收到消息上述代码并没有QueueAudit的声明需要手动声明或者先执行下面的消费者程序进行声明。 正常消费者(ComsumerA) 1. 设置预取消息避免公平轮训问题可以根据需要设置预取消息数这里是1 _channel.BasicQos(0, 1, false); 2. 声明Exchange和Queue _channel.ExchangeDeclare(Exchange, direct);_channel.QueueDeclare(QueueA, true, false, false);_channel.QueueBind(QueueA, Exchange, RouteA); 3. 编写回调函数 注设置了RabbitMQ的断线恢复机制当RabbitMQ连接不可用时与MQ通讯的操作会抛出AlreadyClosedException的异常导致主线程退出哪怕连接恢复了程序也无法恢复因此需要捕获处理该异常。 异常消费者(ComsumerB) 1. 设置预取消息 _channel.BasicQos(0, 1, false); 2. 声明Exchange和Queue _channel.ExchangeDeclare(Exchange, direct);_channel.QueueDeclare(QueueB, true, false, false);_channel.QueueBind(QueueB, Exchange, RouteB); 3. 设置死信交换机(Dead Letter Exchange) var retryDic new Dictionarystring, object { {x-dead-letter-exchange, Exchange},{x-dead-letter-routing-key, RouteB}};_channel.ExchangeDeclare(Exchange_Retry, direct);_channel.QueueDeclare(QueueB_Retry, true, false, false, retryDic);_channel.QueueBind(QueueB_Retry, Exchange_Retry, RouteB_Retry); 4. 重试设置3次重试第一次1秒第二次10秒第三次30秒 _retryTime new Listint {1 * 1000,10 * 1000,30 * 1000 }; 5. 获取当前重试次数 var retryCount 0;if (ea.BasicProperties.Headers ! null ea.BasicProperties.Headers.ContainsKey(retryCount)){retryCount (int)ea.BasicProperties.Headers[retryCount];_logger.LogWarning($[{DateTime.Now:yyyy-MM-dd HH:mm:ss}]Message:{ea.BasicProperties.MessageId}, {retryCount} retry started...);} 6. 发生异常判断是否可以重试 private bool CanRetry(int retryCount){return retryCount _retryTime.Count - 1;} 7. 可以重试则启动重试机制审计消费者(Audit Comsumer) 1. 声明Exchange和Queue _channel.ExchangeDeclare(Exchange, direct);_channel.QueueDeclare(QueueAudit, true, false, false);_channel.QueueBind(QueueAudit, Exchange, RouteA);_channel.QueueBind(QueueAudit, Exchange, RouteB); 2. 排除死信Exchange转发过来的重复消息 if (ea.BasicProperties.Headers null || !ea.BasicProperties.Headers.ContainsKey(x-death)){...} 3. 生成消息实体 var message new Message{MessageId ea.BasicProperties.MessageId,Body ea.Body,Exchange ea.Exchange,Route ea.RoutingKey}; 4. RabbitMQ会用bytes来存储字符串因此要把头中bytes转回字符串 if (ea.BasicProperties.Headers ! null){var headers new Dictionarystring, object();foreach (var header in ea.BasicProperties.Headers){if (header.Value is byte[] bytes){headers[header.Key] Encoding.UTF8.GetString(bytes);}else{headers[header.Key] header.Value;}}message.Headers headers;} 5. 把Unix格式的Timestamp转成UTC时间 if (ea.BasicProperties.Timestamp.UnixTime 0){message.TimestampUnix ea.BasicProperties.Timestamp.UnixTime;var offset DateTimeOffset.FromUnixTimeMilliseconds(ea.BasicProperties.Timestamp.UnixTime);message.Timestamp offset.UtcDateTime;} 6. 消息存入MongoDB _mongoDbContext.CollectionMessage().InsertOne(message, cancellationToken: cancellationToken); MongoDB记录 重试记录 消息检索及重发(WebApi) 1. 通过消息Id检索消息 2. 通过头消息检索消息 3. 消息重发会重新生成MessageId AckNackReject的关系 1. 消息处理成功执行AckRabbitMQ会把消息从队列中删除。 2. 消息处理失败执行Nack或者Reject a) 当requeuetrue时消息会重新回到队列然后当前消费者会马上再取回这条消息 b) 当requeuefalse时如果Exchange有设置Dead Letter Exchange则消息会去到Dead Letter Exchange c) 当requeuefalse时如果Exchange没设置Dead Letter Exchange则消息从队列中删除效果与Ack相同。 3. Nack与Reject的区别在于Nack可以批量操作Reject只能单条操作。 RabbitMQ自动恢复连接Connection恢复 1. 重连Reconnect 2. 恢复连接监听Listeners 3. 重新打开通道Channels 4. 恢复通道监听Listeners 5. 恢复basic.qospublisher confirms以及transaction设置 拓扑Topology恢复 1. 重新声明交换机Exchanges 2. 重新声明队列Queues 3. 恢复所有绑定Bindings 4. 恢复所有消费者Consumers 异常处理机制 1. 临时异常如数据库网络闪断、http请求临时失效等 通过短时间重试如1秒后的方式处理也可以考虑Nack/Reject来实现重试时效性更高。 2. 时序异常如A任务依赖于B任务但可能由于调度或消费者分配的原因导致A任务先于B任务执行 通过长时间重试如1分钟、30分钟、1小时、1天等等待B任务先执行完的方式处理。 3. 业务异常由于系统测试不充分上线后发现某几个或某几种消息无法正常处理 等系统修正后通过消息重发的方式处理。 4. 系统异常业务中间件无法正常操作如网络中断、数据库宕机等 等系统恢复后通过消息重发的方式处理。 5. 非法异常一些伪造、攻击类型的消息 多次重试失败后消息从队列中被删除也可以针对此业务做进一步处理。 源码地址https://github.com/ErikXu/RabbitMesage相关文章CAP带你轻松玩转ASP.NETCore消息队列.NetCore Cap 结合 RabbitMQ 实现消息订阅[译]RabbitMQ教程C#版 - 发布订阅原文地址: https://www.cnblogs.com/Erik_Xu/p/9515208.html.NET社区新闻深度好文欢迎访问公众号文章汇总 http://www.csharpkit.com