电子商务网站建设读书报告,做网站前的准备,有域名自己做网站吗,中国建设银行大沥网站[阅读指南] 这是该系列第四篇 基于kubernetes 1.27 stage版本 为了方便阅读#xff0c;后续所有代码均省略了错误处理及与关注逻辑无关的部分。 文章目录 client-go中的存储结构DeltaFIFOdelta索引 keyqueue push操作delta push 去重 queue pop操作 总结 client-go中的存储结构… [阅读指南] 这是该系列第四篇 基于kubernetes 1.27 stage版本 为了方便阅读后续所有代码均省略了错误处理及与关注逻辑无关的部分。 文章目录 client-go中的存储结构DeltaFIFOdelta索引 keyqueue push操作delta push 去重 queue pop操作 总结 client-go中的存储结构
如下图clinet-go中定义了存储类型接口store用来提供存储对象的基本能力。
queue继承了store接口并提供了队列的能力队列中可以保存需要增删改的存储对象的key它会取出队头元素调用PopProcessFunc处理。
queue的实现有两个FIFO和deltaFIFO。
deltaFIFO的不同点在于deltaFIFO队列中key对应的不是对象本身而是对象的delta。
另外deltaFIFO除了通过add、update、delete添加元素还有两种特殊的方式replaced和sync。replaced一般发生在资源版本更新时而sync由resync定时发起。
DeltaFIFO
下面是deltaFIFO数据结构的定义
type DeltaFIFO struct {// 并发读写锁lock sync.RWMutexcond sync.Cond// items maps a key to a Deltas.// 资源对象的key与对应的delta数组每个数组至少都会有一个deltaitems map[string]Deltas// 按照FIFO队列顺序存储key用来给pop()消费。// 该数组不会有重复值并且所有元素都一定在items中queue []string// 生成key值的函数默认是 MetaNamespaceKeyFunckeyFunc KeyFunc// 本地缓存中已知的所有资源对象的keyknownObjects KeyListerGetter......
}delta
如前面所说deltaFIFO中key映射的不是对象本身是delta数组。
根据Delta数据结构的定义delta包含了一个资源对象的变更类型及变更的内容。这里的Object不一定是完整的资源数据大部分场景下只会有变更的部分信息。
type Delta struct {Type DeltaTypeObject interface{}
}type DeltaType string
const (Added DeltaType AddedUpdated DeltaType UpdatedDeleted DeltaType DeletedReplaced DeltaType ReplacedSync DeltaType Sync
)举个栗子本地已经有了一个pod对象
Pod{Name: mypod,Namespace: default,Labels: map[string]string{app: web, version: 0.0.1},
}此时mypod的 lable从web变成了app-serverreflector就会创建一个这样的delta对象放入FIFO队列中。
Delta{Type: Updated,Object: Pod{Name: mypod,Namespace: default,Labels: map[string]string{app: app-server},},
}索引 key
deltaFIFO队列中存储的是delta的key值通过key值可以在items map中获取到对应的delta对象。
这个key值在初始化FIFO时通过KeyFunction进行定义使用者没有指定时都会使用自带的命名函数 MetaNamespaceKeyFunc 进行命名命名规则是
namespace不为空key为/namespace为空key为
这里的name是在yaml资源配置中的matadata.name比如上面的mypod。在同一个资源下name在所有api version都一定是唯一的。
func MetaNamespaceKeyFunc(obj interface{}) (string, error) {if key, ok : obj.(ExplicitKey); ok {return string(key), nil}meta, err : meta.Accessor(obj)if len(meta.GetNamespace()) 0 {return meta.GetNamespace() / meta.GetName(), nil}return meta.GetName(), nil
}queue push操作
watcher监控的资源变更时会调用deltaFIFO中Added、Updated、Deleted、Replaced、Sync方法最终它们都会通过queueActionLocked 方法往deltaFIFO队列中加入对应类型的delta对象。
queueActionLocked 也就是deltaFIFO的入队操作。
和一般的入队不同的是新加入的delta不是直接加入到队尾队列queue数组中保存的是delta的key。所以入队的操作是这样的
获取delta对应的key值还记得keyfunc吗又是它如果delta所属的资源key已经在队列中直接将delta添加到key对应到deltas数组末尾。更新已存在的资源delta并不会影响他的key在队列中的位置。如果delta所属的资源key不在队列中就将key添加到队列末尾并在items中关联key和delta
func (f *DeltaFIFO) queueActionLocked(actionType DeltaType, obj interface{}) error {id, err : f.KeyOf(obj)// 自定义的转换函数。可以在delta事件被处理之前完成一些预处理// 常见的用法是用来过滤一些处理程序不关注的资源对象、以及处理数据格式等if f.transformer ! nil {obj, err f.transformer(obj)}// 将新的delta放入资源key对应的delta数组末尾// 如果原本的key不存在就是创建了一个新的数组并将新的delta放入其中oldDeltas : f.items[id]newDeltas : append(oldDeltas, Delta{actionType, obj})// 对delta数组中的delta去重newDeltas dedupDeltas(newDeltas)// 判断key是否已经在队列中并且更新key对应的delta数组if len(newDeltas) 0 {if _, exists : f.items[id]; !exists {f.queue append(f.queue, id)}f.items[id] newDeltasf.cond.Broadcast()}return nil
}delta push 去重
上一节提到delta进行push操作时会对加入的delta进行去重。去重逻辑目前只针对两个delete类型的delta有效当delta数组中倒数第一个和第二个delta都是delete类型时将会去掉其中一个。
func dedupDeltas(deltas Deltas) Deltas {n : len(deltas)if n 2 {return deltas}a : deltas[n-1]b : deltas[n-2]if out : isDup(a, b); out ! nil {deltas[n-2] *outreturn deltas[:n-1]}return deltas
}// 判断a、b两个delta是否重复
// 目前暂时只有两个delete类型的delta会被判定为重复。
func isDup(a, b *Delta) *Delta {if out : isDeletionDup(a, b); out ! nil {return out}return nil
}// 判定两个delta是否都是deleted类型
func isDeletionDup(a, b *Delta) *Delta {if b.Type ! Deleted || a.Type ! Deleted {return nil}if _, ok : b.Object.(DeletedFinalStateUnknown); ok {return a}return b
}举个小小的例子来回顾一下delta push操作。假设queue中有3个pod对象对应了不同的变更事件如下所示。
此时watcher监听到资源发生变化
pod2收到了updated事件pod1收到了deleted事件pod3收到了deleted事件
于是三个delta入队成功后的队列图如下
pod1已有一个deleted事件再次收到deleted后经过dedupDeltas去重最终只保留一个deleted。
pod3虽然有两个deleted事件但是他们并不是连续的事件不会被去重
queue pop操作
deltaFIFO出队的操作和普通的队列出队类似从队头取出一个资源对象key并删除items中key对应的deltas数组。
pop出队时会调用传参PopProcessFunc对出队元素进行处理。
func (f *DeltaFIFO) Pop(process PopProcessFunc) (interface{}, error) {f.lock.Lock()defer f.lock.Unlock()for {for len(f.queue) 0 {// 队列为空时阻塞if f.closed {return nil, ErrFIFOClosed}f.cond.Wait()}// 取出队首的资源对象keyid : f.queue[0]f.queue f.queue[1:]// 获取key对应的deltas数组item, ok : f.items[id]// 执行pop处理函数处理delta事件如果处理失败了资源对象会被重新加入到队列中。// 但是如果队列中存在相同的对象资源对象会被丢弃。err : process(item, isInInitialList)if e, ok : err.(ErrRequeue); ok {f.addIfNotPresent(id, item)err e.Err}return item, err}
}这里一开始有个小疑问如果资源的delta处理失败了并且队列中又出现了同样的资源key这部分delta数据不就丢失了吗 但是仔细看出队入队公用一个锁pop处理对象时不会有新的对象入队所以理论上不会出现在addIfNotPresent时key是persent的情况。而deltaFIFO入队的逻辑也不会存在一个队列有两个相同的key的情况所以不会有丢失的问题addIfNotPresent应该只是加多一层保障。如果理解有问题欢迎大佬们指正。 回顾一下pop的调用方processLoop调用pop时传入PopProcessFunc(c.config.Process))。
系列第一篇介绍informer时提到过c.config.Process最终调用的是processDeltas函数它包含了数据同步到存储以及调用注册的用户函数两个操作。
func (c *controller) processLoop() {for {obj, err : c.config.Queue.Pop(PopProcessFunc(c.config.Process))if err ! nil {...}}
}// 数据处理函数
func processDeltas(handler ResourceEventHandler,clientState Store,deltas Deltas,isInInitialList bool,
) error {// from oldest to newestfor _, d : range deltas {obj : d.Object// 区分事件类型进行处理switch d.Type {case Sync, Replaced, Added, Updated:// 同步存储数据if old, exists, err : clientState.Get(obj); err nil exists {if err : clientState.Update(obj); err ! nil {return err}// 回调用户函数handler.OnUpdate(old, obj)} else {// 同步存储数据if err : clientState.Add(obj); err ! nil {return err}// 回调用户函数handler.OnAdd(obj, isInInitialList)}case Deleted:// 同步存储数据if err : clientState.Delete(obj); err ! nil {return err}// 回调用户函数handler.OnDelete(obj)}}return nil
}
总结
还是用上一节的例子小结回顾一下整体的流程