Hbase源码分析(十一)MemStore的flush处理(中)2021SC@SDUSC
2021/12/3 11:06:36
本文主要是介绍Hbase源码分析(十一)MemStore的flush处理(中)2021SC@SDUSC,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
文章目录
- 前言
- cacheFlusher如何处理flush请求
- requestFlush()
- WakeupFlushThread
- FlushRegionEntry
- 总结
前言
本文继续介绍了HRegion上Memstore flush的主体流程和主要细节,cacheFlusher如何处理flush请求。
cacheFlusher如何处理flush请求
通过如何初始化cacheFlusher部分的介绍,我们已经知道,在MemStoreFlusher内部,存在两个存储flush请求及其HRegion封装类的队列和集合,即flushQueue和regionsInQueue,而MemStoreFlusher对外提供了一个requestFlush()方法,我们大体看下这个方法:
public void requestFlush(HRegion r) { synchronized (regionsInQueue) {// 使用synchronized关键字对regionsInQueue进行线程同步 if (!regionsInQueue.containsKey(r)) {// 如果regionsInQueue中不存在对应HRegion // This entry has no delay so it will be added at the top of the flush // queue. It'll come out near immediately. // 将HRegion类型的r封装成FlushRegionEntry类型的fqe // 这个fqe没有delay,即延迟执行时间,所以它被添加到flush队列的顶部。不久它将出列被处理。 FlushRegionEntry fqe = new FlushRegionEntry(r); // 将HRegion->FlushRegionEntry的对应关系添加到regionsInQueue集合 // 将flush请求FlushRegionEntry添加到flushQueue队列 // 从这里可以看出regionsInQueue、flushQueue这两个成员变量go together this.regionsInQueue.put(r, fqe); this.flushQueue.add(fqe); } }
requestFlush()
requestFlush()方法的主要作用,就是添加一个flush region的请求至MemStoreFlusher内部队列。其主要逻辑如下:
1、首先需要使用synchronized关键字对regionsInQueue进行线程同步,这么做是为了防止多线程的并发;
2、然后判断regionsInQueue中是否存在对应的HRegion,如果regionsInQueue集合中不存在对应HRegion的话继续,否则直接返回;
3、既然regionsInQueue集合中不存在对应HRegion,将HRegion类型的r封装成FlushRegionEntry类型的fqe;
4、将HRegion->FlushRegionEntry的对应关系添加到regionsInQueue集合;
5、将flush请求FlushRegionEntry添加到flushQueue队列。
从上述4、5步就可以看出regionsInQueue、flushQueue这两个成员变量go together,并且这个fqe没有delay,即延迟执行时间,所以它被添加到flush队列的顶部,不久它将出列被处理。回到flushQueue的定义,flushQueue是一个存储了Region刷新缓存请求的队列,里面存储的是实现了FlushQueueEntry接口的对象,FlushQueueEntry没有定义任何行为,但是继承了java.util.concurrent.Delayed接口,故flushQueue是java中的DelayQueue,队列里存储的对象有一个过期时间的概念。
既然flush的请求已经被添加至flushQueue队列,相当于生产者已经把产品生产出来了,那么需要一个消费者,这个消费者的角色就是由FlushHandler线程来担任的。既然是线程,那么处理的逻辑肯定在其run()方法内,先看下flushQueue中存储的都是什么?
回顾下flushQueue的定义,它是一个存储了FlushQueueEntry的队列DelayQueue。我们先看下FlushQueueEntry的定义:
interface FlushQueueEntry extends Delayed { }
是一个集成了java的Delayed接口的无任何方法的空接口,那么它的实现类就是WakeupFlushThread和FlushRegionEntry。首先来看下flushQueue对应的队列类型—Java中的DelayQueue。
DelayQueue是一个无界的BlockingQueue,其内部存储的必然是实现了Delayed接口的对象。所以,FlushQueueEntry必须实现java的Delayed接口。而这种队列中的成员有一个最大特点,就是只有在其到期后才能出列,并且该队列内的成员都是有序的,从头至尾按照延迟到期时间的长短来排序。那么如何判断成员是否到期呢?对应成员对象的getDelay()方法返回一个小于等于0的值,就说明对应对象在队列中已到期,可以被取走。
既然DelayQueue中存储的成员对象都是有序的,那么实现了Delayed接口的类,必须提供compareTo()方法,用以排序,并且需要实现上述getDelay()方法,判断队内成员是否到期可以被取走。
下面开始研究下WakeupFlushThread和FlushRegionEntry。
WakeupFlushThread
首先,WakeupFlushThread非常简单,没有任何实质内容,代码如下:
static class WakeupFlushThread implements FlushQueueEntry {
@Override public long getDelay(TimeUnit unit) { return 0; } @Override public int compareTo(Delayed o) { return -1; } @Override public boolean equals(Object obj) { return (this == obj); }
它的主要作用是做为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠。而且,其getDelay()方法返回值为0,说明其不存在延迟时间,入列后即可出列。而它的compareTo()方法返回的值是-1,说明它与其它WakeupFlushThread在队内的顺序是等价的,无前后之分,实际上WakeupFlushThread区分前后也没有意义,它本身也没有实质性的内容。
FlushRegionEntry
接下来,我们再看下FlushRegionEntry类,其定义如下:
static class FlushRegionEntry implements FlushQueueEntry { // 待flush的HRegion private final HRegion region; // 创建时间 private final long createTime; // 何时到期 private long whenToExpire; // 重入队列次数 private int requeueCount = 0; FlushRegionEntry(final HRegion r) { // 待flush的HRegion this.region = r; // 创建时间为当前时间 this.createTime = EnvironmentEdgeManager.currentTime(); // 何时到期也为当前时间,意味着首次入队列时是没有延迟时间的,入列即可出列 this.whenToExpire = this.createTime; }
/** * @param maximumWait * @return True if we have been delayed > <code>maximumWait</code> milliseconds. */ public boolean isMaximumWait(final long maximumWait) { return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait; } /** * @return Count of times {@link #requeue(long)} was called; i.e this is * number of times we've been requeued. */ public int getRequeueCount() { return this.requeueCount; } /** * 类似重新入列的处理方法,重新入列次数requeueCount加1,何时到期未当前时间加参数when * * @param when When to expire, when to come up out of the queue. * Specify in milliseconds. This method adds EnvironmentEdgeManager.currentTime() * to whatever you pass. * @return This. */ public FlushRegionEntry requeue(final long when) { this.whenToExpire = EnvironmentEdgeManager.currentTime() + when; this.requeueCount++; return this; } /** * 判断何时到期的方法 */ @Override public long getDelay(TimeUnit unit) { // 何时到期减去当前时间 return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(), TimeUnit.MILLISECONDS); } /** * 排序比较方法,根据判断何时到期的getDelay()方法来决定顺序 */ @Override public int compareTo(Delayed other) { // Delay is compared first. If there is a tie, compare region's hash code int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) - other.getDelay(TimeUnit.MILLISECONDS)).intValue(); if (ret != 0) { return ret; } // 何时到期时间一直的话,根据hashCode()来排序,其实也就是根据HRegion的hashCode()方法返回值来排序 FlushQueueEntry otherEntry = (FlushQueueEntry) other; return hashCode() - otherEntry.hashCode(); } @Override public String toString() { return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]"; } @Override public int hashCode() { int hash = (int) getDelay(TimeUnit.MILLISECONDS); return hash ^ region.hashCode(); } @Override public boolean equals(Object obj) { if (this == obj) { return true; } if (obj == null || getClass() != obj.getClass()) { return false; } Delayed other = (Delayed) obj; return compareTo(other) == 0; } }
接下来我们看下flush请求的实际处理流程,即FlushHandler的run()方法:
它的主要处理逻辑为:
1、首先HRegionServer未停止的话,run()方法一直运行;
2、将标志位AtomicBoolean类型的wakeupPending设置为false;
3、从flushQueue队列中拉取一个FlushQueueEntry,即fqe:
3.1、如果fqe为空,或者为WakeupFlushThread:
3.1.1、如果通过isAboveLowWaterMark()方法判断全局MemStore的大小高于限制值得低水平线,调用flushOneForGlobalPressure()方法,按照一定策略,flush一个HRegion的MemStore,降低MemStore的大小,预防OOM等异常情况的发生,并入列另一个令牌,以使该线程之后再次被唤醒;
3.2、fre不为空,且不为WakeupFlushThread的话,转化为FlushRegionEntry类型的fre:调用flushRegion()方法,并且如果结果为false的话,跳出循环;
4、如果循环结束,同时清空regionsInQueue和flushQueue(ps:又是在一起啊O(∩_∩)O~)
5、唤醒所有的等待着,使得它们能够看到close标志;
6、记录日志。
WakeupFlushThread的主要作用是做为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠,实际上WakeupFlushThread起到的作用不仅仅是这个,在FlushHandler线程不断的poll刷新队列flushQueue中的元素时,如果获取到的是一个WakeupFlushThread,它会发起 一个检测,即RegionServer的全局MemStore大小是否超过低水平线,如果未超过,WakeupFlushThread仅仅起到了一个占位符的作用,否则,WakeupFlushThread不仅做为占位符,保证刷新线程不休眠,还按照一定策略选择该RegionServer上的一个Region刷新memstore,以缓解RegionServer内存压力。
总结
本文介绍了HRegion上Memstore flush的主体流程和主要细节,讲述了cacheFlusher如何处理flush请求的细节,关于如何选择一个HRegion进行flush以缓解MemStore压力以及后续问题将在下文中介绍。
这篇关于Hbase源码分析(十一)MemStore的flush处理(中)2021SC@SDUSC的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-23增量更新怎么做?-icode9专业技术文章分享
- 2024-11-23压缩包加密方案有哪些?-icode9专业技术文章分享
- 2024-11-23用shell怎么写一个开机时自动同步远程仓库的代码?-icode9专业技术文章分享
- 2024-11-23webman可以同步自己的仓库吗?-icode9专业技术文章分享
- 2024-11-23在 Webman 中怎么判断是否有某命令进程正在运行?-icode9专业技术文章分享
- 2024-11-23如何重置new Swiper?-icode9专业技术文章分享
- 2024-11-23oss直传有什么好处?-icode9专业技术文章分享
- 2024-11-23如何将oss直传封装成一个组件在其他页面调用时都可以使用?-icode9专业技术文章分享
- 2024-11-23怎么使用laravel 11在代码里获取路由列表?-icode9专业技术文章分享
- 2024-11-22怎么实现ansible playbook 备份代码中命名包含时间戳功能?-icode9专业技术文章分享