Skip to content

Commit 5e76af4

Browse files
committed
feat: watch workloads using the Informer API
Create a map that abstracts away watched resources' details (e.g. endpoint, type, K8s listing function). For every workload, look into the map and setup Informers. The Informer is a wrapper around the K8s Watch API that helps restart killed watches and efficiently keeps track of changes to a workload. Using this API fixes the problem of Watches dying and eventually ending the kubernetes-monitor process while also notifying only on real workload changes (not just any change).
1 parent 189a512 commit 5e76af4

File tree

12 files changed

+181
-151
lines changed

12 files changed

+181
-151
lines changed

src/kube-scanner/watchers/handlers/cron-job.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
import { V1beta1CronJob } from '@kubernetes/client-node';
2-
import { WatchEventType } from '../types';
32
import { deleteWorkload } from './index';
43
import { WorkloadKind } from '../../types';
54
import { FALSY_WORKLOAD_NAME_MARKER } from './types';
65

7-
export async function cronJobWatchHandler(eventType: string, cronJob: V1beta1CronJob) {
8-
if (eventType !== WatchEventType.Deleted) {
9-
return;
10-
}
11-
6+
export async function cronJobWatchHandler(cronJob: V1beta1CronJob) {
127
if (!cronJob.metadata || !cronJob.spec || !cronJob.spec.jobTemplate.spec ||
138
!cronJob.spec.jobTemplate.metadata || !cronJob.spec.jobTemplate.spec.template.spec) {
149
// TODO(ivanstanev): possibly log this. It shouldn't happen but we should track it!

src/kube-scanner/watchers/handlers/daemon-set.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
import { V1DaemonSet } from '@kubernetes/client-node';
2-
import { WatchEventType } from '../types';
32
import { deleteWorkload } from './index';
43
import { WorkloadKind } from '../../types';
54
import { FALSY_WORKLOAD_NAME_MARKER } from './types';
65

7-
export async function daemonSetWatchHandler(eventType: string, daemonSet: V1DaemonSet) {
8-
if (eventType !== WatchEventType.Deleted) {
9-
return;
10-
}
11-
6+
export async function daemonSetWatchHandler(daemonSet: V1DaemonSet) {
127
if (!daemonSet.metadata || !daemonSet.spec || !daemonSet.spec.template.metadata ||
138
!daemonSet.spec.template.spec) {
149
// TODO(ivanstanev): possibly log this. It shouldn't happen but we should track it!

src/kube-scanner/watchers/handlers/deployment.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
import { V1Deployment } from '@kubernetes/client-node';
2-
import { WatchEventType } from '../types';
32
import { deleteWorkload } from './index';
43
import { WorkloadKind } from '../../types';
54
import { FALSY_WORKLOAD_NAME_MARKER } from './types';
65

7-
export async function deploymentWatchHandler(eventType: string, deployment: V1Deployment) {
8-
if (eventType !== WatchEventType.Deleted) {
9-
return;
10-
}
11-
6+
export async function deploymentWatchHandler(deployment: V1Deployment) {
127
if (!deployment.metadata || !deployment.spec || !deployment.spec.template.metadata ||
138
!deployment.spec.template.spec) {
149
// TODO(ivanstanev): possibly log this. It shouldn't happen but we should track it!

src/kube-scanner/watchers/handlers/index.ts

Lines changed: 111 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,117 @@
1+
import { makeInformer, ADD, DELETE, UPDATE, KubernetesObject } from '@kubernetes/client-node';
12
import logger = require('../../../common/logger');
23
import WorkloadWorker = require('../../../kube-scanner');
34
import { buildWorkloadMetadata } from '../../metadata-extractor';
4-
import { KubeObjectMetadata } from '../../types';
5+
import { KubeObjectMetadata, WorkloadKind } from '../../types';
6+
import { podWatchHandler } from './pod';
7+
import { cronJobWatchHandler } from './cron-job';
8+
import { daemonSetWatchHandler } from './daemon-set';
9+
import { deploymentWatchHandler } from './deployment';
10+
import { jobWatchHandler } from './job';
11+
import { replicaSetWatchHandler } from './replica-set';
12+
import { replicationControllerWatchHandler } from './replication-controller';
13+
import { statefulSetWatchHandler } from './stateful-set';
14+
import { k8sApi, kubeConfig } from '../../cluster';
15+
import { IWorkloadWatchMetadata, FALSY_WORKLOAD_NAME_MARKER } from './types';
16+
17+
/**
18+
* This map is used in combination with the kubernetes-client Informer API
19+
* to abstract which resources to watch, what their endpoint is, how to grab
20+
* a list of the resources, and which watch actions to handle (e.g. a newly added resource).
21+
*
22+
* The Informer API is just a wrapper around Kubernetes watches that makes sure the watch
23+
* gets restarted if it dies and it also efficiently tracks changes to the watched workloads
24+
* by comparing their resourceVersion.
25+
*
26+
* The map is keyed by the "WorkloadKind" -- the type of resource we want to watch.
27+
* Legal verbs for the "handlers" are pulled from '@kubernetes/client-node'. You can
28+
* set a different handler for every verb.
29+
* (e.g. ADD-ed workloads are processed differently than DELETE-d ones)
30+
*
31+
* The "listFunc" is a callback used by the kubernetes-client to grab the watched resource
32+
* whenever Kubernetes fires a "workload changed" event and it uses the result to figure out
33+
* if the workload actually changed (by inspecting the resourceVersion).
34+
*/
35+
const workloadWatchMetadata: Readonly<IWorkloadWatchMetadata> = {
36+
[WorkloadKind.Pod]: {
37+
endpoint: '/api/v1/namespaces/{namespace}/pods',
38+
handlers: {
39+
[ADD]: podWatchHandler,
40+
[UPDATE]: podWatchHandler,
41+
},
42+
listFactory: (namespace) => () => k8sApi.coreClient.listNamespacedPod(namespace),
43+
},
44+
[WorkloadKind.ReplicationController]: {
45+
endpoint: '/api/v1/watch/namespaces/{namespace}/replicationcontrollers',
46+
handlers: {
47+
[DELETE]: replicationControllerWatchHandler,
48+
},
49+
listFactory: (namespace) => () => k8sApi.coreClient.listNamespacedReplicationController(namespace),
50+
},
51+
[WorkloadKind.CronJob]: {
52+
endpoint: '/apis/batch/v1beta1/watch/namespaces/{namespace}/cronjobs',
53+
handlers: {
54+
[DELETE]: cronJobWatchHandler,
55+
},
56+
listFactory: (namespace) => () => k8sApi.batchUnstableClient.listNamespacedCronJob(namespace),
57+
},
58+
[WorkloadKind.Job]: {
59+
endpoint: '/apis/batch/v1/watch/namespaces/{namespace}/jobs',
60+
handlers: {
61+
[DELETE]: jobWatchHandler,
62+
},
63+
listFactory: (namespace) => () => k8sApi.batchClient.listNamespacedJob(namespace),
64+
},
65+
[WorkloadKind.DaemonSet]: {
66+
endpoint: '/apis/apps/v1/watch/namespaces/{namespace}/daemonsets',
67+
handlers: {
68+
[DELETE]: daemonSetWatchHandler,
69+
},
70+
listFactory: (namespace) => () => k8sApi.appsClient.listNamespacedDaemonSet(namespace),
71+
},
72+
[WorkloadKind.Deployment]: {
73+
endpoint: '/apis/apps/v1/watch/namespaces/{namespace}/deployments',
74+
handlers: {
75+
[DELETE]: deploymentWatchHandler,
76+
},
77+
listFactory: (namespace) => () => k8sApi.appsClient.listNamespacedDeployment(namespace),
78+
},
79+
[WorkloadKind.ReplicaSet]: {
80+
endpoint: '/apis/apps/v1/watch/namespaces/{namespace}/replicasets',
81+
handlers: {
82+
[DELETE]: replicaSetWatchHandler,
83+
},
84+
listFactory: (namespace) => () => k8sApi.appsClient.listNamespacedReplicaSet(namespace),
85+
},
86+
[WorkloadKind.StatefulSet]: {
87+
endpoint: '/apis/apps/v1/watch/namespaces/{namespace}/statefulsets',
88+
handlers: {
89+
[DELETE]: statefulSetWatchHandler,
90+
},
91+
listFactory: (namespace) => () => k8sApi.appsClient.listNamespacedStatefulSet(namespace),
92+
},
93+
};
94+
95+
export function setupInformer(namespace: string, workloadKind: WorkloadKind) {
96+
const workloadMetadata = workloadWatchMetadata[workloadKind];
97+
const namespacedEndpoint = workloadMetadata.endpoint.replace('{namespace}', namespace);
98+
99+
const informer = makeInformer<KubernetesObject>(kubeConfig, namespacedEndpoint,
100+
workloadMetadata.listFactory(namespace));
101+
102+
for (const informerVerb of Object.keys(workloadMetadata.handlers)) {
103+
informer.on(informerVerb, async (watchedWorkload) => {
104+
try {
105+
await workloadMetadata.handlers[informerVerb](watchedWorkload);
106+
} catch (error) {
107+
const name = watchedWorkload.metadata && watchedWorkload.metadata.name || FALSY_WORKLOAD_NAME_MARKER;
108+
logger.warn({error, namespace, name, workloadKind}, 'could not execute the informer handler for a workload');
109+
}
110+
});
111+
}
112+
113+
informer.start();
114+
}
5115

6116
export async function deleteWorkload(kubernetesMetadata: KubeObjectMetadata, workloadName: string) {
7117
try {

src/kube-scanner/watchers/handlers/job.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
import { V1Job } from '@kubernetes/client-node';
2-
import { WatchEventType } from '../types';
32
import { deleteWorkload } from './index';
43
import { WorkloadKind } from '../../types';
54
import { FALSY_WORKLOAD_NAME_MARKER } from './types';
65

7-
export async function jobWatchHandler(eventType: string, job: V1Job) {
8-
if (eventType !== WatchEventType.Deleted) {
9-
return;
10-
}
11-
6+
export async function jobWatchHandler(job: V1Job) {
127
if (!job.metadata || !job.spec || !job.spec.template.metadata || !job.spec.template.spec) {
138
// TODO(ivanstanev): possibly log this. It shouldn't happen but we should track it!
149
return;

src/kube-scanner/watchers/handlers/pod.ts

Lines changed: 4 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ import logger = require('../../../common/logger');
55
import WorkloadWorker = require('../../../kube-scanner');
66
import { IKubeImage } from '../../../transmitter/types';
77
import { buildMetadataForWorkload } from '../../metadata-extractor';
8-
import { PodPhase, WatchEventType } from '../types';
8+
import { PodPhase } from '../types';
99
import state = require('../../../state');
1010
import { FALSY_WORKLOAD_NAME_MARKER } from './types';
1111

@@ -43,9 +43,9 @@ async function handleReadyPod(workloadWorker: WorkloadWorker, workloadMetadata:
4343
}
4444
}
4545

46-
export async function podWatchHandler(eventType: string, pod: V1Pod) {
46+
export async function podWatchHandler(pod: V1Pod) {
4747
// This tones down the number of scans whenever a Pod is about to be scheduled by K8s
48-
if (eventType !== WatchEventType.Deleted && !isPodReady(pod)) {
48+
if (!isPodReady(pod)) {
4949
return;
5050
}
5151

@@ -61,21 +61,7 @@ export async function podWatchHandler(eventType: string, pod: V1Pod) {
6161

6262
const workloadName = workloadMetadata[0].name;
6363
const workloadWorker = new WorkloadWorker(workloadName);
64-
65-
switch (eventType) {
66-
case WatchEventType.Added:
67-
case WatchEventType.Modified:
68-
await handleReadyPod(workloadWorker, workloadMetadata);
69-
break;
70-
case WatchEventType.Error:
71-
break;
72-
case WatchEventType.Bookmark:
73-
break;
74-
case WatchEventType.Deleted:
75-
break;
76-
default:
77-
break;
78-
}
64+
await handleReadyPod(workloadWorker, workloadMetadata);
7965
} catch (error) {
8066
logger.error({error, podName}, 'Could not build image metadata for pod');
8167
}

src/kube-scanner/watchers/handlers/replica-set.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
import { V1ReplicaSet } from '@kubernetes/client-node';
2-
import { WatchEventType } from '../types';
32
import { deleteWorkload } from './index';
43
import { WorkloadKind } from '../../types';
54
import { FALSY_WORKLOAD_NAME_MARKER } from './types';
65

7-
export async function replicaSetWatchHandler(eventType: string, replicaSet: V1ReplicaSet) {
8-
if (eventType !== WatchEventType.Deleted) {
9-
return;
10-
}
11-
6+
export async function replicaSetWatchHandler(replicaSet: V1ReplicaSet) {
127
if (!replicaSet.metadata || !replicaSet.spec || !replicaSet.spec.template ||
138
!replicaSet.spec.template.metadata || !replicaSet.spec.template.spec) {
149
// TODO(ivanstanev): possibly log this. It shouldn't happen but we should track it!

src/kube-scanner/watchers/handlers/replication-controller.ts

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,9 @@
11
import { V1ReplicationController } from '@kubernetes/client-node';
2-
import { WatchEventType } from '../types';
32
import { deleteWorkload } from './index';
43
import { WorkloadKind } from '../../types';
54
import { FALSY_WORKLOAD_NAME_MARKER } from './types';
65

7-
export async function replicationControllerWatchHandler(
8-
eventType: string, replicationController: V1ReplicationController) {
9-
if (eventType !== WatchEventType.Deleted) {
10-
return;
11-
}
12-
6+
export async function replicationControllerWatchHandler(replicationController: V1ReplicationController) {
137
if (!replicationController.metadata || !replicationController.spec || !replicationController.spec.template ||
148
!replicationController.spec.template.metadata || !replicationController.spec.template.spec) {
159
// TODO(ivanstanev): possibly log this. It shouldn't happen but we should track it!

src/kube-scanner/watchers/handlers/stateful-set.ts

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,9 @@
11
import { V1StatefulSet } from '@kubernetes/client-node';
2-
import { WatchEventType } from '../types';
32
import { deleteWorkload } from './index';
43
import { WorkloadKind } from '../../types';
54
import { FALSY_WORKLOAD_NAME_MARKER } from './types';
65

7-
export async function statefulSetWatchHandler(eventType: string, statefulSet: V1StatefulSet) {
8-
if (eventType !== WatchEventType.Deleted) {
9-
return;
10-
}
11-
6+
export async function statefulSetWatchHandler(statefulSet: V1StatefulSet) {
127
if (!statefulSet.metadata || !statefulSet.spec || !statefulSet.spec.template.metadata ||
138
!statefulSet.spec.template.spec) {
149
// TODO(ivanstanev): possibly log this. It shouldn't happen but we should track it!
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1 +1,18 @@
11
export const FALSY_WORKLOAD_NAME_MARKER = 'falsy workload name';
2+
3+
type WorkloadHandlerFunc = (workload: any) => Promise<void>;
4+
5+
type ListWorkloadFunctionFactory = (namespace: string) => () => Promise<{
6+
response: any;
7+
body: any;
8+
}>;
9+
10+
export interface IWorkloadWatchMetadata {
11+
[workloadKind: string]: {
12+
endpoint: string,
13+
handlers: {
14+
[kubernetesInformerVerb: string]: WorkloadHandlerFunc,
15+
},
16+
listFactory: ListWorkloadFunctionFactory;
17+
};
18+
}

0 commit comments

Comments
 (0)