第69节 DeltaFIFO 原理


❤️💕💕新时代拥抱云原生,云原生具有环境统一、按需付费、即开即用、稳定性强特点。Myblog:http://nsddd.topopen in new window


[TOC]

介绍

okkey 我们知道,再上一个 Reflector 从 API Server 监听(watch)特定类型的资源,拿到变更通知后,将其放入到 DeltaFIFO 队列中。

DeltaFIFO 是 Kubernetes 为我们提供了一个存储。

不仅仅是 DeltaFIFO , Kubernetes 还为我们提供了很多存储

Kubernetes 提供的存储

cache 主要实现了 Store,利用了 threadSafeMap 存放数据

type Store interface {

	// Add adds the given object to the accumulator associated with the given object's key
	Add(obj interface{}) error

	// Update updates the given object in the accumulator associated with the given object's key
	Update(obj interface{}) error

	// Delete deletes the given object from the accumulator associated with the given object's key
	Delete(obj interface{}) error

	// List returns a list of all the currently non-empty accumulators
	List() []interface{}

	// ListKeys returns a list of all the keys currently associated with non-empty accumulators
	ListKeys() []string

	// Get returns the accumulator associated with the given object's key
	Get(obj interface{}) (item interface{}, exists bool, err error)

	// GetByKey returns the accumulator associated with the given key
	GetByKey(key string) (item interface{}, exists bool, err error)

	// Replace will delete the contents of the store, using instead the
	// given list. Store takes ownership of the list, you should not reference
	// it after calling this function.
	Replace([]interface{}, string) error

	// Resync is meaningless in the terms appearing here but has
	// meaning in some implementations that have non-trivial
	// additional behavior (e.g., DeltaFIFO).
	Resync() error
}

📜 对上面的解释:

  • Add(obj interface{}) error: 将给定对象添加到与给定对象的键相关联的累加器中
  • Update(obj interface{}) error: 更新与给定对象的键相关联的累加器中的给定对象
  • Delete(obj interface{}) error: 从与给定对象的键相关联的累加器中删除给定对象
  • List() []interface{}: 返回所有当前非空累加器的列表
  • ListKeys() []string: 返回当前与非空累加器关联的所有键的列表
  • Get(obj interface{}) (item interface{}, exists bool, err error): 返回与给定对象的键相关联的累加器中的累加器
  • GetByKey(key string) (item interface{}, exists bool, err error): 返回与给定键相关联的累加器
  • Replace([]interface{}, string) error: 删除存储中的内容,使用给定的列表替换。Store 获取该列表的所有权,调用此函数后不应再引用该列表。
  • Resync() error: 在此处出现的术语中毫无意义,但在某些具有非平凡附加行为(例如 DeltaFIFO)的实现中具有意义。

分类:

  • Add(obj interface{}) error
    • 功能:将给定对象添加到与给定对象的键相关联的累加器中
    • 参数:
      • obj:要添加的对象
    • 返回值:无
  • Update(obj interface{}) error
    • 功能:更新与给定对象的键相关联的累加器中的给定对象
    • 参数:
      • obj:要更新的对象
    • 返回值:无
  • Delete(obj interface{}) error
    • 功能:从与给定对象的键相关联的累加器中删除给定对象
    • 参数:
      • obj:要删除的对象
    • 返回值:无
  • List() []interface{}
    • 功能:返回所有当前非空累加器的列表
    • 参数:无
    • 返回值:所有当前非空累加器的列表
  • ListKeys() []string
    • 功能:返回当前与非空累加器关联的所有键的列表
    • 参数:无
    • 返回值:当前与非空累加器关联的所有键的列表
  • Get(obj interface{}) (item interface{}, exists bool, err error)
    • 功能:返回与给定对象的键相关联的累加器中的累加器
    • 参数:
      • obj:要获取的对象
    • 返回值:
      • item:与给定对象的键相关联的累加器中的累加器
      • exists:是否存在该累加器
      • err:错误信息(如果有)
  • GetByKey(key string) (item interface{}, exists bool, err error)
    • 功能:返回与给定键相关联的累加器
    • 参数:
      • key:要获取的键
    • 返回值:
      • item:与给定键相关联的累加器
      • exists:是否存在该累加器
      • err:错误信息(如果有)
  • Replace([]interface{}, string) error
    • 功能:删除存储中的内容,使用给定的列表替换。Store 获取该列表的所有权,调用此函数后不应再引用该列表。
    • 参数:
      • []interface{}:要替换的列表
      • string:用于记录日志的字符串
    • 返回值:错误信息(如果有)
  • Resync() error
    • 功能:在此处出现的术语中毫无意义,但在某些具有非平凡附加行为(例如 DeltaFIFO)的实现中具有意义。
    • 参数:无
    • 返回值:错误信息(如果有)

UndeltaStore

实现了 Store ,利用了 cache 存放数据,数据变更的时候通过 PushFunc 发送当前完整的状态。

type UndeltaStore struct {
	Store
	PushFunc func([]interface{})
}

可以看到 UndeltaStore 结构体嵌套了一个接口,这样做的目的是将接口的方法作为结构体的一部分,以便更方便地访问接口的方法。

比如说 Add:

func (u *UndeltaStore) Add(obj interface{}) error {
	if err := u.Store.Add(obj); err != nil {
		return err
	}
	u.PushFunc(u.Store.List())
	return nil
}

Heap

Heap 通过实现 Store,利用 data 数据结构存放数据,实现堆数据结构,用于优先级队列。

// heapData is an internal struct that implements the standard heap interface
// and keeps the data stored in the heap.
type heapData struct {
	// items is a map from key of the objects to the objects and their index.
	// We depend on the property that items in the map are in the queue and vice versa.
	items map[string]*heapItem
	// queue implements a heap data structure and keeps the order of elements
	// according to the heap invariant. The queue keeps the keys of objects stored
	// in "items".
	queue []string

	// keyFunc is used to make the key used for queued item insertion and retrieval, and
	// should be deterministic.
	keyFunc KeyFunc
	// lessFunc is used to compare two objects in the heap.
	lessFunc LessFunc
}

FIFO

FIFO 实现了 Queue (包含 Store),利用自己内部的 Items 数据结构存放数据。

// Queue extends Store with a collection of Store keys to "process".
// Every Add, Update, or Delete may put the object's key in that collection.
// A Queue has a way to derive the corresponding key given an accumulator.
// A Queue can be accessed concurrently from multiple goroutines.
// A Queue can be "closed", after which Pop operations return an error.
type Queue interface {
	Store

	// Pop blocks until there is at least one key to process or the
	// Queue is closed.  In the latter case Pop returns with an error.
	// In the former case Pop atomically picks one key to process,
	// removes that (key, accumulator) association from the Store, and
	// processes the accumulator.  Pop returns the accumulator that
	// was processed and the result of processing.  The PopProcessFunc
	// may return an ErrRequeue{inner} and in this case Pop will (a)
	// return that (key, accumulator) association to the Queue as part
	// of the atomic processing and (b) return the inner error from
	// Pop.
	Pop(PopProcessFunc) (interface{}, error)

	// AddIfNotPresent puts the given accumulator into the Queue (in
	// association with the accumulator's key) if and only if that key
	// is not already associated with a non-empty accumulator.
	AddIfNotPresent(interface{}) error

	// HasSynced returns true if the first batch of keys have all been
	// popped.  The first batch of keys are those of the first Replace
	// operation if that happened before any Add, AddIfNotPresent,
	// Update, or Delete; otherwise the first batch is empty.
	HasSynced() bool

	// Close the queue
	Close()
}

DeltaFIFO

接下来就是重点 DeltaFIFO 的部分了

应用场景

DeltaFIFO 的主要应用场景如下:

  • 当你希望处理每一个对象的变化最多一次
  • 当你处理一个对象的时候,希望知道这个对象与你上一次处理时,发生了哪些变化。
  • 当你希望一个对象删除的时候,你仍然能够处理它
  • 能够周期性的处理所有的对象

结构体:

DeltaFIFO是K8s中用来存储处理数据的Queue,相较于传统的FIFO,它不仅仅存储了数据保证了先进先出,而且存储有K8s 资源对象的类型。其是连接Reflector(生产者)和indexer(消费者)的重要通道。

type DeltaFIFO struct {
	// lock/cond protects access to 'items' and 'queue'.
	lock sync.RWMutex
	cond sync.Cond

	// `items` maps a key to a Deltas.
	// Each such Deltas has at least one Delta.
	items map[string]Deltas

	// `queue` maintains FIFO order of keys for consumption in Pop().
	// There are no duplicates in `queue`.
	// A key is in `queue` if and only if it is in `items`.
	queue []string

	// populated is true if the first batch of items inserted by Replace() has been populated
	// or Delete/Add/Update/AddIfNotPresent was called first.
	populated bool
	// initialPopulationCount is the number of items inserted by the first call of Replace()
	initialPopulationCount int

	// keyFunc is used to make the key used for queued item
	// insertion and retrieval, and should be deterministic.
	keyFunc KeyFunc

	// knownObjects list keys that are "known" --- affecting Delete(),
	// Replace(), and Resync()
	knownObjects KeyListerGetter

	// Used to indicate a queue is closed so a control loop can exit when a queue is empty.
	// Currently, not used to gate any of CRUD operations.
	closed bool

	// emitDeltaTypeReplaced is whether to emit the Replaced or Sync
	// DeltaType when Replace() is called (to preserve backwards compat).
	emitDeltaTypeReplaced bool

	// Called with every object if non-nil.
	transformer TransformFunc
}

📜 对上面的解释:

  • items 是计算的 key,value 是一个 Deltas 的结构体,
  • queue:保证这个队列的顺序性
  • keyFunc:我们默认使用 <namespce>/<name> 不指定 namespace 时候用 <name>
  • knownObjects : 专门用来存放数据的地方,其实就是 Indexer

事件的生产和消费

作为一个中间管道的作用,对应的一边就是生产者,一遍就是消费者。

生产

  • Reflector 的 List
  • Reflector 的 Watch
  • Reflector 的 Resync

消费

  • 事件派发到 work queue
  • 刷新本地缓存

Indexer

Indexer 主要提供了一个对象根据一定检索能力,典型的实现就是通过 namespace 来构建 Key,通过 Thread Safe Store 来存储对象。

END 链接