第68节 Reflector 原理


❤️💕💕新时代拥抱云原生,云原生具有环境统一、按需付费、即开即用、稳定性强特点。Myblog:http://nsddd.topopen in new window


[TOC]

介绍

reflectorclient-go 里面的主要作用是与apiserver进行交互,获取自定义资源数据更新到 Delta FIFO 队列里面,所以它主要分为两部分功能,一部分是list和watch的功能,另一部分是更新缓存的功能。

func NewReflector(lw ListerWatcher, expectedType interface{}, store Store, resyncPeriod time.Duration) *Reflector {
  ...
}

参数说明:

  • lw: ListerWatcher 接口的实现,用于获取 Kubernetes API Server 中资源的状态。
  • expectedType: 指定了需要被处理的 Kubernetes API 对象的类型。
  • store: 存储 Kubernetes 资源对象的数据结构,用于与 Kubernetes API 交互。
  • resyncPeriod: 周期性地重新同步资源对象的时间间隔。

注意:

Kubernetes 中对 API Server 是一个长连接而不是轮询。

List 和 Watch 可以保证 可靠性、实时性和顺序性。

  • List 方法指定类型资源对象全量更新,并且更新到缓存上。可以列出 Kubernetes API Server 中指定类型的所有资源对象。

    curl -iv https://127.0.0.1:8001/api/v1/namespaces/default/pods
    
  • Watch 方法可以监听 Kubernetes API Server 中指定类型的资源对象的变化,当资源对象发生变化时,会返回一个 watch.Event 对象,其中包含了发生变化的资源对象的信息。假如要监听pod的变化时,可以在List这个API上加一个参数watch=true 就可以监听资源对象的事件变化)。

    curl -iv https://127.0.0.1:8001/api/v1/namespaces/default/pod\?watch\=ture
    

ResourceVersion与Bookmarks

ResourceVersion

在K8S中每一个资源对象都有一个ResourceVersion,当资源对象发生变更时ResourceVersion就会发生递增的变更,使对应到 ETCD 的 reversion 的变更

  • 保证客户端数据一致性和顺序性
  • 并发控制(可以保证多个客户端并发去更改资源对象时,出现的并发问题)

Bookmarks:

  • 减少apiserver负载(高版本的apiserver才支持Bookmarks)
  • 更新客户端保存的最近一次ResourceVersion

所以说我们的客户端当接受到bookmark的事件的时候,我们可以从这个事件里面的ResourceVersion提取出来,然后更新我们本地的ResourceVersion,然后接着以这个ResourceVersion去watch我们的资源对象的变更。

Bookmarks是Kubernetes API Server的一个配置选项,它允许客户端在长轮询请求中传递一个所谓的“书签”,这个书签是一个字符串,它描述了客户端最后一次看到的资源版本。这样,API Server就可以只返回新的或更新的资源,而不是整个资源列表。这样可以减少网络流量和API Server的负载。

所以说我们的客户端当接受到bookmark的事件的时候,我们可以从这个事件里面的ResourceVersion提取出来,然后更新我们本地的ResourceVersion,然后接着以这个ResourceVersion去watch我们的资源对象的变更。

Reflector与RESTClient

我们知道 Kubernetes 中 所有的组件需要去操作集群资源的时候都需要通过调用 kube-apiserver 提供的 RESTful 接口,kube-apiserver 进一步和 ETCD 交互,完成资源的信息更新。

那么 Reflector如果要跟apiserver进行交互的话,就会通过RESTClient或者是其他的client。以下例子是informer里面的,它实现了相应的接口,并提供了list和watch方法,里面的逻辑就是调用的clientset。

        ...
        &cache.ListWatch{
            ListFunc: func(options metav1.ListOptions) (runtime.Object, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).List(context.TODO(), options)
            },
            WatchFunc: func(options metav1.ListOptions) (watch.Interface, error) {
                if tweakListOptions != nil {
                    tweakListOptions(&options)
                }
                return client.CoreV1().Pods(namespace).Watch(context.TODO(), options)
            },
        },
        ...

END 链接