Hbase源码分析(十一)MemStore的flush处理(中)2021SC@SDUSC

2021/12/3 11:06:36

本文主要是介绍Hbase源码分析(十一)MemStore的flush处理(中)2021SC@SDUSC,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

文章目录

  • 前言
  • cacheFlusher如何处理flush请求
    • requestFlush()
    • WakeupFlushThread
    • FlushRegionEntry
  • 总结


前言

本文继续介绍了HRegion上Memstore flush的主体流程和主要细节,cacheFlusher如何处理flush请求。


cacheFlusher如何处理flush请求

通过如何初始化cacheFlusher部分的介绍,我们已经知道,在MemStoreFlusher内部,存在两个存储flush请求及其HRegion封装类的队列和集合,即flushQueue和regionsInQueue,而MemStoreFlusher对外提供了一个requestFlush()方法,我们大体看下这个方法:

 public void requestFlush(HRegion r) {
    synchronized (regionsInQueue) {// 使用synchronized关键字对regionsInQueue进行线程同步
      
      if (!regionsInQueue.containsKey(r)) {// 如果regionsInQueue中不存在对应HRegion
    	
        // This entry has no delay so it will be added at the top of the flush
        // queue.  It'll come out near immediately.
    	// 将HRegion类型的r封装成FlushRegionEntry类型的fqe
    	// 这个fqe没有delay,即延迟执行时间,所以它被添加到flush队列的顶部。不久它将出列被处理。
        FlushRegionEntry fqe = new FlushRegionEntry(r);
        
        // 将HRegion->FlushRegionEntry的对应关系添加到regionsInQueue集合
        // 将flush请求FlushRegionEntry添加到flushQueue队列
        // 从这里可以看出regionsInQueue、flushQueue这两个成员变量go together
        this.regionsInQueue.put(r, fqe);
        this.flushQueue.add(fqe);
      }
    }

requestFlush()

requestFlush()方法的主要作用,就是添加一个flush region的请求至MemStoreFlusher内部队列。其主要逻辑如下:
1、首先需要使用synchronized关键字对regionsInQueue进行线程同步,这么做是为了防止多线程的并发;

2、然后判断regionsInQueue中是否存在对应的HRegion,如果regionsInQueue集合中不存在对应HRegion的话继续,否则直接返回;

3、既然regionsInQueue集合中不存在对应HRegion,将HRegion类型的r封装成FlushRegionEntry类型的fqe;

4、将HRegion->FlushRegionEntry的对应关系添加到regionsInQueue集合;

5、将flush请求FlushRegionEntry添加到flushQueue队列。

从上述4、5步就可以看出regionsInQueue、flushQueue这两个成员变量go together,并且这个fqe没有delay,即延迟执行时间,所以它被添加到flush队列的顶部,不久它将出列被处理。回到flushQueue的定义,flushQueue是一个存储了Region刷新缓存请求的队列,里面存储的是实现了FlushQueueEntry接口的对象,FlushQueueEntry没有定义任何行为,但是继承了java.util.concurrent.Delayed接口,故flushQueue是java中的DelayQueue,队列里存储的对象有一个过期时间的概念。

既然flush的请求已经被添加至flushQueue队列,相当于生产者已经把产品生产出来了,那么需要一个消费者,这个消费者的角色就是由FlushHandler线程来担任的。既然是线程,那么处理的逻辑肯定在其run()方法内,先看下flushQueue中存储的都是什么?

回顾下flushQueue的定义,它是一个存储了FlushQueueEntry的队列DelayQueue。我们先看下FlushQueueEntry的定义:

interface FlushQueueEntry extends Delayed {
  }

是一个集成了java的Delayed接口的无任何方法的空接口,那么它的实现类就是WakeupFlushThread和FlushRegionEntry。首先来看下flushQueue对应的队列类型—Java中的DelayQueue。
DelayQueue是一个无界的BlockingQueue,其内部存储的必然是实现了Delayed接口的对象。所以,FlushQueueEntry必须实现java的Delayed接口。而这种队列中的成员有一个最大特点,就是只有在其到期后才能出列,并且该队列内的成员都是有序的,从头至尾按照延迟到期时间的长短来排序。那么如何判断成员是否到期呢?对应成员对象的getDelay()方法返回一个小于等于0的值,就说明对应对象在队列中已到期,可以被取走。

既然DelayQueue中存储的成员对象都是有序的,那么实现了Delayed接口的类,必须提供compareTo()方法,用以排序,并且需要实现上述getDelay()方法,判断队内成员是否到期可以被取走。

下面开始研究下WakeupFlushThread和FlushRegionEntry。

WakeupFlushThread

首先,WakeupFlushThread非常简单,没有任何实质内容,代码如下:
static class WakeupFlushThread implements FlushQueueEntry {

  @Override
    public long getDelay(TimeUnit unit) {
      return 0;
    }
 
    @Override
    public int compareTo(Delayed o) {
      return -1;
    }
 
    @Override
    public boolean equals(Object obj) {
      return (this == obj);
    }

它的主要作用是做为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠。而且,其getDelay()方法返回值为0,说明其不存在延迟时间,入列后即可出列。而它的compareTo()方法返回的值是-1,说明它与其它WakeupFlushThread在队内的顺序是等价的,无前后之分,实际上WakeupFlushThread区分前后也没有意义,它本身也没有实质性的内容。

FlushRegionEntry

    接下来,我们再看下FlushRegionEntry类,其定义如下:
static class FlushRegionEntry implements FlushQueueEntry {
    
	// 待flush的HRegion
	private final HRegion region;
 
    // 创建时间 
    private final long createTime;
    
    // 何时到期
    private long whenToExpire;
    
    // 重入队列次数
    private int requeueCount = 0;
 
    FlushRegionEntry(final HRegion r) {
      
      // 待flush的HRegion
      this.region = r;
      
      // 创建时间为当前时间
      this.createTime = EnvironmentEdgeManager.currentTime();
      
      // 何时到期也为当前时间,意味着首次入队列时是没有延迟时间的,入列即可出列
      this.whenToExpire = this.createTime;
    }
 
    /**
     * @param maximumWait
     * @return True if we have been delayed > <code>maximumWait</code> milliseconds.
     */
    public boolean isMaximumWait(final long maximumWait) {
      return (EnvironmentEdgeManager.currentTime() - this.createTime) > maximumWait;
    }
 
    /**
     * @return Count of times {@link #requeue(long)} was called; i.e this is
     * number of times we've been requeued.
     */
    public int getRequeueCount() {
      return this.requeueCount;
    }
 
    /**
     * 类似重新入列的处理方法,重新入列次数requeueCount加1,何时到期未当前时间加参数when
     * 
     * @param when When to expire, when to come up out of the queue.
     * Specify in milliseconds.  This method adds EnvironmentEdgeManager.currentTime()
     * to whatever you pass.
     * @return This.
     */
    public FlushRegionEntry requeue(final long when) {
      this.whenToExpire = EnvironmentEdgeManager.currentTime() + when;
      this.requeueCount++;
      return this;
    }
 
    /**
     * 判断何时到期的方法
     */
    @Override
    public long getDelay(TimeUnit unit) {
      // 何时到期减去当前时间
      return unit.convert(this.whenToExpire - EnvironmentEdgeManager.currentTime(),
          TimeUnit.MILLISECONDS);
    }
 
    /**
     * 排序比较方法,根据判断何时到期的getDelay()方法来决定顺序
     */
    @Override
    public int compareTo(Delayed other) {
      // Delay is compared first. If there is a tie, compare region's hash code
      int ret = Long.valueOf(getDelay(TimeUnit.MILLISECONDS) -
        other.getDelay(TimeUnit.MILLISECONDS)).intValue();
      if (ret != 0) {
        return ret;
      }
      
      // 何时到期时间一直的话,根据hashCode()来排序,其实也就是根据HRegion的hashCode()方法返回值来排序
      FlushQueueEntry otherEntry = (FlushQueueEntry) other;
      return hashCode() - otherEntry.hashCode();
    }
 
    @Override
    public String toString() {
      return "[flush region " + Bytes.toStringBinary(region.getRegionName()) + "]";
    }
 
    @Override
    public int hashCode() {
      int hash = (int) getDelay(TimeUnit.MILLISECONDS);
      return hash ^ region.hashCode();
    }
 
   @Override
    public boolean equals(Object obj) {
      if (this == obj) {
        return true;
      }
      if (obj == null || getClass() != obj.getClass()) {
        return false;
      }
      Delayed other = (Delayed) obj;
      return compareTo(other) == 0;
    }
  }

接下来我们看下flush请求的实际处理流程,即FlushHandler的run()方法:
它的主要处理逻辑为:
1、首先HRegionServer未停止的话,run()方法一直运行;

2、将标志位AtomicBoolean类型的wakeupPending设置为false;

3、从flushQueue队列中拉取一个FlushQueueEntry,即fqe:

3.1、如果fqe为空,或者为WakeupFlushThread:

3.1.1、如果通过isAboveLowWaterMark()方法判断全局MemStore的大小高于限制值得低水平线,调用flushOneForGlobalPressure()方法,按照一定策略,flush一个HRegion的MemStore,降低MemStore的大小,预防OOM等异常情况的发生,并入列另一个令牌,以使该线程之后再次被唤醒;

3.2、fre不为空,且不为WakeupFlushThread的话,转化为FlushRegionEntry类型的fre:调用flushRegion()方法,并且如果结果为false的话,跳出循环;

4、如果循环结束,同时清空regionsInQueue和flushQueue(ps:又是在一起啊O(∩_∩)O~)

5、唤醒所有的等待着,使得它们能够看到close标志;

6、记录日志。
WakeupFlushThread的主要作用是做为一个占位符或令牌插入到刷新队列flushQueue,以确保FlushHandler不会休眠,实际上WakeupFlushThread起到的作用不仅仅是这个,在FlushHandler线程不断的poll刷新队列flushQueue中的元素时,如果获取到的是一个WakeupFlushThread,它会发起 一个检测,即RegionServer的全局MemStore大小是否超过低水平线,如果未超过,WakeupFlushThread仅仅起到了一个占位符的作用,否则,WakeupFlushThread不仅做为占位符,保证刷新线程不休眠,还按照一定策略选择该RegionServer上的一个Region刷新memstore,以缓解RegionServer内存压力。

总结

本文介绍了HRegion上Memstore flush的主体流程和主要细节,讲述了cacheFlusher如何处理flush请求的细节,关于如何选择一个HRegion进行flush以缓解MemStore压力以及后续问题将在下文中介绍。



这篇关于Hbase源码分析(十一)MemStore的flush处理(中)2021SC@SDUSC的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程