第一免费营销型网站,用mcu做灯光效果网站,app与网站,美辰网站建设其他相关文章
Hbase Compaction 源码分析 - CompactionChecker
Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略
Hbase Compaction 源码分析 - CompactSplitThread 线程池选择
CompactionChecker
介绍#xff1a;
RegionServer会在后台启动一个线程Compac…其他相关文章
Hbase Compaction 源码分析 - CompactionChecker
Hbase Compaction 源码分析 - RatioBasedCompactionPolicy 策略
Hbase Compaction 源码分析 - CompactSplitThread 线程池选择
CompactionChecker
介绍
RegionServer会在后台启动一个线程CompactionChecker定期触发检查对应Store是否需要执行Compaction检查周期为hbase.server.thread.wakefrequency*hbase.server.compactchecker.interval.multiplier。和flush不同的是该线程优先检查Store中总文件数是否大于配置Compaction阈值hbase.hstore.compactionThreshold一旦大于就会触发Compaction如果不满足接着检查是否满足Major Compaction条件。简单来说如果当前Store中HFile的最早更新时间早于某个值mcTime就会触发Major Compaction。mcTime是一个浮动值浮动区间默认为7-7 0.577*0.5其中7为hbase.hregion.majorcompaction0.5为hbase.hregion.majorcompaction.jitter可见默认在7天左右就会执行一次Major Compaction。用户如果想禁用Major Compaction需要将参数hbase.hregion.majorcompaction设为0
源码分析
在 org.apache.hadoop.hbase.regionserver.HRegionServer 类中有个 compactionChecker 变量该变量类型实现 Runnable 接口用做后台独立线程监测是否需要执行Compaction操作 /** Check for compactions requests.*/ScheduledChore compactionChecker; CompactionChecker 类是HRegionServer内部类CompactionChecker构造方法如下
CompactionChecker(final HRegionServer h, final int sleepTime,final Stoppable stopper) {super(CompactionChecker, stopper, sleepTime);this.instance h;LOG.info(this.getName() runs every StringUtils.formatTime(sleepTime));/* MajorCompactPriority is configurable.* If not set, the compaction will use default priority.*///设置major合并优先级取参数hbase.regionserver.compactionChecker.majorCompactPriority// 默认为Integer.MAX_VALUEthis.majorCompactPriority this.instance.conf.getInt(hbase.regionserver.compactionChecker.majorCompactPriority,DEFAULT_PRIORITY);}
传入三个参数 第一个是HRegionServer第二个是休眠时间第三个是是否停止如果RegionServer停止运行CompactionChecker会监控到并停止Compaction
同时调用父类方法
super(CompactionChecker, stopper, sleepTime);
我们看下父类方法 实现了Runnable接口这里我们直接看run方法具体运行方法
我们发现第一次运行会初始化执行initialChore()方法该方法值有retrun true 不做任何处理之后每次都会运行chore()方法该方法在CompactionChecker类中实现
public abstract class ScheduledChore implements Runnable Overridepublic void run() {updateTimeTrackingBeforeRun();if (missedStartTime() isScheduled()) {onChoreMissedStartTime();if (LOG.isInfoEnabled()) LOG.info(Chore: getName() missed its start time);} else if (stopper.isStopped() || !isScheduled()) {cancel(false);cleanup();if (LOG.isInfoEnabled()) LOG.info(Chore: getName() was stopped);} else {try {if (!initialChoreComplete) {initialChoreComplete initialChore();} else {chore();}} catch (Throwable t) {if (LOG.isErrorEnabled()) LOG.error(Caught error, t);if (this.stopper.isStopped()) {cancel(false);cleanup();}}}}
CompactionChecker类chore
查看CompactionChecker类chore方法
这里主要就是调用相关策略的方法判断是否需要Compaction具体策略在下面介绍
private static class CompactionChecker extends ScheduledChore {private final HRegionServer instance;private final int majorCompactPriority;private final static int DEFAULT_PRIORITY Integer.MAX_VALUE;//Iteration is 1-based rather than 0-based so we dont check for compaction// immediately upon region server startupprivate long iteration 1;//sleepTime上面调用传入的是hbase.server.thread.wakefrequency10 * 1000CompactionChecker(final HRegionServer h, final int sleepTime,final Stoppable stopper) {super(CompactionChecker, stopper, sleepTime);this.instance h;LOG.info(this.getName() runs every StringUtils.formatTime(sleepTime));/* MajorCompactPriority is configurable.* If not set, the compaction will use default priority.*///设置major合并优先级取参数hbase.regionserver.compactionChecker.majorCompactPriority// 默认为Integer.MAX_VALUEthis.majorCompactPriority this.instance.conf.getInt(hbase.regionserver.compactionChecker.majorCompactPriority,DEFAULT_PRIORITY);}Overrideprotected void chore() {//onlineRegions.values() 是所有RegionServer中活跃的Region集合for (Region r : this.instance.onlineRegions.values()) {if (r null)continue;//r.getStores 获取region中所有Store一个Region有几个列簇就有几个Storefor (Store s : r.getStores()) {try {//multiplier hbase.server.compactchecker.interval.multiplier的值//该值获取方法在 HStore 的构造函数中初始化 默认1000long multiplier s.getCompactionCheckMultiplier();//断言是否为正常值assert multiplier 0;// iteration该值初始化为1每次定时执行该值会1当为multiplier的整数倍时会往下执行//我们上面获取到的multiplier1000chore定期执行每隔 hbase.server.thread.wakefrequency10秒 默认 10 * 1000//也就是每隔10s*100010000s2.77小时会往下执行一次if (iteration % multiplier ! 0) continue;//需要合并的话发起SystemCompaction请求// 此处最终比较的是是否当前storefile数量减去正在compacting的文件数大于设置的compact min值这里看的是RatioBasedCompactionPolicy策略的needsCompaction方法// 若满足则执行systemcompactif (s.needsCompaction()) {// Queue a compaction. Will recognize if major is needed.this.instance.compactSplitThread.requestSystemCompaction(r, s, getName() requests compaction);} else if (s.isMajorCompaction()) {//判断是否需要执行Major Compactions.triggerMajorCompaction();if (majorCompactPriority DEFAULT_PRIORITY|| majorCompactPriority ((HRegion)r).getCompactPriority()) {this.instance.compactSplitThread.requestCompaction(r, s, getName() requests major compaction; use default priority, null);} else {this.instance.compactSplitThread.requestCompaction(r, s, getName() requests major compaction; use configured priority,this.majorCompactPriority, null, null);}}} catch (IOException e) {LOG.warn(Failed major compaction check on r, e);}}}iteration (iteration Long.MAX_VALUE) ? 0 : (iteration 1);}}
needsCompaction 方法 Overridepublic boolean needsCompaction(final CollectionStoreFile storeFiles,final ListStoreFile filesCompacting) {//当前storeFiles数量-正在compact的文件数量是否大于minFilesToCompact //minFilesToCompact Math.max(2, conf.getInt(HBASE_HSTORE_COMPACTION_MIN_KEY,// /*old name*/ conf.getInt(hbase.hstore.compactionThreshold, 3)));//如果待compaction文件数量大于配置则返回true进行compactionint numCandidates storeFiles.size() - filesCompacting.size();return numCandidates comConf.getMinFilesToCompact();}
isMajorCompaction 方法
可以看到调用的是 storeEngine.getCompactionPolicy() 的 shouldPerformMajorCompaction方法storeEngine.getCompactionPolicy() 获取到执行的策略然后调用该策略的 shouldPerformMajorCompaction 方法这里分析的是 RatioBasedCompactionPolicy 策略 Overridepublic boolean isMajorCompaction() throws IOException {for (StoreFile sf : this.storeEngine.getStoreFileManager().getStorefiles()) {// TODO: what are these reader checks all over the place?if (sf.getReader() null) {LOG.debug(StoreFile sf has null Reader);return false;}}return storeEngine.getCompactionPolicy().shouldPerformMajorCompaction(this.storeEngine.getStoreFileManager().getStorefiles());}
shouldPerformMajorCompaction 方法
该方法返回是否需要Compaction
这里查看的是hbase 1.4.10 版本源码该方法存在一个bug倒数第二行的 result true;会导致其中的一种判断失效后来去查看了下 1.4.13 版本以后源码已经修复该问题。 Overridepublic boolean shouldPerformMajorCompaction(final CollectionStoreFile filesToCompact)throws IOException {boolean result false;//获取下一次major compact的时间 该值是一个浮动值 [7-7*0.5,77.0.5]// hbase.hregion.majorcompaction 7天// hbase.hregion.majorcompaction.jitter 0.5long mcTime getNextMajorCompactTime(filesToCompact);if (filesToCompact null || filesToCompact.isEmpty() || mcTime 0) {return result;}// TODO: Use better method for determining stamp of last major (HBASE-2990)//获取待合并文件中的修改时间最小的那个long lowTimestamp StoreUtils.getLowestTimestamp(filesToCompact);//获取当前时间long now EnvironmentEdgeManager.currentTime();//判断上次修改时间是否在本次修改时间范围内如果最早caption时间大于mcTime天前// 即在mcTime时间内执行过则不运行Majorif (lowTimestamp 0L lowTimestamp (now - mcTime)) {//到了这一步就肯定会执行Major Compaction,后面判断基本就是Debug时候使用String regionInfo;if (this.storeConfigInfo ! null this.storeConfigInfo instanceof HStore) {regionInfo ((HStore)this.storeConfigInfo).getRegionInfo().getRegionNameAsString();} else {regionInfo this.toString();}// Major compaction time has elapsed.long cfTTL HConstants.FOREVER;if (this.storeConfigInfo ! null) {//获取文件保存时间ttlcfTTL this.storeConfigInfo.getStoreFileTtl();}if (filesToCompact.size() 1) {//合并文件为1个// Single fileStoreFile sf filesToCompact.iterator().next();//文件最小时间戳Long minTimestamp sf.getMinimumTimestamp();//文件存在时间long oldest (minTimestamp null) ? Long.MIN_VALUE : now - minTimestamp.longValue();if (sf.isMajorCompaction() (cfTTL Long.MAX_VALUE || oldest cfTTL)) {//文件未过期float blockLocalityIndex sf.getHDFSBlockDistribution().getBlockLocalityIndex(RSRpcServices.getHostname(comConf.conf, false));if (blockLocalityIndex comConf.getMinLocalityToForceCompact()) {//判断文件是否本地化如果未本地化则进行CompactionLOG.debug(Major compaction triggered on only store regionInfo ; to make hdfs blocks local, current blockLocalityIndex is blockLocalityIndex (min comConf.getMinLocalityToForceCompact() ));result true;} else {//跳过压缩LOG.debug(Skipping major compaction of regionInfo because one (major) compacted file only, oldestTime oldest ms is TTL cfTTL and blockLocalityIndex is blockLocalityIndex (min comConf.getMinLocalityToForceCompact() ));}} else if (cfTTL ! HConstants.FOREVER oldest cfTTL) {//storeFile过期触发Major CompactionLOG.debug(Major compaction triggered on store regionInfo , because keyvalues outdated; time since last major compaction (now - lowTimestamp) ms);result true;}} else {//如果合并文件为多个则返回trueLOG.debug(Major compaction triggered on store regionInfo ; time since last major compaction (now - lowTimestamp) ms);}result true;//该处存在bug}return result;} 这里返回的result的就是 s.isMajorCompaction() 返回的结果