Skip to content

Commit 53be523

Browse files
committed
feat: remove workloads from scan queue when they are deleted in K8s
Up until now the queue of workloads to scan would keep growing indefinitely as we only appended new workloads to it. However, once workloads are deleted from Kubernetes, they no longer need to be in the queue. This didn't cause any issues with trying to process deleted workloads, because the queue handler would detect that the workload is gone and continue processing other items. However, it created alerts that the queue grows indefinitely and caused the snyk-monitor to process some work that was never going to succeed.
1 parent d33e93b commit 53be523

File tree

10 files changed

+102
-68
lines changed

10 files changed

+102
-68
lines changed

src/supervisor/watchers/handlers/cron-job.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
import { logger } from '../../../common/logger';
2121
import { retryKubernetesApiRequest } from '../../kuberenetes-api-wrappers';
2222
import { trimWorkload } from '../../workload-sanitization';
23+
import { deleteWorkloadFromScanQueue } from './queue';
2324

2425
export async function paginatedNamespacedCronJobList(
2526
namespace: string,
@@ -117,6 +118,7 @@ export async function cronJobWatchHandler(
117118
.filter((container) => container.image !== undefined)
118119
.map((container) => container.image!),
119120
}),
121+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
120122
]);
121123
}
122124

src/supervisor/watchers/handlers/daemon-set.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
kubernetesObjectToWorkloadAlreadyScanned,
1212
} from '../../../state';
1313
import { trimWorkload } from '../../workload-sanitization';
14+
import { deleteWorkloadFromScanQueue } from './queue';
1415

1516
export async function paginatedNamespacedDaemonSetList(
1617
namespace: string,
@@ -71,6 +72,7 @@ export async function daemonSetWatchHandler(
7172
.filter((container) => container.image !== undefined)
7273
.map((container) => container.image!),
7374
}),
75+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
7476
]);
7577
}
7678

src/supervisor/watchers/handlers/deployment-config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
} from '../../../state';
1616
import { retryKubernetesApiRequest } from '../../kuberenetes-api-wrappers';
1717
import { logger } from '../../../common/logger';
18+
import { deleteWorkloadFromScanQueue } from './queue';
1819

1920
export async function paginatedNamespacedDeploymentConfigList(
2021
namespace: string,
@@ -112,6 +113,7 @@ export async function deploymentConfigWatchHandler(
112113
.filter((container) => container.image !== undefined)
113114
.map((container) => container.image!),
114115
}),
116+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
115117
]);
116118
}
117119

src/supervisor/watchers/handlers/deployment.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
kubernetesObjectToWorkloadAlreadyScanned,
1212
} from '../../../state';
1313
import { trimWorkload } from '../../workload-sanitization';
14+
import { deleteWorkloadFromScanQueue } from './queue';
1415

1516
export async function paginatedNamespacedDeploymentList(
1617
namespace: string,
@@ -71,6 +72,7 @@ export async function deploymentWatchHandler(
7172
.filter((container) => container.image !== undefined)
7273
.map((container) => container.image!),
7374
}),
75+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
7476
]);
7577
}
7678

src/supervisor/watchers/handlers/job.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
kubernetesObjectToWorkloadAlreadyScanned,
1212
} from '../../../state';
1313
import { trimWorkload } from '../../workload-sanitization';
14+
import { deleteWorkloadFromScanQueue } from './queue';
1415

1516
export async function paginatedNamespacedJobList(namespace: string): Promise<{
1617
response: IncomingMessage;
@@ -65,6 +66,7 @@ export async function jobWatchHandler(job: V1Job): Promise<void> {
6566
.filter((container) => container.image !== undefined)
6667
.map((container) => container.image!),
6768
}),
69+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
6870
]);
6971
}
7072

src/supervisor/watchers/handlers/pod.ts

Lines changed: 5 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
import { V1Pod, V1PodList } from '@kubernetes/client-node';
22
import { IncomingMessage } from 'http';
3-
import * as async from 'async';
43

54
import { logger } from '../../../common/logger';
6-
import { config } from '../../../common/config';
7-
import { processWorkload } from '../../../scanner';
85
import { sendWorkloadMetadata } from '../../../transmitter';
9-
import { IWorkload, Telemetry } from '../../../transmitter/types';
6+
import { IWorkload } from '../../../transmitter/types';
107
import { constructWorkloadMetadata } from '../../../transmitter/payload';
118
import { buildMetadataForWorkload } from '../../metadata-extractor';
129
import { PodPhase } from '../types';
@@ -25,70 +22,7 @@ import { deleteWorkload } from './workload';
2522
import { k8sApi } from '../../cluster';
2623
import { paginatedClusterList, paginatedNamespacedList } from './pagination';
2724
import { trimWorkload } from '../../workload-sanitization';
28-
29-
export interface ImagesToScanQueueData {
30-
workloadMetadata: IWorkload[];
31-
/** The timestamp when this workload was added to the image scan queue. */
32-
enqueueTimestampMs: number;
33-
}
34-
35-
async function queueWorkerWorkloadScan(
36-
task: ImagesToScanQueueData,
37-
callback,
38-
): Promise<void> {
39-
const { workloadMetadata, enqueueTimestampMs } = task;
40-
/** Represents how long this workload spent waiting in the queue to be processed. */
41-
const enqueueDurationMs = Date.now() - enqueueTimestampMs;
42-
const telemetry: Partial<Telemetry> = {
43-
enqueueDurationMs,
44-
queueSize: workloadsToScanQueue.length(),
45-
};
46-
try {
47-
await processWorkload(workloadMetadata, telemetry);
48-
} catch (err) {
49-
logger.error(
50-
{ err, task },
51-
'error processing a workload in the pod handler 2',
52-
);
53-
const imageIds = workloadMetadata.map((workload) => workload.imageId);
54-
const workload = {
55-
// every workload metadata references the same workload, grab it from the first one
56-
...workloadMetadata[0],
57-
imageIds,
58-
};
59-
await deleteWorkloadImagesAlreadyScanned(workload);
60-
}
61-
}
62-
63-
const workloadsToScanQueue = async.queue<ImagesToScanQueueData>(
64-
queueWorkerWorkloadScan,
65-
config.WORKERS_COUNT,
66-
);
67-
68-
workloadsToScanQueue.error(function (err, task) {
69-
logger.error(
70-
{ err, task },
71-
'error processing a workload in the pod handler 1',
72-
);
73-
});
74-
75-
function reportQueueSize(): void {
76-
try {
77-
const queueDataToReport: { [key: string]: any } = {};
78-
queueDataToReport.workloadsToScanLength = workloadsToScanQueue.length();
79-
logger.debug(queueDataToReport, 'queue sizes report');
80-
} catch (err) {
81-
logger.debug({ err }, 'failed logging queue sizes');
82-
}
83-
}
84-
85-
// Report the queue size shortly after the snyk-monitor starts.
86-
setTimeout(reportQueueSize, 1 * 60 * 1000).unref();
87-
// Additionally, periodically report every X minutes.
88-
setInterval(
89-
reportQueueSize,
90-
config.QUEUE_LENGTH_LOG_FREQUENCY_MINUTES * 60 * 1000,
91-
).unref();
25+
import { deleteWorkloadFromScanQueue, workloadsToScanQueue } from './queue';
9226

9327
async function handleReadyPod(workloadMetadata: IWorkload[]): Promise<void> {
9428
const workloadToScan: IWorkload[] = [];
@@ -111,8 +45,10 @@ async function handleReadyPod(workloadMetadata: IWorkload[]): Promise<void> {
11145
workloadToScan.push(workload);
11246
}
11347

48+
const workload = workloadToScan[0];
11449
if (workloadToScan.length > 0) {
11550
workloadsToScanQueue.push({
51+
key: workload.uid,
11652
workloadMetadata: workloadToScan,
11753
enqueueTimestampMs: Date.now(),
11854
});
@@ -223,6 +159,7 @@ export async function podDeletedHandler(pod: V1Pod): Promise<void> {
223159
.filter((container) => container.image !== undefined)
224160
.map((container) => container.image!),
225161
}),
162+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
226163
]);
227164
}
228165

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import * as async from 'async';
2+
import { config } from '../../../common/config';
3+
4+
import { logger } from '../../../common/logger';
5+
import { processWorkload } from '../../../scanner';
6+
import { deleteWorkloadImagesAlreadyScanned } from '../../../state';
7+
import type { IWorkload, Telemetry } from '../../../transmitter/types';
8+
9+
interface ImagesToScanQueueData {
10+
/** Identifies the workload in the queue. */
11+
key: string;
12+
workloadMetadata: IWorkload[];
13+
/** The timestamp when this workload was added to the image scan queue. */
14+
enqueueTimestampMs: number;
15+
}
16+
17+
export const workloadsToScanQueue = async.queue<ImagesToScanQueueData>(
18+
queueWorkerWorkloadScan,
19+
config.WORKERS_COUNT,
20+
);
21+
22+
export async function deleteWorkloadFromScanQueue(workload: {
23+
uid: string;
24+
}): Promise<void> {
25+
workloadsToScanQueue.remove(
26+
(item) => item.data && item.data.key === workload.uid,
27+
);
28+
}
29+
30+
workloadsToScanQueue.error(function (err, task) {
31+
logger.error(
32+
{ err, task },
33+
'error processing a workload in the pod handler 1',
34+
);
35+
});
36+
37+
async function queueWorkerWorkloadScan(
38+
task: ImagesToScanQueueData,
39+
callback,
40+
): Promise<void> {
41+
const { workloadMetadata, enqueueTimestampMs } = task;
42+
/** Represents how long this workload spent waiting in the queue to be processed. */
43+
const enqueueDurationMs = Date.now() - enqueueTimestampMs;
44+
const telemetry: Partial<Telemetry> = {
45+
enqueueDurationMs,
46+
queueSize: workloadsToScanQueue.length(),
47+
};
48+
try {
49+
await processWorkload(workloadMetadata, telemetry);
50+
} catch (err) {
51+
logger.error(
52+
{ err, task },
53+
'error processing a workload in the pod handler 2',
54+
);
55+
const imageIds = workloadMetadata.map((workload) => workload.imageId);
56+
const workload = {
57+
// every workload metadata references the same workload, grab it from the first one
58+
...workloadMetadata[0],
59+
imageIds,
60+
};
61+
await deleteWorkloadImagesAlreadyScanned(workload);
62+
}
63+
}
64+
65+
function reportQueueSize(): void {
66+
try {
67+
const queueDataToReport: { [key: string]: any } = {};
68+
queueDataToReport.workloadsToScanLength = workloadsToScanQueue.length();
69+
logger.debug(queueDataToReport, 'queue sizes report');
70+
} catch (err) {
71+
logger.debug({ err }, 'failed logging queue sizes');
72+
}
73+
}
74+
75+
// Report the queue size shortly after the snyk-monitor starts.
76+
setTimeout(reportQueueSize, 1 * 60 * 1000).unref();
77+
// Additionally, periodically report every X minutes.
78+
setInterval(
79+
reportQueueSize,
80+
config.QUEUE_LENGTH_LOG_FREQUENCY_MINUTES * 60 * 1000,
81+
).unref();

src/supervisor/watchers/handlers/replica-set.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
kubernetesObjectToWorkloadAlreadyScanned,
1212
} from '../../../state';
1313
import { trimWorkload } from '../../workload-sanitization';
14+
import { deleteWorkloadFromScanQueue } from './queue';
1415

1516
export async function paginatedNamespacedReplicaSetList(
1617
namespace: string,
@@ -72,6 +73,7 @@ export async function replicaSetWatchHandler(
7273
.filter((container) => container.image !== undefined)
7374
.map((container) => container.image!),
7475
}),
76+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
7577
]);
7678
}
7779

src/supervisor/watchers/handlers/replication-controller.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
kubernetesObjectToWorkloadAlreadyScanned,
1515
} from '../../../state';
1616
import { trimWorkload } from '../../workload-sanitization';
17+
import { deleteWorkloadFromScanQueue } from './queue';
1718

1819
export async function paginatedNamespacedReplicationControllerList(
1920
namespace: string,
@@ -80,6 +81,7 @@ export async function replicationControllerWatchHandler(
8081
.filter((container) => container.image !== undefined)
8182
.map((container) => container.image!),
8283
}),
84+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
8385
]);
8486
}
8587

src/supervisor/watchers/handlers/stateful-set.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
kubernetesObjectToWorkloadAlreadyScanned,
1212
} from '../../../state';
1313
import { trimWorkload } from '../../workload-sanitization';
14+
import { deleteWorkloadFromScanQueue } from './queue';
1415

1516
export async function paginatedNamespacedStatefulSetList(
1617
namespace: string,
@@ -71,6 +72,7 @@ export async function statefulSetWatchHandler(
7172
.filter((container) => container.image !== undefined)
7273
.map((container) => container.image!),
7374
}),
75+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
7476
]);
7577
}
7678

0 commit comments

Comments
 (0)