Skip to content

Commit 5470efb

Browse files
authored
Merge pull request #1035 from snyk/feat/delete-workloads-from-scan-queue
[RUN-2125] Feat/delete workloads from scan queue
2 parents d33e93b + b2ab32c commit 5470efb

File tree

12 files changed

+120
-72
lines changed

12 files changed

+120
-72
lines changed

config.default.json

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,9 @@
1111
"MAX_SIZE": 10000,
1212
"MAX_AGE_MS": 60000
1313
},
14-
"WORKERS_COUNT": 10,
14+
"WORKERS_COUNT": 5,
1515
"REQUEST_QUEUE_LENGTH": 2,
16-
"QUEUE_LENGTH_LOG_FREQUENCY_MINUTES": 15,
16+
"QUEUE_LENGTH_LOG_FREQUENCY_MINUTES": 1,
1717
"INTEGRATION_ID": "",
1818
"DEFAULT_KUBERNETES_UPSTREAM_URL": "https://kubernetes-upstream.snyk.io",
1919
"MAX_RETRY_BACKOFF_DURATION_SECONDS": 300

src/supervisor/metadata-extractor.ts

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,14 @@ import {
55
V1ContainerStatus,
66
V1PodSpec,
77
} from '@kubernetes/client-node';
8-
import { IWorkload, ILocalWorkloadLocator } from '../transmitter/types';
98
import { currentClusterName } from './cluster';
10-
import { IKubeObjectMetadata } from './types';
9+
import { WorkloadKind } from './types';
1110
import { getSupportedWorkload, getWorkloadReader } from './workload-reader';
1211
import { logger } from '../common/logger';
12+
import { config } from '../common/config';
13+
14+
import type { IKubeObjectMetadata } from './types';
15+
import type { IWorkload, ILocalWorkloadLocator } from '../transmitter/types';
1316

1417
const loopingThreshold = 20;
1518

@@ -170,6 +173,17 @@ export async function buildMetadataForWorkload(
170173
);
171174
}
172175

176+
const hasJobOwnerRef = pod.metadata?.ownerReferences?.find(
177+
(owner) => owner.kind === WorkloadKind.Job,
178+
);
179+
if (hasJobOwnerRef && config.SKIP_K8S_JOBS) {
180+
logger.info(
181+
{ podMetadata: pod.metadata },
182+
'pod associated with job but jobs are skipped from processing. not building metadata.',
183+
);
184+
return undefined;
185+
}
186+
173187
const podOwner: IKubeObjectMetadata | undefined = await findParentWorkload(
174188
pod.spec,
175189
pod.metadata.ownerReferences,

src/supervisor/watchers/handlers/cron-job.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import {
2020
import { logger } from '../../../common/logger';
2121
import { retryKubernetesApiRequest } from '../../kuberenetes-api-wrappers';
2222
import { trimWorkload } from '../../workload-sanitization';
23+
import { deleteWorkloadFromScanQueue } from './queue';
2324

2425
export async function paginatedNamespacedCronJobList(
2526
namespace: string,
@@ -117,6 +118,7 @@ export async function cronJobWatchHandler(
117118
.filter((container) => container.image !== undefined)
118119
.map((container) => container.image!),
119120
}),
121+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
120122
]);
121123
}
122124

src/supervisor/watchers/handlers/daemon-set.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
kubernetesObjectToWorkloadAlreadyScanned,
1212
} from '../../../state';
1313
import { trimWorkload } from '../../workload-sanitization';
14+
import { deleteWorkloadFromScanQueue } from './queue';
1415

1516
export async function paginatedNamespacedDaemonSetList(
1617
namespace: string,
@@ -71,6 +72,7 @@ export async function daemonSetWatchHandler(
7172
.filter((container) => container.image !== undefined)
7273
.map((container) => container.image!),
7374
}),
75+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
7476
]);
7577
}
7678

src/supervisor/watchers/handlers/deployment-config.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ import {
1515
} from '../../../state';
1616
import { retryKubernetesApiRequest } from '../../kuberenetes-api-wrappers';
1717
import { logger } from '../../../common/logger';
18+
import { deleteWorkloadFromScanQueue } from './queue';
1819

1920
export async function paginatedNamespacedDeploymentConfigList(
2021
namespace: string,
@@ -112,6 +113,7 @@ export async function deploymentConfigWatchHandler(
112113
.filter((container) => container.image !== undefined)
113114
.map((container) => container.image!),
114115
}),
116+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
115117
]);
116118
}
117119

src/supervisor/watchers/handlers/deployment.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
kubernetesObjectToWorkloadAlreadyScanned,
1212
} from '../../../state';
1313
import { trimWorkload } from '../../workload-sanitization';
14+
import { deleteWorkloadFromScanQueue } from './queue';
1415

1516
export async function paginatedNamespacedDeploymentList(
1617
namespace: string,
@@ -71,6 +72,7 @@ export async function deploymentWatchHandler(
7172
.filter((container) => container.image !== undefined)
7273
.map((container) => container.image!),
7374
}),
75+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
7476
]);
7577
}
7678

src/supervisor/watchers/handlers/job.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
kubernetesObjectToWorkloadAlreadyScanned,
1212
} from '../../../state';
1313
import { trimWorkload } from '../../workload-sanitization';
14+
import { deleteWorkloadFromScanQueue } from './queue';
1415

1516
export async function paginatedNamespacedJobList(namespace: string): Promise<{
1617
response: IncomingMessage;
@@ -65,6 +66,7 @@ export async function jobWatchHandler(job: V1Job): Promise<void> {
6566
.filter((container) => container.image !== undefined)
6667
.map((container) => container.image!),
6768
}),
69+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
6870
]);
6971
}
7072

src/supervisor/watchers/handlers/pod.ts

Lines changed: 5 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,9 @@
11
import { V1Pod, V1PodList } from '@kubernetes/client-node';
22
import { IncomingMessage } from 'http';
3-
import * as async from 'async';
43

54
import { logger } from '../../../common/logger';
6-
import { config } from '../../../common/config';
7-
import { processWorkload } from '../../../scanner';
85
import { sendWorkloadMetadata } from '../../../transmitter';
9-
import { IWorkload, Telemetry } from '../../../transmitter/types';
6+
import { IWorkload } from '../../../transmitter/types';
107
import { constructWorkloadMetadata } from '../../../transmitter/payload';
118
import { buildMetadataForWorkload } from '../../metadata-extractor';
129
import { PodPhase } from '../types';
@@ -25,70 +22,7 @@ import { deleteWorkload } from './workload';
2522
import { k8sApi } from '../../cluster';
2623
import { paginatedClusterList, paginatedNamespacedList } from './pagination';
2724
import { trimWorkload } from '../../workload-sanitization';
28-
29-
export interface ImagesToScanQueueData {
30-
workloadMetadata: IWorkload[];
31-
/** The timestamp when this workload was added to the image scan queue. */
32-
enqueueTimestampMs: number;
33-
}
34-
35-
async function queueWorkerWorkloadScan(
36-
task: ImagesToScanQueueData,
37-
callback,
38-
): Promise<void> {
39-
const { workloadMetadata, enqueueTimestampMs } = task;
40-
/** Represents how long this workload spent waiting in the queue to be processed. */
41-
const enqueueDurationMs = Date.now() - enqueueTimestampMs;
42-
const telemetry: Partial<Telemetry> = {
43-
enqueueDurationMs,
44-
queueSize: workloadsToScanQueue.length(),
45-
};
46-
try {
47-
await processWorkload(workloadMetadata, telemetry);
48-
} catch (err) {
49-
logger.error(
50-
{ err, task },
51-
'error processing a workload in the pod handler 2',
52-
);
53-
const imageIds = workloadMetadata.map((workload) => workload.imageId);
54-
const workload = {
55-
// every workload metadata references the same workload, grab it from the first one
56-
...workloadMetadata[0],
57-
imageIds,
58-
};
59-
await deleteWorkloadImagesAlreadyScanned(workload);
60-
}
61-
}
62-
63-
const workloadsToScanQueue = async.queue<ImagesToScanQueueData>(
64-
queueWorkerWorkloadScan,
65-
config.WORKERS_COUNT,
66-
);
67-
68-
workloadsToScanQueue.error(function (err, task) {
69-
logger.error(
70-
{ err, task },
71-
'error processing a workload in the pod handler 1',
72-
);
73-
});
74-
75-
function reportQueueSize(): void {
76-
try {
77-
const queueDataToReport: { [key: string]: any } = {};
78-
queueDataToReport.workloadsToScanLength = workloadsToScanQueue.length();
79-
logger.debug(queueDataToReport, 'queue sizes report');
80-
} catch (err) {
81-
logger.debug({ err }, 'failed logging queue sizes');
82-
}
83-
}
84-
85-
// Report the queue size shortly after the snyk-monitor starts.
86-
setTimeout(reportQueueSize, 1 * 60 * 1000).unref();
87-
// Additionally, periodically report every X minutes.
88-
setInterval(
89-
reportQueueSize,
90-
config.QUEUE_LENGTH_LOG_FREQUENCY_MINUTES * 60 * 1000,
91-
).unref();
25+
import { deleteWorkloadFromScanQueue, workloadsToScanQueue } from './queue';
9226

9327
async function handleReadyPod(workloadMetadata: IWorkload[]): Promise<void> {
9428
const workloadToScan: IWorkload[] = [];
@@ -111,8 +45,10 @@ async function handleReadyPod(workloadMetadata: IWorkload[]): Promise<void> {
11145
workloadToScan.push(workload);
11246
}
11347

48+
const workload = workloadToScan[0];
11449
if (workloadToScan.length > 0) {
11550
workloadsToScanQueue.push({
51+
key: workload.uid,
11652
workloadMetadata: workloadToScan,
11753
enqueueTimestampMs: Date.now(),
11854
});
@@ -223,6 +159,7 @@ export async function podDeletedHandler(pod: V1Pod): Promise<void> {
223159
.filter((container) => container.image !== undefined)
224160
.map((container) => container.image!),
225161
}),
162+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
226163
]);
227164
}
228165

Lines changed: 81 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,81 @@
1+
import * as async from 'async';
2+
import { config } from '../../../common/config';
3+
4+
import { logger } from '../../../common/logger';
5+
import { processWorkload } from '../../../scanner';
6+
import { deleteWorkloadImagesAlreadyScanned } from '../../../state';
7+
import type { IWorkload, Telemetry } from '../../../transmitter/types';
8+
9+
interface ImagesToScanQueueData {
10+
/** Identifies the workload in the queue. */
11+
key: string;
12+
workloadMetadata: IWorkload[];
13+
/** The timestamp when this workload was added to the image scan queue. */
14+
enqueueTimestampMs: number;
15+
}
16+
17+
export const workloadsToScanQueue = async.queue<ImagesToScanQueueData>(
18+
queueWorkerWorkloadScan,
19+
config.WORKERS_COUNT,
20+
);
21+
22+
export async function deleteWorkloadFromScanQueue(workload: {
23+
uid: string;
24+
}): Promise<void> {
25+
workloadsToScanQueue.remove(
26+
(item) => item.data && item.data.key === workload.uid,
27+
);
28+
}
29+
30+
workloadsToScanQueue.error(function (err, task) {
31+
logger.error(
32+
{ err, task },
33+
'error processing a workload in the pod handler 1',
34+
);
35+
});
36+
37+
async function queueWorkerWorkloadScan(
38+
task: ImagesToScanQueueData,
39+
callback,
40+
): Promise<void> {
41+
const { workloadMetadata, enqueueTimestampMs } = task;
42+
/** Represents how long this workload spent waiting in the queue to be processed. */
43+
const enqueueDurationMs = Date.now() - enqueueTimestampMs;
44+
const telemetry: Partial<Telemetry> = {
45+
enqueueDurationMs,
46+
queueSize: workloadsToScanQueue.length(),
47+
};
48+
try {
49+
await processWorkload(workloadMetadata, telemetry);
50+
} catch (err) {
51+
logger.error(
52+
{ err, task },
53+
'error processing a workload in the pod handler 2',
54+
);
55+
const imageIds = workloadMetadata.map((workload) => workload.imageId);
56+
const workload = {
57+
// every workload metadata references the same workload, grab it from the first one
58+
...workloadMetadata[0],
59+
imageIds,
60+
};
61+
await deleteWorkloadImagesAlreadyScanned(workload);
62+
}
63+
}
64+
65+
function reportQueueSize(): void {
66+
try {
67+
const queueDataToReport: { [key: string]: any } = {};
68+
queueDataToReport.workloadsToScanLength = workloadsToScanQueue.length();
69+
logger.debug(queueDataToReport, 'queue sizes report');
70+
} catch (err) {
71+
logger.debug({ err }, 'failed logging queue sizes');
72+
}
73+
}
74+
75+
// Report the queue size shortly after the snyk-monitor starts.
76+
setTimeout(reportQueueSize, 1 * 60 * 1000).unref();
77+
// Additionally, periodically report every X minutes.
78+
setInterval(
79+
reportQueueSize,
80+
config.QUEUE_LENGTH_LOG_FREQUENCY_MINUTES * 60 * 1000,
81+
).unref();

src/supervisor/watchers/handlers/replica-set.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import {
1111
kubernetesObjectToWorkloadAlreadyScanned,
1212
} from '../../../state';
1313
import { trimWorkload } from '../../workload-sanitization';
14+
import { deleteWorkloadFromScanQueue } from './queue';
1415

1516
export async function paginatedNamespacedReplicaSetList(
1617
namespace: string,
@@ -72,6 +73,7 @@ export async function replicaSetWatchHandler(
7273
.filter((container) => container.image !== undefined)
7374
.map((container) => container.image!),
7475
}),
76+
deleteWorkloadFromScanQueue(workloadAlreadyScanned),
7577
]);
7678
}
7779

0 commit comments

Comments
 (0)