使用client-go开发自定义Controller
创建utils/consts.go
- 在pkg包下,创建一个utils目录,并在里面创建一个 consts.go 的工具文件,记录一些常量,等会编写controller的时候会用
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20package utils
const ControllerAgentName = "app-controller"
const WorkNum = 5
const MaxRetry = 10
const (
// SuccessSynced is used as part of the Event 'reason' when a App is synced
SuccessSynced = "Synced"
// ErrResourceExists is used as part of the Event 'reason' when a App fails
// to sync due to a Deployment of the same name already existing.
ErrResourceExists = "ErrResourceExists"
// MessageResourceExists is the message used for Events when a resource
// fails to sync due to a Deployment already existing
MessageResourceExists = "Resource %q already exists and is not managed by App"
// MessageResourceSynced is the message used for an Event fired when a App
// is synced successfully
MessageResourceSynced = "App synced successfully"
)
创建控制器pkg/controller/controller.go
先展示完整 controller.go 文件
1 | package controller |
Controller结构体详解
创建一个Controller结构体,我们的控制器,需要操作的资源有 App、Deployment、Service,因此Controller中需要包括一下几部分:
- kubernetes 的 clientset:用于从apiserver获取最新的deployment、service信息
- app 的 clientset:用于从apiserver获取最新的apps信息,这个clientset是我们使用code-generator自动生成的
- dedployment 的 Lister:用于从informer的缓存中获取deployment的信息,避免与apiserver交互的太频繁
- service 的 Lister:用于从informer的缓存中获取service的信息,避免与apiserver交互的太频繁
- app 的 Lister:用于从informer的缓存中获取app的信息,避免与apiserver交互的太频繁
- dedployment 的 HasSynced:用于检查 deployments 资源,是否完成同步
- service 的 HasSynced:检查 services 资源,是否完成同步
- app 的 HasSynced:检查 apps 资源,是否完成同步
- workqueue:存储 待处理资源的key(一般是 namespace/name)的队列
- recorder:事件记录器,用于记录事件,可以被 kubectl get event 获取到
Controller结构体
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25type Controller struct {
// kubeClientset kubernetes 所有内置资源的 clientset,用于操作所有内置资源
kubeClientset kubernetes.Interface
// appClientset 为 apps 资源生成的 clientset,用于操作 apps 资源
appClientset clientset.Interface
// deploymentsLister 查询本地缓存中的 deployment 资源
deploymentsLister appslisterv1.DeploymentLister
// servicesLister 查询本地缓存中的 service 资源
servicesLister corelisterv1.ServiceLister
// appsLister 查询本地缓存中的 apps 资源
appsLister listerv1.AppLister
// deploymentsSync 检查 deployments 资源,是否完成同步
deploymentsSync cache.InformerSynced
// servicesSync 检查 services 资源,是否完成同步
servicesSync cache.InformerSynced
// appsSync 检查 apps 资源,是否完成同步
appsSync cache.InformerSynced
// workqueue 队列,存储 待处理资源的key(一般是 namespace/name)
workqueue workqueue.RateLimitingInterface
// recorder 事件记录器
recorder record.EventRecorder
}提供一个 NewController 的方法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53func NewController(kubeclientset kubernetes.Interface,
appclientset clientset.Interface,
deploymentInformer appsinformersv1.DeploymentInformer,
serviceInformer coreinformersv1.ServiceInformer,
appInformer informersv1.AppInformer) *Controller {
// 将 为apps资源生成的clientset的Scheme,添加到全局 Scheme 中
utilruntime.Must(scheme.AddToScheme(scheme.Scheme))
klog.V(4).Info("Creating event broadcaster")
// 新建一个事件广播器,用于将事件广播到不同的监听器
eventBroadcaster := record.NewBroadcaster()
// 将事件以结构化日志的形式输出
eventBroadcaster.StartStructuredLogging(0)
// 将事件广播器配置为将事件记录到指定的 EventSink
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
// 创建一个事件记录器,用于发送事件到设置好的事件广播
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: utils.ControllerAgentName})
// 创建一个 Controller 对象
c := &Controller{
kubeClientset: kubeclientset,
appClientset: appclientset,
deploymentsLister: deploymentInformer.Lister(),
servicesLister: serviceInformer.Lister(),
appsLister: appInformer.Lister(),
deploymentsSync: deploymentInformer.Informer().HasSynced,
servicesSync: serviceInformer.Informer().HasSynced,
appsSync: appInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Apps"),
recorder: recorder,
}
// 为AppInformer,设置 ResourceEventHandler
klog.Info("Setting up event handlers")
appInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.AddApp,
UpdateFunc: c.UpdateApp,
})
// 为 DeploymentInformer,设置 ResourceEventHandler
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.DeleteDeployment,
})
// 为 ServiceInformer,设置 ResourceEventHandler
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.DeleteService,
})
// 将控制器实例返回
return c
}
Controller中的ResourceEventHandler处理方法详解
- 共有4个 ResourceEventHandler,方法内容如下:
- 这4个 ResourceEventHandler 方法的主要作用,就是将 待调谐的资源key,放入workqueue
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39func (c *Controller) enqueue(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}
func (c *Controller) AddApp(obj interface{}) {
c.enqueue(obj)
}
func (c *Controller) UpdateApp(oldObj, newObj interface{}) {
if reflect.DeepEqual(oldObj, newObj) {
key, _ := cache.MetaNamespaceKeyFunc(oldObj)
klog.V(4).Infof("UpdateApp %s: %s", key, "no change")
return
}
c.enqueue(newObj)
}
func (c *Controller) DeleteDeployment(obj interface{}) {
deploy := obj.(*appsv1.Deployment)
ownerReference := metav1.GetControllerOf(deploy)
if ownerReference == nil || ownerReference.Kind != "App" {
return
}
c.enqueue(obj)
}
func (c *Controller) DeleteService(obj interface{}) {
service := obj.(*corev1.Service)
ownerReference := metav1.GetControllerOf(service)
if ownerReference == nil || ownerReference.Kind != "App" {
return
}
c.enqueue(obj)
}
- 这4个 ResourceEventHandler 方法的主要作用,就是将 待调谐的资源key,放入workqueue
Controller的启动方法Run()详解
- 用于启动Controller,并启动 workerNum 个 worker 进行工作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24func (c *Controller) Run(workerNum int, stopCh <-chan struct{}) error {
// 用于处理程序崩溃,发生未捕获的异常(panic)时,调用HandleCrash()方法,记录日志并发出报告
defer utilruntime.HandleCrash()
// 控制器程序结束时,清理队列
defer c.workqueue.ShutDown()
klog.V(4).Info("Starting App Controller")
klog.V(4).Info("Waiting for informer cache to sync")
if ok := cache.WaitForCacheSync(stopCh, c.appsSync, c.deploymentsSync, c.servicesSync); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}
klog.V(4).Info("Starting workers")
for i := 0; i < workerNum; i++ {
go wait.Until(c.worker, time.Minute, stopCh)
}
klog.V(4).Info("Started workers")
<-stopCh
klog.V(4).Info("Shutting down workers")
return nil
}
worker详解
- worker是调谐的核心逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}
func (c *Controller) processNextWorkItem() bool {
// 从 workqueue 中获取一个item
item, shutdown := c.workqueue.Get()
// 如果队列已经被回收,返回false
if shutdown {
return false
}
// 最终将这个item标记为已处理
defer c.workqueue.Done(item)
// 将item转成key
key, ok := item.(string)
if !ok {
klog.Warningf("failed convert item [%s] to string", item)
c.workqueue.Forget(item)
return true
}
// 对key这个App,进行具体的调谐。这里面是核心的调谐逻辑
if err := c.syncApp(key); err != nil {
klog.Errorf("failed to syncApp [%s], error: [%s]", key, err.Error())
c.handleError(key, err)
}
return true
}
// syncApp 对App资源的调谐核心逻辑
func (c *Controller) syncApp(key string) error {
// 将key拆分成namespace、name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}
// 从informer缓存中,获取到key对应的app对象
app, err := c.appsLister.Apps(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("app [%s] in work queue no longer exists", key)
}
return err
}
// 取出 app 对象 的 deploymentSpec 部分
deploymentTemplate := app.Spec.DeploymentSpec
// 如果 app 的 deploymentTemplate 不为空
if deploymentTemplate.Name != "" {
// 尝试从缓存获取 对应的 deployment
deploy, err := c.deploymentsLister.Deployments(namespace).Get(deploymentTemplate.Name)
if err != nil {
// 如果没找到
if errors.IsNotFound(err) {
klog.V(4).Info("starting to create deployment [%s] in namespace [%s]", deploymentTemplate.Name, namespace)
// 创建一个deployment对象,然后使用 kubeClientset,与apiserver交互,创建deployment
deploy = newDeployment(deploymentTemplate, app)
_, err := c.kubeClientset.AppsV1().Deployments(namespace).Create(context.TODO(), deploy, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create deployment [%s] in namespace [%s], error: [%v]", deploymentTemplate.Name, namespace, err)
}
// 创建完成后,从apiserver中,获取最新的deployment,因为下面要使用它的status.【这里不能从informer缓存获取,因为缓存里暂时未同步新创建的deployment】
deploy, _ = c.kubeClientset.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentTemplate.Name, metav1.GetOptions{})
} else {
return fmt.Errorf("failed to get deployment [%s] in namespace [%s], error: [%v]", deploy.Name, deploy.Namespace, err)
}
}
// 如果获取到的 deployment,并非 app 所控制,报错
if !metav1.IsControlledBy(deploy, app) {
msg := fmt.Sprintf(utils.MessageResourceExists, deploy.Name)
c.recorder.Event(app, corev1.EventTypeWarning, utils.ErrResourceExists, msg)
return fmt.Errorf("%s", msg)
}
// update deploy status
app.Status.DeploymentStatus = &deploy.Status
}
// 取出 app 对象 的 deploymentSpec 部分
serviceTemplate := app.Spec.ServiceSpec
// 如果 app 的 serviceTemplate 不为空
if serviceTemplate.Name != "" {
// 尝试从缓存获取 对应的 service
service, err := c.servicesLister.Services(namespace).Get(serviceTemplate.Name)
if err != nil {
// 如果没找到
if errors.IsNotFound(err) {
klog.V(4).Info("starting to create service [%s] in namespace [%s]", serviceTemplate.Name, namespace)
// 创建一个service对象,然后使用 kubeClientset,与apiserver交互,创建service
service = newService(serviceTemplate, app)
_, err := c.kubeClientset.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create service [%s] in namespace [%s], error: [%v]", serviceTemplate.Name, namespace, err)
}
// 创建完成后,从apiserver中,获取最新的service,因为下面要使用它的status.【这里不能从informer缓存获取,因为缓存里暂时未同步新创建的service】
service, _ = c.kubeClientset.CoreV1().Services(namespace).Get(context.TODO(), serviceTemplate.Name, metav1.GetOptions{})
} else {
return fmt.Errorf("failed to get service [%s] in namespace [%s], error: [%v]", service.Name, service.Namespace, err)
}
}
// 如果获取到的 service,并非 app 所控制,报错
if !metav1.IsControlledBy(service, app) {
msg := fmt.Sprintf(utils.MessageResourceExists, service.Name)
c.recorder.Event(app, corev1.EventTypeWarning, utils.ErrResourceExists, msg)
return fmt.Errorf("%s", msg)
}
// update service status
app.Status.ServiceStatus = &service.Status
}
// 处理完 deploymentSpec、serviceSpec,将设置好的AppStatus更新到环境中去
_, err = c.appClientset.AppcontrollerV1().Apps(namespace).Update(context.TODO(), app, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update app [%s], error: [%v]", key, err)
}
// 记录事件日志
c.recorder.Event(app, corev1.EventTypeNormal, utils.SuccessSynced, utils.MessageResourceSynced)
return nil
}
创建deployemnt、service的方法详解
- 根据AppSpec中用户编写的信息,创建对应的deployment和service
- 这里只在AppSpec中添加了几个简单的信息,所以很多信息都是写死在代码里的,大家可以根据需要,更新AppSpec,这样创建的时候,就可以有更多的信息由用户指定
- 更新完AppSpec后,记得重新执行 第3部分 中的命令,重新生成deepcopy文件、generated、crd文件
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72// newDeployment 创建一个deployment对象
func newDeployment(template appcontrollerv1.DeploymentTemplate, app *appcontrollerv1.App) *appsv1.Deployment {
d := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: template.Name,
},
Spec: appsv1.DeploymentSpec{
// Selector 和 pod 的 Labels 必须一致
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app-key": "app-value",
},
},
Replicas: &template.Replicas,
Template: corev1.PodTemplateSpec{
// pod 的 labels,没有让用户指定,这里设置成默认的
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app-key": "app-value",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "app-deploy-container",
Image: template.Image,
},
},
},
},
},
}
// 将 deploy 的 OwnerReferences,设置成app
d.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(app, appcontrollerv1.SchemeGroupVersion.WithKind("App")),
}
return d
}
func newService(template appcontrollerv1.ServiceTemplate, app *appcontrollerv1.App) *corev1.Service {
s := &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: template.Name,
},
Spec: corev1.ServiceSpec{
// Selector 和 pod 的 Labels 必须一致
Selector: map[string]string{
"app-key": "app-value",
},
Ports: []corev1.ServicePort{
{
Name: "app-service",
// Service的端口,默认设置成了8080。这里仅仅是为了学习crd,实际开发中可以设置到AppSpec中去
Port: 8080,
},
},
},
}
s.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(app, appcontrollerv1.SchemeGroupVersion.WithKind("App")),
}
return s
}
编写main函数,启动控制器
编写cmd/main.go
- 编写main方法,创建Controller对象,并启动,同时做好优雅停止设计
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67package main
import (
"crd-controller-demo/pkg/controller"
clientset "crd-controller-demo/pkg/generated/clientset/versioned"
appinformers "crd-controller-demo/pkg/generated/informers/externalversions"
"crd-controller-demo/pkg/signals"
"crd-controller-demo/pkg/utils"
"flag"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/clientcmd"
"time"
"k8s.io/klog/v2"
)
var (
masterURL string
kubeConfig string
)
func main() {
klog.InitFlags(nil)
flag.Parse()
// set up signals so we handle the first shutdown signal gracefully
stopCh := signals.SetupSignalHandler()
config, err := clientcmd.BuildConfigFromFlags(masterURL, kubeConfig)
//config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
klog.Fatalf("Error building kubeConfig: %s", err.Error())
}
kubeClientSet, err := kubernetes.NewForConfig(config)
if err != nil {
klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
}
appClientSet, err := clientset.NewForConfig(config)
if err != nil {
klog.Fatalf("Error building app clientset: %s", err.Error())
}
kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClientSet, time.Second*30)
appInformerFactory := appinformers.NewSharedInformerFactory(appClientSet, time.Second*30)
controller := controller.NewController(kubeClientSet, appClientSet,
kubeInformerFactory.Apps().V1().Deployments(),
kubeInformerFactory.Core().V1().Services(),
appInformerFactory.Appcontroller().V1().Apps())
// notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
// Start method is non-blocking and runs all registered informers in a dedicated goroutine.
kubeInformerFactory.Start(stopCh)
appInformerFactory.Start(stopCh)
if err = controller.Run(utils.WorkNum, stopCh); err != nil {
klog.Fatalf("Error running controller: %s", err.Error())
}
}
func init() {
flag.StringVar(&kubeConfig, "kubeConfig", "", "Path to a kubeConfig. Only required if out-of-cluster.")
flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeConfig. Only required if out-of-cluster.")
}
pkg/signals包
- 在pkg下创建signal包,并创建3个文件,用于实现 程序 的优雅停止
编写signal.go
1 | /* |
编写signal_posix.go
1 | //go:build !windows |
编写signal_windows.go
1 | /* |
编写测试yaml文件
- 在artifacts包下,创建一个example包,里面创建两个测试文件
- test_app.yaml
1
2
3
4
5
6
7
8
9
10
11
12apiVersion: appcontroller.k8s.io/v1
kind: App
metadata:
name: test-app
namespace: tcs
spec:
deploymentTemplate:
name: app-deploy
image: nginx
replicas: 2
serviceTemplate:
name: app-service - test_app_2.yaml
1
2
3
4
5
6
7
8
9
10
11
12apiVersion: appcontroller.k8s.io/v1
kind: App
metadata:
name: test-app-2
namespace: tcs
spec:
deploymentTemplate:
name: app-deploy-test
image: tomcat
replicas: 3
serviceTemplate:
name: app-service-test
Create CRD && Test
在kubernetes集群中,创建CRD资源
1
2cd crd-controller-demo
kubectl apply -f artifacts/crd/appcontroller.k8s.io_apps.yaml启动控制器
- 执行 go run main.go
- 可以使用命令行参数指定masterIP、configPath
然后创建两个App资源,查看情况
1
2
3cd crd-controller-demo
kubectl apply -f artifacts/example/test_app.yaml
kubectl apply -f artifacts/example/test_app_2.yaml查看deployment和service的创建情况
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16[root@master crd-controller-demo]# kubectl get deploy -n tcs
NAME READY UP-TO-DATE AVAILABLE AGE
app-deploy 2/2 2 2 145m
app-deploy-test 3/3 3 3 125m
[root@master crd-controller-demo]# kubectl get pods -n tcs
NAME READY STATUS RESTARTS AGE
app-deploy-67677ddc7f-jpggn 1/1 Running 0 145m
app-deploy-67677ddc7f-zqgcl 1/1 Running 0 145m
app-deploy-test-8fb698bf7-84s8p 1/1 Running 0 126m
app-deploy-test-8fb698bf7-dtk4w 1/1 Running 0 126m
app-deploy-test-8fb698bf7-wzfj9 1/1 Running 0 126m
[root@master crd-controller-demo]# kubectl get svc -n tcs
NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
app-service ClusterIP
本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 Macolm's Blog!