创建utils/consts.go

  • 在pkg包下,创建一个utils目录,并在里面创建一个 consts.go 的工具文件,记录一些常量,等会编写controller的时候会用
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    package utils

    const ControllerAgentName = "app-controller"
    const WorkNum = 5
    const MaxRetry = 10

    const (
    // SuccessSynced is used as part of the Event 'reason' when a App is synced
    SuccessSynced = "Synced"
    // ErrResourceExists is used as part of the Event 'reason' when a App fails
    // to sync due to a Deployment of the same name already existing.
    ErrResourceExists = "ErrResourceExists"

    // MessageResourceExists is the message used for Events when a resource
    // fails to sync due to a Deployment already existing
    MessageResourceExists = "Resource %q already exists and is not managed by App"
    // MessageResourceSynced is the message used for an Event fired when a App
    // is synced successfully
    MessageResourceSynced = "App synced successfully"
    )

创建控制器pkg/controller/controller.go

先展示完整 controller.go 文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
package controller

import (
"context"
appcontrollerv1 "crd-controller-demo/pkg/apis/appcontroller/v1"
clientset "crd-controller-demo/pkg/generated/clientset/versioned"
"crd-controller-demo/pkg/generated/clientset/versioned/scheme"
informersv1 "crd-controller-demo/pkg/generated/informers/externalversions/appcontroller/v1"
listerv1 "crd-controller-demo/pkg/generated/listers/appcontroller/v1"
"crd-controller-demo/pkg/utils"
"fmt"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
appsinformersv1 "k8s.io/client-go/informers/apps/v1"
coreinformersv1 "k8s.io/client-go/informers/core/v1"
"k8s.io/client-go/kubernetes"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
appslisterv1 "k8s.io/client-go/listers/apps/v1"
corelisterv1 "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"k8s.io/klog/v2"
"reflect"
"time"
)

type Controller struct {
// kubeClientset kubernetes 所有内置资源的 clientset,用于操作所有内置资源
kubeClientset kubernetes.Interface
// appClientset 为 apps 资源生成的 clientset,用于操作 apps 资源
appClientset clientset.Interface

// deploymentsLister 查询本地缓存中的 deployment 资源
deploymentsLister appslisterv1.DeploymentLister
// servicesLister 查询本地缓存中的 service 资源
servicesLister corelisterv1.ServiceLister
// appsLister 查询本地缓存中的 apps 资源
appsLister listerv1.AppLister

// deploymentsSync 检查 deployments 资源,是否完成同步
deploymentsSync cache.InformerSynced
// servicesSync 检查 services 资源,是否完成同步
servicesSync cache.InformerSynced
// appsSync 检查 apps 资源,是否完成同步
appsSync cache.InformerSynced

// workqueue 队列,存储 待处理资源的key(一般是 namespace/name)
workqueue workqueue.RateLimitingInterface
// recorder 事件记录器
recorder record.EventRecorder
}

func NewController(kubeclientset kubernetes.Interface,
appclientset clientset.Interface,
deploymentInformer appsinformersv1.DeploymentInformer,
serviceInformer coreinformersv1.ServiceInformer,
appInformer informersv1.AppInformer) *Controller {

// 将 为apps资源生成的clientset的Scheme,添加到全局 Scheme 中
utilruntime.Must(scheme.AddToScheme(scheme.Scheme))

klog.V(4).Info("Creating event broadcaster")
// 新建一个事件广播器,用于将事件广播到不同的监听器
eventBroadcaster := record.NewBroadcaster()
// 将事件以结构化日志的形式输出
eventBroadcaster.StartStructuredLogging(0)
// 将事件广播器配置为将事件记录到指定的 EventSink
eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
// 创建一个事件记录器,用于发送事件到设置好的事件广播
recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: utils.ControllerAgentName})

// 创建一个 Controller 对象
c := &Controller{
kubeClientset: kubeclientset,
appClientset: appclientset,
deploymentsLister: deploymentInformer.Lister(),
servicesLister: serviceInformer.Lister(),
appsLister: appInformer.Lister(),
deploymentsSync: deploymentInformer.Informer().HasSynced,
servicesSync: serviceInformer.Informer().HasSynced,
appsSync: appInformer.Informer().HasSynced,
workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Apps"),
recorder: recorder,
}

// 为AppInformer,设置 ResourceEventHandler
klog.Info("Setting up event handlers")
appInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: c.AddApp,
UpdateFunc: c.UpdateApp,
})

// 为 DeploymentInformer,设置 ResourceEventHandler
deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.DeleteDeployment,
})

// 为 ServiceInformer,设置 ResourceEventHandler
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
DeleteFunc: c.DeleteService,
})

// 将控制器实例返回
return c
}

func (c *Controller) enqueue(obj interface{}) {
key, err := cache.MetaNamespaceKeyFunc(obj)
if err != nil {
utilruntime.HandleError(err)
return
}
c.workqueue.Add(key)
}

func (c *Controller) AddApp(obj interface{}) {
c.enqueue(obj)
}

func (c *Controller) UpdateApp(oldObj, newObj interface{}) {
if reflect.DeepEqual(oldObj, newObj) {
key, _ := cache.MetaNamespaceKeyFunc(oldObj)
klog.V(4).Infof("UpdateApp %s: %s", key, "no change")
return
}
c.enqueue(newObj)
}

func (c *Controller) DeleteDeployment(obj interface{}) {
deploy := obj.(*appsv1.Deployment)
ownerReference := metav1.GetControllerOf(deploy)
if ownerReference == nil || ownerReference.Kind != "App" {
return
}
c.enqueue(obj)
}

func (c *Controller) DeleteService(obj interface{}) {
service := obj.(*corev1.Service)
ownerReference := metav1.GetControllerOf(service)
if ownerReference == nil || ownerReference.Kind != "App" {
return
}
c.enqueue(obj)
}

func (c *Controller) Run(workerNum int, stopCh <-chan struct{}) error {
// 用于处理程序崩溃,发生未捕获的异常(panic)时,调用HandleCrash()方法,记录日志并发出报告
defer utilruntime.HandleCrash()
// 控制器程序结束时,清理队列
defer c.workqueue.ShutDown()

klog.V(4).Info("Starting App Controller")

klog.V(4).Info("Waiting for informer cache to sync")
if ok := cache.WaitForCacheSync(stopCh, c.appsSync, c.deploymentsSync, c.servicesSync); !ok {
return fmt.Errorf("failed to wait for caches to sync")
}

klog.V(4).Info("Starting workers")
for i := 0; i < workerNum; i++ {
go wait.Until(c.worker, time.Minute, stopCh)
}

klog.V(4).Info("Started workers")
<-stopCh
klog.V(4).Info("Shutting down workers")

return nil
}

func (c *Controller) worker() {
for c.processNextWorkItem() {
}
}

func (c *Controller) processNextWorkItem() bool {
// 从 workqueue 中获取一个item
item, shutdown := c.workqueue.Get()
// 如果队列已经被回收,返回false
if shutdown {
return false
}
// 最终将这个item标记为已处理
defer c.workqueue.Done(item)
// 将item转成key
key, ok := item.(string)
if !ok {
klog.Warningf("failed convert item [%s] to string", item)
c.workqueue.Forget(item)
return true
}
// 对key这个App,进行具体的调谐。这里面是核心的调谐逻辑
if err := c.syncApp(key); err != nil {
klog.Errorf("failed to syncApp [%s], error: [%s]", key, err.Error())
c.handleError(key, err)
}
return true
}

func (c *Controller) syncApp(key string) error {
// 将key拆分成namespace、name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

// 从informer缓存中,获取到key对应的app对象
app, err := c.appsLister.Apps(namespace).Get(name)
if err != nil {
if errors.IsNotFound(err) {
return fmt.Errorf("app [%s] in work queue no longer exists", key)
}
return err
}

// 取出 app 对象 的 deploymentSpec 部分
deploymentTemplate := app.Spec.DeploymentSpec
// 如果 app 的 deploymentTemplate 不为空
if deploymentTemplate.Name != "" {
// 尝试从缓存获取 对应的 deployment
deploy, err := c.deploymentsLister.Deployments(namespace).Get(deploymentTemplate.Name)
if err != nil {
// 如果没找到
if errors.IsNotFound(err) {
klog.V(4).Info("starting to create deployment [%s] in namespace [%s]", deploymentTemplate.Name, namespace)
// 创建一个deployment对象,然后使用 kubeClientset,与apiserver交互,创建deployment
deploy = newDeployment(deploymentTemplate, app)
_, err := c.kubeClientset.AppsV1().Deployments(namespace).Create(context.TODO(), deploy, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create deployment [%s] in namespace [%s], error: [%v]", deploymentTemplate.Name, namespace, err)
}
// 创建完成后,从apiserver中,获取最新的deployment,因为下面要使用它的status.【这里不能从informer缓存获取,因为缓存里暂时未同步新创建的deployment】
deploy, _ = c.kubeClientset.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentTemplate.Name, metav1.GetOptions{})
} else {
return fmt.Errorf("failed to get deployment [%s] in namespace [%s], error: [%v]", deploy.Name, deploy.Namespace, err)
}
}
// 如果获取到的 deployment,并非 app 所控制,报错
if !metav1.IsControlledBy(deploy, app) {
msg := fmt.Sprintf(utils.MessageResourceExists, deploy.Name)
c.recorder.Event(app, corev1.EventTypeWarning, utils.ErrResourceExists, msg)
return fmt.Errorf("%s", msg)
}
// update deploy status
app.Status.DeploymentStatus = &deploy.Status
}

// 取出 app 对象 的 deploymentSpec 部分
serviceTemplate := app.Spec.ServiceSpec
// 如果 app 的 serviceTemplate 不为空
if serviceTemplate.Name != "" {
// 尝试从缓存获取 对应的 service
service, err := c.servicesLister.Services(namespace).Get(serviceTemplate.Name)
if err != nil {
// 如果没找到
if errors.IsNotFound(err) {
klog.V(4).Info("starting to create service [%s] in namespace [%s]", serviceTemplate.Name, namespace)
// 创建一个service对象,然后使用 kubeClientset,与apiserver交互,创建service
service = newService(serviceTemplate, app)
_, err := c.kubeClientset.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
if err != nil {
return fmt.Errorf("failed to create service [%s] in namespace [%s], error: [%v]", serviceTemplate.Name, namespace, err)
}
// 创建完成后,从apiserver中,获取最新的service,因为下面要使用它的status.【这里不能从informer缓存获取,因为缓存里暂时未同步新创建的service】
service, _ = c.kubeClientset.CoreV1().Services(namespace).Get(context.TODO(), serviceTemplate.Name, metav1.GetOptions{})
} else {
return fmt.Errorf("failed to get service [%s] in namespace [%s], error: [%v]", service.Name, service.Namespace, err)
}
}
// 如果获取到的 service,并非 app 所控制,报错
if !metav1.IsControlledBy(service, app) {
msg := fmt.Sprintf(utils.MessageResourceExists, service.Name)
c.recorder.Event(app, corev1.EventTypeWarning, utils.ErrResourceExists, msg)
return fmt.Errorf("%s", msg)
}
// update service status
app.Status.ServiceStatus = &service.Status
}

// 处理完 deploymentSpec、serviceSpec,将设置好的AppStatus更新到环境中去
_, err = c.appClientset.AppcontrollerV1().Apps(namespace).Update(context.TODO(), app, metav1.UpdateOptions{})
if err != nil {
return fmt.Errorf("failed to update app [%s], error: [%v]", key, err)
}

// 记录事件日志
c.recorder.Event(app, corev1.EventTypeNormal, utils.SuccessSynced, utils.MessageResourceSynced)

return nil
}

// newDeployment 创建一个deployment对象
func newDeployment(template appcontrollerv1.DeploymentTemplate, app *appcontrollerv1.App) *appsv1.Deployment {
d := &appsv1.Deployment{
TypeMeta: metav1.TypeMeta{
Kind: "Deployment",
APIVersion: "apps/v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: template.Name,
},
Spec: appsv1.DeploymentSpec{
// Selector 和 pod 的 Labels 必须一致
Selector: &metav1.LabelSelector{
MatchLabels: map[string]string{
"app-key": "app-value",
},
},
Replicas: &template.Replicas,
Template: corev1.PodTemplateSpec{
// pod 的 labels,没有让用户指定,这里设置成默认的
ObjectMeta: metav1.ObjectMeta{
Labels: map[string]string{
"app-key": "app-value",
},
},
Spec: corev1.PodSpec{
Containers: []corev1.Container{
{
Name: "app-deploy-container",
Image: template.Image,
},
},
},
},
},
}
// 将 deploy 的 OwnerReferences,设置成app
d.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(app, appcontrollerv1.SchemeGroupVersion.WithKind("App")),
}
return d
}

func newService(template appcontrollerv1.ServiceTemplate, app *appcontrollerv1.App) *corev1.Service {
s := &corev1.Service{
TypeMeta: metav1.TypeMeta{
Kind: "Service",
APIVersion: "v1",
},
ObjectMeta: metav1.ObjectMeta{
Name: template.Name,
},
Spec: corev1.ServiceSpec{
// Selector 和 pod 的 Labels 必须一致
Selector: map[string]string{
"app-key": "app-value",
},
Ports: []corev1.ServicePort{
{
Name: "app-service",
// Service的端口,默认设置成了8080。这里仅仅是为了学习crd,实际开发中可以设置到AppSpec中去
Port: 8080,
},
},
},
}

s.OwnerReferences = []metav1.OwnerReference{
*metav1.NewControllerRef(app, appcontrollerv1.SchemeGroupVersion.WithKind("App")),
}
return s
}

func (c *Controller) handleError(key string, err error) {
// 如果当前key的处理次数,还不到最大重试次数,则再次加入队列
if c.workqueue.NumRequeues(key) < utils.MaxRetry {
c.workqueue.AddRateLimited(key)
return
}

// 运行时统一处理错误
utilruntime.HandleError(err)
// 不再处理这个key
c.workqueue.Forget(key)
}

Controller结构体详解

  • 创建一个Controller结构体,我们的控制器,需要操作的资源有 App、Deployment、Service,因此Controller中需要包括一下几部分:

    • kubernetes 的 clientset:用于从apiserver获取最新的deployment、service信息
    • app 的 clientset:用于从apiserver获取最新的apps信息,这个clientset是我们使用code-generator自动生成的
    • dedployment 的 Lister:用于从informer的缓存中获取deployment的信息,避免与apiserver交互的太频繁
    • service 的 Lister:用于从informer的缓存中获取service的信息,避免与apiserver交互的太频繁
    • app 的 Lister:用于从informer的缓存中获取app的信息,避免与apiserver交互的太频繁
    • dedployment 的 HasSynced:用于检查 deployments 资源,是否完成同步
    • service 的 HasSynced:检查 services 资源,是否完成同步
    • app 的 HasSynced:检查 apps 资源,是否完成同步
    • workqueue:存储 待处理资源的key(一般是 namespace/name)的队列
    • recorder:事件记录器,用于记录事件,可以被 kubectl get event 获取到
  • Controller结构体

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    type Controller struct {
    // kubeClientset kubernetes 所有内置资源的 clientset,用于操作所有内置资源
    kubeClientset kubernetes.Interface
    // appClientset 为 apps 资源生成的 clientset,用于操作 apps 资源
    appClientset clientset.Interface

    // deploymentsLister 查询本地缓存中的 deployment 资源
    deploymentsLister appslisterv1.DeploymentLister
    // servicesLister 查询本地缓存中的 service 资源
    servicesLister corelisterv1.ServiceLister
    // appsLister 查询本地缓存中的 apps 资源
    appsLister listerv1.AppLister

    // deploymentsSync 检查 deployments 资源,是否完成同步
    deploymentsSync cache.InformerSynced
    // servicesSync 检查 services 资源,是否完成同步
    servicesSync cache.InformerSynced
    // appsSync 检查 apps 资源,是否完成同步
    appsSync cache.InformerSynced

    // workqueue 队列,存储 待处理资源的key(一般是 namespace/name)
    workqueue workqueue.RateLimitingInterface
    // recorder 事件记录器
    recorder record.EventRecorder
    }
  • 提供一个 NewController 的方法

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    func NewController(kubeclientset kubernetes.Interface,
    appclientset clientset.Interface,
    deploymentInformer appsinformersv1.DeploymentInformer,
    serviceInformer coreinformersv1.ServiceInformer,
    appInformer informersv1.AppInformer) *Controller {

    // 将 为apps资源生成的clientset的Scheme,添加到全局 Scheme 中
    utilruntime.Must(scheme.AddToScheme(scheme.Scheme))

    klog.V(4).Info("Creating event broadcaster")
    // 新建一个事件广播器,用于将事件广播到不同的监听器
    eventBroadcaster := record.NewBroadcaster()
    // 将事件以结构化日志的形式输出
    eventBroadcaster.StartStructuredLogging(0)
    // 将事件广播器配置为将事件记录到指定的 EventSink
    eventBroadcaster.StartRecordingToSink(&typedcorev1.EventSinkImpl{Interface: kubeclientset.CoreV1().Events("")})
    // 创建一个事件记录器,用于发送事件到设置好的事件广播
    recorder := eventBroadcaster.NewRecorder(scheme.Scheme, corev1.EventSource{Component: utils.ControllerAgentName})

    // 创建一个 Controller 对象
    c := &Controller{
    kubeClientset: kubeclientset,
    appClientset: appclientset,
    deploymentsLister: deploymentInformer.Lister(),
    servicesLister: serviceInformer.Lister(),
    appsLister: appInformer.Lister(),
    deploymentsSync: deploymentInformer.Informer().HasSynced,
    servicesSync: serviceInformer.Informer().HasSynced,
    appsSync: appInformer.Informer().HasSynced,
    workqueue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "Apps"),
    recorder: recorder,
    }

    // 为AppInformer,设置 ResourceEventHandler
    klog.Info("Setting up event handlers")
    appInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    AddFunc: c.AddApp,
    UpdateFunc: c.UpdateApp,
    })

    // 为 DeploymentInformer,设置 ResourceEventHandler
    deploymentInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    DeleteFunc: c.DeleteDeployment,
    })

    // 为 ServiceInformer,设置 ResourceEventHandler
    serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
    DeleteFunc: c.DeleteService,
    })

    // 将控制器实例返回
    return c
    }

Controller中的ResourceEventHandler处理方法详解

  • 共有4个 ResourceEventHandler,方法内容如下:
    • 这4个 ResourceEventHandler 方法的主要作用,就是将 待调谐的资源key,放入workqueue
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      func (c *Controller) enqueue(obj interface{}) {
      key, err := cache.MetaNamespaceKeyFunc(obj)
      if err != nil {
      utilruntime.HandleError(err)
      return
      }
      c.workqueue.Add(key)
      }

      func (c *Controller) AddApp(obj interface{}) {
      c.enqueue(obj)
      }

      func (c *Controller) UpdateApp(oldObj, newObj interface{}) {
      if reflect.DeepEqual(oldObj, newObj) {
      key, _ := cache.MetaNamespaceKeyFunc(oldObj)
      klog.V(4).Infof("UpdateApp %s: %s", key, "no change")
      return
      }
      c.enqueue(newObj)
      }

      func (c *Controller) DeleteDeployment(obj interface{}) {
      deploy := obj.(*appsv1.Deployment)
      ownerReference := metav1.GetControllerOf(deploy)
      if ownerReference == nil || ownerReference.Kind != "App" {
      return
      }
      c.enqueue(obj)
      }

      func (c *Controller) DeleteService(obj interface{}) {
      service := obj.(*corev1.Service)
      ownerReference := metav1.GetControllerOf(service)
      if ownerReference == nil || ownerReference.Kind != "App" {
      return
      }
      c.enqueue(obj)
      }

Controller的启动方法Run()详解

  • 用于启动Controller,并启动 workerNum 个 worker 进行工作
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    func (c *Controller) Run(workerNum int, stopCh <-chan struct{}) error {
    // 用于处理程序崩溃,发生未捕获的异常(panic)时,调用HandleCrash()方法,记录日志并发出报告
    defer utilruntime.HandleCrash()
    // 控制器程序结束时,清理队列
    defer c.workqueue.ShutDown()

    klog.V(4).Info("Starting App Controller")

    klog.V(4).Info("Waiting for informer cache to sync")
    if ok := cache.WaitForCacheSync(stopCh, c.appsSync, c.deploymentsSync, c.servicesSync); !ok {
    return fmt.Errorf("failed to wait for caches to sync")
    }

    klog.V(4).Info("Starting workers")
    for i := 0; i < workerNum; i++ {
    go wait.Until(c.worker, time.Minute, stopCh)
    }

    klog.V(4).Info("Started workers")
    <-stopCh
    klog.V(4).Info("Shutting down workers")

    return nil
    }

worker详解

  • worker是调谐的核心逻辑
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    func (c *Controller) worker() {
    for c.processNextWorkItem() {
    }
    }

    func (c *Controller) processNextWorkItem() bool {
    // 从 workqueue 中获取一个item
    item, shutdown := c.workqueue.Get()
    // 如果队列已经被回收,返回false
    if shutdown {
    return false
    }

    // 最终将这个item标记为已处理
    defer c.workqueue.Done(item)

    // 将item转成key
    key, ok := item.(string)
    if !ok {
    klog.Warningf("failed convert item [%s] to string", item)
    c.workqueue.Forget(item)
    return true
    }

    // 对key这个App,进行具体的调谐。这里面是核心的调谐逻辑
    if err := c.syncApp(key); err != nil {
    klog.Errorf("failed to syncApp [%s], error: [%s]", key, err.Error())
    c.handleError(key, err)
    }

    return true

    }

    // syncApp 对App资源的调谐核心逻辑
    func (c *Controller) syncApp(key string) error {
    // 将key拆分成namespace、name
    namespace, name, err := cache.SplitMetaNamespaceKey(key)
    if err != nil {
    return err
    }

    // 从informer缓存中,获取到key对应的app对象
    app, err := c.appsLister.Apps(namespace).Get(name)
    if err != nil {
    if errors.IsNotFound(err) {
    return fmt.Errorf("app [%s] in work queue no longer exists", key)
    }
    return err
    }

    // 取出 app 对象 的 deploymentSpec 部分
    deploymentTemplate := app.Spec.DeploymentSpec
    // 如果 app 的 deploymentTemplate 不为空
    if deploymentTemplate.Name != "" {
    // 尝试从缓存获取 对应的 deployment
    deploy, err := c.deploymentsLister.Deployments(namespace).Get(deploymentTemplate.Name)
    if err != nil {
    // 如果没找到
    if errors.IsNotFound(err) {
    klog.V(4).Info("starting to create deployment [%s] in namespace [%s]", deploymentTemplate.Name, namespace)
    // 创建一个deployment对象,然后使用 kubeClientset,与apiserver交互,创建deployment
    deploy = newDeployment(deploymentTemplate, app)
    _, err := c.kubeClientset.AppsV1().Deployments(namespace).Create(context.TODO(), deploy, metav1.CreateOptions{})
    if err != nil {
    return fmt.Errorf("failed to create deployment [%s] in namespace [%s], error: [%v]", deploymentTemplate.Name, namespace, err)
    }
    // 创建完成后,从apiserver中,获取最新的deployment,因为下面要使用它的status.【这里不能从informer缓存获取,因为缓存里暂时未同步新创建的deployment】
    deploy, _ = c.kubeClientset.AppsV1().Deployments(namespace).Get(context.TODO(), deploymentTemplate.Name, metav1.GetOptions{})
    } else {
    return fmt.Errorf("failed to get deployment [%s] in namespace [%s], error: [%v]", deploy.Name, deploy.Namespace, err)
    }
    }
    // 如果获取到的 deployment,并非 app 所控制,报错
    if !metav1.IsControlledBy(deploy, app) {
    msg := fmt.Sprintf(utils.MessageResourceExists, deploy.Name)
    c.recorder.Event(app, corev1.EventTypeWarning, utils.ErrResourceExists, msg)
    return fmt.Errorf("%s", msg)
    }
    // update deploy status
    app.Status.DeploymentStatus = &deploy.Status
    }

    // 取出 app 对象 的 deploymentSpec 部分
    serviceTemplate := app.Spec.ServiceSpec
    // 如果 app 的 serviceTemplate 不为空
    if serviceTemplate.Name != "" {
    // 尝试从缓存获取 对应的 service
    service, err := c.servicesLister.Services(namespace).Get(serviceTemplate.Name)
    if err != nil {
    // 如果没找到
    if errors.IsNotFound(err) {
    klog.V(4).Info("starting to create service [%s] in namespace [%s]", serviceTemplate.Name, namespace)
    // 创建一个service对象,然后使用 kubeClientset,与apiserver交互,创建service
    service = newService(serviceTemplate, app)
    _, err := c.kubeClientset.CoreV1().Services(namespace).Create(context.TODO(), service, metav1.CreateOptions{})
    if err != nil {
    return fmt.Errorf("failed to create service [%s] in namespace [%s], error: [%v]", serviceTemplate.Name, namespace, err)
    }
    // 创建完成后,从apiserver中,获取最新的service,因为下面要使用它的status.【这里不能从informer缓存获取,因为缓存里暂时未同步新创建的service】
    service, _ = c.kubeClientset.CoreV1().Services(namespace).Get(context.TODO(), serviceTemplate.Name, metav1.GetOptions{})
    } else {
    return fmt.Errorf("failed to get service [%s] in namespace [%s], error: [%v]", service.Name, service.Namespace, err)
    }
    }
    // 如果获取到的 service,并非 app 所控制,报错
    if !metav1.IsControlledBy(service, app) {
    msg := fmt.Sprintf(utils.MessageResourceExists, service.Name)
    c.recorder.Event(app, corev1.EventTypeWarning, utils.ErrResourceExists, msg)
    return fmt.Errorf("%s", msg)
    }
    // update service status
    app.Status.ServiceStatus = &service.Status
    }

    // 处理完 deploymentSpec、serviceSpec,将设置好的AppStatus更新到环境中去
    _, err = c.appClientset.AppcontrollerV1().Apps(namespace).Update(context.TODO(), app, metav1.UpdateOptions{})
    if err != nil {
    return fmt.Errorf("failed to update app [%s], error: [%v]", key, err)
    }

    // 记录事件日志
    c.recorder.Event(app, corev1.EventTypeNormal, utils.SuccessSynced, utils.MessageResourceSynced)

    return nil
    }

创建deployemnt、service的方法详解

  • 根据AppSpec中用户编写的信息,创建对应的deployment和service
  • 这里只在AppSpec中添加了几个简单的信息,所以很多信息都是写死在代码里的,大家可以根据需要,更新AppSpec,这样创建的时候,就可以有更多的信息由用户指定
  • 更新完AppSpec后,记得重新执行 第3部分 中的命令,重新生成deepcopy文件、generated、crd文件
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    // newDeployment 创建一个deployment对象
    func newDeployment(template appcontrollerv1.DeploymentTemplate, app *appcontrollerv1.App) *appsv1.Deployment {
    d := &appsv1.Deployment{
    TypeMeta: metav1.TypeMeta{
    Kind: "Deployment",
    APIVersion: "apps/v1",
    },
    ObjectMeta: metav1.ObjectMeta{
    Name: template.Name,
    },
    Spec: appsv1.DeploymentSpec{
    // Selector 和 pod 的 Labels 必须一致
    Selector: &metav1.LabelSelector{
    MatchLabels: map[string]string{
    "app-key": "app-value",
    },
    },
    Replicas: &template.Replicas,
    Template: corev1.PodTemplateSpec{
    // pod 的 labels,没有让用户指定,这里设置成默认的
    ObjectMeta: metav1.ObjectMeta{
    Labels: map[string]string{
    "app-key": "app-value",
    },
    },
    Spec: corev1.PodSpec{
    Containers: []corev1.Container{
    {
    Name: "app-deploy-container",
    Image: template.Image,
    },
    },
    },
    },
    },
    }
    // 将 deploy 的 OwnerReferences,设置成app
    d.OwnerReferences = []metav1.OwnerReference{
    *metav1.NewControllerRef(app, appcontrollerv1.SchemeGroupVersion.WithKind("App")),
    }
    return d
    }

    func newService(template appcontrollerv1.ServiceTemplate, app *appcontrollerv1.App) *corev1.Service {
    s := &corev1.Service{
    TypeMeta: metav1.TypeMeta{
    Kind: "Service",
    APIVersion: "v1",
    },
    ObjectMeta: metav1.ObjectMeta{
    Name: template.Name,
    },
    Spec: corev1.ServiceSpec{
    // Selector 和 pod 的 Labels 必须一致
    Selector: map[string]string{
    "app-key": "app-value",
    },
    Ports: []corev1.ServicePort{
    {
    Name: "app-service",
    // Service的端口,默认设置成了8080。这里仅仅是为了学习crd,实际开发中可以设置到AppSpec中去
    Port: 8080,
    },
    },
    },
    }

    s.OwnerReferences = []metav1.OwnerReference{
    *metav1.NewControllerRef(app, appcontrollerv1.SchemeGroupVersion.WithKind("App")),
    }
    return s
    }

编写main函数,启动控制器

编写cmd/main.go

  • 编写main方法,创建Controller对象,并启动,同时做好优雅停止设计
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    package main

    import (
    "crd-controller-demo/pkg/controller"
    clientset "crd-controller-demo/pkg/generated/clientset/versioned"
    appinformers "crd-controller-demo/pkg/generated/informers/externalversions"
    "crd-controller-demo/pkg/signals"
    "crd-controller-demo/pkg/utils"
    "flag"
    kubeinformers "k8s.io/client-go/informers"
    "k8s.io/client-go/kubernetes"
    "k8s.io/client-go/tools/clientcmd"
    "time"

    "k8s.io/klog/v2"
    )

    var (
    masterURL string
    kubeConfig string
    )

    func main() {
    klog.InitFlags(nil)
    flag.Parse()

    // set up signals so we handle the first shutdown signal gracefully
    stopCh := signals.SetupSignalHandler()

    config, err := clientcmd.BuildConfigFromFlags(masterURL, kubeConfig)
    //config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
    if err != nil {
    klog.Fatalf("Error building kubeConfig: %s", err.Error())
    }

    kubeClientSet, err := kubernetes.NewForConfig(config)
    if err != nil {
    klog.Fatalf("Error building kubernetes clientset: %s", err.Error())
    }

    appClientSet, err := clientset.NewForConfig(config)
    if err != nil {
    klog.Fatalf("Error building app clientset: %s", err.Error())
    }

    kubeInformerFactory := kubeinformers.NewSharedInformerFactory(kubeClientSet, time.Second*30)
    appInformerFactory := appinformers.NewSharedInformerFactory(appClientSet, time.Second*30)

    controller := controller.NewController(kubeClientSet, appClientSet,
    kubeInformerFactory.Apps().V1().Deployments(),
    kubeInformerFactory.Core().V1().Services(),
    appInformerFactory.Appcontroller().V1().Apps())

    // notice that there is no need to run Start methods in a separate goroutine. (i.e. go kubeInformerFactory.Start(stopCh)
    // Start method is non-blocking and runs all registered informers in a dedicated goroutine.
    kubeInformerFactory.Start(stopCh)
    appInformerFactory.Start(stopCh)

    if err = controller.Run(utils.WorkNum, stopCh); err != nil {
    klog.Fatalf("Error running controller: %s", err.Error())
    }
    }

    func init() {
    flag.StringVar(&kubeConfig, "kubeConfig", "", "Path to a kubeConfig. Only required if out-of-cluster.")
    flag.StringVar(&masterURL, "master", "", "The address of the Kubernetes API server. Overrides any value in kubeConfig. Only required if out-of-cluster.")
    }

pkg/signals包

  • 在pkg下创建signal包,并创建3个文件,用于实现 程序 的优雅停止

编写signal.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package signals

import (
"os"
"os/signal"
)

var onlyOneSignalHandler = make(chan struct{})

// SetupSignalHandler registered for SIGTERM and SIGINT. A stop channel is returned
// which is closed on one of these signals. If a second signal is caught, the program
// is terminated with exit code 1.
func SetupSignalHandler() (stopCh <-chan struct{}) {
close(onlyOneSignalHandler) // panics when called twice

stop := make(chan struct{})
c := make(chan os.Signal, 2)
signal.Notify(c, shutdownSignals...)
go func() {
<-c
close(stop)
<-c
os.Exit(1) // second signal. Exit directly.
}()

return stop
}

编写signal_posix.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//go:build !windows
// +build !windows

/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package signals

import (
"os"
"syscall"
)

var shutdownSignals = []os.Signal{os.Interrupt, syscall.SIGTERM}

编写signal_windows.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
/*
Copyright 2017 The Kubernetes Authors.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package signals

import (
"os"
)

var shutdownSignals = []os.Signal{os.Interrupt}

编写测试yaml文件

  • 在artifacts包下,创建一个example包,里面创建两个测试文件
  • test_app.yaml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    apiVersion: appcontroller.k8s.io/v1
    kind: App
    metadata:
    name: test-app
    namespace: tcs
    spec:
    deploymentTemplate:
    name: app-deploy
    image: nginx
    replicas: 2
    serviceTemplate:
    name: app-service
  • test_app_2.yaml
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    apiVersion: appcontroller.k8s.io/v1
    kind: App
    metadata:
    name: test-app-2
    namespace: tcs
    spec:
    deploymentTemplate:
    name: app-deploy-test
    image: tomcat
    replicas: 3
    serviceTemplate:
    name: app-service-test

Create CRD && Test

  • 在kubernetes集群中,创建CRD资源

    1
    2
    cd crd-controller-demo
    kubectl apply -f artifacts/crd/appcontroller.k8s.io_apps.yaml
  • 启动控制器

    • 执行 go run main.go
    • 可以使用命令行参数指定masterIP、configPath
  • 然后创建两个App资源,查看情况

    1
    2
    3
    cd crd-controller-demo
    kubectl apply -f artifacts/example/test_app.yaml
    kubectl apply -f artifacts/example/test_app_2.yaml
  • 查看deployment和service的创建情况

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    [root@master crd-controller-demo]# kubectl get deploy -n tcs
    NAME READY UP-TO-DATE AVAILABLE AGE
    app-deploy 2/2 2 2 145m
    app-deploy-test 3/3 3 3 125m

    [root@master crd-controller-demo]# kubectl get pods -n tcs
    NAME READY STATUS RESTARTS AGE
    app-deploy-67677ddc7f-jpggn 1/1 Running 0 145m
    app-deploy-67677ddc7f-zqgcl 1/1 Running 0 145m
    app-deploy-test-8fb698bf7-84s8p 1/1 Running 0 126m
    app-deploy-test-8fb698bf7-dtk4w 1/1 Running 0 126m
    app-deploy-test-8fb698bf7-wzfj9 1/1 Running 0 126m

    [root@master crd-controller-demo]# kubectl get svc -n tcs
    NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
    app-service ClusterIP