网站设计教学,专业网站制作电话,网站开发进阶实训报告,邢台做移动网站公司电话参考 k8s 污点驱逐详解-源码分析 - 掘金 k8s驱逐篇(5)-kube-controller-manager驱逐 - 良凯尔 - 博客园 k8s驱逐篇(6)-kube-controller-manager驱逐-NodeLifecycleController源码分析 - 良凯尔 - 博客园 k8s驱逐篇(7)-kube-controller-manager驱逐-taintManager源码分析 - 良…参考 k8s 污点驱逐详解-源码分析 - 掘金 k8s驱逐篇(5)-kube-controller-manager驱逐 - 良凯尔 - 博客园 k8s驱逐篇(6)-kube-controller-manager驱逐-NodeLifecycleController源码分析 - 良凯尔 - 博客园 k8s驱逐篇(7)-kube-controller-manager驱逐-taintManager源码分析 - 良凯尔 - 博客园
整体概况分析
基于 k8s 1.19 版本分析 TaintManager 与 非TaintManager
TaintManager 模式 发现 Node Unhealthy 后也就是 Node Ready Condition False 或 Unknown会更新 Pod Ready Condition 为 False表示 Pod 不健康也会给 Node 打上 NoExecute Effect 的 Taint之后 TaintManager 根据 Pod 的 Toleration 判断是否有设置容忍 NoExecute Effect Taint 的 Toleration 没有 Toleration 的话就立即驱逐有 Toleration 会根据 Toleration 设置的时长定时删除该 Pod默认情况下会设置个 5min 的Toleration也就是 5min 后会删除此 Pod 非 TaintManager 模式默认模式 发现 Node Unhealthy 后会更新 Pod Ready Condition 为 False表示 Pod 不健康之后会记录该 Node等待 PodTimeout5min - nodegracePeriod40s) 时间后驱逐该 Node 上所有 PodNode级别驱逐之后标记该 Node 为 evicted 状态此处是代码中标记资源上没有此状态之后便只考虑单 Pod 的驱逐可能考虑部分 Pod 失败等 若 Node 已经被标记为 evicted 状态那么可以进行单 Pod 的驱逐若 Node 没有被标记为 evicted 状态那将 Node 标记为 tobeevicted 状态等待上面 Node 级别的驱逐
代码中的几个存储结构
nodeEvictionMap *nodeEvictionMap// nodeEvictionMap stores evictionStatus *data for each node.*type nodeEvictionMap struct { lock sync.Mutex nodeEvictions map[string]evictionStatus}记录所有 node 的状态1. 健康 unmarked2. 等待驱逐 tobeevicted3. 驱逐完成 evictedzoneStates map[string]ZoneStatetype ZoneState string记录 zone 的健康状态1. 新zone Initial2. 健康的zone Normal3. 部分健康zone PartialDisruption4. 完全不健康 FullDisruption这个是用于设置该zone 的驱逐速率zonePodEvictor map[string]*scheduler.RateLimitedTimedQueue失联不健康的 Node 会放入此结构中等待被驱逐之后nodeEvictionMap 对应的状态记录会被设置为 evicted1. 该结构key 为zonevalue 为限速队列处理也就是上面驱逐效率起作用的地方2. 当一个 node 不健康首先会计算出该 node 对应的zone3. 然后放入该结构中nodeHealthMap *nodeHealthMaptype nodeHealthMap struct { lock sync.RWMutex nodeHealths map[string]*nodeHealthData}type nodeHealthData struct { probeTimestamp metav1.Time readyTransitionTimestamp metav1.Time status *v1.NodeStatus lease *coordv1.Lease}记录每个node的健康状态主要在 monitorHealth 函数中使用1. 其中 probeTimestamp 最关键该参数记录该 Node 最后一次健康的时间也就是失联前最后一个 lease 的时间2. 之后根据 probeTimestamp 和宽限时间 gracePeriod判断该 node 是否真正失联并设置为 unknown 状态
整体代码流程分析
// Run starts an asynchronous loop that monitors the status of cluster nodes.
func (nc *Controller) Run(stopCh -chan struct{}) {defer utilruntime.HandleCrash()
klog.Infof(Starting node controller)defer klog.Infof(Shutting down node controller)// 1.等待leaseInformer、nodeInformer、podInformerSynced、daemonSetInformerSynced同步完成。if !cache.WaitForNamedCacheSync(taint, stopCh, nc.leaseInformerSynced, nc.nodeInformerSynced, nc.podInformerSynced, nc.daemonSetInformerSynced) {return}// 2.如果enable-taint-managertrue,开启nc.taintManager.Runif nc.runTaintManager {go nc.taintManager.Run(stopCh)}// Close node update queue to cleanup go routine.defer nc.nodeUpdateQueue.ShutDown()defer nc.podUpdateQueue.ShutDown()// 3.执行doNodeProcessingPassWorker这个是处理nodeUpdateQueue队列的node// Start workers to reconcile labels and/or update NoSchedule taint for nodes.for i : 0; i scheduler.UpdateWorkerSize; i {// Thanks to workqueue, each worker just need to get item from queue, because// the item is flagged when got from queue: if new event come, the new item will// be re-queued until Done, so no more than one worker handle the same item and// no event missed.go wait.Until(nc.doNodeProcessingPassWorker, time.Second, stopCh)}// 4.doPodProcessingWorker这个是处理podUpdateQueue队列的podfor i : 0; i podUpdateWorkerSize; i {go wait.Until(nc.doPodProcessingWorker, time.Second, stopCh)}// 5. 如果开启了feature-gatesTaintBasedEvictionstrue执行doNoExecuteTaintingPass函数。否则执行doEvictionPass函数if nc.useTaintBasedEvictions {// Handling taint based evictions. Because we dont want a dedicated logic in TaintManager for NC-originated// taints and we normally dont rate limit evictions caused by taints, we need to rate limit adding taints.go wait.Until(nc.doNoExecuteTaintingPass, scheduler.NodeEvictionPeriod, stopCh)} else {// Managing eviction of nodes:// When we delete pods off a node, if the node was not empty at the time we then// queue an eviction watcher. If we hit an error, retry deletion.go wait.Until(nc.doEvictionPass, scheduler.NodeEvictionPeriod, stopCh)}// 6.一直监听node状态是否健康// Incorporate the results of node health signal pushed from kubelet to master.go wait.Until(func() {if err : nc.monitorNodeHealth(); err ! nil {klog.Errorf(Error monitoring node health: %v, err)}}, nc.nodeMonitorPeriod, stopCh)-stopCh
}
MonitorNodeHealth 此部分有如下几个作用 读取 Node 的 Label用于确定 Node 属于哪个 zone若该 zone 是新增的就注册到 zonePodEvictor 或 zoneNoExecuteTainter (TaintManager 模式) zonePodEvictor 后续用于该 zone 中失联的 Node用于 Node 级别驱逐就是驱逐 Node 上所有 Pod并设置为 evicted 状态此部分参见 // pkg/controller/nodelifecycle/node_lifecycle_controller.go
// addPodEvictorForNewZone checks if new zone appeared, and if so add new evictor.
// dfy: 若出现新的 zone 初始化 zonePodEvictor 或 zoneNoExecuteTainter
func (nc *Controller) addPodEvictorForNewZone(node *v1.Node) {nc.evictorLock.Lock()defer nc.evictorLock.Unlock()zone : utilnode.GetZoneKey(node)// dfy: 若出现新的 zone 初始化 zonePodEvictor 或 zoneNoExecuteTainterif _, found : nc.zoneStates[zone]; !found {// dfy: 没有找到 zone value设置为 Initialnc.zoneStates[zone] stateInitial// dfy: 没有 TaintManager创建一个 限速队列不太清楚有什么作用if !nc.runTaintManager {// dfy: zonePodEvictor 负责将 pod 从无响应的节点驱逐出去nc.zonePodEvictor[zone] scheduler.NewRateLimitedTimedQueue(flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))} else {// dfy: zoneNoExecuteTainter 负责为 node 打上污点 taintnc.zoneNoExecuteTainter[zone] scheduler.NewRateLimitedTimedQueue(flowcontrol.NewTokenBucketRateLimiter(nc.evictionLimiterQPS, scheduler.EvictionRateLimiterBurst))}// Init the metric for the new zone.klog.Infof(Initializing eviction metric for zone: %v, zone)evictionsNumber.WithLabelValues(zone).Add(0)}
}func (nc *Controller) doEvictionPass() {nc.evictorLock.Lock()defer nc.evictorLock.Unlock()for k : range nc.zonePodEvictor {// Function should return false and a time after which it should be retried, or true if it shouldnt (it succeeded).nc.zonePodEvictor[k].Try(func(value scheduler.TimedValue) (bool, time.Duration) {// dfy: 此处 value.Value 存储的是 Cluster Namenode, err : nc.nodeLister.Get(value.Value)if apierrors.IsNotFound(err) {klog.Warningf(Node %v no longer present in nodeLister!, value.Value)} else if err ! nil {klog.Warningf(Failed to get Node %v from the nodeLister: %v, value.Value, err)}nodeUID, _ : value.UID.(string)// dfy: 获得分配到该节点上的 Podpods, err : nc.getPodsAssignedToNode(value.Value)if err ! nil {utilruntime.HandleError(fmt.Errorf(unable to list pods from node %q: %v, value.Value, err))return false, 0}// dfy: 删除 Podremaining, err : nodeutil.DeletePods(nc.kubeClient, pods, nc.recorder, value.Value, nodeUID, nc.daemonSetStore)if err ! nil {// We are not setting eviction status here.// New pods will be handled by zonePodEvictor retry// instead of immediate pod eviction.utilruntime.HandleError(fmt.Errorf(unable to evict node %q: %v, value.Value, err))return false, 0}// dfy: 在nodeEvictionMap设置node的状态为evictedif !nc.nodeEvictionMap.setStatus(value.Value, evicted) {klog.V(2).Infof(node %v was unregistered in the meantime - skipping setting status, value.Value)}if remaining {klog.Infof(Pods awaiting deletion due to Controller eviction)}if node ! nil {zone : utilnode.GetZoneKey(node)evictionsNumber.WithLabelValues(zone).Inc()}return true, 0})}
}监听 Node 健康状态通过监听 Node Lease 进行判别 若 Lease 不更新且超过了容忍时间 gracePeriod认为该 Node 失联更新 Status Ready Condition 为 Unknown // tryUpdateNodeHealth checks a given nodes conditions and tries to update it. Returns grace period to
// which given node is entitled, state of current and last observed Ready Condition, and an error if it occurred.
func (nc *Controller) tryUpdateNodeHealth(node *v1.Node) (time.Duration, v1.NodeCondition, *v1.NodeCondition, error) {// 省略一大部分 probeTimestamp 更新逻辑// dfy: 通过 lease 更新来更新 probeTimestampobservedLease, _ : nc.leaseLister.Leases(v1.NamespaceNodeLease).Get(node.Name)if observedLease ! nil (savedLease nil || savedLease.Spec.RenewTime.Before(observedLease.Spec.RenewTime)) {nodeHealth.lease observedLeasenodeHealth.probeTimestamp nc.now()}// dfy: 注意此处 Lease 没更新导致 probeTimestamp 没变动因此 现在时间超过了容忍时间将此 Node 视作失联 Nodeif nc.now().After(nodeHealth.probeTimestamp.Add(gracePeriod)) {// NodeReady condition or lease was last set longer ago than gracePeriod, so// update it to Unknown (regardless of its current value) in the master.nodeConditionTypes : []v1.NodeConditionType{v1.NodeReady,v1.NodeMemoryPressure,v1.NodeDiskPressure,v1.NodePIDPressure,// We dont change NodeNetworkUnavailable condition, as its managed on a control plane level.// v1.NodeNetworkUnavailable,}nowTimestamp : nc.now()// dfy: 寻找 node 是否有上面几个异常状态for _, nodeConditionType : range nodeConditionTypes {// dfy: 具有异常状态就进行记录_, currentCondition : nodeutil.GetNodeCondition(node.Status, nodeConditionType)if currentCondition nil {klog.V(2).Infof(Condition %v of node %v was never updated by kubelet, nodeConditionType, node.Name)node.Status.Conditions append(node.Status.Conditions, v1.NodeCondition{Type: nodeConditionType,Status: v1.ConditionUnknown,Reason: NodeStatusNeverUpdated,Message: Kubelet never posted node status.,LastHeartbeatTime: node.CreationTimestamp,LastTransitionTime: nowTimestamp,})} else {klog.V(2).Infof(node %v hasnt been updated for %v. Last %v is: %v,node.Name, nc.now().Time.Sub(nodeHealth.probeTimestamp.Time), nodeConditionType, currentCondition)if currentCondition.Status ! v1.ConditionUnknown {currentCondition.Status v1.ConditionUnknowncurrentCondition.Reason NodeStatusUnknowncurrentCondition.Message Kubelet stopped posting node status.currentCondition.LastTransitionTime nowTimestamp}}}// We need to update currentReadyCondition due to its value potentially changed._, currentReadyCondition nodeutil.GetNodeCondition(node.Status, v1.NodeReady)if !apiequality.Semantic.DeepEqual(currentReadyCondition, observedReadyCondition) {if _, err : nc.kubeClient.CoreV1().Nodes().UpdateStatus(context.TODO(), node, metav1.UpdateOptions{}); err ! nil {klog.Errorf(Error updating node %s: %v, node.Name, err)return gracePeriod, observedReadyCondition, currentReadyCondition, err}nodeHealth nodeHealthData{status: node.Status,probeTimestamp: nodeHealth.probeTimestamp,readyTransitionTimestamp: nc.now(),lease: observedLease,}return gracePeriod, observedReadyCondition, currentReadyCondition, nil}}return gracePeriod, observedReadyCondition, currentReadyCondition, nil
}根据 zone 设置驱逐速率 每个 zone 有不同数量的 Node根据该 zone 中 Node 失联数量的占比设置不同的驱逐速率 // dfy 1. 计算 zone 不健康程度 2. 根据 zone 不健康程度设置不同的驱逐速率
func (nc *Controller) handleDisruption(zoneToNodeConditions map[string][]*v1.NodeCondition, nodes []*v1.Node) {newZoneStates : map[string]ZoneState{}allAreFullyDisrupted : truefor k, v : range zoneToNodeConditions {zoneSize.WithLabelValues(k).Set(float64(len(v)))// dfy: 计算该 zone 的不健康程度就是失联 node 的占比// nc.computeZoneStateFunc nc.ComputeZoneStateunhealthy, newState : nc.computeZoneStateFunc(v)zoneHealth.WithLabelValues(k).Set(float64(100*(len(v)-unhealthy)) / float64(len(v)))unhealthyNodes.WithLabelValues(k).Set(float64(unhealthy))if newState ! stateFullDisruption {allAreFullyDisrupted false}newZoneStates[k] newStateif _, had : nc.zoneStates[k]; !had {klog.Errorf(Setting initial state for unseen zone: %v, k)nc.zoneStates[k] stateInitial}}allWasFullyDisrupted : truefor k, v : range nc.zoneStates {if _, have : zoneToNodeConditions[k]; !have {zoneSize.WithLabelValues(k).Set(0)zoneHealth.WithLabelValues(k).Set(100)unhealthyNodes.WithLabelValues(k).Set(0)delete(nc.zoneStates, k)continue}if v ! stateFullDisruption {allWasFullyDisrupted falsebreak}}// At least one node was responding in previous pass or in the current pass. Semantics is as follows:// - if the new state is partialDisruption we call a user defined function that returns a new limiter to use,// - if the new state is normal we resume normal operation (go back to default limiter settings),// - if new state is fullDisruption we restore normal eviction rate,// - unless all zones in the cluster are in fullDisruption - in that case we stop all evictions.if !allAreFullyDisrupted || !allWasFullyDisrupted {// Were switching to full disruption modeif allAreFullyDisrupted {klog.V(0).Info(Controller detected that all Nodes are not-Ready. Entering master disruption mode.)for i : range nodes {if nc.runTaintManager {_, err : nc.markNodeAsReachable(nodes[i])if err ! nil {klog.Errorf(Failed to remove taints from Node %v, nodes[i].Name)}} else {nc.cancelPodEviction(nodes[i])}}// We stop all evictions.for k : range nc.zoneStates {if nc.runTaintManager {nc.zoneNoExecuteTainter[k].SwapLimiter(0)} else {nc.zonePodEvictor[k].SwapLimiter(0)}}for k : range nc.zoneStates {nc.zoneStates[k] stateFullDisruption}// All rate limiters are updated, so we can return early here.return}// Were exiting full disruption modeif allWasFullyDisrupted {klog.V(0).Info(Controller detected that some Nodes are Ready. Exiting master disruption mode.)// When exiting disruption mode update probe timestamps on all Nodes.now : nc.now()for i : range nodes {v : nc.nodeHealthMap.getDeepCopy(nodes[i].Name)v.probeTimestamp nowv.readyTransitionTimestamp nownc.nodeHealthMap.set(nodes[i].Name, v)}// We reset all rate limiters to settings appropriate for the given state.for k : range nc.zoneStates {// dfy: 设置 zone 的驱逐速率nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newZoneStates[k])nc.zoneStates[k] newZoneStates[k]}return}// We know that theres at least one not-fully disrupted so,// we can use default behavior for rate limitersfor k, v : range nc.zoneStates {newState : newZoneStates[k]if v newState {continue}klog.V(0).Infof(Controller detected that zone %v is now in state %v., k, newState// dfy: 设置 zone 的驱逐速率nc.setLimiterInZone(k, len(zoneToNodeConditions[k]), newState)nc.zoneStates[k] newState}}
}// ComputeZoneState returns a slice of NodeReadyConditions for all Nodes in a given zone.
// The zone is considered:
// - fullyDisrupted if therere no Ready Nodes,
// - partiallyDisrupted if at least than nc.unhealthyZoneThreshold percent of Nodes are not Ready,
// - normal otherwise
func (nc *Controller) ComputeZoneState(nodeReadyConditions []*v1.NodeCondition) (int, ZoneState) {readyNodes : 0notReadyNodes : 0for i : range nodeReadyConditions {if nodeReadyConditions[i] ! nil nodeReadyConditions[i].Status v1.ConditionTrue {readyNodes} else {notReadyNodes}}switch {case readyNodes 0 notReadyNodes 0:return notReadyNodes, stateFullDisruptioncase notReadyNodes 2 float32(notReadyNodes)/float32(notReadyNodesreadyNodes) nc.unhealthyZoneThreshold:return notReadyNodes, statePartialDisruptiondefault:return notReadyNodes, stateNormal}
}// dfy: 根据该 zone 健康状态也就是健康比例设置驱逐效率(频率
func (nc *Controller) setLimiterInZone(zone string, zoneSize int, state ZoneState) {switch state {case stateNormal:if nc.runTaintManager {nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.evictionLimiterQPS)} else {nc.zonePodEvictor[zone].SwapLimiter(nc.evictionLimiterQPS)}case statePartialDisruption:if nc.runTaintManager {nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.enterPartialDisruptionFunc(zoneSize))} else {nc.zonePodEvictor[zone].SwapLimiter(nc.enterPartialDisruptionFunc(zoneSize))}case stateFullDisruption:if nc.runTaintManager {nc.zoneNoExecuteTainter[zone].SwapLimiter(nc.enterFullDisruptionFunc(zoneSize))} else {nc.zonePodEvictor[zone].SwapLimiter(nc.enterFullDisruptionFunc(zoneSize))}}
}进行 Pod 驱逐的处理 proceeNoTaintBaseEviction
TaintManger.Run TainManager 的驱逐逻辑看代码不难理解大概说明 若开启 TaintManager 模式所有 Pod、Node 的改变都会被放入nc.tc.podUpdateQueue 和 nc.tc.nodeUpdateQueue 中 当 Node 失联时会被打上 NoExecute Effect Taint不在此处在 main Controller.Run 函数中 此处会先处理 nc.tc.nodeUpdateQueue 的驱逐 首先会检查 Node 是否有 NoExecute Effect Taint没有就取消驱逐 有的话进行 Pod 的逐个驱逐检查 Pod 是否有该 Taint 的 toleration有的话就根据 toleration 设置 pod 的定时删除没有 Toleration就立即删除 接下来处理 nc.tc.podUpdateQueue 的驱逐 进行 Pod 的逐个驱逐检查 Pod 是否有该 Taint 的 toleration有的话就根据 toleration 设置 pod 的定时删除没有 Toleration就立即删除 Node Pod 的处理
此处就是 nc.podUpdateQueue 和 nc.NodeUpdateQueue 的一些驱逐逻辑比如给 Node 打上 NoSchedule Taint检测到 Node 不健康给 Pod 打上 Ready Condition False 的 Status Condition进行 Pod 驱逐的处理 proceeNoTaintBaseEviction 驱逐
此处 TaintManager 模式只是打上 NoExecute Effect Taint —— doNoExecuteTaintingPass 函数非 TaintManager 模式会清理 zonePodEvicotr 记录的 Node 上的所有 Pod Node 级别驱逐