k8s驱逐篇(6)-kube-controller-manager驱逐-NodeLifecycleController源码分析

2023/6/24 14:52:20

本文主要是介绍k8s驱逐篇(6)-kube-controller-manager驱逐-NodeLifecycleController源码分析,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!

概述

k8s v1.16版本中NodeController已经分为了NodeIpamControllerNodeLifecycleController,本文主要介绍NodeLifecycleController

NodeLifecycleController主要功能有:
(1)定期检查node的心跳上报,某个node间隔一定时间都没有心跳上报时,更新node的ready condition值为false或unknown,开启了污点驱逐的情况下,给该node添加NoExecute的污点;
(2)当污点驱逐未开启时,当node的ready Condition值为false或unknown且已经持续了一段时间(该时间可配置)时,对该node上的pod做驱逐(删除)操作;
(3)当污点驱逐开启时,node上有NoExecute污点后,立马驱逐(删除)不能容忍污点的pod,对于能容忍该污点的pod,则等待所有污点的容忍时间里最小值后,pod才被驱逐(删除);

源码分析

源码分析分成3部分:
(1)启动参数分析;
(2)初始化与相关结构体分析;
(3)处理逻辑分析;

1.相关启动参数分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
// cmd/kube-controller-manager/app/core.go
func startNodeLifecycleController(ctx ControllerContext) (http.Handler, bool, error) {
    lifecycleController, err := lifecyclecontroller.NewNodeLifecycleController(
        ctx.InformerFactory.Coordination().V1().Leases(),
        ctx.InformerFactory.Core().V1().Pods(),
        ctx.InformerFactory.Core().V1().Nodes(),
        ctx.InformerFactory.Apps().V1().DaemonSets(),
        // node lifecycle controller uses existing cluster role from node-controller
        ctx.ClientBuilder.ClientOrDie("node-controller"),
        ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration,
        ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration,
        ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration,
        ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration,
        ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate,
        ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate,
        ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold,
        ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold,
        ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager,
        utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions),
    )
    if err != nil {
        return nil, true, err
    }
    go lifecycleController.Run(ctx.Stop)
    return nil, true, nil
}

看到上面的startNodeLifecycleController函数中lifecyclecontroller.NewNodeLifecycleController方法的入参,其中传入了多个kube-controller-manager的启动参数;

(1)ctx.ComponentConfig.KubeCloudShared.NodeMonitorPeriod.Duration

即kcm启动参数--node-monitor-period,默认值5秒,代表NodeLifecycleController中更新同步node对象的status值(node的污点、node的condition值)的周期;

1
2
fs.DurationVar(&o.NodeMonitorPeriod.Duration, "node-monitor-period", o.NodeMonitorPeriod.Duration,
        "The period for syncing NodeStatus in NodeController.")

(2)ctx.ComponentConfig.NodeLifecycleController.NodeStartupGracePeriod.Duration

即kcm启动参数--node-startup-grace-period,默认值60秒,代表node启动后多久才会更新node对象的conditions值;

1
2
fs.DurationVar(&o.NodeStartupGracePeriod.Duration, "node-startup-grace-period", o.NodeStartupGracePeriod.Duration,
        "Amount of time which we allow starting Node to be unresponsive before marking it unhealthy.")

(3)ctx.ComponentConfig.NodeLifecycleController.NodeMonitorGracePeriod.Duration

即kcm启动参数--node-monitor-grace-period,默认值40秒,代表在距离上一次上报心跳时间超过40s后,将该node的conditions值更新为unknown(kubelet通过更新node lease来上报心跳);

1
2
3
4
fs.DurationVar(&o.NodeMonitorGracePeriod.Duration, "node-monitor-grace-period", o.NodeMonitorGracePeriod.Duration,
        "Amount of time which we allow running Node to be unresponsive before marking it unhealthy. "+
            "Must be N times more than kubelet's nodeStatusUpdateFrequency, "+
            "where N means number of retries allowed for kubelet to post node status.")

(4)ctx.ComponentConfig.NodeLifecycleController.PodEvictionTimeout.Duration

即kcm启动参数--pod-eviction-timeout,默认值5分钟,当不开启污点驱逐时该参数起效,当node的ready condition值变为false或unknown并持续了5分钟后,将驱逐(删除)该node上的pod;

1
fs.DurationVar(&o.PodEvictionTimeout.Duration, "pod-eviction-timeout", o.PodEvictionTimeout.Duration, "The grace period for deleting pods on failed nodes.")

(5)ctx.ComponentConfig.NodeLifecycleController.EnableTaintManager

即kcm启动参数--enable-taint-manager,默认值true,代表启动taintManager,当已经调度到该node上的pod不能容忍node的NoExecute 污点时,由TaintManager负责驱逐此类pod,若为false即不启动taintManager,则根据--pod-eviction-timeout来做驱逐操作;

1
fs.BoolVar(&o.EnableTaintManager, "enable-taint-manager", o.EnableTaintManager, "WARNING: Beta feature. If set to true enables NoExecute Taints and will evict all not-tolerating Pod running on Nodes tainted with this kind of Taints.")

(6)utilfeature.DefaultFeatureGate.Enabled(features.TaintBasedEvictions)

即kcm启动参数--feature-gates=TaintBasedEvictions=xxx,默认值true,配合--enable-taint-manager共同作用,两者均为true,才会开启污点驱逐;

(7)ctx.ComponentConfig.NodeLifecycleController.NodeEvictionRate

即kcm启动参数--node-eviction-rate,默认值0.1,代表当集群下某个zone(zone的概念后面详细介绍)为healthy时,每秒应该触发pod驱逐操作的node数量,默认0.1,即每10s触发1个node上的pod驱逐操作;

1
fs.Float32Var(&o.NodeEvictionRate, "node-eviction-rate", 0.1, "Number of nodes per second on which pods are deleted in case of node failure when a zone is healthy (see --unhealthy-zone-threshold for definition of healthy/unhealthy). Zone refers to entire cluster in non-multizone clusters.")

(8)ctx.ComponentConfig.NodeLifecycleController.SecondaryNodeEvictionRate

即kcm启动参数--secondary-node-eviction-rate,代表如果某个zone下的unhealthy节点的百分比超过--unhealthy-zone-threshold (默认为 0.55)时,驱逐速率将会减小,如果不是LargeCluster(zone节点数量小于等于--large-cluster-size-threshold个,默认为 50),驱逐操作将会停止,如果是LargeCluster,驱逐速率将降为每秒--secondary-node-eviction-rate 个,默认为0.01;

1
fs.Float32Var(&o.SecondaryNodeEvictionRate, "secondary-node-eviction-rate", 0.01, "Number of nodes per second on which pods are deleted in case of node failure when a zone is unhealthy (see --unhealthy-zone-threshold for definition of healthy/unhealthy). Zone refers to entire cluster in non-multizone clusters. This value is implicitly overridden to 0 if the cluster size is smaller than --large-cluster-size-threshold.")

(9)ctx.ComponentConfig.NodeLifecycleController.LargeClusterSizeThreshold

即kcm启动参数--large-cluster-size-threshold,默认值50,当某zone的节点数超过该值时,认为该zone是一个LargeCluster,不是LargeCluster时,对应的SecondaryNodeEvictionRate配置会被设置为0;

1
fs.Int32Var(&o.LargeClusterSizeThreshold, "large-cluster-size-threshold", 50, "Number of nodes from which NodeController treats the cluster as large for the eviction logic purposes. --secondary-node-eviction-rate is implicitly overridden to 0 for clusters this size or smaller.")

(10)ctx.ComponentConfig.NodeLifecycleController.UnhealthyZoneThreshold

即kcm启动参数--unhealthy-zone-threshold,代表认定某zone为unhealthy的阈值,即会影响什么时候开启二级驱逐速率;默认值0.55,当该zone中not ready节点(ready condition值不为true)数目超过55%,认定该zone为unhealthy;

1
fs.Float32Var(&o.UnhealthyZoneThreshold, "unhealthy-zone-threshold", 0.55, "Fraction of Nodes in a zone which needs to be not Ready (minimum 3) for zone to be treated as unhealthy. ")

(11)--feature-gates=NodeLease=xxx:默认值true,使用lease对象上报node心跳信息,替换老的更新node的status的方式,能大大减轻apiserver的负担;

zone概念介绍

根据每个node对象的region和zone的label值,将node划分到不同的zone中;

region、zone值都相同的node,划分为同一个zone;

zone状态介绍

zone状态有四种,分别是:
(1)Initial:初始化状态;
(2)FullDisruption:ready的node数量为0,not ready的node数量大于0;
(3)PartialDisruption:not ready的node数量大于2且其占比大于等于unhealthyZoneThreshold
(4)Normal:上述三种状态以外的情形,都属于该状态;

需要注意二级驱逐速率对驱逐的影响,即kcm启动参数--secondary-node-eviction-rate,代表如果某个zone下的unhealthy节点的百分比超过--unhealthy-zone-threshold (默认为 0.55)时,驱逐速率将会减小,如果不是LargeCluster(zone节点数量小于等于--large-cluster-size-threshold,默认为 50),驱逐操作将会停止,如果是LargeCluster,驱逐速率将降为每秒--secondary-node-eviction-rate 个,默认为0.01;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {
    readyNodes := 0
    notReadyNodes := 0
    for i := range nodeReadyConditions {
        if nodeReadyConditions[i] != nil && nodeReadyConditions[i].Status == v1.ConditionTrue {
            readyNodes++
        } else {
            notReadyNodes++
        }
    }
    switch {
    case readyNodes == 0 && notReadyNodes > 0:
        return notReadyNodes, stateFullDisruption
    case notReadyNodes > 2 && float32(notReadyNodes)/float32(notReadyNodes+readyNodes) >= nc.unhealthyZoneThreshold:
        return notReadyNodes, statePartialDisruption
    default:
        return notReadyNodes, stateNormal
    }
}

2.初始化与相关结构体分析

2.1 Controller结构体分析

Controller结构体关键属性:
(1)taintManager:负责污点驱逐的manager;
(2)enterPartialDisruptionFunc:返回当zone状态为PartialDisruption时的驱逐速率(node节点数量超过largeClusterThreshold时,返回secondaryEvictionLimiterQPS,即kcm启动参数--secondary-node-eviction-rate,否则返回0);
(3)enterFullDisruptionFunc:返回当zone状态为FullDisruption时的驱逐速率(直接返回NodeEvictionRate值,kcm启动参数--node-eviction-rate);
(4)computeZoneStateFunc:计算zone状态的方法,即上面zone状态介绍中的ComputeZoneState方法;
(5)nodeHealthMap:用于记录所有node的最近一次的状态信息;
(6)zoneStates:用于记录所有zone的状态;
(7)nodeMonitorPeriodnodeStartupGracePeriodnodeMonitorGracePeriodpodEvictionTimeoutevictionLimiterQPSsecondaryEvictionLimiterQPSlargeClusterThresholdunhealthyZoneThreshold,上面介绍启动参数时已经做了分析;
(8)runTaintManager:kcm启动参数--enable-taint-manager赋值,代表是否启动taintManager;
(9)useTaintBasedEvictions:代表是否开启污点驱逐,kcm启动参数--feature-gates=TaintBasedEvictions=xxx赋值,默认值true,配合--enable-taint-manager共同作用,两者均为true,才会开启污点驱逐;

Controller结构体中的两个关键队列:
(1)zonePodEvictor:pod需要被驱逐的node节点队列(只有在未开启污点驱逐时,才用到该队列),当node的ready condition变为false或unknown且持续了podEvictionTimeout的时间,会将该node放入该队列,然后有worker负责从该队列中读取node,去执行node上的pod驱逐操作;
(2)zoneNoExecuteTainter:需要更新taint的node节点队列,当node的ready condition变为false或unknown时,会将该node放入该队列,然后有worker负责从该队列中读取node,去执行taint更新操作(增加notReadyunreachable的taint);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
type Controller struct {
    ...
    taintManager *scheduler.NoExecuteTaintManager
 
    // This timestamp is to be used instead of LastProbeTime stored in Condition. We do this
    // to avoid the problem with time skew across the cluster.
    now func() metav1.Time
 
    enterPartialDisruptionFunc func(nodeNum int) float32
    enterFullDisruptionFunc    func(nodeNum int) float32
    computeZoneStateFunc       func(nodeConditions []*v1.NodeCondition) (int, ZoneState)
 
    knownNodeSet map[string]*v1.Node
    // per Node map storing last observed health together with a local time when it was observed.
    nodeHealthMap *nodeHealthMap
 
    // evictorLock protects zonePodEvictor and zoneNoExecuteTainter.
    // TODO(#83954): API calls shouldn't be executed under the lock.
    evictorLock     sync.Mutex
    nodeEvictionMap *nodeEvictionMap
    // workers that evicts pods from unresponsive nodes.
    zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue
    // workers that are responsible for tainting nodes.
    zoneNoExecuteTainter map[string]*scheduler.RateLimitedTimedQueue
 
    nodesToRetry sync.Map
 
    zoneStates map[string]ZoneState
 
    daemonSetStore          appsv1listers.DaemonSetLister
    daemonSetInformerSynced cache.InformerSynced
 
    leaseLister         coordlisters.LeaseLister
    leaseInformerSynced cache.InformerSynced
    nodeLister          corelisters.NodeLister
    nodeInformerSynced  cache.InformerSynced
 
    getPodsAssignedToNode func(nodeName string) ([]*v1.Pod, error)
 
    recorder record.EventRecorder
 
    // Value controlling Controller monitoring period, i.e. how often does Controller
    // check node health signal posted from kubelet. This value should be lower than
    // nodeMonitorGracePeriod.
    // TODO: Change node health monitor to watch based.
    nodeMonitorPeriod time.Duration
 
    // When node is just created, e.g. cluster bootstrap or node creation, we give
    // a longer grace period.
    nodeStartupGracePeriod time.Duration
 
    // Controller will not proactively sync node health, but will monitor node
    // health signal updated from kubelet. There are 2 kinds of node healthiness
    // signals: NodeStatus and NodeLease. NodeLease signal is generated only when
    // NodeLease feature is enabled. If it doesn't receive update for this amount
    // of time, it will start posting "NodeReady==ConditionUnknown". The amount of
    // time before which Controller start evicting pods is controlled via flag
    // 'pod-eviction-timeout'.
    // Note: be cautious when changing the constant, it must work with
    // nodeStatusUpdateFrequency in kubelet and renewInterval in NodeLease
    // controller. The node health signal update frequency is the minimal of the
    // two.
    // There are several constraints:
    // 1. nodeMonitorGracePeriod must be N times more than  the node health signal
    //    update frequency, where N means number of retries allowed for kubelet to
    //    post node status/lease. It is pointless to make nodeMonitorGracePeriod
    //    be less than the node health signal update frequency, since there will
    //    only be fresh values from Kubelet at an interval of node health signal
    //    update frequency. The constant must be less than podEvictionTimeout.
    // 2. nodeMonitorGracePeriod can't be too large for user experience - larger
    //    value takes longer for user to see up-to-date node health.
    nodeMonitorGracePeriod time.Duration
 
    podEvictionTimeout          time.Duration
    evictionLimiterQPS          float32
    secondaryEvictionLimiterQPS float32
    largeClusterThreshold       int32
    unhealthyZoneThreshold      float32
 
    // if set to true Controller will start TaintManager that will evict Pods from
    // tainted nodes, if they're not tolerated.
    runTaintManager bool
 
    // if set to true Controller will taint Nodes with 'TaintNodeNotReady' and 'TaintNodeUnreachable'
    // taints instead of evicting Pods itself.
    useTaintBasedEvictions bool
 
    nodeUpdateQueue workqueue.Interface
    podUpdateQueue  workqueue.RateLimitingInterface
}

2.2 初始化逻辑分析

NewNodeLifecycleController函数的主要逻辑为:
(1)初始化Controller结构体,代表NodeLifecycleController
(2)给podInformer注册EventHandler(部分逻辑与TaintManager相关);
(3)判断是否开启污点驱逐,即--enable-taint-manager启动参数值是否配置为true,是则初始化TaintManager并赋值给ControllertaintManager属性,随后给nodeInformer注册TaintManager相关的EventHandler
(4)给nodeInformer注册EventHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func NewNodeLifecycleController(
    leaseInformer coordinformers.LeaseInformer,
    podInformer coreinformers.PodInformer,
    nodeInformer coreinformers.NodeInformer,
    daemonSetInformer appsv1informers.DaemonSetInformer,
    kubeClient clientset.Interface,
    nodeMonitorPeriod time.Duration,
    nodeStartupGracePeriod time.Duration,
    nodeMonitorGracePeriod time.Duration,
    podEvictionTimeout time.Duration,
    evictionLimiterQPS float32,
    secondaryEvictionLimiterQPS float32,
    largeClusterThreshold int32,
    unhealthyZoneThreshold float32,
    runTaintManager bool,
    useTaintBasedEvictions bool,
) (*Controller, error) {
 
    ...
 
    // (1)初始化`Controller`结构体; 
    nc := &Controller{
        kubeClient:                  kubeClient,
        now:                         metav1.Now,
        knownNodeSet:                make(map[string]*v1.Node),
        nodeHealthMap:               newNodeHealthMap(),
        nodeEvictionMap:             newNodeEvictionMap(),
        recorder:                    recorder,
        nodeMonitorPeriod:           nodeMonitorPeriod,
        nodeStartupGracePeriod:      nodeStartupGracePeriod,
        nodeMonitorGracePeriod:      nodeMonitorGracePeriod,
        zonePodEvictor:              make(map[string]*scheduler.RateLimitedTimedQueue),
        zoneNoExecuteTainter:        make(map[string]*scheduler.RateLimitedTimedQueue),
        nodesToRetry:                sync.Map{},
        zoneStates:                  make(map[string]ZoneState),
        podEvictionTimeout:          podEvictionTimeout,
        evictionLimiterQPS:          evictionLimiterQPS,
        secondaryEvictionLimiterQPS: secondaryEvictionLimiterQPS,
        largeClusterThreshold:       largeClusterThreshold,
        unhealthyZoneThreshold:      unhealthyZoneThreshold,
        runTaintManager:             runTaintManager,
        useTaintBasedEvictions:      useTaintBasedEvictions && runTaintManager,
        nodeUpdateQueue:             workqueue.NewNamed("node_lifecycle_controller"),
        podUpdateQueue:              workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "node_lifecycle_controller_pods"),
    }
    if useTaintBasedEvictions {
        klog.Infof("Controller is using taint based evictions.")
    }
 
    nc.enterPartialDisruptionFunc = nc.ReducedQPSFunc
    nc.enterFullDisruptionFunc = nc.HealthyQPSFunc
    nc.computeZoneStateFunc = nc.ComputeZoneState
 
    // (2)给`podInformer`注册`EventHandler`(部分逻辑与`TaintManager`相关);
    podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: func(obj interface{}) {
            pod := obj.(*v1.Pod)
            nc.podUpdated(nil, pod)
            if nc.taintManager != nil {
                nc.taintManager.PodUpdated(nil, pod)
            }
        },
        UpdateFunc: func(prev, obj interface{}) {
            prevPod := prev.(*v1.Pod)
            newPod := obj.(*v1.Pod)
            nc.podUpdated(prevPod, newPod)
            if nc.taintManager != nil {
                nc.taintManager.PodUpdated(prevPod, newPod)
            }
        },
        DeleteFunc: func(obj interface{}) {
            pod, isPod := obj.(*v1.Pod)
            // We can get DeletedFinalStateUnknown instead of *v1.Pod here and we need to handle that correctly.
            if !isPod {
                deletedState, ok := obj.(cache.DeletedFinalStateUnknown)
                if !ok {
                    klog.Errorf("Received unexpected object: %v", obj)
                    return
                }
                pod, ok = deletedState.Obj.(*v1.Pod)
                if !ok {
                    klog.Errorf("DeletedFinalStateUnknown contained non-Pod object: %v", deletedState.Obj)
                    return
                }
            }
            nc.podUpdated(pod, nil)
            if nc.taintManager != nil {
                nc.taintManager.PodUpdated(pod, nil)
            }
        },
    })
    nc.podInformerSynced = podInformer.Informer().HasSynced
    podInformer.Informer().AddIndexers(cache.Indexers{
        nodeNameKeyIndex: func(obj interface{}) ([]string, error) {
            pod, ok := obj.(*v1.Pod)
            if !ok {
                return []string{}, nil
            }
            if len(pod.Spec.NodeName) == 0 {
                return []string{}, nil
            }
            return []string{pod.Spec.NodeName}, nil
        },
    })
 
    podIndexer := podInformer.Informer().GetIndexer()
    nc.getPodsAssignedToNode = func(nodeName string) ([]*v1.Pod, error) {
        objs, err := podIndexer.ByIndex(nodeNameKeyIndex, nodeName)
        if err != nil {
            return nil, err
        }
        pods := make([]*v1.Pod, 0, len(objs))
        for _, obj := range objs {
            pod, ok := obj.(*v1.Pod)
            if !ok {
                continue
            }
            pods = append(pods, pod)
        }
        return pods, nil
    }
    nc.podLister = podInformer.Lister()
 
    // (3)判断是否开启污点驱逐,即`--enable-taint-manager`启动参数值是否配置为true,是则初始化`TaintManager`并赋值给`Controller`的`taintManager`属性,随后给nodeInformer注册`TaintManager`相关的`EventHandler`; 
    if nc.runTaintManager {
        podGetter := func(name, namespace string) (*v1.Pod, error) { return nc.podLister.Pods(namespace).Get(name) }
        nodeLister := nodeInformer.Lister()
        nodeGetter := func(name string) (*v1.Node, error) { return nodeLister.Get(name) }
        nc.taintManager = scheduler.NewNoExecuteTaintManager(kubeClient, podGetter, nodeGetter, nc.getPodsAssignedToNode)
        nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
            AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
                nc.taintManager.NodeUpdated(nil, node)
                return nil
            }),
            UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(oldNode, newNode *v1.Node) error {
                nc.taintManager.NodeUpdated(oldNode, newNode)
                return nil
            }),
            DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
                nc.taintManager.NodeUpdated(node, nil)
                return nil
            }),
        })
    }
     
    // (4) 给`nodeInformer`注册`EventHandler`; 
    klog.Infof("Controller will reconcile labels.")
    nodeInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
        AddFunc: nodeutil.CreateAddNodeHandler(func(node *v1.Node) error {
            nc.nodeUpdateQueue.Add(node.Name)
            nc.nodeEvictionMap.registerNode(node.Name)
            return nil
        }),
        UpdateFunc: nodeutil.CreateUpdateNodeHandler(func(_, newNode *v1.Node) error {
            nc.nodeUpdateQueue.Add(newNode.Name)
            return nil
        }),
        DeleteFunc: nodeutil.CreateDeleteNodeHandler(func(node *v1.Node) error {
            nc.nodesToRetry.Delete(node.Name)
            nc.nodeEvictionMap.unregisterNode(node.Name)
            return nil
        }),
    })
 
    nc.leaseLister = leaseInformer.Lister()
    nc.leaseInformerSynced = leaseInformer.Informer().HasSynced
 
    nc.nodeLister = nodeInformer.Lister()
    nc.nodeInformerSynced = nodeInformer.Informer().HasSynced
 
    nc.daemonSetStore = daemonSetInformer.Lister()
    nc.daemonSetInformerSynced = daemonSetInformer.Informer().HasSynced
 
    return nc, nil
}

3.处理逻辑分析

Run方法作为NodeLifecycleController的处理逻辑分析入口,其主要逻辑为:
(1)等待leaseInformernodeInformerpodInformerdaemonSetInformer中的cache同步完成;

(2)判断是否开启污点驱逐,是则拉起一个goroutine,调用nc.taintManager.Run方法,启动taintManager

(3)启动8个goroutine,即8个worker,循环调用nc.doNodeProcessingPassWorker方法,用于处理nc.nodeUpdateQueue队列;
nc.doNodeProcessingPassWorker方法有两个作用:
(3-1)调用nc.doNoScheduleTaintingPass方法,根据node.Status.Conditionsnode.Spec.Unschedulable的值来更新node.Spec.Taints,主要是设置Effectnoschedule的taint;
(3-2)调用nc.reconcileNodeLabels方法,处理node对象中os和arch相关的label;

(4)启动4个goroutine,即4个worker,循环调用nc.doPodProcessingWorker方法,用于处理nc.podUpdateQueue队列;
nc.doPodProcessingWorker方法做以下两个操作:
(4-1)当污点驱逐未开启时,判断node对象的status,当node的ready Condition为false或unknown且已经持续了至少nc.podEvictionTimeout的时间时,对该node上的pod做驱逐(删除)操作;
(4-2)如果node的ready condition值不为true,则将pod的ready condition更新为false;

(5)判断nc.useTaintBasedEvictions是否为true,即是否开启污点驱逐,是则启动goroutine并循环调用nc.doNoExecuteTaintingPass
nc.doNoExecuteTaintingPass方法主要作用是根据node.Status.Conditions的值来更新node.Spec.Taints,主要是设置EffectnoExecute的taint;

(6)未开启污点驱逐时,启动goroutine并循环调用nc.doEvictionPass
nc.doEvictionPass方法主要作用是从nc.zonePodEvictor中获取node,然后驱逐(删除)该node上除daemonset pod外的所有pod;

(7)启动goroutine,间隔nc.nodeMonitorPeriod时间(即kcm启动参数--node-monitor-period,默认值5秒)循环调用nc.monitorNodeHealth方法;
nc.monitorNodeHealth方法的主要作用是持续监控node的状态,根据集群中不同zone下unhealthy数量的node,以及kcm启动参数中驱逐速率的相关配置,给不同的zone设置不同的驱逐速率(该驱逐速率对是否开启污点驱逐均生效),且当node心跳上报(node lease的更新时间)距离上一次上报时间已经超过nodeMonitorGracePeriod(刚启动则为nodeStartupGracePeriod),更新node对象的ready condition值,并做相应的驱逐处理:
(7-1)当开启了污点驱逐,且node的ready condition不为true时,添加NoExcute污点,并将该node放入zoneNoExecuteTainter中,由taintManager来做驱逐操作;
(7-2)当没开启污点驱逐,且node的ready condition不为true持续了podEvictionTimeout时间,则开始驱逐pod的操作;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) Run(stopCh <-chan struct{}) {
    defer utilruntime.HandleCrash()
 
    klog.Infof("Starting node controller")
    defer klog.Infof("Shutting down node controller")
 
    if !cache.WaitForNamedCacheSync("taint", stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {
        return
    }
 
    if nc.runTaintManager {
        go nc.taintManager.Run(stopCh)
    }
 
    // Close node update queue to cleanup go routine.
    defer nc.nodeUpdateQueue.ShutDown()
    defer nc.podUpdateQueue.ShutDown()
 
    // Start workers to reconcile labels and/or update NoSchedule taint for nodes.
    for i := 0; i < scheduler.UpdateWorkerSize; i++ {
        // Thanks to "workqueue", each worker just need to get item from queue, because
        // the item is flagged when got from queue: if new event come, the new item will
        // be re-queued until "Done", so no more than one worker handle the same item and
        // no event missed.
        go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)
    }
 
    for i := 0; i < podUpdateWorkerSize; i++ {
        go wait.Until(nc.doPodProcessingWorker, time.Second, stopCh)
    }
 
    if nc.useTaintBasedEvictions {
        // Handling taint based evictions. Because we don't want a dedicated logic in TaintManager for NC-originated
        // taints and we normally don't rate limit evictions caused by taints, we need to rate limit adding taints.
        go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)
    } else {
        // Managing eviction of nodes:
        // When we delete pods off a node, if the node was not empty at the time we then
        // queue an eviction watcher. If we hit an error, retry deletion.
        go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)
    }
 
    // Incorporate the results of node health signal pushed from kubelet to master.
    go wait.Until(func() {
        if err := nc.monitorNodeHealth(); err != nil {
            klog.Errorf("Error monitoring node health: %v", err)
        }
    }, nc.nodeMonitorPeriod, stopCh)
 
    <-stopCh
}

3.1 nc.taintManager.Run

taintManager的主要功能为:当某个node被打上NoExecute污点后,其上面的pod如果不能容忍该污点,则taintManager将会驱逐这些pod,而新建的pod也需要容忍该污点才能调度到该node上;

通过kcm启动参数--enable-taint-manager来确定是否启动taintManagertrue时启动(启动参数默认值为true);

kcm启动参数--feature-gates=TaintBasedEvictions=xxx,默认值true,配合--enable-taint-manager共同作用,两者均为true,才会开启污点驱逐;

taintManager部分的内容比较多,将在后面单独一遍文章进行分析;

3.2 nc.doNodeProcessingPassWorker

nc.doNodeProcessingPassWorker方法有两个作用:
(1)调用nc.doNoScheduleTaintingPass方法,根据node.Status.Conditionsnode.Spec.Unschedulable的值来更新node.Spec.Taints,主要是设置Effectnoschedule的taint;
(2)调用nc.reconcileNodeLabels方法,处理node对象中os和arch相关的label;

主要逻辑:
(1)循环消费nodeUpdateQueue队列,从队列中获取一个nodeName;
(2)调用nc.doNoScheduleTaintingPass,对node的taint进行处理;
(3)调用nc.reconcileNodeLabels,对node对象中os和arch相关的label进行处理;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doNodeProcessingPassWorker() {
    for {
        obj, shutdown := nc.nodeUpdateQueue.Get()
        // "nodeUpdateQueue" will be shutdown when "stopCh" closed;
        // we do not need to re-check "stopCh" again.
        if shutdown {
            return
        }
        nodeName := obj.(string)
        if err := nc.doNoScheduleTaintingPass(nodeName); err != nil {
            klog.Errorf("Failed to taint NoSchedule on node <%s>, requeue it: %v", nodeName, err)
            // TODO(k82cn): Add nodeName back to the queue
        }
        // TODO: re-evaluate whether there are any labels that need to be
        // reconcile in 1.19. Remove this function if it's no longer necessary.
        if err := nc.reconcileNodeLabels(nodeName); err != nil {
            klog.Errorf("Failed to reconcile labels for node <%s>, requeue it: %v", nodeName, err)
            // TODO(yujuhong): Add nodeName back to the queue
        }
        nc.nodeUpdateQueue.Done(nodeName)
    }
}

3.2.1 nc.doNoScheduleTaintingPass

nc.doNoScheduleTaintingPass方法主要作用是根据node.Status.Conditionsnode.Spec.Unschedulable的值来更新node.Spec.Taints,主要是设置Effectnoschedule的taint;

主要逻辑:
(1)调用nc.nodeLister.Get,从informer本地缓存中获取node对象,不存在则直接return;

(2)根据node.Status.Conditions的值,获得相应的taints;
(2-1)node.status.Conditions中有type为ready的condition。如果这个condition.status为fasle,设置key为node.kubernetes.io/not-ready,Effect为noschedule的taint;如果这个condition.status值为unknown,设置key为node.kubernetes.io/unreachable,Effect为noschedule的taint;
(2-2)node.status.Conditions中有type为MemoryPressure的condition。如果这个condition.status为true,设置key为node.kubernetes.io/memory-pressure,Effect为noschedule的taint;
(2-3)node.status.Conditions中有type为DiskPressure的condition。如果这个condition.status为true,设置key为node.kubernetes.io/disk-pressure,Effect为noschedule的taint;
(2-4)node.status.Conditions中有type为NetworkUnavailable的condition。如果这个condition.status为true,设置key为node.kubernetes.io/network-unavailable,Effect为noschedule的taint;
(2-5)node.status.Conditions中有type为PIDPressure的condition。如果这个condition.status为true,设置key为node.kubernetes.io/pid-pressure,Effect为noschedule的taint;

(3)如果node.Spec.Unschedulable值为true,则再追加key为node.kubernetes.io/unschedulable,Effect为noschedule的taint到taints中;

(4)调用nodeutil.SwapNodeControllerTaint,更新node的taints;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
 
var(
    ...
    nodeConditionToTaintKeyStatusMap = map[v1.NodeConditionType]map[v1.ConditionStatus]string{
        v1.NodeReady: {
            v1.ConditionFalse:   v1.TaintNodeNotReady,
            v1.ConditionUnknown: v1.TaintNodeUnreachable,
        },
        v1.NodeMemoryPressure: {
            v1.ConditionTrue: v1.TaintNodeMemoryPressure,
        },
        v1.NodeDiskPressure: {
            v1.ConditionTrue: v1.TaintNodeDiskPressure,
        },
        v1.NodeNetworkUnavailable: {
            v1.ConditionTrue: v1.TaintNodeNetworkUnavailable,
        },
        v1.NodePIDPressure: {
            v1.ConditionTrue: v1.TaintNodePIDPressure,
        },
    }
    ...
)
 
func (nc *Controller) doNoScheduleTaintingPass(nodeName string) error {
    node, err := nc.nodeLister.Get(nodeName)
    if err != nil {
        // If node not found, just ignore it.
        if apierrors.IsNotFound(err) {
            return nil
        }
        return err
    }
 
    // Map node's condition to Taints.
    var taints []v1.Taint
    for _, condition := range node.Status.Conditions {
        if taintMap, found := nodeConditionToTaintKeyStatusMap[condition.Type]; found {
            if taintKey, found := taintMap[condition.Status]; found {
                taints = append(taints, v1.Taint{
                    Key:    taintKey,
                    Effect: v1.TaintEffectNoSchedule,
                })
            }
        }
    }
    if node.Spec.Unschedulable {
        // If unschedulable, append related taint.
        taints = append(taints, v1.Taint{
            Key:    v1.TaintNodeUnschedulable,
            Effect: v1.TaintEffectNoSchedule,
        })
    }
 
    // Get exist taints of node.
    nodeTaints := taintutils.TaintSetFilter(node.Spec.Taints, func(t *v1.Taint) bool {
        // only NoSchedule taints are candidates to be compared with "taints" later
        if t.Effect != v1.TaintEffectNoSchedule {
            return false
        }
        // Find unschedulable taint of node.
        if t.Key == v1.TaintNodeUnschedulable {
            return true
        }
        // Find node condition taints of node.
        _, found := taintKeyToNodeConditionMap[t.Key]
        return found
    })
    taintsToAdd, taintsToDel := taintutils.TaintSetDiff(taints, nodeTaints)
    // If nothing to add not delete, return true directly.
    if len(taintsToAdd) == 0 && len(taintsToDel) == 0 {
        return nil
    }
    if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, taintsToAdd, taintsToDel, node) {
        return fmt.Errorf("failed to swap taints of node %+v", node)
    }
    return nil
}

3.2.2 nc.reconcileNodeLabels

nc.reconcileNodeLabels方法主要是处理node对象中,os和arch相关的label,os和arch相关的label在kubelet向apiserver注册node的时候会带上;

主要逻辑:
(1)从informer缓存中获取node对象,不存在则直接return;
(2)如果node的label为空,则直接return;
(3)如果node对象存在key为“beta.kubernetes.io/os”的label ,则设置key为“kubernetes.io/os"、value值一样的label;
(4)如果node对象存在key为“beta.kubernetes.io/arch”的label,则设置key为“kubernetes.io/arch"、value值一样的label;

3.3 nc.doPodProcessingWorker

nc.doPodProcessingWorker方法判断node对象的status,当node的ready Condition为false或unknown且已经持续了至少nc.podEvictionTimeout的时间时,对该node上的pod做驱逐(删除)操作,并且如果node的ready condition值不为true,则将pod的ready condition更新为false;

需要注意的是,当启用了taint manager时,pod的驱逐由taint manager进行处理,这里就不再进行pod的驱逐处理。

主要逻辑:
(1)循环消费podUpdateQueue队列,从队列中获取一个nodeName;
(2)调用nc.processPod,做进一步处理;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doPodProcessingWorker() {
    for {
        obj, shutdown := nc.podUpdateQueue.Get()
        // "podUpdateQueue" will be shutdown when "stopCh" closed;
        // we do not need to re-check "stopCh" again.
        if shutdown {
            return
        }
 
        podItem := obj.(podUpdateItem)
        nc.processPod(podItem)
    }
}

3.3.1 nc.processPod

nc.processPod方法主要逻辑:
(1)从informer本地缓存中获取pod对象;
(2)获取pod所在nodeName,并根据nodeName调用nc.nodeHealthMap.getDeepCopy获取nodeHealth,如nodeHealth为空则直接return;
(3)调用nodeutil.GetNodeCondition,获取nodeHealth.status中node的ready condition,如果获取不到则直接return;
(4)判断taint manager是否启用,没启用则调用nc.processNoTaintBaseEviction对pod做进一步处理(驱逐逻辑);
(5)如果node的ready condition值不为true,则调用nodeutil.MarkPodsNotReady将pod的ready condition更新为false;

注意:当启用taint manager时,pod的驱逐由taint manager进行处理,所以不在这里处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) processPod(podItem podUpdateItem) {
    defer nc.podUpdateQueue.Done(podItem)
    pod, err := nc.podLister.Pods(podItem.namespace).Get(podItem.name)
    if err != nil {
        if apierrors.IsNotFound(err) {
            // If the pod was deleted, there is no need to requeue.
            return
        }
        klog.Warningf("Failed to read pod %v/%v: %v.", podItem.namespace, podItem.name, err)
        nc.podUpdateQueue.AddRateLimited(podItem)
        return
    }
 
    nodeName := pod.Spec.NodeName
 
    nodeHealth := nc.nodeHealthMap.getDeepCopy(nodeName)
    if nodeHealth == nil {
        // Node data is not gathered yet or node has beed removed in the meantime.
        // Pod will be handled by doEvictionPass method.
        return
    }
 
    node, err := nc.nodeLister.Get(nodeName)
    if err != nil {
        klog.Warningf("Failed to read node %v: %v.", nodeName, err)
        nc.podUpdateQueue.AddRateLimited(podItem)
        return
    }
 
    _, currentReadyCondition := nodeutil.GetNodeCondition(nodeHealth.status, v1.NodeReady)
    if currentReadyCondition == nil {
        // Lack of NodeReady condition may only happen after node addition (or if it will be maliciously deleted).
        // In both cases, the pod will be handled correctly (evicted if needed) during processing
        // of the next node update event.
        return
    }
 
    pods := []*v1.Pod{pod}
    // In taint-based eviction mode, only node updates are processed by NodeLifecycleController.
    // Pods are processed by TaintManager.
    if !nc.useTaintBasedEvictions {
        if err := nc.processNoTaintBaseEviction(node, currentReadyCondition, nc.nodeMonitorGracePeriod, pods); err != nil {
            klog.Warningf("Unable to process pod %+v eviction from node %v: %v.", podItem, nodeName, err)
            nc.podUpdateQueue.AddRateLimited(podItem)
            return
        }
    }
 
    if currentReadyCondition.Status != v1.ConditionTrue {
        if err := nodeutil.MarkPodsNotReady(nc.kubeClient, pods, nodeName); err != nil {
            klog.Warningf("Unable to mark pod %+v NotReady on node %v: %v.", podItem, nodeName, err)
            nc.podUpdateQueue.AddRateLimited(podItem)
        }
    }
}

3.3.2 nc.processNoTaintBaseEviction

nc.processNoTaintBaseEviction方法主要逻辑:
(1)当node的ready Condition为false或unknown且已经持续了至少nc.podEvictionTimeout的时间时,调用nc.evictPods方法,将node加入到nc.zonePodEvictor队列中,由其他worker消费该队列,对该node上的pod做驱逐(删除)操作;
(2)当node的ready Condition为true时,调用nc.cancelPodEviction方法,将该node从nc.zonePodEvictor队列中移除,代表取消驱逐该node上的pod;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration, pods []*v1.Pod) error {
    decisionTimestamp := nc.now()
    nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name)
    if nodeHealthData == nil {
        return fmt.Errorf("health data doesn't exist for node %q", node.Name)
    }
    // Check eviction timeout against decisionTimestamp
    switch observedReadyCondition.Status {
    case v1.ConditionFalse:
        if decisionTimestamp.After(nodeHealthData.readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
            enqueued, err := nc.evictPods(node, pods)
            if err != nil {
                return err
            }
            if enqueued {
                klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
                    node.Name,
                    decisionTimestamp,
                    nodeHealthData.readyTransitionTimestamp,
                    nc.podEvictionTimeout,
                )
            }
        }
    case v1.ConditionUnknown:
        if decisionTimestamp.After(nodeHealthData.probeTimestamp.Add(nc.podEvictionTimeout)) {
            enqueued, err := nc.evictPods(node, pods)
            if err != nil {
                return err
            }
            if enqueued {
                klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
                    node.Name,
                    decisionTimestamp,
                    nodeHealthData.readyTransitionTimestamp,
                    nc.podEvictionTimeout-gracePeriod,
                )
            }
        }
    case v1.ConditionTrue:
        if nc.cancelPodEviction(node) {
            klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
        }
    }
    return nil
}

3.3.3 nc.evictPods

nc.evictPods方法不列出源码,只需要知道:该方法主要是将node加入到nc.zonePodEvictor队列中,由其他worker消费该队列,对该node上的pod做驱逐(删除)操作;

3.3.4 消费nc.zonePodEvictor队列-nc.doEvictionPass

nc.doEvictionPass方法负责消费nc.zonePodEvictor队列,调用nodeutil.DeletePods来将node上的pod驱逐(删除)掉;

1
2
3
4
5
6
7
8
9
10
11
12
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doEvictionPass() {
    nc.evictorLock.Lock()
    defer nc.evictorLock.Unlock()
    for k := range nc.zonePodEvictor {
        nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
            ...
            remaining, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
            ...
        })
    }
}
1
2
3
4
5
6
7
8
9
// pkg/controller/util/node/controller_utils.go
func DeletePods(...) ... {
    ...
    for i := range pods {
        if err := kubeClient.CoreV1().Pods(pod.Namespace).Delete(pod.Name, nil); err != nil {
        ...
    }
    ...
}

3.4 nc.doNoExecuteTaintingPass

nc.doNoExecuteTaintingPass方法主要作用是根据node.Status.Conditions的值来更新node.Spec.Taints,主要是设置EffectnoExecute的taint;

主要逻辑为循环遍历nc.zoneNoExecuteTainter,然后做相关处理:
(1)从nc.zoneNoExecuteTainter中取出一个node,然后从informer本地缓存中获取该node对象;
(2)获取node对象ready condition的值并做判断,如为false则构建key为node.kubernetes.io/not-ready,Effect为NoExecute的taint;如为unknown则构建key为node.kubernetes.io/unreachable,Effect为NoExecute的taint;
(3)最后调用nodeutil.SwapNodeControllerTaint,将构造好的taint更新到node对象中(这里注意,上述两个NoExecute的taint在node对象中,同一时间只会存在一个,一个添加到node对象中时,会把另一个移除);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doNoExecuteTaintingPass() {
    nc.evictorLock.Lock()
    defer nc.evictorLock.Unlock()
    for k := range nc.zoneNoExecuteTainter {
        // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
        nc.zoneNoExecuteTainter[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
            node, err := nc.nodeLister.Get(value.Value)
            if apierrors.IsNotFound(err) {
                klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
                return true, 0
            } else if err != nil {
                klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
                // retry in 50 millisecond
                return false, 50 * time.Millisecond
            }
            _, condition := nodeutil.GetNodeCondition(&node.Status, v1.NodeReady)
            // Because we want to mimic NodeStatus.Condition["Ready"] we make "unreachable" and "not ready" taints mutually exclusive.
            taintToAdd := v1.Taint{}
            oppositeTaint := v1.Taint{}
            switch condition.Status {
            case v1.ConditionFalse:
                taintToAdd = *NotReadyTaintTemplate
                oppositeTaint = *UnreachableTaintTemplate
            case v1.ConditionUnknown:
                taintToAdd = *UnreachableTaintTemplate
                oppositeTaint = *NotReadyTaintTemplate
            default:
                // It seems that the Node is ready again, so there's no need to taint it.
                klog.V(4).Infof("Node %v was in a taint queue, but it's ready now. Ignoring taint request.", value.Value)
                return true, 0
            }
 
            result := nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{&oppositeTaint}, node)
            if result {
                //count the evictionsNumber
                zone := utilnode.GetZoneKey(node)
                evictionsNumber.WithLabelValues(zone).Inc()
            }
 
            return result, 0
        })
    }
}

3.5 nc.doEvictionPass

nc.doEvictionPass方法主要作用是从nc.zonePodEvictor中获取node,然后驱逐(删除)该node上除daemonset pod外的所有pod;

主要逻辑为循环遍历nc.zonePodEvictor,然后做相关处理:
(1)从nc.zonePodEvictor中取出一个node,然后从informer本地缓存中获取该node对象;
(2)调用nc.getPodsAssignedToNode,获取该node上的所有pod;
(3)调用nodeutil.DeletePods,删除该node上除daemonset pod外的所有的pod;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) doEvictionPass() {
    nc.evictorLock.Lock()
    defer nc.evictorLock.Unlock()
    for k := range nc.zonePodEvictor {
        // Function should return 'false' and a time after which it should be retried, or 'true' if it shouldn't (it succeeded).
        nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {
            node, err := nc.nodeLister.Get(value.Value)
            if apierrors.IsNotFound(err) {
                klog.Warningf("Node %v no longer present in nodeLister!", value.Value)
            } else if err != nil {
                klog.Warningf("Failed to get Node %v from the nodeLister: %v", value.Value, err)
            }
            nodeUID, _ := value.UID.(string)
            pods, err := nc.getPodsAssignedToNode(value.Value)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("unable to list pods from node %q: %v", value.Value, err))
                return false, 0
            }
            remaining, err := nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)
            if err != nil {
                // We are not setting eviction status here.
                // New pods will be handled by zonePodEvictor retry
                // instead of immediate pod eviction.
                utilruntime.HandleError(fmt.Errorf("unable to evict node %q: %v", value.Value, err))
                return false, 0
            }
            if !nc.nodeEvictionMap.setStatus(value.Value, evicted) {
                klog.V(2).Infof("node %v was unregistered in the meantime - skipping setting status", value.Value)
            }
            if remaining {
                klog.Infof("Pods awaiting deletion due to Controller eviction")
            }
 
            if node != nil {
                zone := utilnode.GetZoneKey(node)
                evictionsNumber.WithLabelValues(zone).Inc()
            }
 
            return true, 0
        })
    }
}

3.6 nc.monitorNodeHealth

nc.monitorNodeHealth方法的主要作用是持续监控node的状态,根据集群中不同zone下unhealthy数量的node,以及kcm启动参数中驱逐速率的相关配置,给不同的zone设置不同的驱逐速率(该驱逐速率对是否开启污点驱逐均生效),且当node心跳上报(node lease的更新时间)距离上一次上报时间已经超过nodeMonitorGracePeriod(刚启动则为nodeStartupGracePeriod),更新node对象的ready condition值,并做相应的驱逐处理:
(1)当开启了污点驱逐,且node的ready condition不为true时,添加NoExcute污点,并将该node放入zoneNoExecuteTainter中,由taintManager来做驱逐操作;
(2)当没开启污点驱逐,且node的ready condition不为true持续了podEvictionTimeout时间,则开始驱逐pod的操作;

主要逻辑:
(1)从informer本地缓存中获取所有node对象;

(2)调用nc.classifyNodes,将这些node对象分为added、deleted、newZoneRepresentatives三类,即新增的、被删除的、属于新的zone三类,并对每一类做不同的逻辑处理(根据node对象的region与zone label,为每一个node划分一个zoneStates),zoneStates有Initial、Normal、FullDisruption、PartialDisruption四种,新增加的 node默认zoneStates为Initial,其余的几个zoneStates分别对应着不同的驱逐速率;

(3)遍历node对象列表,对每个node进行处理;
(3-1)调用nc.tryUpdateNodeHealth,根据当前获取的node对象的ready condition值、node lease的更新时间等来更新nc.nodeHealthMap中的数据、更新node的condition值,并获取该node的gracePeriodobservedReadyConditioncurrentReadyCondition值,observedReadyCondition可以理解为该node上一次探测时的状态,currentReadyCondition为本次计算出来的状态;
(3-2)如果currentReadyCondition不为空,则调用nc.getPodsAssignedToNode,获取该node上的所有pod列表;
(3-3)如果nc.useTaintBasedEvictions为true,即开启了污点驱逐,则调用nc.processTaintBaseEviction,当node的ready condition属性值为false时去除Unrearchable的污点而添加Notready的污点,并将该node加入zoneNoExecuteTainter队列中;为unknown时去除Notready的污点而添加Unrearchable的污点,并将该node加入zoneNoExecuteTainter队列中;为true时去除NotreadyUnrearchable的污点,然后将该node从zoneNoExecuteTainter队列中移除;
(3-4)如果nc.useTaintBasedEvictions为false,则调用nc.processNoTaintBaseEviction,做进一步的驱逐逻辑处理:当node的ready condition属性值为false时,判断距该node上一次的readyTransitionTimestamp时间是否超过了 podEvictionTimeout,是则将该node加入到zonePodEvictor队列中,最终该node上的pod会被驱逐;当node的ready condition属性值为unknow时,判断距该node上一次的probeTimestamp时间是否超过了 podEvictionTimeout,是则将该node加入到zonePodEvictor队列中,最终该node上的pod会被驱逐;当node的ready condition属性值为true时,调用nc.cancelPodEviction,将该node从zonePodEvictor队列中移除,代表不再对该node上的pod执行驱逐操作;
(3-5)当node对象的Ready Condition值由true变为false时,则调用nodeutil.MarkAllPodsNotReady,将该node上的所有pod标记为notReady;

(4)调用nc.handleDisruption,根据集群中不同zone下unhealthy数量的node,以及kcm启动参数中驱逐速率的相关配置,给不同的zone设置不同的驱逐速率(该驱逐速率对是否开启污点驱逐均生效);
nc.handleDisruption方法暂不展开分析,可自行查看;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) monitorNodeHealth() error {
    // We are listing nodes from local cache as we can tolerate some small delays
    // comparing to state from etcd and there is eventual consistency anyway.
    nodes, err := nc.nodeLister.List(labels.Everything())
    if err != nil {
        return err
    }
    added, deleted, newZoneRepresentatives := nc.classifyNodes(nodes)
 
    for i := range newZoneRepresentatives {
        nc.addPodEvictorForNewZone(newZoneRepresentatives[i])
    }
 
    for i := range added {
        klog.V(1).Infof("Controller observed a new Node: %#v", added[i].Name)
        nodeutil.RecordNodeEvent(nc.recorder, added[i].Name, string(added[i].UID), v1.EventTypeNormal, "RegisteredNode", fmt.Sprintf("Registered Node %v in Controller", added[i].Name))
        nc.knownNodeSet[added[i].Name] = added[i]
        nc.addPodEvictorForNewZone(added[i])
        if nc.useTaintBasedEvictions {
            nc.markNodeAsReachable(added[i])
        } else {
            nc.cancelPodEviction(added[i])
        }
    }
 
    for i := range deleted {
        klog.V(1).Infof("Controller observed a Node deletion: %v", deleted[i].Name)
        nodeutil.RecordNodeEvent(nc.recorder, deleted[i].Name, string(deleted[i].UID), v1.EventTypeNormal, "RemovingNode", fmt.Sprintf("Removing Node %v from Controller", deleted[i].Name))
        delete(nc.knownNodeSet, deleted[i].Name)
    }
 
    zoneToNodeConditions := map[string][]*v1.NodeCondition{}
    for i := range nodes {
        var gracePeriod time.Duration
        var observedReadyCondition v1.NodeCondition
        var currentReadyCondition *v1.NodeCondition
        node := nodes[i].DeepCopy()
        if err := wait.PollImmediate(retrySleepTime, retrySleepTime*scheduler.NodeHealthUpdateRetry, func() (bool, error) {
            gracePeriod, observedReadyCondition, currentReadyCondition, err = nc.tryUpdateNodeHealth(node)
            if err == nil {
                return true, nil
            }
            name := node.Name
            node, err = nc.kubeClient.CoreV1().Nodes().Get(name, metav1.GetOptions{})
            if err != nil {
                klog.Errorf("Failed while getting a Node to retry updating node health. Probably Node %s was deleted.", name)
                return false, err
            }
            return false, nil
        }); err != nil {
            klog.Errorf("Update health of Node '%v' from Controller error: %v. "+
                "Skipping - no pods will be evicted.", node.Name, err)
            continue
        }
 
        // Some nodes may be excluded from disruption checking
        if !isNodeExcludedFromDisruptionChecks(node) {
            zoneToNodeConditions[utilnode.GetZoneKey(node)] = append(zoneToNodeConditions[utilnode.GetZoneKey(node)], currentReadyCondition)
        }
 
        if currentReadyCondition != nil {
            pods, err := nc.getPodsAssignedToNode(node.Name)
            if err != nil {
                utilruntime.HandleError(fmt.Errorf("unable to list pods of node %v: %v", node.Name, err))
                if currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue {
                    // If error happened during node status transition (Ready -> NotReady)
                    // we need to mark node for retry to force MarkPodsNotReady execution
                    // in the next iteration.
                    nc.nodesToRetry.Store(node.Name, struct{}{})
                }
                continue
            }
            if nc.useTaintBasedEvictions {
                nc.processTaintBaseEviction(node, &observedReadyCondition)
            } else {
                if err := nc.processNoTaintBaseEviction(node, &observedReadyCondition, gracePeriod, pods); err != nil {
                    utilruntime.HandleError(fmt.Errorf("unable to evict all pods from node %v: %v; queuing for retry", node.Name, err))
                }
            }
 
            _, needsRetry := nc.nodesToRetry.Load(node.Name)
            switch {
            case currentReadyCondition.Status != v1.ConditionTrue && observedReadyCondition.Status == v1.ConditionTrue:
                // Report node event only once when status changed.
                nodeutil.RecordNodeStatusChange(nc.recorder, node, "NodeNotReady")
                fallthrough
            case needsRetry && observedReadyCondition.Status != v1.ConditionTrue:
                if err = nodeutil.MarkPodsNotReady(nc.kubeClient, pods, node.Name); err != nil {
                    utilruntime.HandleError(fmt.Errorf("unable to mark all pods NotReady on node %v: %v; queuing for retry", node.Name, err))
                    nc.nodesToRetry.Store(node.Name, struct{}{})
                    continue
                }
            }
        }
        nc.nodesToRetry.Delete(node.Name)
    }
    nc.handleDisruption(zoneToNodeConditions, nodes)
 
    return nil
}

3.6.1 nc.tryUpdateNodeHealth

nc.tryUpdateNodeHealth方法主要是根据当前获取的node对象的ready condition值、node lease的更新时间等来更新nc.nodeHealthMap中的数据、更新node的condition值,并返回gracePeriod、上次记录的node的ready condition值observedReadyCondition、本次计算出来的node的ready condition值currentReadyCondition

nc.tryUpdateNodeHealth方法主要逻辑为:
(1)从node对象中获取ready condition的值,如果其为空,则设置observedReadyCondition为unknown并设置gracePeriodnc.nodeStartupGracePeriod;否则设置gracePeriod值为nc.nodeMonitorGracePeriod,设置observedReadyCondition值为从node对象中获取到的ready condition的值;
(2)nodeHealthstatusprobeTimestampreadyTransitionTimestamp属性值赋值相关逻辑处理,status赋值为node.status,probeTimestamp赋值为现在的时间,当ready condition的LastTransitionTime值有所变化,设置readyTransitionTimestamp值为现在的时间;
(3)获取node对应的lease对象,判断其spec.RenewTime属性值是否比上次记录的时间(nodeHealth.lease.spec.RenewTime)要新,是则更新nodeHealth的lease为新lease对象、更新probeTimestamp为此时此刻的时间;
(4)判断现在距离上次探测时间probeTimestamp是否已经超过了nc.nodeMonitorGracePeriod时间,是则将该node的ready conditionmemoryPressure conditiondiskPressure conditionpidPressure condition的值都更新为unknown,最后判断上一次记录的node的ready condition的值与本次的是否一致,不一致则更新nodeHealthreadyTransitionTimestamp的时间为现在;
(5)最后返回gracePeriod、上次记录的node的ready condition值observedReadyCondition、本次计算出来的node的ready condition值currentReadyCondition

3.6.2 nc.processTaintBaseEviction

nc.processTaintBaseEviction方法为污点驱逐的处理逻辑:
(1)当node的ready condition属性值为false时去除Unrearchable的污点而添加Notready的污点,并将该node加入zoneNoExecuteTainter队列中;
(2)为unknown时去除Notready的污点而添加Unrearchable的污点,并将该node加入zoneNoExecuteTainter队列中;
(3)为true时去除NotreadyUnrearchable的污点,然后将该node从zoneNoExecuteTainter队列中移除;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) processTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition) {
    decisionTimestamp := nc.now()
    // Check eviction timeout against decisionTimestamp
    switch observedReadyCondition.Status {
    case v1.ConditionFalse:
        // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
        if taintutils.TaintExists(node.Spec.Taints, UnreachableTaintTemplate) {
            taintToAdd := *NotReadyTaintTemplate
            if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{UnreachableTaintTemplate}, node) {
                klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
            }
        } else if nc.markNodeForTainting(node) {
            klog.V(2).Infof("Node %v is NotReady as of %v. Adding it to the Taint queue.",
                node.Name,
                decisionTimestamp,
            )
        }
    case v1.ConditionUnknown:
        // We want to update the taint straight away if Node is already tainted with the UnreachableTaint
        if taintutils.TaintExists(node.Spec.Taints, NotReadyTaintTemplate) {
            taintToAdd := *UnreachableTaintTemplate
            if !nodeutil.SwapNodeControllerTaint(nc.kubeClient, []*v1.Taint{&taintToAdd}, []*v1.Taint{NotReadyTaintTemplate}, node) {
                klog.Errorf("Failed to instantly swap UnreachableTaint to NotReadyTaint. Will try again in the next cycle.")
            }
        } else if nc.markNodeForTainting(node) {
            klog.V(2).Infof("Node %v is unresponsive as of %v. Adding it to the Taint queue.",
                node.Name,
                decisionTimestamp,
            )
        }
    case v1.ConditionTrue:
        removed, err := nc.markNodeAsReachable(node)
        if err != nil {
            klog.Errorf("Failed to remove taints from node %v. Will retry in next iteration.", node.Name)
        }
        if removed {
            klog.V(2).Infof("Node %s is healthy again, removing all taints", node.Name)
        }
    }
}

3.6.3 nc.processNoTaintBaseEviction

nc.processNoTaintBaseEviction方法为未开启污点驱逐时的驱逐处理逻辑:
(1)当node的ready condition属性值为false时,判断距该node上一次的readyTransitionTimestamp时间是否超过了 podEvictionTimeout,是则将该node加入到zonePodEvictor队列中,最终该node上的pod会被驱逐;
(2)当node的ready condition属性值为unknow时,判断距该node上一次的probeTimestamp时间是否超过了 podEvictionTimeout,是则将该node加入到zonePodEvictor队列中,最终该node上的pod会被驱逐;
(3)当node的ready condition属性值为true时,调用nc.cancelPodEviction,将该node从zonePodEvictor队列中移除,代表不再对该node上的pod执行驱逐操作;

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
// pkg/controller/nodelifecycle/node_lifecycle_controller.go
func (nc *Controller) processNoTaintBaseEviction(node *v1.Node, observedReadyCondition *v1.NodeCondition, gracePeriod time.Duration, pods []*v1.Pod) error {
    decisionTimestamp := nc.now()
    nodeHealthData := nc.nodeHealthMap.getDeepCopy(node.Name)
    if nodeHealthData == nil {
        return fmt.Errorf("health data doesn't exist for node %q", node.Name)
    }
    // Check eviction timeout against decisionTimestamp
    switch observedReadyCondition.Status {
    case v1.ConditionFalse:
        if decisionTimestamp.After(nodeHealthData.readyTransitionTimestamp.Add(nc.podEvictionTimeout)) {
            enqueued, err := nc.evictPods(node, pods)
            if err != nil {
                return err
            }
            if enqueued {
                klog.V(2).Infof("Node is NotReady. Adding Pods on Node %s to eviction queue: %v is later than %v + %v",
                    node.Name,
                    decisionTimestamp,
                    nodeHealthData.readyTransitionTimestamp,
                    nc.podEvictionTimeout,
                )
            }
        }
    case v1.ConditionUnknown:
        if decisionTimestamp.After(nodeHealthData.probeTimestamp.Add(nc.podEvictionTimeout)) {
            enqueued, err := nc.evictPods(node, pods)
            if err != nil {
                return err
            }
            if enqueued {
                klog.V(2).Infof("Node is unresponsive. Adding Pods on Node %s to eviction queues: %v is later than %v + %v",
                    node.Name,
                    decisionTimestamp,
                    nodeHealthData.readyTransitionTimestamp,
                    nc.podEvictionTimeout-gracePeriod,
                )
            }
        }
    case v1.ConditionTrue:
        if nc.cancelPodEviction(node) {
            klog.V(2).Infof("Node %s is ready again, cancelled pod eviction", node.Name)
        }
    }
    return nil
}

总结

NodeLifecycleController主要功能有:
(1)定期检查node的心跳上报,某个node间隔一定时间都没有心跳上报时,更新node的ready condition值为false或unknown,开启了污点驱逐的情况下,给该node添加NoExecute的污点;
(2)当污点驱逐未开启时,当node的ready Condition值为false或unknown且已经持续了一段时间(该时间可配置)时,对该node上的pod做驱逐(删除)操作;
(3)当污点驱逐开启时,node上有NoExecute污点后,立马驱逐(删除)不能容忍污点的pod,对于能容忍该污点的pod,则等待所有污点的容忍时间里最小值后,pod才被驱逐(删除);

驱逐中的zone概念

根据每个node对象的region和zone的label值,将node划分到不同的zone中;

region、zone值都相同的node,划分为同一个zone;

zone状态介绍

zone状态有四种,分别是:
(1)Initial:初始化状态;
(2)FullDisruption:ready的node数量为0,not ready的node数量大于0;
(3)PartialDisruption:not ready的node数量大于2且其占比大于等于unhealthyZoneThreshold
(4)Normal:上述三种状态以外的情形,都属于该状态;

需要注意二级驱逐速率对驱逐的影响,即kcm启动参数--secondary-node-eviction-rate,代表如果某个zone下的unhealthy节点的百分比超过--unhealthy-zone-threshold (默认为 0.55)时,驱逐速率将会减小,如果不是LargeCluster(zone节点数量小于等于--large-cluster-size-threshold,默认为 50),驱逐操作将会停止,如果是LargeCluster,驱逐速率将降为每秒--secondary-node-eviction-rate 个,默认为0.01;



这篇关于k8s驱逐篇(6)-kube-controller-manager驱逐-NodeLifecycleController源码分析的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!


原文链接: https://www.likecs.com/show-308668875.html
扫一扫关注最新编程教程