浅析Kubernetes中client-go informer

2022/6/24 23:23:30

本文主要是介绍浅析Kubernetes中client-go informer,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

Controller

在client-go informer架构中存在一个 controller ,这个不是 Kubernetes 中的Controller组件;而是在 tools/cache 中的一个概念,controller 位于 informer 之下,Reflector 之上。

Config

从严格意义上来讲,controller 是作为一个 sharedInformer 使用,通过接受一个 Config ,而 Reflector 则作为 controller 的 slot。Config 则包含了这个 controller 里所有的设置。

type Config struct {
  Queue // DeltaFIFO
  ListerWatcher // 用于list watch的
  Process ProcessFunc // 定义如何从DeltaFIFO中弹出数据后处理的操作
  ObjectType runtime.Object // Controller处理的对象数据,实际上就是kubernetes中的资源
  FullResyncPeriod time.Duration // 全量同步的周期
  ShouldResync ShouldResyncFunc // Reflector通过该标记来确定是否应该重新同步
  RetryOnError bool
}

controller

然后 controller 又为 reflertor 的上层。

type controller struct {
  config         Config
  reflector      *Reflector 
  reflectorMutex sync.RWMutex
  clock          clock.Clock
}

type Controller interface {
  // controller 主要做两件事,
    // 1. 构建并运行 Reflector,将listerwacther中的泵压到queue(Delta fifo)中
    // 2. Queue用Pop()弹出数据,具体的操作是Process
    // 直到 stopCh 不阻塞,这两个协程将退出
  Run(stopCh <-chan struct{})
  HasSynced() bool // 这个实际上是从store中继承的,标记这个controller已经
  LastSyncResourceVersion() string
}

controller 中的方法,仅有一个 Run() 和 New( );这意味着,controller 只是一个抽象的概念,作为 ReflectorDelta FIFO 整合的工作流。

 

而 controller 则是 SharedInformer 了。

Queue

这里的 queue 可以理解为是一个具有 Pop() 功能的 Indexer ;而 Pop() 的功能则是 controller 中的一部分;也就是说 queue 是一个扩展的 Store , Store 是不具备弹出功能的。

type Queue interface {
  Store
// Pop会阻塞等待,直到有内容弹出,删除对应的值并处理计数器
  Pop(PopProcessFunc) (interface{}, error)

// AddIfNotPresent puts the given accumulator into the Queue (in
// association with the accumulator's key) if and only if that key
// is not already associated with a non-empty accumulator.
  AddIfNotPresent(interface{}) error

// HasSynced returns true if the first batch of keys have all been
// popped.  The first batch of keys are those of the first Replace
// operation if that happened before any Add, Update, or Delete;
// otherwise the first batch is empty.
  HasSynced() bool
Close() // 关闭queue
}

而弹出的操作是通过 controller 中的 processLoop( ) 进行的,最终走到Delta FIFO中进行处理。

通过忙等待去读取要弹出的数据,然后在弹出前 通过PopProcessFunc 进行处理。

func (c *controller) processLoop() {
  for {
    obj, err := c.config.Queue.Pop(PopProcessFunc(c.config.Process))
    if err != nil {
      if err == ErrFIFOClosed {
        return
      }
      if c.config.RetryOnError {
        // This is the safe way to re-enqueue.
        c.config.Queue.AddIfNotPresent(obj)
      }
    }
  }
}

DeltaFIFO.Pop()

func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {
  f.lock.Lock()
  defer f.lock.Unlock()
  for {
    for len(f.queue) == 0 {
      // When the queue is empty, invocation of Pop() is blocked until new item is enqueued.
      // When Close() is called, the f.closed is set and the condition is broadcasted.
      // Which causes this loop to continue and return from the Pop().
      if f.IsClosed() {
        return nil, ErrFIFOClosed
      }

      f.cond.Wait()
    }
    id := f.queue[0]
    f.queue = f.queue[1:]
    if f.initialPopulationCount > 0 {
      f.initialPopulationCount--
    }
    item, ok := f.items[id]
    if !ok {
      // Item may have been deleted subsequently.
      continue
    }
    delete(f.items, id)
    err := process(item) // 进行处理
    if e, ok := err.(ErrRequeue); ok {
      f.addIfNotPresent(id, item) // 如果失败,再重新加入到队列中
      err = e.Err 
    }
    // Don't need to copyDeltas here, because we're transferring
    // ownership to the caller.
    return item, err
  }
}

Informer

通过对 ReflectorStoreQueueListerWatcherProcessFunc, 等的概念,发现由 controller 所包装的起的功能并不能完成通过对API的动作监听,并通过动作来处理本地缓存的一个能力;这个情况下诞生了 informer 严格意义上来讲是 sharedInformer。

func newInformer(
  lw ListerWatcher,
  objType runtime.Object,
  resyncPeriod time.Duration,
  h ResourceEventHandler,
  clientState Store,
) Controller {
  // This will hold incoming changes. Note how we pass clientState in as a
  // KeyLister, that way resync operations will result in the correct set
  // of update/delete deltas.
  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    KnownObjects:          clientState,
    EmitDeltaTypeReplaced: true,
  })

  cfg := &Config{
    Queue:            fifo,
    ListerWatcher:    lw,
    ObjectType:       objType,
    FullResyncPeriod: resyncPeriod,
    RetryOnError:     false,

    Process: func(obj interface{}) error {
      // from oldest to newest
      for _, d := range obj.(Deltas) {
        switch d.Type {
        case Sync, Replaced, Added, Updated:
          if old, exists, err := clientState.Get(d.Object); err == nil && exists {
            if err := clientState.Update(d.Object); err != nil {
              return err
            }
            h.OnUpdate(old, d.Object)
          } else {
            if err := clientState.Add(d.Object); err != nil {
              return err
            }
            h.OnAdd(d.Object)
          }
        case Deleted:
          if err := clientState.Delete(d.Object); err != nil {
            return err
          }
          h.OnDelete(d.Object)
        }
      }
      return nil
    },
  }
  return New(cfg)
}

newInformer是位于 tools/cache/controller.go 下,可以看出,这里面并没有informer的概念,这里通过注释可以看到,newInformer实际上是一个提供了存储和事件通知的informer。他关联的 queue 则是 Delta FIFO,并包含了 ProcessFuncStore 等 controller的概念。最终对外的方法为 NewInformer()。

func NewInformer(
  lw ListerWatcher,
  objType runtime.Object,
  resyncPeriod time.Duration,
  h ResourceEventHandler,
) (Store, Controller) {
  // This will hold the client state, as we know it.
  clientState := NewStore(DeletionHandlingMetaNamespaceKeyFunc)

  return clientState, newInformer(lw, objType, resyncPeriod, h, clientState)
}

type ResourceEventHandler interface {
  OnAdd(obj interface{})
  OnUpdate(oldObj, newObj interface{})
  OnDelete(obj interface{})
}

SharedIndexInformer

shareInformer 为客户端提供了与apiserver一致的数据对象本地缓存,并支持多事件处理程序的informer,而 shareIndexInformer 则是对shareInformer 

的扩展。

type SharedInformer interface {
    // 添加一个事件处理函数,使用informer默认的resync period
	AddEventHandler(handler ResourceEventHandler)
    // 将事件处理函数注册到 share informer,将resyncPeriod作为参数传入
	AddEventHandlerWithResyncPeriod(handler ResourceEventHandler, resyncPeriod time.Duration)
	// 从本地缓存获取的信息作为infomer的返回
	GetStore() Store
	// 已弃用
	GetController() Controller
	// 运行一个informer,当stopCh停止时,informer也被关闭
	Run(stopCh <-chan struct{})
	// HasSynced returns true if the shared informer's store has been
	// informed by at least one full LIST of the authoritative state
	// of the informer's object collection.  This is unrelated to "resync".
	HasSynced() bool
	// LastSyncResourceVersion is the resource version observed when last synced with the underlying store. The value returned is not synchronized with access to the underlying store and is not thread-safe.
	LastSyncResourceVersion() string
}

SharedIndexInformer 是对SharedInformer的实现,可以从结构中看出,SharedIndexInformer 大致具有如下功能:

·  索引本地缓存·  controller,通过list watch拉取API并推入 Deltal FIFO

·  事件的处理

type sharedIndexInformer struct {
  indexer    Indexer // 具有索引的本地缓存
  controller Controller // controller

  processor             *sharedProcessor // 事件处理函数集合
  cacheMutationDetector MutationDetector

  listerWatcher ListerWatcher
  objectType runtime.Object
  resyncCheckPeriod time.Duration
  defaultEventHandlerResyncPeriod time.Duration
  clock clock.Clock
  started, stopped bool
  startedLock      sync.Mutex
  blockDeltas sync.Mutex
}

而在 tools/cache/share_informer.go 可以看到 shareIndexInformer 的运行过程。

func (s *sharedIndexInformer) Run(stopCh <-chan struct{}) {
  defer utilruntime.HandleCrash()

  fifo := NewDeltaFIFOWithOptions(DeltaFIFOOptions{
    KnownObjects:          s.indexer,
    EmitDeltaTypeReplaced: true,
  })

  cfg := &Config{
    Queue:            fifo,
    ListerWatcher:    s.listerWatcher,
    ObjectType:       s.objectType,
    FullResyncPeriod: s.resyncCheckPeriod,
    RetryOnError:     false,
    ShouldResync:     s.processor.shouldResync,

    Process: s.HandleDeltas, // process 弹出时操作的流程
  }

func() {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()

    s.controller = New(cfg)
    s.controller.(*controller).clock = s.clock
    s.started = true
  }()

  // Separate stop channel because Processor should be stopped strictly after controller
  processorStopCh := make(chan struct{})
  var wg wait.Group
  defer wg.Wait()              // Wait for Processor to stop
  defer close(processorStopCh) // Tell Processor to stop
  wg.StartWithChannel(processorStopCh, s.cacheMutationDetector.Run)
  wg.StartWithChannel(processorStopCh, s.processor.run) // 启动事件处理函数

defer func() {
    s.startedLock.Lock()
    defer s.startedLock.Unlock()
    s.stopped = true // Don't want any new listeners
  }()
    s.controller.Run(stopCh) // 启动controller,controller会启动Reflector和fifo的Pop()
}

而在操作Delta FIFO中可以看到,做具体操作时,会将动作分发至对应的事件处理函数中,这个是informer初始化时对事件操作的函数。

func (s *sharedIndexInformer) HandleDeltas(obj interface{}) error {
  s.blockDeltas.Lock()
  defer s.blockDeltas.Unlock()


  for _, d := range obj.(Deltas) {
    switch d.Type {
    case Sync, Replaced, Added, Updated:
      s.cacheMutationDetector.AddObject(d.Object)
      if old, exists, err := s.indexer.Get(d.Object); err == nil && exists {
        if err := s.indexer.Update(d.Object); err != nil {
          return err
        }

        isSync := false
        switch {
        case d.Type == Sync:
          isSync = true
        case d.Type == Replaced:
          if accessor, err := meta.Accessor(d.Object); err == nil {
            if oldAccessor, err := meta.Accessor(old); err == nil {
              isSync = accessor.GetResourceVersion() == oldAccessor.GetResourceVersion()
            }
          }
        }
                // 事件的分发
        s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)
      } else {
        if err := s.indexer.Add(d.Object); err != nil {
          return err
        }
                // 事件的分发
        s.processor.distribute(addNotification{newObj: d.Object}, false)
      }
    case Deleted:
      if err := s.indexer.Delete(d.Object); err != nil {
        return err
      }
      s.processor.distribute(deleteNotification{oldObj: d.Object}, false)
    }
  }
  return nil
}

事件处理函数 processor

启动informer时也会启动注册进来的事件处理函数;processor 就是这个事件处理函数run( ) 函数会启动两个 listener,监听事件处理

业务函数 listener.run 和 事件的处理。

wg.StartWithChannel(processorStopCh, s.processor.run)

func (p *sharedProcessor) run(stopCh <-chan struct{}) {
  func() {
    p.listenersLock.RLock()
    defer p.listenersLock.RUnlock()
    for _, listener := range p.listeners {
      p.wg.Start(listener.run) 
      p.wg.Start(listener.pop)
    }
    p.listenersStarted = true
  }()
  <-stopCh
  p.listenersLock.RLock()
  defer p.listenersLock.RUnlock()
  for _, listener := range p.listeners {
    close(listener.addCh) // Tell .pop() to stop. .pop() will tell .run() to stop
  }
  p.wg.Wait() // Wait for all .pop() and .run() to stop
}

可以看出,就是拿到的事件,根据注册的到informer的事件函数进行处理。

func (p *processorListener) run() {
  stopCh := make(chan struct{})
  wait.Until(func() {
    for next := range p.nextCh { // 消费事件
      switch notification := next.(type) {
      case updateNotification:
        p.handler.OnUpdate(notification.oldObj, notification.newObj)
      case addNotification:
        p.handler.OnAdd(notification.newObj)
      case deleteNotification:
        p.handler.OnDelete(notification.oldObj)
      default:
        utilruntime.HandleError(fmt.Errorf("unrecognized notification: %T", next))
      }
    }
    // the only way to get here is if the p.nextCh is empty and closed
    close(stopCh)
  }, 1*time.Second, stopCh)
}

informer中的事件的设计

了解了informer如何处理事件,就需要学习下,informer的事件系统设计 prossorListener

事件的添加

当在handleDelta时,会分发具体的事件

// 事件的分发
s.processor.distribute(updateNotification{oldObj: old, newObj: d.Object}, isSync)

此时,事件泵 Pop( ) 会根据接收到的事件进行处理。

// run() 时会启动一个事件泵
p.wg.Start(listener.pop)

func (p *processorListener) pop() {
  defer utilruntime.HandleCrash()
  defer close(p.nextCh) 

  var nextCh chan<- interface{}
  var notification interface{}
  for {
    select {
        case nextCh <- notification: // 这里实际上是一个阻塞的等待
            // 单向channel 可能不会走到这步骤
      var ok bool
            // deltahandle 中 distribute 会将事件添加到addCh待处理事件中
            // 处理完事件会再次拿到一个事件
      notification, ok = p.pendingNotifications.ReadOne()
      if !ok { // Nothing to pop
        nextCh = nil // Disable this select case
      }
        // 处理 分发过来的事件 addCh
    case notificationToAdd, ok := <-p.addCh: // distribute分发的事件
      if !ok {
        return
      }
            // 这里代表第一次,没有任何事件时,或者上面步骤完成读取
      if notification == nil { // 就会走这里
        notification = notificationToAdd 
        nextCh = p.nextCh 
      } else { 
                // notification否则代表没有处理完,将数据再次添加到待处理中
        p.pendingNotifications.WriteOne(notificationToAdd)
      }
    }
  }
}

该事件的流程图:

 

 

通过一个简单实例来学习client-go中的消息通知机制:

package main

import (
  "fmt"
  "time"

  "k8s.io/utils/buffer"
)

var nextCh1 = make(chan interface{})
var addCh = make(chan interface{})
var stopper = make(chan struct{})
var notification interface{}
var pendding = *buffer.NewRingGrowing(2)

func main() {
  // pop
  go func() {
    var nextCh chan<- interface{}
    var notification interface{}
    //var n int
    for {
      fmt.Println("busy wait")
      fmt.Println("entry select", notification)
      select {
      // 初始时,一个未初始化的channel,nil,形成一个阻塞(单channel下是死锁)
      case nextCh <- notification:
        fmt.Println("entry nextCh", notification)
        var ok bool
        // 读不到数据代表已处理完,置空锁
        notification, ok = pendding.ReadOne()
        if !ok {
          fmt.Println("unactive nextch")
          nextCh = nil
        }
      // 事件的分发,监听,初始时也是一个阻塞
      case notificationToAdd, ok := <-addCh:
        fmt.Println(notificationToAdd, notification)
        if !ok {
          return
        }
        // 线程安全
        // 当消息为空时,没有被处理
        // 锁为空,就分发数据
        if notification == nil {
          fmt.Println("frist notification nil")
          notification = notificationToAdd
          nextCh = nextCh1 // 这步骤等于初始化了局部的nextCh,会触发上面的流程
        } else {
          // 在第三次时,会走到这里,数据进入环
          fmt.Println("into ring", notificationToAdd)
          pendding.WriteOne(notificationToAdd)
        }
      }
    }
  }()
  // producer
  go func() {
    i := 0
    for {
      i++
      if i%5 == 0 {
        addCh <- fmt.Sprintf("thread 2 inner -- %d", i)
        time.Sleep(time.Millisecond * 9000)
      } else {
        addCh <- fmt.Sprintf("thread 2 outer -- %d", i)
        time.Sleep(time.Millisecond * 500)
      }
    }
  }()
  // subsriber
  go func() {
    for {
      for next := range nextCh1 {
        time.Sleep(time.Millisecond * 300)
        fmt.Println("consumer", next)
      }
    }
  }()
  <-stopper
}

总结:

这里的机制类似于线程安全,进入临界区的一些算法,临界区就是 nextChnotification 就是保证了至少有一个进程可以进入临界区(要么分发事件,要么生产事件);nextCh 和 nextCh1 一个是局部管道一个是全局的,管道未初始化代表了死锁(阻塞);当有消息要处理时,会将局部管道 nextCh 赋值给 全局 nextCh1 此时相当于解除了分发的步骤(对管道赋值,触发分发操作);ringbuffer 实际上是提供了一个对 notification 加锁的操作,在没有处理的消息时,需要保障 notification 为空,同时也关闭了流程 nextCh 的写入。这里主要是考虑对golang中channel的用法。

参考连接:

https://blog.csdn.net/sinat_24092079/article/details/124918055

https://www.cnblogs.com/Cylon/p/16311233.html



这篇关于浅析Kubernetes中client-go informer的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


扫一扫关注最新编程教程