查找公司注册信息的网站,商务公司网站建设,企业公示信息查询系统江西,网站开发市场情况摘要#xff1a; 当资源成为瓶颈时#xff0c;服务框架需要对消费者做限流#xff0c;启动流控保护机制。流量控制有多种策略#xff0c;比较常用的有#xff1a;针对访问速率的静态流控、针对资源占用的动态流控、针对消费者并发连接数的连接控制和针对并行访问数的并发控…摘要 当资源成为瓶颈时服务框架需要对消费者做限流启动流控保护机制。流量控制有多种策略比较常用的有针对访问速率的静态流控、针对资源占用的动态流控、针对消费者并发连接数的连接控制和针对并行访问数的并发控制。在分布式架构中应用和应用之间的调用类型分为以下两种流控方式也略有不同。
点此查看原文https://yq.aliyun.com/articles/380180?spma2c41.11181499.0.0
当资源成为瓶颈时服务框架需要对消费者做限流启动流控保护机制。流量控制有多种策略比较常用的有针对访问速率的静态流控、针对资源占用的动态流控、针对消费者并发连接数的连接控制和针对并行访问数的并发控制。在实践中各种流量控制策略需要综合使用才能起到较好的效果。
在分布式架构中应用和应用之间的调用类型分为以下两种流控方式也略有不同。
同步RPC类调用比如RESTfulDubboHSF等都属于该类。对于该类同步调用通常限流方式为两种针对服务提供者的并发全局流控或针对服务消费者的并发局部流控。两种的控制手段类似都是通过限制服务端或客服端并发调用数来进行限制。
异步MQ类调用典型如RocketMQ, Kafka等。对于该类异步调用通常限流方式是在订阅端限流。限流方式为两种针对消息订阅者的并发流控或针对消息订阅者的消费延时流控。
针对消息订阅者的消费延时流控基本原理是在每次客户端消费时可以增加一个延时来控制消费速度这样理论消费并发最快速度为
MaxRate 1 / ConsumInterval * ConcurrentThreadNumber
比如如果消息并发消费线程为20延时为100ms则理论上可以将并发消费控制在200以下。具体公式如下
200 1 / 0.1 * 20
相比并发线程数流控消费延时流控优点在于实现相对简单对MQ类客户端包依赖较少不需要客户端提供控制并发线程数的动态调整接口。
以上各种流量控制方法在分布式架构下如果要做到全局动态控制一个简单的技术方法是依赖配置中心即通过配置中心来进行流控参数的下发。
下面章节详细介绍如何基于配置中心来实现异步消息消费的全局动态流控。使用的例子为阿里云上的 MQ (消息队列)和 ACM (应用配置管理)两款产品。
注之所以用MQ为示例是因为在本文撰写之时正好MQ Consumer Client SDK并不支持动态调整现成并发数因此通过基于ACM来动态调整消费延迟的方法正好可以解决MQ消费流控动态的问题。
基于消费延时流控的基本原理
基本原理如下。其中管理员或应用程序通过ACM控制台发布消费延时配置(RCV_INTERVAL_TIME)所有MQ消费程序订阅该配置。理论上该配置从发布到下发所有客户端可以在1秒内完成(取决于网络延时)。代码示例
该章节基于配置中心来实现异步消息消费的全局动态流控的代码示例。使用的例子为阿里云上的MQ(消息队列)和ACM(应用配置管理)两款产品基于Java语言。关于SDK的详细介绍可参见两款产品的官方文档。
在ACM上创建消费延时的参数截屏如下。设置全局消费延时变量
首先设置消费接收延时的全局变量 如下。// 初始化消息接收延时参数单位为millisecondstatic int RCV_INTERVAL_TIME 10000;// 初始化配置服务控制台通过示例代码自动获取下面参数ConfigService.init(acm.aliyun.com, /*租户ID*/xxx, /*AK*/xxx, /*SK*/yyy); // 主动获取配置String content ConfigService.getConfig(app.mq.qos, DEFAULT_GROUP, 6000);Properties p new Properties();try {p.load(new StringReader(content));RCV_INTERVAL_TIME Integer.valueOf(p.getProperty(RCV_INTERVAL_TIME));} catch (IOException e) {e.printStackTrace();}
其次设置ACM listener确保当配置被修改时即使更新 RCV_INTERVAL_TIME 参数 如下。// 初始化的时候给配置添加监听,配置变更会回调通知ConfigService.addListener(app.mq.qos, DEFAULT_GROUP, new ConfigChangeListener() {public void receiveConfigInfo(String configInfo) {Properties p new Properties();try {p.load(new StringReader(configInfo));RCV_INTERVAL_TIME Integer.valueOf(p.getProperty(RCV_INTERVAL_TIME));} catch (IOException e) {e.printStackTrace();}}});
设置 MQ 消费延时逻辑
完整实例如下。
注这里 RCV_INTERVAL_TIME 参数的访问是故意没有加锁的读者可以自行思考原因。Aliyun ONS Client不提供动态线程并发数默认并发为20。因此这里正好使用消费延时参数来动态调节QoS。//以下代码可直接贴在Main()函数里Properties properties new Properties();properties.put(PropertyKeyConst.ConsumerId, CID_consumer_group);properties.put(PropertyKeyConst.AccessKey,xxx);properties.put(PropertyKeyConst.SecretKey, yyy);properties.setProperty(PropertyKeyConst.SendMsgTimeoutMillis, 3000);// 设置 TCP 接入域名此处以公共云生产环境为例properties.put(PropertyKeyConst.ONSAddr,http://onsaddr-internet.aliyun.com/rocketmq/nsaddr4client-internet);Consumer consumer ONSFactory.createConsumer(properties);consumer.subscribe(/*Topic*/topic-name, /*Tag*/null, new MessageListener() {public Action consume(Message message, ConsumeContext context) {// MQ Subscribe QoS logical start, // Each consuming process will sleep for RCV_INTERVAL_TIME seconds with 100 ms sleeping cycle.// Within each cycle, the thread will check RCV_INTERVAL_TIME in case its set to a smaller value. // RCV_INTERVAL_TIME 0 means no sleeping.int rcvIntervalTimeLeft RCV_INTERVAL_TIME;while (rcvIntervalTimeLeft 0) {if (rcvIntervalTimeLeft RCV_INTERVAL_TIME) {rcvIntervalTimeLeft RCV_INTERVAL_TIME;}try {if (rcvIntervalTimeLeft 100) {rcvIntervalTimeLeft - 100;Thread.sleep(100);} else {Thread.sleep(rcvIntervalTimeLeft);rcvIntervalTimeLeft 0;}} catch (InterruptedException e) {e.printStackTrace();}}// MQ Subscribe interval logical endsSystem.out.println(Receive: message);/** Put your business logic here.*/doSomething();return Action.CommitMessage;}});consumer.start();
运行结果
单机运行consumer进行消费假设queue内的消息无限多不存在消费万的情况分三段测试,分别运行约5分钟通过ACM配置推送来达到以下效果。
RCV_INTERVAL_TIME 100 ms
RCV_INTERVAL_TIME 5000 ms
RCV_INTERVAL_TIME 1000 ms
结果如下在单MQ消费业务处理耗时约100ms情况下的单机并发20线程的测试结果。
RCV_INTERVAL_TIME 100 ms平均消费性能约为 9000 tpm 左右
RCV_INTERVAL_TIME 5000 ms平均消费性能被限制到了 200 tpm 左右
RCV_INTERVAL_TIME 1000 ms平均消费性能回升到到了 1100 tpm 左右
以上结果基本达到消费和 tpm 成反比的预期最关键的是整个过程中应用不中断流控推送结果秒级生效到分布式集群。单机性能结果如下所示。