需求说明

  • 我们希望实现这个效果:
    • 创建或更新Service的时候,如果这个Service的Annotations中,包含了”ingress/http:true”,那么在创建或更新这个Service的时候,会自动为它创建一个Ingress。
    • 删除Service的时候,如果这个Service的Annotations中,包含了”ingress/http:true”,那么同时也要删除它的 Ingress。

需求分析:

这个效果需要编写三个事件处理方法,addService、updateService、deleteIngress

  • addService/updateService:用户创建或更新service的时候,kubernetes的ServiceController已经完成service的创建或更新了。我们要做的是拿到已存在的service对象,看是否包含 “ingress/http:true”。如果包含,则保证有一个对应的ingress;如果不包含,则保证不能有对应的ingress
  • deleteIngress:用于删除ingress的时候,也触发addService/updateService一样的逻辑,保证service和ingress的对应关系是正确的。
  • 可能会疑问,为什么没有 deleteService?
    • 因为我们会使用 OwnerReferences 将service+ingress关联起来。因此删除service,会由kubernetes的ControllerManager中的特殊Controller,自动完成ingress的gc,所以删除service时我们无需特殊处理。

代码编写

main.go 代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
package main

import (
"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
"log"
"share-code-operator-study/addingress/pkg"
)

func main() {
// 创建一个 集群客户端配置
config, err := clientcmd.BuildConfigFromFlags("", clientcmd.RecommendedHomeFile)
if err != nil {
inClusterConfig, err := rest.InClusterConfig()
if err != nil {
log.Fatalln("can't get config")
}
config = inClusterConfig
}

// 创建一个 clientset 客户端,用于创建 informerFactory
clientset, err := kubernetes.NewForConfig(config)
if err != nil {
panic(err.Error())
}

// 创建一个 informerFactory
factory := informers.NewSharedInformerFactory(clientset, 0)
// 使用 informerFactory 创建Services资源的 informer对象
serviceInformer := factory.Core().V1().Services()
// 使用 informerFactory 创建Ingresses资源的 informer对象
ingressInformer := factory.Networking().V1().Ingresses()

// 创建一个自定义控制器
controller := pkg.NewController(clientset, serviceInformer, ingressInformer)

// 创建 停止channel信号
stopCh := make(chan struct{})
// 启动 informerFactory,会启动已经创建的 serviceInformer、ingressInformer
factory.Start(stopCh)
// 等待 所有informer 从 etcd 实现全量同步
factory.WaitForCacheSync(stopCh)

// 启动自定义控制器
controller.Run(stopCh)
}

controller.go文件如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package pkg

import (
"context"
corev1 "k8s.io/api/core/v1"
netv1 "k8s.io/api/networking/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/apimachinery/pkg/util/wait"
informercorev1 "k8s.io/client-go/informers/core/v1"
informernetv1 "k8s.io/client-go/informers/networking/v1"
"k8s.io/client-go/kubernetes"
listercorev1 "k8s.io/client-go/listers/core/v1"
listernetv1 "k8s.io/client-go/listers/networking/v1"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/util/workqueue"
"reflect"
"time"
)

const (
// worker 数量
workNum = 5
// service 指定 ingress 的 annotation key
annoKey = "ingress/http"
// 调谐失败的最大重试次数
maxRetry = 10
)

// 自定义控制器
type controller struct {
client kubernetes.Interface
serviceLister listercorev1.ServiceLister
ingressLister listernetv1.IngressLister
queue workqueue.RateLimitingInterface
}

// NewController 创建一个自定义控制器
func NewController(clientset *kubernetes.Clientset, serviceInformer informercorev1.ServiceInformer, ingressInformer informernetv1.IngressInformer) *controller {
// 控制器中,包含一个clientset、service和ingress的缓存监听器、一个workqueue
c := controller{
client: clientset,
serviceLister: serviceInformer.Lister(),
ingressLister: ingressInformer.Lister(),
queue: workqueue.NewNamedRateLimitingQueue(workqueue.DefaultControllerRateLimiter(), "ingressManager"),
}

// 为 serviceInformer 添加 ResourceEventHandler
serviceInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// 添加service时触发
AddFunc: c.addService,
// 修改service时触发
UpdateFunc: c.updateService,
// 这里没有删除service的逻辑,因为我们会使用 OwnerReferences 将service+ingress关联起来。
// 因此删除service,会由kubernetes的ControllerManager中的特殊Controller,自动完成ingress的gc
})

// 为 ingressInformer 添加 ResourceEventHandler
ingressInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
// 删除ingress时触发
DeleteFunc: c.deleteIngress,
})

return &c
}

// 添加service时触发
func (c *controller) addService(obj interface{}) {
// 将 添加service 的 key 加入 workqueue
c.enqueue(obj)
}

// 修改service时触发
func (c *controller) updateService(oldObj interface{}, newObj interface{}) {
// 如果两个对象一致,就无需触发修改逻辑
if reflect.DeepEqual(oldObj, newObj) {
return
}
// todo 比较annotation
// 将 修改service 的 key 加入 workqueue
c.enqueue(newObj)
}

// 删除ingress时触发
func (c *controller) deleteIngress(obj interface{}) {
// 将对象转成ingress,并获取到它的 ownerReference
ingress := obj.(*netv1.Ingress)
ownerReference := metav1.GetControllerOf(ingress)
// 如果ingress的 ownerReference 没有绑定到service,则无需处理
if ownerReference == nil || ownerReference.Kind != "Service" {
return
}
// 如果ingress的 ownerReference 已经绑定到service,则需要处理
c.enqueue(obj)
}

// enqueue 将 待添加service 的 key 加入 workqueue
func (c *controller) enqueue(obj interface{}) {
// 调用工具方法,获取 kubernetes资源对象的 key(默认是 ns/name,或 name)
key, err := cache.MetaNamespaceKeyFunc(obj)
// 获取失败,不加入队列,即本次事件不予处理
if err != nil {
runtime.HandleError(err)
return
}
// 将 key 加入 workqueue
c.queue.Add(key)
}

// dequeue 将处理完成的 key 出队
func (c *controller) dequeue(item interface{}) {
c.queue.Done(item)
}

// Run 启动controller
func (c *controller) Run(stopCh chan struct{}) {
// 启动多个worker,同时对workqueue中的事件进行处理,效率提升5倍
for i := 0; i < workNum; i++ {
// 每个worker都是一个协程,使用同一个停止信号
go wait.Until(c.worker, time.Minute, stopCh)
}
// 启动完成后,Run函数就停止在这里,等待停止信号
<-stopCh
}

// worker方法
func (c *controller) worker() {
// 死循环,worker处理完一个,再去处理下一个
for c.processNextItem() {

}
}

// processNextItem 处理下一个
func (c *controller) processNextItem() bool {
// 从 workerqueue 取出一个key
item, shutdown := c.queue.Get()
// 如果已经收到停止信号了,则返回false,worker就会停止处理
if shutdown {
return false
}
// 处理完成后,将这个key出队
defer c.dequeue(item)

// 转成string类型的key
key := item.(string)

// 处理service逻辑的核心方法
err := c.syncService(key)
// 处理过程出错,进入错误统一处理逻辑
if err != nil {
c.handleError(key, err)
}
// 处理结束,返回true
return true
}

// handleError 错误统一处理逻辑
func (c *controller) handleError(key string, err error) {
// 如果当前key的处理次数,还不到最大重试次数,则再次加入队列
if c.queue.NumRequeues(key) < maxRetry {
c.queue.AddRateLimited(key)
return
}

// 运行时统一处理错误
runtime.HandleError(err)
// 不再处理这个key
c.queue.Forget(key)
}

// syncService 处理service逻辑的核心方法
func (c *controller) syncService(key string) error {
// 将 key 切割为 ns 和 name
namespace, name, err := cache.SplitMetaNamespaceKey(key)
if err != nil {
return err
}

// 从indexer中,获取service
service, err := c.serviceLister.Services(namespace).Get(name)
// 没有service,直接返回
if errors.IsNotFound(err) {
return nil
}
if err != nil {
return err
}

// 检查service的annotation,是否包含 key: "ingress/http"
_, ok := service.Annotations[annoKey]
// 从indexer缓存中,获取ingress
ingress, err := c.ingressLister.Ingresses(namespace).Get(name)

if ok && errors.IsNotFound(err) {
// ingress不存在,但是service有"ingress/http",需要创建ingress
// 创建ingress
ig := c.createIngress(service)
// 调用controller中的client,完成ingress的创建
_, err := c.client.NetworkingV1().Ingresses(namespace).Create(context.TODO(), ig, metav1.CreateOptions{})
if err != nil {
return err
}
} else if !ok && ingress != nil {
// ingress存在,但是service没有"ingress/http",需要删除ingress
// 调用controller中的client,完成ingress的删除
err := c.client.NetworkingV1().Ingresses(namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
if err != nil {
return err
}
}

return nil
}

// createIngress 创建ingress
func (c *controller) createIngress(service *corev1.Service) *netv1.Ingress {
icn := "ingress"
pathType := netv1.PathTypePrefix
return &netv1.Ingress{
ObjectMeta: metav1.ObjectMeta{
Name: service.Name,
Namespace: service.Namespace,
OwnerReferences: []metav1.OwnerReference{
*metav1.NewControllerRef(service, corev1.SchemeGroupVersion.WithKind("Service")),
},
},
Spec: netv1.IngressSpec{
IngressClassName: &icn,
Rules: []netv1.IngressRule{
{
Host: "example.com",
IngressRuleValue: netv1.IngressRuleValue{
HTTP: &netv1.HTTPIngressRuleValue{
Paths: []netv1.HTTPIngressPath{
{
Path: "/",
PathType: &pathType,
Backend: netv1.IngressBackend{
Service: &netv1.IngressServiceBackend{
Name: service.Name,
Port: netv1.ServiceBackendPort{
Number: 80,
},
},
},
},
},
},
},
},
},
},
}
}