Skip to content

Commit 368e18c

Browse files
feat(blog): custom k8s controller
1 parent 28ab2c6 commit 368e18c

File tree

6 files changed

+650
-0
lines changed

6 files changed

+650
-0
lines changed

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
.idea
2+
k8s-custom-controller

img.png

448 KB
Loading

img_1.png

96.5 KB
Loading

img_2.png

219 KB
Loading

informer-specific-deployment.md

Lines changed: 331 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,331 @@
1+
# Informer implementation with the NewSharedInformerFactoryWithOptions
2+
# Implementation of watch workload (in this case deployment)
3+
# Informer and websocket implementation for specific workload (deployments)
4+
# Kubernetes custom controller from scratch in go
5+
6+
```go
7+
package main
8+
9+
// websocket
10+
router.GET("/ws", func(ctx *gin.Context) {
11+
deployment.HandleDeploymentLogs(ctx.Writer, ctx.Request)
12+
})
13+
14+
```
15+
16+
```go
17+
18+
package deployment
19+
20+
import (
21+
"context"
22+
"encoding/json"
23+
"fmt"
24+
"github.com/gorilla/websocket"
25+
"github.com/katamyra/kubestellarUI/wds"
26+
v1 "k8s.io/api/apps/v1"
27+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
28+
"k8s.io/client-go/informers"
29+
"k8s.io/client-go/kubernetes"
30+
"k8s.io/client-go/tools/cache"
31+
"log"
32+
"net/http"
33+
"time"
34+
)
35+
36+
var upgrader = websocket.Upgrader{
37+
CheckOrigin: func(r *http.Request) bool {
38+
return true
39+
},
40+
}
41+
42+
type DeploymentUpdate struct {
43+
Timestamp string `json:"timestamp"`
44+
Message string `json:"message"`
45+
}
46+
47+
func HandleDeploymentLogs(w http.ResponseWriter, r *http.Request) {
48+
conn, err := upgrader.Upgrade(w, r, nil)
49+
if err != nil {
50+
log.Println("WebSocket Upgrade Error:", err)
51+
return
52+
}
53+
defer conn.Close()
54+
55+
namespace := r.URL.Query().Get("namespace")
56+
deploymentName := r.URL.Query().Get("deployment")
57+
58+
if namespace == "" || deploymentName == "" {
59+
if err := conn.WriteMessage(websocket.TextMessage, []byte("Error: Missing namespace or deployment name")); err != nil {
60+
log.Printf("Failed to write message: %v", err)
61+
}
62+
return
63+
}
64+
// nothing but just getting the kubeclient
65+
clientset, err := wds.GetClientSetKubeConfig()
66+
if err != nil {
67+
if err := conn.WriteMessage(websocket.TextMessage, []byte("Error: Failed to create Kubernetes clientset - "+err.Error())); err != nil {
68+
log.Printf("Failed to send WebSocket message: %v", err)
69+
}
70+
return
71+
}
72+
73+
sendInitialLogs(conn, clientset, namespace, deploymentName)
74+
75+
// Use an informer to watch the deployment
76+
watchDeploymentWithInformer(conn, clientset, namespace, deploymentName)
77+
78+
// WE ARE USING INFORMER
79+
//watchDeploymentChanges(conn, clientset, namespace, deploymentName)
80+
}
81+
82+
func sendInitialLogs(conn *websocket.Conn, clientset *kubernetes.Clientset, namespace, deploymentName string) {
83+
deployment, err := clientset.AppsV1().Deployments(namespace).Get(context.Background(), deploymentName, metav1.GetOptions{})
84+
if err != nil {
85+
if err := conn.WriteMessage(websocket.TextMessage, []byte("Error: Failed to fetch deployment - "+err.Error())); err != nil {
86+
log.Printf("Failed to send WebSocket message: %v", err)
87+
}
88+
return
89+
}
90+
91+
logs := getDeploymentLogs(deployment)
92+
for _, logLine := range logs {
93+
if err := conn.WriteMessage(websocket.TextMessage, []byte(logLine)); err != nil {
94+
log.Println("Error writing to WebSocket:", err)
95+
return
96+
}
97+
time.Sleep(200 * time.Millisecond)
98+
}
99+
}
100+
101+
func watchDeploymentWithInformer(conn *websocket.Conn, clientset *kubernetes.Clientset, namespace, deploymentName string) {
102+
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0,
103+
informers.WithNamespace(namespace),
104+
informers.WithTweakListOptions(func(options *metav1.ListOptions) {
105+
options.FieldSelector = fmt.Sprintf("metadata.name=%s", deploymentName)
106+
}),
107+
)
108+
informer := factory.Apps().V1().Deployments().Informer()
109+
110+
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
111+
UpdateFunc: func(oldObj, newObj interface{}) {
112+
oldDeployment, ok1 := oldObj.(*v1.Deployment)
113+
newDeployment, ok2 := newObj.(*v1.Deployment)
114+
if !ok1 || !ok2 || newDeployment.Name != deploymentName {
115+
return
116+
}
117+
updateHandler(conn, oldDeployment, newDeployment)
118+
},
119+
})
120+
// make(chan struct{}) creates an empty struct channel.
121+
//This channel is used to signal the informer to stop.
122+
// struct{} is a zero-memory type, meaning it doesn’t allocate memory.
123+
// We don’t need to send any actual data through this channel, just a signal.
124+
stopCh := make(chan struct{})
125+
defer close(stopCh)
126+
127+
go informer.Run(stopCh)
128+
129+
// Keep the connection open
130+
131+
//This is a blocking operation that:
132+
//
133+
//Prevents the function from exiting.
134+
// Keeps the WebSocket connection open.
135+
//💡 Why does this work?
136+
//
137+
//select {} waits indefinitely because there are no case statements.
138+
//The function never returns, so the informer keeps running.
139+
140+
/*
141+
1️⃣ Create a channel (stopCh) for stopping the informer.
142+
2️⃣ Run the informer in a goroutine (go informer.Run(stopCh)).
143+
3️⃣ Block forever (select {}) to keep the function running.
144+
4️⃣ If the function returns (not in this case), defer close(stopCh) stops the informer.
145+
*/
146+
select {}
147+
148+
}
149+
150+
func updateHandler(conn *websocket.Conn, oldDeployment, newDeployment *v1.Deployment) {
151+
var logs []DeploymentUpdate
152+
if *oldDeployment.Spec.Replicas != *newDeployment.Spec.Replicas {
153+
logs = append(logs, DeploymentUpdate{
154+
Timestamp: time.Now().Format(time.RFC3339),
155+
Message: fmt.Sprintf("Deployment %s updated - Replicas changed: %d", newDeployment.Name, *newDeployment.Spec.Replicas),
156+
})
157+
}
158+
oldImage := oldDeployment.Spec.Template.Spec.Containers[0].Image
159+
newImage := newDeployment.Spec.Template.Spec.Containers[0].Image
160+
if oldImage != newImage {
161+
logs = append(logs, DeploymentUpdate{
162+
Timestamp: time.Now().Format(time.RFC3339),
163+
Message: fmt.Sprintf("Deployment %s updated - Image changed: %s", newDeployment.Name, newImage),
164+
})
165+
}
166+
for _, logLine := range logs {
167+
jsonMessage, _ := json.Marshal(logLine)
168+
conn.WriteMessage(websocket.TextMessage, jsonMessage)
169+
}
170+
}
171+
172+
// Watches deployment changes and sends updates
173+
// Keeping it for reference - NOT USEFUL
174+
func watchDeploymentChanges(conn *websocket.Conn, clientset *kubernetes.Clientset, namespace, deploymentName string) {
175+
options := metav1.ListOptions{
176+
// remove this line it will become universal for all the deployment
177+
// it will listen for all deployment inside namespace
178+
FieldSelector: fmt.Sprintf("metadata.name=%s", deploymentName),
179+
}
180+
watcher, err := clientset.AppsV1().Deployments(namespace).Watch(context.Background(), options)
181+
if err != nil {
182+
if err := conn.WriteMessage(websocket.TextMessage, []byte("Error: Failed to watch deployment - "+err.Error())); err != nil {
183+
log.Printf("Failed to send WebSocket message: %v", err)
184+
}
185+
return
186+
}
187+
188+
defer watcher.Stop()
189+
190+
// preserving the replicas and image for next call
191+
var lastReplicas *int32
192+
var lastImage string
193+
194+
for event := range watcher.ResultChan() {
195+
deployment, ok := event.Object.(*v1.Deployment)
196+
if !ok {
197+
continue
198+
}
199+
200+
var logs []DeploymentUpdate
201+
message := fmt.Sprintf("Deployment %s changed: %s", deployment.Name, event.Type)
202+
log.Println(message)
203+
204+
if lastReplicas == nil || *lastReplicas != *deployment.Spec.Replicas {
205+
message = fmt.Sprintf("Deployment %s updated - Replicas changed: %d", deployment.Name, *deployment.Spec.Replicas)
206+
lastReplicas = deployment.Spec.Replicas
207+
logs = append(logs, DeploymentUpdate{
208+
Timestamp: time.Now().Format(time.RFC3339),
209+
Message: message,
210+
})
211+
}
212+
213+
if len(deployment.Spec.Template.Spec.Containers) > 0 {
214+
currentImage := deployment.Spec.Template.Spec.Containers[0].Image
215+
if lastImage == "" || lastImage != currentImage {
216+
message = fmt.Sprintf("Deployment %s updated - Image changed: %s", deployment.Name, currentImage)
217+
logs = append(logs, DeploymentUpdate{
218+
Timestamp: time.Now().Format(time.RFC3339),
219+
Message: message,
220+
})
221+
lastImage = currentImage
222+
}
223+
}
224+
225+
for _, logLine := range logs {
226+
jsonMessage, _ := json.Marshal(logLine)
227+
if err := conn.WriteMessage(websocket.TextMessage, jsonMessage); err != nil {
228+
log.Println("Error writing to WebSocket:", err)
229+
return
230+
}
231+
}
232+
}
233+
}
234+
235+
func getDeploymentLogs(deployment *v1.Deployment) []string {
236+
baseTime := time.Now().Format(time.RFC3339)
237+
238+
replicas := int32(1)
239+
if deployment.Spec.Replicas != nil {
240+
replicas = *deployment.Spec.Replicas
241+
}
242+
243+
logs := []string{
244+
fmt.Sprintf("[%v] INFO: Deployment workload %v initiated ", baseTime, deployment.Name),
245+
fmt.Sprintf("[%v] INFO: Workload created with replicas: %d, image: %v ", baseTime, replicas, deployment.Spec.Template.Spec.Containers[0].Image),
246+
fmt.Sprintf("[%v] INFO: Namespace %v successfully updated ", baseTime, deployment.Namespace),
247+
fmt.Sprintf("[%v] INFO: Available Replicas: %d ", baseTime, deployment.Status.AvailableReplicas),
248+
}
249+
250+
// Check if Conditions slice has elements before accessing it
251+
if len(deployment.Status.Conditions) > 0 {
252+
condition := deployment.Status.Conditions[0]
253+
logs = append(logs,
254+
fmt.Sprintf("[%v] INFO: Conditions: %s ", baseTime, condition.Type),
255+
fmt.Sprintf("[%v] INFO: LastUpdateTime : %s ", baseTime, condition.LastUpdateTime.Time),
256+
fmt.Sprintf("[%v] INFO: LastTransitionTime : %s ", baseTime, condition.LastTransitionTime.Time),
257+
fmt.Sprintf("[%v] INFO: Message: %s ", baseTime, condition.Message),
258+
)
259+
} else {
260+
logs = append(logs, fmt.Sprintf("[%v] INFO: No conditions available", baseTime))
261+
}
262+
263+
return logs
264+
}
265+
266+
```
267+
268+
269+
```go
270+
package wds
271+
272+
import (
273+
"fmt"
274+
"os"
275+
276+
"k8s.io/client-go/kubernetes"
277+
"k8s.io/client-go/tools/clientcmd"
278+
)
279+
280+
/*
281+
Load the KubeConfig file and return the kubernetes clientset which gives you access to play with the k8s api
282+
*/
283+
func homeDir() string {
284+
if h := os.Getenv("HOME"); h != "" {
285+
return h
286+
}
287+
return os.Getenv("USERPROFILE") // windows
288+
}
289+
func GetClientSetKubeConfig() (*kubernetes.Clientset, error) {
290+
kubeconfig := os.Getenv("KUBECONFIG")
291+
if kubeconfig == "" {
292+
if home := homeDir(); home != "" {
293+
kubeconfig = fmt.Sprintf("%s/.kube/config", home)
294+
}
295+
}
296+
297+
// Load the kubeconfig file
298+
config, err := clientcmd.LoadFromFile(kubeconfig)
299+
if err != nil {
300+
// c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to load kubeconfig"})
301+
return nil, fmt.Errorf("failed to load kubeconfig")
302+
}
303+
304+
// Use WDS1 context specifically
305+
ctxContext := config.Contexts["wds1"]
306+
if ctxContext == nil {
307+
// c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create ctxConfig"})
308+
return nil, fmt.Errorf("failed to create ctxConfig")
309+
}
310+
311+
// Create config for WDS cluster
312+
clientConfig := clientcmd.NewDefaultClientConfig(
313+
*config,
314+
&clientcmd.ConfigOverrides{
315+
CurrentContext: "wds1",
316+
},
317+
)
318+
319+
restConfig, err := clientConfig.ClientConfig()
320+
if err != nil {
321+
return nil, fmt.Errorf("failed to create restconfig")
322+
}
323+
324+
clientset, err := kubernetes.NewForConfig(restConfig)
325+
if err != nil {
326+
return nil, fmt.Errorf("failed to create Kubernetes client")
327+
}
328+
return clientset, nil
329+
}
330+
331+
```

0 commit comments

Comments
 (0)