第67节 K8s 深入理解 Operator-client 详解


❤️💕💕新时代拥抱云原生,云原生具有环境统一、按需付费、即开即用、稳定性强特点。Myblog:http://nsddd.topopen in new window


[TOC]

前言

在上一个对于 client-go 学习中,通过 sample-controller 的项目了解到 Kubernetes 的 部分 client-go 的学习。继续深入学习 client-go

Client-go

kubectl 并不适合 Kubernetes 的二次开发者来和 k8s 打交道,Go语言提供了一个专门和 Kubernetes API 交互的 库 Client-go。

Kubernetes 也提供了一个编写控制器的小案例open in new window,可以作为 client-go 的入门

Client-go是一个用于与 Kubernetes API 交互的Go库。它提供了广泛的功能,用于与Kubernetes API交互,包括强类型 API、资源客户端、Watch API 和动态客户端。使用 client-go,开发人员可以轻松地在 Kubernetes 中创建、读取、更新和删除资源对象。

从这个package的名称来看,这应该是跟k8s打交道的客户端clientgo实现,这一点没错,它定义了诸多资源的客户端client

上面是 client-go 的 GitHub 仓库,不过这个库是 actions 以每天一次的频率从 Kubernetes/Kubernetes 主仓库中自动同步过来的。所以如果我们想贡献的话找好位置去 Kubernetes 贡献(kubernetes/stagin/src/k8s.io)。

Client-go目录结构

├── discovery   # DsicoveryClient客户端,用于发现k8s所支持GVR。
├── dynamic     # DynamicClient客户端, 用于访问k8s Resources,也可以访问CRD。
├── informers   # k8s中各种Resources的Informer机制的实现。
├── kubernetes  # 对RestClient进行了封装,定义多种Client的客户端集合,俗称clientset。
├── listers     # 提供对Resources的获取功能。对于Get()和List()而言,listers提供给二者的数据都是从缓存中读取的。
├── pkg
├── plugin      # 提供第三方插件。
├── scale       # 定义用于Deploy, RS, RC等资源进行的扩、缩容的客户端ScaleClient。
├── tools       # 实现client查询和缓存机制,以及定义诸如SharedInformer、Reflector、DealtFIFO和Indexer等常用工具。
├── transport
└── util        # 提供诸如WorkQueue、Certificate等常用方法。

📜 对上面的解释:

  • /discovery:该目录包含用于发现和获取Kubernetes API资源的代码。这些资源包括Pod、Service、ReplicationController等。discovery目录中的代码可以帮助开发人员发现和使用这些资源。
  • /dynamic:该目录包含动态客户端库,用于与Kubernetes API交互,而无需生成代码。这对于构建需要与任意Kubernetes资源交互的通用工具和实用程序非常有用。
  • /kubernetes:这个包中方的是用 client-gen 自动生成的用来访问 Kubernetes API 的 ClientSet,后面会经常看到 ClientSet 这个工具。
  • /informers:该目录包含用于监视Kubernetes资源变化的代码。这些变化可以包括资源的创建、更新和删除。informers目录中的代码可以帮助开发人员构建控制器和其他需要对Kubernetes环境中的变化做出反应的应用程序。
  • /listers:该目录包含用于从Kubernetes服务器获取资源列表的代码。这些资源列表包括Pod、Service、Namespace等。listers目录中的代码可以帮助开发人员更轻松地获取有关Kubernetes资源的信息。
  • /rest:该目录包含用于与Kubernetes API交互的代码。这些API包括Pod、Service、Namespace等。rest目录中的代码可以帮助开发人员执行各种操作,包括管理Pod、Deployment、Service、Namespace等。
  • /scale:该目录包含用于与Kubernetes资源的自动缩放相关的代码。这些资源包括Deployment、ReplicaSet、StatefulSet等。scale目录中的代码可以帮助开发人员自动缩放与Kubernetes资源相关的组件。
  • transport:这个包用于设置认证和建立连接
  • /tools:该目录包含用于测试和其他实用程序的代码。这些实用程序包括代码生成器、测试工具等。tools目录中的代码可以帮助开发人员更轻松地测试和使用client-go库。
  • /util:该目录包含用于客户端库的辅助功能的代码。这些功能包括对Kubernetes API对象的类型转换、对象比较等。util目录中的代码可以帮助开发人员更轻松地使用client-go库。

获取 Client-go

可以通过 go get 命令获取 client-go,不过我直接克隆最新源代码,然后构建为可执行文件:

git clone https://github.com/kubernetes/client-go.git

他们给了一些样例的文件,我找出来:

find -name main.go
./examples/workqueue/main.go
./examples/in-cluster-client-configuration/main.go
./examples/out-of-cluster-client-configuration/main.go
./examples/dynamic-create-update-delete-deployment/main.go
./examples/create-update-delete-deployment/main.go
./examples/leader-election/main.go

这些文件可以帮助我们快速上手 client-go:

  • ./examples/workqueue/main.go:演示如何使用 Kubernetes 的工作队列(Workqueue)实现资源控制器(Controller)。
  • ./examples/in-cluster-client-configuration/main.go:演示如何在 Kubernetes 集群内部使用 client-go 访问 Kubernetes API Server。
  • ./examples/out-of-cluster-client-configuration/main.go:演示如何在 Kubernetes 集群外部使用 client-go 访问 Kubernetes API Server。
  • ./examples/dynamic-create-update-delete-deployment/main.go:演示如何使用 Kubernetes 的动态客户端库(Dynamic Client)实现对 Deployment 资源对象的增删改查等操作。
  • ./examples/create-update-delete-deployment/main.go:演示如何使用 client-go 实现对 Deployment 资源对象的增删改查等操作。
  • ./examples/leader-election/main.go:演示如何使用 Kubernetes 的 Leader Election 机制,实现资源控制器的高可用性和故障转移。

/root/workspaces/client-go/examples/workqueue 目录中:

我们是不能直接编译的,看一下:

❯ ./main --help
Usage of ./main:
  -kubeconfig string
        absolute path to the kubeconfig file
  -master string
        master url

指定 kubeconfig (也可以通过设置环境变量 export KUBECONFIG

❯ ./main -kubeconfig ~/.kube/config

举例:使用 kubectl 命令行工具创建一个名为 myresource 的自定义资源,并将其保存到 YAML 文件中。然后,运行 go run ./examples/workqueue/main.go 命令启动控制器。此时,控制器会开始监听 myresource 资源,并在该资源被创建或更新时,异步地处理一些任务。

继续演示 对 Deployment CURD 操作:

cd dynamic-create-update-delete-deployment/
❯ ls
README.md  main.go
❯ go build main.go
❯ ./main 
Creating deployment...
Created deployment "demo-deployment".
-> Press Return key to continue.

Updating deployment...
Updated deployment...
-> Press Return key to continue.

Listing deployments in namespace "default":
 * demo-deployment (1 replicas)
 * my-nginx-app (3 replicas)
 * nginx-deployment (3 replicas)
-> Press Return key to continue.

Deleting deployment...
Deleted deployment.

./examples/dynamic-create-update-delete-deployment/main.go:这个示例文件演示如何使用 Kubernetes 的动态客户端库(Dynamic Client)实现对 Deployment 资源对象的增删改查等操作。使用动态客户端库,开发人员可以更加灵活地操作 Kubernetes 资源对象,而不需要手动编写复杂的代码。例如,在这个示例中,开发人员可以使用 DynamicClient 对象,动态地创建、更新和删除 Deployment 资源对象。

举例:使用 kubectl 命令行工具创建一个名为 my-deployment 的 Deployment 对象,并将其保存到 YAML 文件中。然后,运行 go run ./examples/dynamic-create-update-delete-deployment/main.go 命令,使用 DynamicClient 对象,动态地创建、更新和删除 my-deployment Deployment 对象。

测试:

❯ k get deployment
NAME               READY   UP-TO-DATE   AVAILABLE   AGE
demo-deployment    0/1     0            0           4s
my-nginx-app       0/3     3            0           26h
nginx-deployment   3/3     3            3           165m

WorkQueue 源码分析

WorkQueue 一般使用的说就是延迟队列的实现,在 Resopurce Event Handlers 中会完成将对象的 key 放入 WorkQueue 的过程,然后再自己的逻辑代码中从 WorkQueue 中消费这些 Key。

Client-go 的 utile/Workqueue 包中有三个队列:

分别对应的是 普通队列、延时队列和限速队列。对应的源文件分别是:

  • queue.go
  • delaying_queue.go
  • rate_limiting_queue.go

我们先看看 普通队列,这个对应的文件就是 queue.go:

type Interface interface {
	Add(item interface{})
	Len() int
	Get() (item interface{}, shutdown bool)
	Done(item interface{})
	ShutDown()
	ShutDownWithDrain()
	ShuttingDown() bool
}

看接口就知道了实现了哪些方法,分别是添加元素,元素个数、获取第一个元素、第二个返回值和 channel 类似,标记了队列是否关闭。

Done 标记一个元素是否已经处理完,ShutDown 关闭队列,ShowDownWithDrain 关闭队列,但是等待队列中的元素处理完。ShuttingDown 标记当前的 channel 是否正在关闭。

我们再看延迟队列,对应的文件在delaying_queue.go

// DelayingInterface is an Interface that can Add an item at a later time. This makes it easier to
// requeue items after failures without ending up in a hot-loop.
type DelayingInterface interface {
	Interface
	// AddAfter adds an item to the workqueue after the indicated duration has passed
	AddAfter(item interface{}, duration time.Duration)
}

接口就很简单,定义 DelayingInterface 接口在 delaying_queue.go 源文件,名字和 Queue 所使用的interface对称,叫做 DelayingInterface。

这里有一个 Interface,对此我感觉到有一些疑惑,Interface 是普通队列命名的接口,这个设计模式的接口设计的思路蛮有意思的,先别激动,后面有你激动的,看到 Controller 的接口设计,那简直就是 6 ~

最后我们看一下限速队列,这个有意思很多了,同样今天的项目 sample-controller 也用到了这个队列,我们看一下 controller 部分:

type Controller struct {
    //...
	workqueue workqueue.RateLimitingInterface
} 

我们看到了 限速队列包含了延迟队列的所有接口实现,也包含了 基本队列的所有接口实现。

分析这个队列的接口:

  • AddRateLimited:在速率限制器认可的情况下将项目添加到工作队列中。
  • Forget:表示项目已经完成重试。无论是永久性失败还是成功,我们都会停止速率限制器对其进行跟踪。这仅清除了rateLimiter,您仍然需要在队列上调用Done方法。
  • NumRequeues:返回项目被重新排队的次数。
// RateLimitingInterface is an interface that rate limits items being added to the queue.
type RateLimitingInterface interface {
	DelayingInterface

	// AddRateLimited adds an item to the workqueue after the rate limiter says it's ok
	AddRateLimited(item interface{})

	// Forget indicates that an item is finished being retried.  Doesn't matter whether it's for perm failing
	// or for success, we'll stop the rate limiter from tracking it.  This only clears the `rateLimiter`, you
	// still have to call `Done` on the queue.
	Forget(item interface{})

	// NumRequeues returns back how many times the item was requeued
	NumRequeues(item interface{}) int
}

在后面的学习中,我们再深入探索这些 WorkQueue 的实现。

DeltaFIFO 源码分析

DeltaFIFO 也是一个特别重要的组件,在 k8s.io/client-go/tools/cache 包中,用于存储对象的增量更新。

  • 存储对象的增量更新,包括添加、删除和更新操作。
  • 能够按顺序处理更新,以确保它们被正确地应用。
  • 支持阻塞式同步,以便在应用所有更新之前等待。

在 fifo.go 中定义了一个 Queue 的接口,我们看一下接口代码:

type Queue interface {
	Store
	Pop(PopProcessFunc) (interface{}, error)
	AddIfNotPresent(interface{}) error
	HasSynced() bool
	// Close the queue
	Close()
}

嵌入了一个 Store 接口:

Store interface {
	Add(obj interface{}) error
	Update(obj interface{}) error
	Delete(obj interface{}) error
	List() []interface{}
	ListKeys() []string
	Get(obj interface{}) (item interface{}, exists bool, err error)
	GetByKey(key string) (item interface{}, exists bool, err error)
	Replace([]interface{}, string) error
	Resync() error
}

都是表面的意思了,CURD 操作,各种 Get、List 操作。

除此之外,还有其他的,阅读源码即可,后面慢慢深入讲解。

Informer 机制

Informar 这个单词出镜率很高,我们在很多文章中都可以看到这个 Informer,包括 Kubernetes 源码讲解的 ETCD 和 API Server 中。

Informer 从 DeltaFIFO 中 Pop 相对应的对象,会通过 Indexer 将对象和索引都丢在本地的 cache 中,再触发相应的事件处理函数(Resource Event Handlers) 的运行。

源码位置: k8s.io/client-go/tools/cache

Informar 是通过一个 Controller 对象来进行定义的,我们先看看对象接口:

controller.go 文件中能看到 Controller 的定义:

type Controller interface {
	Run(stopCh <-chan struct{})
	HasSynced() bool
	LastSyncResourceVersion() string
}

看出,最核心的方法是 Run(stopCh <-chan struct{}) Run 这个函数我们点击进去会发现,主要做了两个事情:

  • 构造 Reflector 利用 ListerWatcher 的能力将对象事件更新到 DeltaFIFO
  • 从 DeltaFIFO 中 Pop 对象后调用 ProcessFunc 来处理。

初始化是什么样的?

// New makes a new Controller from the given Config.
func New(c *Config) Controller {
	ctlr := &controller{
		config: *c,
		clock:  &clock.RealClock{},
	}
	return ctlr
}

传入了一个 Config 过来,然后初始化,核心逻辑那就在 Config 中了:

// Config contains all the settings for one of these low-level controllers.
type Config struct {
	Queue
	// Something that can list and watch your objects.
	ListerWatcher
	// Something that can process a popped Deltas.
	Process ProcessFunc
	// ObjectType is an example object of the type this controller is
	// expected to handle.
	ObjectType runtime.Object
	// ObjectDescription is the description to use when logging type-specific information about this controller.
	ObjectDescription string
	// FullResyncPeriod is the period at which ShouldResync is considered.
	FullResyncPeriod time.Duration
	ShouldResync ShouldResyncFunc
	RetryOnError bool
	// Called whenever the ListAndWatch drops the connection with an error.
	WatchErrorHandler WatchErrorHandler
	// WatchListPageSize is the requested chunk size of initial and relist watch lists.
	WatchListPageSize int64
}

SharedInformer 对象源码

SharedInformer 和 Informer 之间的名字就差一个 Shared,在 Operator 开发中,如果不适用 controller-runtime 库(kubebuilder controller-runtime 子项目的Repo),也就是不用 Kubebuilder 等工具生成脚手架,那么应该用到的是 SharedInformerFactory,比如今天主要做的 sample-controller 的 main() 函数,在下面的 sample-controller 源码介绍中会讲解~

我们能看到这两行代码:

kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClient, time.Second*30)
	
exampleInformerFactory := informers.NewSharedInformerFactory(exampleClient, time.Second*30)

controller := NewController(ctx, kubeClient, exampleClient,
	kubeInformerFactory.Apps().V1().Deployments(), exampleInformerFactory.Samplecontroller().V1alpha1().Foos())

控制器的初始化后面会讲 controller.go 的时候重点讲解,先看看初始化过程和 SharedInformer 的关系。

controller 依赖于 kubeInformerFactory.Apps().V1().Deployments()exampleInformerFactory.Samplecontroller().V1alpha1().Foos()

后者是 sample-controller 自己 pkg 包提供的,前者是 client-go 提供的,那么看看前者:

这里的 Deployment 依赖的是:Deployments() DeploymentInformer

DeploymentInformer 是一个接口类型,看一看:

// DeploymentInformer provides access to a shared informer and lister for
// Deployments.
type DeploymentInformer interface {
	Informer() cache.SharedIndexInformer
	Lister() v1.DeploymentLister
}

实现了 InformerListerInformer 本质上就是一个 ShareIndexInformer


接下来的三个部分,我将详细介绍 sample-controller、kubebuilder、operator-sdk 以及它们之间那微妙的关系。

END 链接