kubelet源码分析——关闭Pod
2021/10/1 9:10:53
本文主要是介绍kubelet源码分析——关闭Pod,对大家解决编程问题具有一定的参考价值,需要的程序猿们随着小编来一起学习吧!
上一篇说到kublet如何启动一个pod,本篇讲述如何关闭一个Pod,引用一段来自官方文档介绍pod的生命周期的话
- 你使用 kubectl 工具手动删除某个特定的 Pod,而该 Pod 的体面终止限期是默认值(30 秒)。
- API 服务器中的 Pod 对象被更新,记录涵盖体面终止限期在内 Pod 的最终死期,超出所计算时间点则认为 Pod 已死(dead)。 如果你使用 kubectl describe 来查验你正在删除的 Pod,该 Pod 会显示为 "Terminating" (正在终止)。 在 Pod 运行所在的节点上:kubelet 一旦看到 Pod 被标记为正在终止(已经设置了体面终止限期),kubelet 即开始本地的 Pod 关闭过程。
- 如果 Pod 中的容器之一定义了 preStop 回调, kubelet 开始在容器内运行该回调逻辑。如果超出体面终止限期时,preStop 回调逻辑 仍在运行,kubelet 会请求给予该 Pod 的宽限期一次性增加 2 秒钟。
说明: 如果 preStop 回调所需要的时间长于默认的体面终止限期,你必须修改 terminationGracePeriodSeconds 属性值来使其正常工作。
- kubelet 接下来触发容器运行时发送 TERM 信号给每个容器中的进程 1。
说明: Pod 中的容器会在不同时刻收到 TERM 信号,接收顺序也是不确定的。 如果关闭的顺序很重要,可以考虑使用 preStop 回调逻辑来协调。
- 与此同时,kubelet 启动体面关闭逻辑,控制面会将 Pod 从对应的端点列表(以及端点切片列表, 如果启用了的话)中移除,过滤条件是 Pod 被对应的 服务以某 选择算符选定。 ReplicaSets和其他工作负载资源 不再将关闭进程中的 Pod 视为合法的、能够提供服务的副本。关闭动作很慢的 Pod 也无法继续处理请求数据,因为负载均衡器(例如服务代理)已经在终止宽限期开始的时候 将其从端点列表中移除。
- 超出终止宽限期限时,kubelet 会触发强制关闭过程。容器运行时会向 Pod 中所有容器内 仍在运行的进程发送 SIGKILL 信号。 kubelet 也会清理隐藏的 pause 容器,如果容器运行时使用了这种容器的话。
- kubelet 触发强制从 API 服务器上删除 Pod 对象的逻辑,并将体面终止限期设置为 0 (这意味着马上删除)。
- API 服务器删除 Pod 的 API 对象,从任何客户端都无法再看到该对象。
简单概括为
- 删除一个Pod后系统默认给30s的宽限期,并将它的状态设置成Terminating
- kublectl发现Pod状态为Terminating则尝试执行preStop生命周期勾子,并可多给2s的宽限期
- 同时控制面将Pod中svc的endpoint中去除
- 宽限期到则发送TERM信号
- Pod还不关闭再发送SIGKILL强制关闭,并清理sandbox
- kubelet删除Pod资源对象。
下面则从kublet源码中查看这个过程
kubelet.syncLoop
前文讲到kubelet.syncLoop这个循环包含了kublet主要的核心的操作,Pod的启动从这里开始,Pod的关闭也从这里开始,与之前Pod启动的极为相似,最终还是到达了kublet的sync方法
kubelet.syncLoop /pkg/kubelet/kubelet.go |--kl.syncLoopIteration(updates, handler, syncTicker.C, housekeepingTicker.C, plegCh) |--u, open := <-configCh |--handler.HandlePodUpdates(u.Pods)即Kubelet.HandlePodUpdates |--kl.handleMirrorPod(pod, start) |--kl.dispatchWork |--kl.dispatchWork(pod, kubetypes.SyncPodCreate, mirrorPod, start) |--kl.podWorkers.UpdatePod即podWorkers.UpdatePod /pkg/kubelet/pod_worker.go |--p.managePodLoop |--p.syncPodFn
Kubelet.dispatchWork
但是需要穿插提前说一下这个方法,当pod的container是Termial(status.State.Terminated不为空)且DeletionTimestamp不为空(资源被调用删除后这个字段会填值),就会调用statusManager.TerminatePod,这个方法的作用后续会说,按着顺序走调用podWorkers.UpdatePod方法,传入的UpdateType是SyncPodUpdate。
func (kl *Kubelet) dispatchWork(pod *v1.Pod, syncType kubetypes.SyncPodType, mirrorPod *v1.Pod, start time.Time) { containersTerminal, podWorkerTerminal := kl.podAndContainersAreTerminal(pod) if pod.DeletionTimestamp != nil && containersTerminal { kl.statusManager.TerminatePod(pod) return } // Run the sync in an async worker. kl.podWorkers.UpdatePod(&UpdatePodOptions{ Pod: pod, MirrorPod: mirrorPod, UpdateType: syncType, OnCompleteFunc: func(err error) { if err != nil { metrics.PodWorkerDuration.WithLabelValues(syncType.String()).Observe(metrics.SinceInSeconds(start)) } }, }) }
Pod sync(Kubelet.syncPod)
还是走到kubelet.syncPod方法,在这个方法里面一开始也有一个killPod方法的的调用,但是本次进入传参updateType是SyncPodUpdate,因此会往下走,走到runnable.Admit的判断才是进入调用killPod方法
func (kl *Kubelet) syncPod(o syncPodOptions) error { // if we want to kill a pod, do it now! if updateType == kubetypes.SyncPodKill { kl.statusManager.SetPodStatus(pod, apiPodStatus) // we kill the pod with the specified grace period since this is a termination if err := kl.killPod(pod, nil, podStatus, killPodOptions.PodTerminationGracePeriodSecondsOverride); err != nil { } return nil } // Kill pod if it should not be running if !runnable.Admit || pod.DeletionTimestamp != nil || apiPodStatus.Phase == v1.PodFailed { var syncErr error if err := kl.killPod(pod, nil, podStatus, nil); err != nil { kl.recorder.Eventf(pod, v1.EventTypeWarning, events.FailedToKillPod, "error killing pod: %v", err) syncErr = fmt.Errorf("error killing pod: %v", err) utilruntime.HandleError(syncErr) } else { if !runnable.Admit { // There was no error killing the pod, but the pod cannot be run. // Return an error to signal that the sync loop should back off. syncErr = fmt.Errorf("pod cannot be run: %s", runnable.Message) } } return syncErr } }
killPod
Kubelet.syncPod /pkg/kubelet/kubelet.go |--Kubelet.killPod /pkg/kubelet/kubelet_pods.go |--kl.containerRuntime.KillPod |==kubeGenericRuntimeManager.KillPod /pkg/kubelet/kuberuntime/kuberuntime_manager.go | |- m.killPodWithSyncResult |--kl.containerManager.UpdateQOSCgroups()
经过多层的调用,来到kubeGenericRuntimeManager.killPodWithSyncResult方法,代码中关键操作有两个
1 先停止属于该pod的所有containers
2 然后再停止pod sandbox容器
func (m *kubeGenericRuntimeManager) killPodWithSyncResult(pod *v1.Pod, runningPod kubecontainer.Pod, gracePeriodOverride *int64) (result kubecontainer.PodSyncResult) { killContainerResults := m.killContainersWithSyncResult(pod, runningPod, gracePeriodOverride) // Stop all sandboxes belongs to same pod for _, podSandbox := range runningPod.Sandboxes { if err := m.runtimeService.StopPodSandbox(podSandbox.ID.ID); err != nil { } } return }
killContainer
kubeGenericRuntimeManager.killPodWithSyncResult /pkg/kubelet/kuberuntime/kuberuntime_manager.go |--m.killContainersWithSyncResult /pkg/kubelet/kuberuntime/kuberuntime_container.go |--m.killContainer
killContainersWithSyncResult经过两层调用来到kubeGenericRuntimeManager.killContainer,从代码看到
1 关闭pod的宽限时间设置
2 执行pod的preStop生命周期钩子
3 宽限时间不够可以再多给2s
4 停止容器
func (m *kubeGenericRuntimeManager) killContainer(pod *v1.Pod, containerID kubecontainer.ContainerID, containerName string, message string, gracePeriodOverride *int64) error { //1 关闭pod的宽限时间设置 gracePeriod := int64(minimumGracePeriodInSeconds) switch { case pod.DeletionGracePeriodSeconds != nil: gracePeriod = *pod.DeletionGracePeriodSeconds case pod.Spec.TerminationGracePeriodSeconds != nil: gracePeriod = *pod.Spec.TerminationGracePeriodSeconds } //2 执行pod的preStop生命周期钩子 if containerSpec.Lifecycle != nil && containerSpec.Lifecycle.PreStop != nil && gracePeriod > 0 { //这里执行完会返回剩余的宽限时间 gracePeriod = gracePeriod - m.executePreStopHook(pod, containerID, containerSpec, gracePeriod) } //3 宽限时间不够可以再多给2s // always give containers a minimal shutdown window to avoid unnecessary SIGKILLs if gracePeriod < minimumGracePeriodInSeconds { gracePeriod = minimumGracePeriodInSeconds } //4 停止容器 err := m.runtimeService.StopContainer(containerID.ID, gracePeriod) }
若要往下追源码,可在下面这方法看到往dockerDeamon发送stop容器的请求
func (cli *Client) ContainerStop(ctx context.Context, containerID string, timeout *time.Duration) error { query := url.Values{} if timeout != nil { query.Set("t", timetypes.DurationToSecondsString(*timeout)) } resp, err := cli.post(ctx, "/containers/"+containerID+"/stop", query, nil, nil) ensureReaderClosed(resp) return err }
调用链如下
m.runtimeService.StopContainer /pkg/kubelet/kuberuntime/kuberuntime_container.go |==remoteRuntimeService.StopContainer /pkg/kubelet/cri/remote/remote_runtime.go |--r.runtimeClient.StopContainer |==dockerService.StopContainer /pkg/kubelet/dockershim/docker_container.go |--ds.client.StopContainer |==kubeDockerClient.StopContainer /pkg/kubelet/dockershim/libdocker/kube_docker_client.go |--d.client.ContainerStop //就是上面的Client.ContainerStop
注:当使用GOALND看代码时追到r.runtimeClient.StopContainer时会发现调到cri-api包里面的RuntimeServiceClient,这个包处于vendor中,又找不到实现,实际上这里已经是kubelet开始调CRI了,目前的例子是使用docker作为CRI,那相关代码在/pkg/kubelet/dockershim里面找,这里是涉及到container的则看docker_container.go,像上一篇跟sandbox相关的在docker_sandbox.go里面找
StopPodSandbox
killPodWithSyncResult的另外一个关键调用就是调用StopPodSandbox方法,为了停止SandBox,主要步骤有
1 调用ds.network.TearDownPod:删除pod网络;
2 调用ds.client.StopContainer:停止pod sandbox容器。
代码位于/pkg/kubelet/dockershim/docker_sandbox.go
func (ds *dockerService) StopPodSandbox(ctx context.Context, r *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) { ready, ok := ds.getNetworkReady(podSandboxID) if !hostNetwork && (ready || !ok) { err := ds.network.TearDownPod(namespace, name, cID) } if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil { } }
TearDownPod是CRI的方法,用于清除容器网络,StopContainer则与上面停止业务容器时调用ds.client.StopContainer一样,实际上调用kubeDockerClient.StopContainer最终往dockerDaemon发stop容器的post请求。
后续清理Pod资源
至此Pod就停下来了,从状态Terminating转成Terminated,Pod这个资源将要etcd中删除,通过api-server查也查不到,这个调用api-server删pod资源的动作由kublet的statusManager执行
在执行kubelet的Run方法跑起kubelet的核心循环syncLoop之前,启动了各种manager,其中有一个便是statusManager,statusManager的Run方法是开了一个协程不断去循环同步Pod状态,触发方式有两种,其一是从通道里传入,单个执行同步;另一是通过定时器触发批量执行同步
代码位于/pkg/kubelet/status/status_manager.go
func (m *manager) Start() { go wait.Forever(func() { for { select { case syncRequest := <-m.podStatusChannel: m.syncPod(syncRequest.podUID, syncRequest.status) case <-syncTicker: for i := len(m.podStatusChannel); i > 0; i-- { <-m.podStatusChannel } m.syncBatch() } } }, 0) }
syncPod的简略如下
func (m *manager) syncPod(uid types.UID, status versionedPodStatus) { if m.canBeDeleted(pod, status.status) { deleteOptions := metav1.DeleteOptions{ GracePeriodSeconds: new(int64), // Use the pod UID as the precondition for deletion to prevent deleting a // newly created pod with the same name and namespace. Preconditions: metav1.NewUIDPreconditions(string(pod.UID)), } err = m.kubeClient.CoreV1().Pods(pod.Namespace).Delete(context.TODO(), pod.Name, deleteOptions) } }
执行canBeDeleted方法作用如函数名一致用于判定当前pod的状况能否去执行pod资源的删除,最终会调用到Kubelet.PodResourcesAreReclaimed方法,大致是判断pod的业务容器和sandbox是否有清理干净,volume有否卸载完毕,cgroup是否有清理完毕,代码位于/pkg/kubelet/kubelet_pods.go
func (kl *Kubelet) PodResourcesAreReclaimed(pod *v1.Pod, status v1.PodStatus) bool { if !notRunning(status.ContainerStatuses) { // We shouldn't delete pods that still have running containers klog.V(3).Infof("Pod %q is terminated, but some containers are still running", format.Pod(pod)) return false } // pod's containers should be deleted runtimeStatus, err := kl.podCache.Get(pod.UID) if err != nil { klog.V(3).Infof("Pod %q is terminated, Error getting runtimeStatus from the podCache: %s", format.Pod(pod), err) return false } if len(runtimeStatus.ContainerStatuses) > 0 { var statusStr string for _, status := range runtimeStatus.ContainerStatuses { statusStr += fmt.Sprintf("%+v ", *status) } klog.V(3).Infof("Pod %q is terminated, but some containers have not been cleaned up: %s", format.Pod(pod), statusStr) return false } // pod's sandboxes should be deleted if len(runtimeStatus.SandboxStatuses) > 0 { var sandboxStr string for _, sandbox := range runtimeStatus.SandboxStatuses { sandboxStr += fmt.Sprintf("%+v ", *sandbox) } klog.V(3).Infof("Pod %q is terminated, but some pod sandboxes have not been cleaned up: %s", format.Pod(pod), sandboxStr) return false } if kl.podVolumesExist(pod.UID) && !kl.keepTerminatedPodVolumes { // We shouldn't delete pods whose volumes have not been cleaned up if we are not keeping terminated pod volumes klog.V(3).Infof("Pod %q is terminated, but some volumes have not been cleaned up", format.Pod(pod)) return false } if kl.kubeletConfiguration.CgroupsPerQOS { pcm := kl.containerManager.NewPodContainerManager() if pcm.Exists(pod) { klog.V(3).Infof("Pod %q is terminated, but pod cgroup sandbox has not been cleaned up", format.Pod(pod)) return false } } return true }
同步Pod的触发源头
然后要找到这串逻辑触发的源头,就在先前kubelet.dispatchWork方法开头的那个判断,经过之前清理了各种容器后Pod的状态已转换成Terminated,再次走到dispatchWork方法时就会进入statusManager.TerminatePod
Kubelet.dispatchWork /pkg/kubelet/kubelet.go |--kl.statusManager.TerminatePod(pod) |==manager.TerminatePod /pkg/kubelet/status/status_manager.go |--m.updateStatusInternal |--m.podStatusChannel <- podStatusSyncRequest{pod.UID, newStatus}
寻找statusManager,调api删除pod资源,前者在startKubelet时开了个协程去同步,在kubelet.dispatchWork处调kl.statusManager.TerminatePod往通道里塞pod触发逻辑
小结
本篇从kubelet的主循环开始,讲述了kubelet启动pod的过程,包括状态更新,分配cgroup,创建容器目录,等待volume挂载,注入imagepull secret,创建sandbox,调用cni编织网络,启动临时容器,init容器,业务容器,执行postStart生命周期钩子。
如要回顾本系列的文章可点击
kubelet源码分析——kubelet简介与启动
kubelet源码分析——启动Pod
kubelet源码分析——关闭Pod
参考文章
kubernetes/k8s CRI 分析-kubelet删除pod分析
pod删除主要流程源码解析
Pod 的终止
这篇关于kubelet源码分析——关闭Pod的文章就介绍到这儿,希望我们推荐的文章对大家有所帮助,也希望大家多多支持为之网!
- 2024-11-14后台交互资料入门指南
- 2024-11-14如何轻松创建项目环境:新手入门教程
- 2024-11-14如何抽离公共代码:初级开发者指南
- 2024-11-14Python编程入门指南
- 2024-11-14Python编程入门:如何获取参数
- 2024-11-14JWT 用户校验:简单教程与实践
- 2024-11-14Pre-commit 自动化测试入门指南
- 2024-11-14Python编程基础
- 2024-11-14Server Action入门教程:轻松掌握服务器操作
- 2024-11-14Server Component入门教程:轻松搭建服务器组件