东岳网络工作室

利用 kubebuilder 优化 Kubernetes Operator 开发体验

gaocegege kubernetes

Kubernetes 现在已经成为了容器集群管理,乃至云计算的事实标准。相比于它曾经的竞争对手如 Mesos,Docker Swarm 等,其最大的优势在于扩展性。其扩展性的一个重要的体现,就是 Custom Resource 这一特性。Kubernetes 本身有很多资源类型,被我们熟知的有 Pod,Job,Deployment 等等。而通过 Custom Resource,用户可以定义自己的资源,并实现对应的 Operator(控制器)来处理对资源的请求。用户实现的 Operator 通过与 Kubernetes 的 API server 交互,来实现自身的业务逻辑。

在没有 kubebuilder 之前,为了实现一个 Operator,用户需要完全实现从 Kubernetes Client 的创建开始,到监听 Kubernetes API Server 的请求,再到请求的队列化,以及后面的业务逻辑一整套的逻辑。在整个过程中,有一些逻辑是所有的 Operator 在实现的时候都需要的。因此 kubebuilder 将其进行了封装和抽象成了公共的库(controller-runtime)和公共的工具(controller-tools)。随后在开发 Operator 的时候,只需要通过 kubebuilder 生成一些 Scaffolding(脚手架)代码,用户不再需要关心 Kubernetes API Server 发来的请求是怎样进入请求队列,然后被依次执行的,只需要关心要如何处理当前的请求即可。其他的事情会由 Scaffolding 代码中用到的 controller-runtime 等库来帮助开发者处理。

因此接下来,本文主要介绍利用 kubebuilder v1 scaffolding 简化 Operator 开发的过程。目前 kubebuilder 社区已经推出了正在开发中的第二版实现,这一实现将在后续文章进行介绍。

开发流程

初始化项目

首先,我们需要初始化项目:

kubebuilder init

在当前目录会生成一个最小可编译版本,但其中没有任何的 Operator 实现。目录结构如下所示:

├── cmd
│   └── manager
│       └── main.go
├── config
│   ├── default
│   │   ├── kustomization.yaml
│   │   ├── manager_auth_proxy_patch.yaml
│   │   ├── manager_image_patch.yaml
│   │   └── manager_prometheus_metrics_patch.yaml
│   ├── manager
│   │   └── manager.yaml
│   └── rbac
│       ├── auth_proxy_role_binding.yaml
│       ├── auth_proxy_role.yaml
│       └── auth_proxy_service.yaml
├── Dockerfile
├── Gopkg.toml
├── hack
│   └── boilerplate.go.txt
├── Makefile
├── pkg
│   ├── apis
│   │   └── apis.go
│   ├── controller
│   │   └── controller.go
│   └── webhook
│       └── webhook.go
└── PROJECT

其中 cmd 目录是关于可执行文件的代码。config 是在 Kubernetes 上部署 Operator 的配置文件。Dockerfile 是把 Operator 打包为 Docker 镜像的配置文件。Gopkg.toml 是自动生成的关于依赖的配置文件。hack 目录下是有关 Copyright Header 的文件。而 pkg 下则是随后的 API 和代码实现。

创建 CRD 和 Operator 的 Scaffolding 代码

随后我们运行如下命令来创建 CRD 和对应的 Operator 的实现:

kubebuilder create api --group ship --version v1beta1 --kind Frigate

可以看到在目录内生成了一些新的文件和目录:

├── pkg
│   ├── apis
+│   │   ├── addtoscheme_ship_v1beta1.go
│   │   ├── apis.go
│   │   └── ship
│   │       ├── group.go
+│   │       └── v1beta1
+│   │           ├── doc.go
+│   │           ├── frigate_types.go
+│   │           ├── frigate_types_test.go
+│   │           ├── zz_generated.deepcopy.go
+│   │           ├── register.go
+│   │           └── v1beta1_suite_test.go
│   ├── controller
+│   │   ├── add_frigate.go
│   │   ├── controller.go
+│   │   └── frigate
+│   │       ├── frigate_controller.go
+│   │       ├── frigate_controller_suite_test.go
+│   │       └── frigate_controller_test.go
│   └── webhook
│       └── webhook.go
└── PROJECT

其中,需要用户修改的文件主要有两个,分别是 frigate_types.gofrigate_controller.go。前者是 CRD 的 API 定义相关内容,后者是 CRD 对应的 Operator 实现。

修改 API

frigate_types.go 中,我们可以看到有提示用户编辑的注释:

// EDIT THIS FILE!  THIS IS SCAFFOLDING FOR YOU TO OWN!
// NOTE: json tags are required.  Any new fields you add must have json tags for the fields to be serialized.

// FrigateSpec defines the desired state of Frigate
type FrigateSpec struct {
	// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
	// Important: Run "make" to regenerate code after modifying this file
}

// FrigateStatus defines the observed state of Frigate
type FrigateStatus struct {
	// INSERT ADDITIONAL STATUS FIELD - define observed state of cluster
	// Important: Run "make" to regenerate code after modifying this file
}

关于 Kubernetes Custom Resource 相关的 API 定义相关内容,建议参考 Extend the Kubernetes API with CustomResourceDefinitions,此处不再赘述。用户只需要修改对应的 SpecStatus 的数据结构。值得一提的是,一般而言,如果不依赖 kubebuilder 实现 Operator,通常需要为定义好的 CRD 生成 Kubernetes Client,Informer 等,以便 Operator 可以利用其进行监听 API 请求等操作。而 kubebuilder 使用的 controller-runtime 利用了 dynamic client,unstructured client 和其他相关的方式完成了这些操作,因此在使用时无需生成 Kubernetes Client,Informer 等包。

修改 Controller 实现

在 Controller 中,主要有两处需要修改,一处为:

// add adds a new Controller to mgr with r as the reconcile.Reconciler
func add(mgr manager.Manager, r reconcile.Reconciler) error {
	// Create a new controller
	c, err := controller.New("frigate-controller", mgr, controller.Options{Reconciler: r})
	if err != nil {
		return err
	}

	// Watch for changes to Frigate
	err = c.Watch(&source.Kind{Type: &shipv1beta1.Frigate{}}, &handler.EnqueueRequestForObject{})
	if err != nil {
		return err
	}

	// TODO(user): Modify this to be the types you create
	// Uncomment watch a Deployment created by Frigate - change this for objects you create
	err = c.Watch(&source.Kind{Type: &appsv1.Deployment{}}, &handler.EnqueueRequestForOwner{
		IsController: true,
		OwnerType:    &shipv1beta1.Frigate{},
	})
	if err != nil {
		return err
	}

	return nil
}

在这里用户需要指定 Operator 需要监听哪些资源的变动。在此例中,监听了被定义的新资源拥有(Own) 的 Deployment 的变动。当 Owner 为 Frigate 的 Deployment 的 Spec 或者 Status 一旦发生了变动时,Operator 都会接收到来自 Kubernetes API Server 的请求。

另外一处为 ReconcileFrigate 结构的方法 Reconcile

// Reconcile reads that state of the cluster for a Frigate object and makes changes based on the state read
// and what is in the Frigate.Spec
// TODO(user): Modify this Reconcile function to implement your Controller logic.  The scaffolding writes
// a Deployment as an example
// Automatically generate RBAC rules to allow the Controller to read and write Deployments
// +kubebuilder:rbac:groups=apps,resources=deployments,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=apps,resources=deployments/status,verbs=get;update;patch
// +kubebuilder:rbac:groups=ship.example.com,resources=frigates,verbs=get;list;watch;create;update;patch;delete
// +kubebuilder:rbac:groups=ship.example.com,resources=frigates/status,verbs=get;update;patch
func (r *ReconcileFrigate) Reconcile(request reconcile.Request) (reconcile.Result, error) {
	// Fetch the Frigate instance
	instance := &shipv1beta1.Frigate{}
	err := r.Get(context.TODO(), request.NamespacedName, instance)
	if err != nil {
		if errors.IsNotFound(err) {
			// Object not found, return.  Created objects are automatically garbage collected.
			// For additional cleanup logic use finalizers.
			return reconcile.Result{}, nil
		}
		// Error reading the object - requeue the request.
		return reconcile.Result{}, err
	}

	// TODO(user): Change this to be the object type created by your controller
	// Define the desired Deployment object
	deploy := &appsv1.Deployment{
		ObjectMeta: metav1.ObjectMeta{
			Name:      instance.Name + "-deployment",
			Namespace: instance.Namespace,
		},
		Spec: appsv1.DeploymentSpec{
			Selector: &metav1.LabelSelector{
				MatchLabels: map[string]string{"deployment": instance.Name + "-deployment"},
			},
			Template: corev1.PodTemplateSpec{
				ObjectMeta: metav1.ObjectMeta{Labels: map[string]string{"deployment": instance.Name + "-deployment"}},
				Spec: corev1.PodSpec{
					Containers: []corev1.Container{
						{
							Name:  "nginx",
							Image: "nginx",
						},
					},
				},
			},
		},
	}
	if err := controllerutil.SetControllerReference(instance, deploy, r.scheme); err != nil {
		return reconcile.Result{}, err
	}

	// TODO(user): Change this for the object type created by your controller
	// Check if the Deployment already exists
	found := &appsv1.Deployment{}
	err = r.Get(context.TODO(), types.NamespacedName{Name: deploy.Name, Namespace: deploy.Namespace}, found)
	if err != nil && errors.IsNotFound(err) {
		log.Info("Creating Deployment", "namespace", deploy.Namespace, "name", deploy.Name)
		err = r.Create(context.TODO(), deploy)
		return reconcile.Result{}, err
	} else if err != nil {
		return reconcile.Result{}, err
	}

	// TODO(user): Change this for the object type created by your controller
	// Update the found object and write the result back if there are any changes
	if !reflect.DeepEqual(deploy.Spec, found.Spec) {
		found.Spec = deploy.Spec
		log.Info("Updating Deployment", "namespace", deploy.Namespace, "name", deploy.Name)
		err = r.Update(context.TODO(), found)
		if err != nil {
			return reconcile.Result{}, err
		}
	}
	return reconcile.Result{}, nil
}

这里是处理所有来自 API Server 的请求的函数入口。在此例中,每次请求都会根据 Custom Resource 的 name 在内存中维护一个期望的 Deployment 的实例,随后将其与 API Server 中已有的实例进行比对,如果其 Spec 字段有不同之处,则通过 API Server 更新这一实例。Deployment Controller 会进行接下来的处理,根据更新后的 Spec 修改在集群上的 Pod 的相关运行情况。

编译与运行 Operator

如果是在本地运行,我们需要利用 make manager 来编译 Operator 为二进制可执行文件。最后的可执行文件在 bin 目录下。 如果是需要运行在 Kubernetes 集群中,我们可以利用 make docker-push && make docker-push && make deploy 来进行部署。

Under the Hood

看完了使用上的过程,接下来我们来看看,kubebuilder 生成的代码到底是怎么去运行的。这一部分主要涉及 controller-runtime 中的实现,因此需要有一定的 Kubernetes 基础会比较好理解。

首先来看看 cmd 中的代码:

// Create a new Cmd to provide shared dependencies and start components
log.Info("setting up manager")
mgr, err := manager.New(cfg, manager.Options{MetricsBindAddress: metricsAddr})
if err != nil {
    log.Error(err, "unable to set up overall controller manager")
    os.Exit(1)
}

log.Info("Registering Components.")

// Setup Scheme for all resources
log.Info("setting up scheme")
if err := apis.AddToScheme(mgr.GetScheme()); err != nil {
    log.Error(err, "unable add APIs to scheme")
    os.Exit(1)
}

// Setup all Controllers
log.Info("Setting up controller")
if err := controller.AddToManager(mgr); err != nil {
    log.Error(err, "unable to register controllers to the manager")
    os.Exit(1)
}

log.Info("setting up webhooks")
if err := webhook.AddToManager(mgr); err != nil {
    log.Error(err, "unable to register webhooks to the manager")
    os.Exit(1)
}

// Start the Cmd
log.Info("Starting the Cmd.")
if err := mgr.Start(signals.SetupSignalHandler()); err != nil {
    log.Error(err, "unable to run the manager")
    os.Exit(1)
}

之前说到 kubebuilder 是开发 Operator 的框架,其实并不十分准确。准确来说,kubebuilder 是开发 Controller Manager 的框架,Controller Manager 会管理一个或者多个 Operator。因此 cmd 中的代码也主要是围绕 Controller Manager 展开的。manager.New 首先创建了一个 Manager 实例,在这一实例中有 client,cache 等之后需要的对象。

apis.AddToScheme 将 CRD 的结构与 Kubernetes GroupVersionKinds 的映射添加到 Manager 的 Scheme 中。这一步是为了能够让 Manager 知道 CRD 的存在。

接下来,就是通过 controller.AddToManager 创建出定义的 Operator,并且添加到 Manager 中。webhook.AddToManager 会创建对应的 Webhook,主要是做数据校验,与默认值的赋值等操作,这里就不多介绍了,因为与 Controller 主要的逻辑无关。最后的 mgr.Start 会真正运行 Manager。

controller.AddToManager 中会用 controller.New 创建出 Operator,然后 Watch 对应的资源,最后返回。下面是 controller.New 的实现:

// New returns a new Controller registered with the Manager.  The Manager will ensure that shared Caches have
// been synced before the Controller is Started.
func New(name string, mgr manager.Manager, options Options) (Controller, error) {
	if options.Reconciler == nil {
		return nil, fmt.Errorf("must specify Reconciler")
	}

	if len(name) == 0 {
		return nil, fmt.Errorf("must specify Name for Controller")
	}

	if options.MaxConcurrentReconciles <= 0 {
		options.MaxConcurrentReconciles = 1
	}

	// Inject dependencies into Reconciler
	if err := mgr.SetFields(options.Reconciler); err != nil {
		return nil, err
	}

	// Create controller with dependencies set
	c := &controller.Controller{
		Do:                      options.Reconciler,
		Cache:                   mgr.GetCache(),
		Config:                  mgr.GetConfig(),
		Scheme:                  mgr.GetScheme(),
		Client:                  mgr.GetClient(),
		Recorder:                mgr.GetRecorder(name),
		Queue:                   workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), name),
		MaxConcurrentReconciles: options.MaxConcurrentReconciles,
		Name:                    name,
	}

	// Add the controller as a Manager components
	return c, mgr.Add(c)
}

其中 options.Reconciler 就是我们定义的实现了 Reconcile 函数的结构的实例。这一结构的 Reconcile 函数的实现也就是前文中提到的 Operator 实现所需的第二处需要修改的地方。mgr.SetFields(options.Reconciler) 利用依赖注入的方式,将 Manager 的 Client 和 Scheme 注入到 options.Reconciler 中,然后将其赋值给 Controller 中指向 reconcile.Reconciler 接口的字段 Do 中。可以看到除了这一字段,Controller 还有 Queue,Recorder, Client 等其他的字段。因此 kubebuilder 是对 Controller 进行了更高层次的抽象,其有关业务逻辑的实现都通过 reconcile.Reconciler 这一接口进行,而 Queue 等底层的对象,则是由 kubebuilder 来替开发者维护。

最后一步 mgr.Add(c) 将 Operator 加入到 Manager 的一个 Operator 数组中。

type controllerManager struct {
    // ...

	// runnables is the set of Controllers that the controllerManager injects deps into and Starts.
	runnables []Runnable
    // ...
}

// Add sets dependencies on i, and adds it to the list of runnables to start.
func (cm *controllerManager) Add(r Runnable) error {
	cm.mu.Lock()
	defer cm.mu.Unlock()

	// Set dependencies on the object
	if err := cm.SetFields(r); err != nil {
		return err
	}

	// Add the runnable to the list
	cm.runnables = append(cm.runnables, r)
	if cm.started {
		// If already started, start the controller
		go func() {
			cm.errChan <- r.Start(cm.internalStop)
		}()
	}

	return nil
}

接下来,我们回过头来看下 Controller 是如何实现 Watch Kubernetes API Server 的资源的:

// Watch implements controller.Controller
func (c *Controller) Watch(src source.Source, evthdler handler.EventHandler, prct ...predicate.Predicate) error {
	c.mu.Lock()
	defer c.mu.Unlock()

	// Inject Cache into arguments
	if err := c.SetFields(src); err != nil {
		return err
	}
	if err := c.SetFields(evthdler); err != nil {
		return err
	}
	for _, pr := range prct {
		if err := c.SetFields(pr); err != nil {
			return err
		}
	}

	log.Info("Starting EventSource", "controller", c.Name, "source", src)
	return src.Start(evthdler, c.Queue, prct...)
}

这里的 SetFields 是 Manager 将其自己的 SetFields 函数注入了进来,所以等同于调用了 Manager 的 SetFields 方法,其定义如下:

func (cm *controllerManager) SetFields(i interface{}) error {
	if _, err := inject.ConfigInto(cm.config, i); err != nil {
		return err
	}
	if _, err := inject.ClientInto(cm.client, i); err != nil {
		return err
	}
	if _, err := inject.SchemeInto(cm.scheme, i); err != nil {
		return err
	}
	if _, err := inject.CacheInto(cm.cache, i); err != nil {
		return err
	}
	if _, err := inject.InjectorInto(cm.SetFields, i); err != nil {
		return err
	}
	if _, err := inject.StopChannelInto(cm.internalStop, i); err != nil {
		return err
	}
	if _, err := inject.DecoderInto(cm.admissionDecoder, i); err != nil {
		return err
	}
	return nil
}
// Cache is used by the ControllerManager to inject Cache into Sources, EventHandlers, Predicates, and
// Reconciles
type Cache interface {
	InjectCache(cache cache.Cache) error
}
// CacheInto will set informers on i and return the result if it implements Cache.  Returns
//// false if i does not implement Cache.
func CacheInto(c cache.Cache, i interface{}) (bool, error) {
	if s, ok := i.(Cache); ok {
		return true, s.InjectCache(c)
	}
	return false, nil
}

inject.CacheInto 为例介绍下实现,其检查被注入的对象有没有实现 Cache 接口,即有没有实现 InjectCache(cache cache.Cache) error 方法。如果实现了,则执行注入,否则直接返回。也就是通过这样的方式,Manager 最终把 Cache 注入到了 Source 中,同时如果需要的话,把 Scheme 注入到了 EventHandler 中。这里的 Scheme 在指定 Owner 的 EventHandler 会被用来获取 Owner 的 GroupKind

接下来,后面 src.Start(evthdler, c.Queue, prct...) 也就是顺理成章的实现了:

// Start is internal and should be called only by the Controller to register an EventHandler with the Informer
// to enqueue reconcile.Requests.
func (ks *Kind) Start(handler handler.EventHandler, queue workqueue.RateLimitingInterface,
	prct ...predicate.Predicate) error {

	// Type should have been specified by the user.
	if ks.Type == nil {
		return fmt.Errorf("must specify Kind.Type")
	}

	// cache should have been injected before Start was called
	if ks.cache == nil {
		return fmt.Errorf("must call CacheInto on Kind before calling Start")
	}

	// Lookup the Informer from the Cache and add an EventHandler which populates the Queue
	i, err := ks.cache.GetInformer(ks.Type)
	if err != nil {
		if kindMatchErr, ok := err.(*meta.NoKindMatchError); ok {
			log.Error(err, "if kind is a CRD, it should be installed before calling Start",
				"kind", kindMatchErr.GroupKind)
		}
		return err
	}
	i.AddEventHandler(internal.EventHandler{Queue: queue, EventHandler: handler, Predicates: prct})
	return nil
}

其利用被注入到 Source 中的 Cache 获取到针对 Source 的资源类型的 Informer,然后将 EventHandler 作为处理 Informer 的事件的处理器。这就是 kubebuilder 的高层 API 背后做的事情。

关于作者

高策,欢迎关注我们的 GitHub博客知乎专栏 :)

许可协议

gaocegege
MOS 组的小哥哥