当前位置: 首页 > news >正文

网站后台怎么打开小饭店普通装修

网站后台怎么打开,小饭店普通装修,外国大气网站,app开发定制公司如何选择作者 | 犀牛饲养员责编 | 徐威龙封图| CSDN 下载于视觉中国最近看kafka源码#xff0c;着实被它的客户端缓冲池技术优雅到了。忍不住要写篇文章赞美一下#xff08;哈哈#xff09;。注#xff1a;本文用到的源码来自kafka2.2.2版本。背景当我们应用程序调用kafka客户端 pr… 作者 | 犀牛饲养员 责编 | 徐威龙封图| CSDN 下载于视觉中国最近看kafka源码着实被它的客户端缓冲池技术优雅到了。忍不住要写篇文章赞美一下哈哈。注本文用到的源码来自kafka2.2.2版本。背景当我们应用程序调用kafka客户端 producer发送消息的时候在kafka客户端内部会把属于同一个topic分区的消息先汇总起来形成一个batch。真正发往kafka服务器的消息都是以batch为单位的。如下图所示这么做的好处显而易见。客户端和服务端通过网络通信这样批量发送可以减少网络带来的性能开销提高吞吐量。这个Batch的管理就非常值得探讨了。可能有人会说这不简单吗用的时候分配一个块内存发送完了释放不就行了吗。kafka是用java语言编写的新版本大部分都是用java实现的了用上面的方案就是使用的时候new一个空间然后赋值给一个引用释放的时候把引用置为null等JVM GC处理就可以了。看起来似乎也没啥问题。但是在并发量比较高的时候就会频繁的进行GC。我们都知道GC的时候有个stop the world尽管最新的GC技术这个时间已经非常短依然有可能成为生产环境的性能瓶颈。kafka的设计者当然能考虑到这一层。下面我们就来学习下kafka是如何对batch进行管理的。缓冲池技术原理解析kafka客户端使用了缓冲池的概念预先分配好真实的内存块放在池子里。每个batch其实都对应了缓冲池中的一个内存空间发送完消息之后batch不再使用了就把内存块归还给缓冲池。听起来是不是很耳熟啊不错数据库连接池线程池等池化技术其实差不多都是这样的原理。通过池化技术降低创建和销毁带来的开销提升执行效率。代码是最好的文档下面我们就来撸下源码。我们撸代码的步骤采用的是从上往下的原则先带你看看缓冲池在哪里使用然后再深入到缓存池内部深入分析。下面的代码做了一些删减值保留了跟本文相关的部分便于分析。public class KafkaProducerK, V implements ProducerK, V {private final Logger log;private static final AtomicInteger PRODUCER_CLIENT_ID_SEQUENCE  new AtomicInteger(1);private static final String JMX_PREFIX  kafka.producer;public static final String NETWORK_THREAD_PREFIX  kafka-producer-network-thread;public static final String PRODUCER_METRIC_GROUP_NAME  producer-metrics;Overridepublic FutureRecordMetadata send(ProducerRecordK, V record, Callback callback) {// intercept the record, which can be potentially modified; this method does not throw exceptionsProducerRecordK, V interceptedRecord  this.interceptors.onSend(record);return doSend(interceptedRecord, callback);}private FutureRecordMetadata doSend(ProducerRecordK, V record, Callback callback) {RecordAccumulator.RecordAppendResult result  accumulator.append(tp, timestamp, serializedKey,serializedValue, headers, interceptCallback, remainingWaitMs);...} 当我们调用客户端的发送消息的时候底层会调用doSend然后里面使用一个记录累计器RecordAccumulator把消息append进来。我们继续往下看看。public final class RecordAccumulator {private final Logger log;private volatile boolean closed;private final AtomicInteger flushesInProgress;private final AtomicInteger appendsInProgress;private final int batchSize;private final CompressionType compression;private final int lingerMs;private final long retryBackoffMs;private final int deliveryTimeoutMs;private final BufferPool free;private final Time time;private final ApiVersions apiVersions;private final ConcurrentMapTopicPartition, DequeProducerBatch batches;private final IncompleteBatches incomplete;// The following variables are only accessed by the sender thread, so we dont need to protect them.private final MapTopicPartition, Long muted;private int drainIndex;private final TransactionManager transactionManager;private long nextBatchExpiryTimeMs  Long.MAX_VALUE; // the earliest time (absolute) a batch will expire.public RecordAppendResult append(TopicPartition tp,long timestamp,byte[] key,byte[] value,Header[] headers,Callback callback,long maxTimeToBlock) throws InterruptedException {// We keep track of the number of appending thread to make sure we do not miss batches in// abortIncompleteBatches().appendsInProgress.incrementAndGet();ByteBuffer buffer  null;buffer  free.allocate(size, maxTimeToBlock);synchronized (dq) {// Need to check if producer is closed again after grabbing the dequeue lock.if (closed)throw new KafkaException(Producer closed while send in progress);RecordAppendResult appendResult  tryAppend(timestamp, key, value, headers, callback, dq);if (appendResult ! null) {// Somebody else found us a batch, return the one we waited for! Hopefully this doesnt happen often...return appendResult;}MemoryRecordsBuilder recordsBuilder  recordsBuilder(buffer, maxUsableMagic);ProducerBatch batch  new ProducerBatch(tp, recordsBuilder, time.milliseconds());FutureRecordMetadata future  Utils.notNull(batch.tryAppend(timestamp, key, value, headers, callback, time.milliseconds()));dq.addLast(batch);...RecordAccumulator其实就是管理一个batch队列我们看到append方法实现其实是调用BufferPool的free方法申请allocate了一块内存空间(ByteBuffer) 然后把这个内存空空间包装成batch添加到队列后面。当消息发送完成不在使用batch的时候RecordAccumulator会调用deallocate方法归还内存内部其实是调用BufferPool的deallocate方法。public void deallocate(ProducerBatch batch) {incomplete.remove(batch);// Only deallocate the batch if it is not a split batch because split batch are allocated outside the// buffer pool.if (!batch.isSplitBatch())free.deallocate(batch.buffer(), batch.initialCapacity());} 很明显BufferPool就是缓冲池管理的类也是我们今天要讨论的重点。我们先来看看分配内存块的方法。public class BufferPool {static final String WAIT_TIME_SENSOR_NAME  bufferpool-wait-time;private final long totalMemory;private final int poolableSize;private final ReentrantLock lock;private final DequeByteBuffer free;private final DequeCondition waiters;/** Total available memory is the sum of nonPooledAvailableMemory and the number of byte buffers in free * poolableSize.  */private long nonPooledAvailableMemory;private final Metrics metrics;private final Time time;private final Sensor waitTime;public ByteBuffer allocate(int size, long maxTimeToBlockMs) throws InterruptedException {if (size  this.totalMemory)throw new IllegalArgumentException(Attempt to allocate   size  bytes, but there is a hard limit of  this.totalMemory  on memory allocations.);ByteBuffer buffer  null;this.lock.lock();try {// check if we have a free buffer of the right size pooledif (size  poolableSize  !this.free.isEmpty())return this.free.pollFirst();// now check if the request is immediately satisfiable with the// memory on hand or if we need to blockint freeListSize  freeSize() * this.poolableSize;if (this.nonPooledAvailableMemory  freeListSize  size) {// we have enough unallocated or pooled memory to immediately// satisfy the request, but need to allocate the bufferfreeUp(size);this.nonPooledAvailableMemory - size;} else {// we are out of memory and will have to blockint accumulated  0;Condition moreMemory  this.lock.newCondition();try {long remainingTimeToBlockNs  TimeUnit.MILLISECONDS.toNanos(maxTimeToBlockMs);this.waiters.addLast(moreMemory);// loop over and over until we have a buffer or have reserved// enough memory to allocate onewhile (accumulated  size) {long startWaitNs  time.nanoseconds();long timeNs;boolean waitingTimeElapsed;try {waitingTimeElapsed  !moreMemory.await(remainingTimeToBlockNs, TimeUnit.NANOSECONDS);} finally {long endWaitNs  time.nanoseconds();timeNs  Math.max(0L, endWaitNs - startWaitNs);recordWaitTime(timeNs);}if (waitingTimeElapsed) {throw new TimeoutException(Failed to allocate memory within the configured max blocking time   maxTimeToBlockMs   ms.);}remainingTimeToBlockNs - timeNs;// check if we can satisfy this request from the free list,// otherwise allocate memoryif (accumulated  0  size  this.poolableSize  !this.free.isEmpty()) {// just grab a buffer from the free listbuffer  this.free.pollFirst();accumulated  size;} else {// well need to allocate memory, but we may only get// part of what we need on this iterationfreeUp(size - accumulated);int got  (int) Math.min(size - accumulated, this.nonPooledAvailableMemory);this.nonPooledAvailableMemory - got;accumulated  got;}...首先整个方法是加锁操作的所以支持并发分配内存。逻辑是这样的当申请的内存大小等于poolableSize则从缓存池中获取。这个poolableSize可以理解成是缓冲池的页大小作为缓冲池分配的基本单位。从缓存池获取其实就是从ByteBuffer队列取出一个元素返回。如果申请的内存不等于特定的数值则向非缓存池申请。同时会从缓冲池中取一些内存并入到非缓冲池中。这个nonPooledAvailableMemory指的就是非缓冲池的可用内存大小。非缓冲池分配内存其实就是调用ByteBuffer.allocat分配真实的JVM内存。缓存池的内存一般都很少回收。而非缓存池的内存是使用后丢弃然后等待GC回收。继续来看看batch释放的代码public void deallocate(ByteBuffer buffer, int size) {lock.lock();try {if (size  this.poolableSize  size  buffer.capacity()) {buffer.clear();this.free.add(buffer);} else {this.nonPooledAvailableMemory  size;}Condition moreMem  this.waiters.peekFirst();if (moreMem ! null)moreMem.signal();} finally {lock.unlock();}} 很简单也是分为两种情况。要么直接归还缓冲池要么就是更新非缓冲池部分的可以内存。然后通知等待队列里的第一个元素。推荐阅读Docker 概念很难理解一文搞定 Docker 端口绑定DevOps 转型时如何安全融入对企业产出有何影响2019年 DevOps 最新现状研究报告解读 | 原力计划 十分钟上手 ReactMirrorX从此前端大神代码不再难懂 | 原力计划 第一批复工的人都栽在了公司的厕所...... 如何用Jupyter Notebook制作新冠病毒疫情追踪器 比特币最主流以太坊大跌区块链技术“万金油”红利已结束 | 区块链开发者年度报告 真香朕在看了
http://www.yutouwan.com/news/452943/

相关文章:

  • 国外免费网站空间乔拓云网微信小程序制作
  • 广州网站优化平台用自己照片做衣服_杯子的是哪个网站
  • 番禺网站开发价格安徽百度推广怎么做
  • 四川省建设厅工地安全网站wordpress制作企业
  • 做移动网站优化排名东莞58同城网
  • wordpress cdn加速seo网站排名优化快速排
  • 做财务需要关注哪些网站网页制作的模板代码
  • 山西建设网站公司用html做家谱网站代码
  • 毕业设计网站建设题目企业管理系统大全免费
  • 哪里有专做水果的网站新手制作网页的方法
  • 网站免费建站系统阿里云网站方案建设书模板
  • 公司建的站加油违法吗南京今天重大新闻事件
  • 小公司如何做网站鞍山58同城租房网
  • php和django做网站哪个好网上做公司网站怎么做
  • 网站建设的一般流程是电子设计全国网站建设
  • 网站怎么做谷歌权重张家港网站开发培训
  • 做网站怎么做的建设网站公司有哪些
  • 苏州知名网站建设设计公司排名网站被360拦截怎么办
  • 仙桃网站设计公司wordpress不同分类模板
  • 如何创网站网站关键词快速排名软件
  • 做暧暧视频网站微信微网站模版
  • 优化自己的网站苏州建设集团有限责任公司
  • 建设企业网站可信度招聘网站开发时间
  • 北京网站建设百度排名怎样注册小程序商城
  • 昌邑市住房和建设局网站免费样机素材网站
  • 社旗微网站开发百度建设网站的目的
  • 做服装的一般去什么网站找图片客户信息管理
  • 做视频网站容易收录吗网页游戏 手机
  • 中国建设银行网站-个人客户wordpress linux安装
  • 漳州网站建设到博大科技类网站简介怎么做