引言 在 Kubernetes 集群中,Informer 是一种重要的机制,用于监控和处理集群中资源对象的变化。它是基于观察者模式设计的,允许开发者注册对某类资源对象的关注,并在对象发生变化时得到通知。本文将深入介绍 Kubernetes 中的 Informer 机制,包括其设计思想、工作原理、示例和最佳实践。
Informer 是 Kubernetes 中用于监控和处理资源对象变化的框架。它建立在 Kubernetes 的客户端库 client-go 之上,提供了高级别的 API,简化了开发者对资源对象状态变化的监听和处理。
Informer 的核心思想是将对资源对象的监听和处理逻辑进行模块化,以便更容易地维护和扩展。通过 Informer,开发者可以注册关注的资源类型,并在资源状态发生变化时执行自定义的业务逻辑。 Informer 的工作原理
Informer 机制的核心工作原理主要包括以下几个步骤:
List-Watch 机制: Informer 使用 Kubernetes API 的 List-Watch 机制来获取资源对象的初始列表,并通过 Watch 机制实时接收对象的变化事件。
SharedInformerFactory: SharedInformerFactory 是 Informer 的工厂,负责创建和管理多个 SharedInformer。每个 SharedInformer 监听一种资源对象的变化。
Event Handlers: 开发者可以注册事件处理器(Event Handlers),在资源对象发生变化时触发相应的处理逻辑。Event Handlers 是 Informer 的核心扩展点,用于实现自定义的业务逻辑。
Delta FIFO Queue: 通过 Delta FIFO Queue,Informer 在收到资源对象的变化事件时,将事件推送到队列中。Event Handlers 从队列中取出事件进行处理。
Resync: Informer 支持定期的全量同步(Resync)机制,以确保本地缓存与实际状态的一致性。定期地对资源对象进行全量同步,更新本地缓存。
首先,创建一个 SharedInformerFactory 对象:
1 2 3 4 5 6 clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } informerFactory := informers.NewSharedInformerFactory(clientset, time.Second*30)
注册关注的资源类型 使用 SharedInformerFactory 注册对某一种资源类型的关注:
1 podInformer := informerFactory.Core().V1().Pods()
注册 Event Handlers 注册事件处理器,定义在资源对象发生变化时的处理逻辑:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { // 处理资源对象新增事件 pod := obj.(*corev1.Pod) fmt.Printf("Pod added: %s\n", pod.Name) }, UpdateFunc: func(oldObj, newObj interface{}) { // 处理资源对象更新事件 oldPod := oldObj.(*corev1.Pod) newPod := newObj.(*corev1.Pod) fmt.Printf("Pod updated: %s\n", newPod.Name) }, DeleteFunc: func(obj interface{}) { // 处理资源对象删除事件 pod := obj.(*corev1.Pod) fmt.Printf("Pod deleted: %s\n", pod.Name) }, })
启动 SharedInformerFactory,开始监听资源对象的变化:
1 informerFactory.Start(stopCh)
处理资源对象变化事件 在 Event Handlers 中定义的逻辑将在资源对象发生变化时被触发:
通过以上步骤,就可以使用 Informer 监听和处理 Kubernetes 集群中资源对象的变化。
下面是一个简单的示例,演示如何使用 Informer 监听 Pod 对象的变化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 package main import ( "context" "flag" "fmt" "os" "os/signal" "time" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/util/homedir" "k8s.io/client-go/util/wait" ) func main() { var kubeconfig *string if home := homedir.HomeDir(); home != "" { kubeconfig = flag.String("kubeconfig", home+"/.kube/config", "kubeconfig file") } else { kubeconfig = flag.String("kubeconfig", "", "kubeconfig file") } config, err := clientcmd.BuildConfigFromFlags("", *kubeconfig) if err != nil { panic(err.Error()) } clientset, err := kubernetes.NewForConfig(config) if err != nil { panic(err.Error()) } stopCh := make(chan struct{}) defer close(stopCh) informerFactory := cache.NewSharedInformerFactory(clientset, time.Second*30) podInformer := informerFactory.Core().V1().Pods() podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { pod := obj.(*corev1.Pod) fmt.Printf("Pod added: %s\n", pod.Name) }, UpdateFunc: func(oldObj, newObj interface{}) { oldPod := oldObj.(*corev1.Pod) newPod := newObj.(*corev1.Pod) fmt.Printf("Pod updated: %s\n", newPod.Name) }, DeleteFunc: func(obj interface{}) { pod := obj.(*corev1.Pod) fmt.Printf("Pod deleted: %s\n", pod.Name) }, }) go informerFactory.Start(stopCh) if !cache.WaitForCacheSync(stopCh, podInformer.Informer().HasSynced) { fmt.Println("Timed out waiting for caches to sync") return } fmt.Println("Informer started. Waiting for Pod events...") c := make(chan os.Signal, 1) signal.Notify(c, os.Interrupt) <-c fmt.Println("Received interrupt signal. Stopping Informer...") }
结语 Informer 机制是 Kubernetes 中强大且灵活的一部分,为开发者提供了便捷的方式监听和处理集群中资源对象的变化。通过 SharedInformerFactory 的注册和 Event Handlers 的定义,可以轻松实现对特定资源类型的关注和处理逻辑。Informer 的应用范围广泛,涉及到许多领域,包括控制器开发、自动伸缩、日志收集等。