Skip to content

Commit fcdeb61

Browse files
authored
Merge pull request #996 from snyk/fix/cronjob
[RUN-1518] fix: support both v1 and v1beta1 CronJobs
2 parents 40f6cab + 1acc320 commit fcdeb61

File tree

6 files changed

+164
-26
lines changed

6 files changed

+164
-26
lines changed

src/supervisor/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ import { IncomingMessage } from 'http';
22
import {
33
AppsV1Api,
44
BatchV1Api,
5+
BatchV1beta1Api,
56
CoreV1Api,
67
CustomObjectsApi,
78
KubeConfig,
@@ -16,7 +17,10 @@ export enum WorkloadKind {
1617
StatefulSet = 'StatefulSet',
1718
DaemonSet = 'DaemonSet',
1819
Job = 'Job',
20+
/** Available since Kubernetes 1.20. */
1921
CronJob = 'CronJob',
22+
/** @deprecated Will be removed in Kubernetes 1.25. */
23+
CronJobV1Beta1 = 'CronJobV1Beta1',
2024
ReplicationController = 'ReplicationController',
2125
Pod = 'Pod',
2226
DeploymentConfig = 'DeploymentConfig',
@@ -40,20 +44,23 @@ export interface IK8sClients {
4044
readonly appsClient: AppsV1Api;
4145
readonly coreClient: CoreV1Api;
4246
readonly batchClient: BatchV1Api;
47+
readonly batchUnstableClient: BatchV1beta1Api;
4348
readonly customObjectsClient: CustomObjectsApi;
4449
}
4550

4651
export class K8sClients implements IK8sClients {
4752
public readonly appsClient: AppsV1Api;
4853
public readonly coreClient: CoreV1Api;
4954
public readonly batchClient: BatchV1Api;
55+
public readonly batchUnstableClient: BatchV1beta1Api;
5056
/** This client is used to access Custom Resources in the cluster, e.g. DeploymentConfig on OpenShift. */
5157
public readonly customObjectsClient: CustomObjectsApi;
5258

5359
constructor(config: KubeConfig) {
5460
this.appsClient = config.makeApiClient(AppsV1Api);
5561
this.coreClient = config.makeApiClient(CoreV1Api);
5662
this.batchClient = config.makeApiClient(BatchV1Api);
63+
this.batchUnstableClient = config.makeApiClient(BatchV1beta1Api);
5764
this.customObjectsClient = config.makeApiClient(CustomObjectsApi);
5865
}
5966
}

src/supervisor/watchers/handlers/cron-job.ts

Lines changed: 27 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,9 @@
1-
import { V1CronJob, V1CronJobList } from '@kubernetes/client-node';
1+
import {
2+
V1CronJob,
3+
V1CronJobList,
4+
V1beta1CronJob,
5+
V1beta1CronJobList,
6+
} from '@kubernetes/client-node';
27
import { deleteWorkload, trimWorkload } from './workload';
38
import { WorkloadKind } from '../../types';
49
import { FALSY_WORKLOAD_NAME_MARKER } from './types';
@@ -27,7 +32,27 @@ export async function paginatedCronJobList(namespace: string): Promise<{
2732
);
2833
}
2934

30-
export async function cronJobWatchHandler(cronJob: V1CronJob): Promise<void> {
35+
export async function paginatedCronJobV1Beta1List(namespace: string): Promise<{
36+
response: IncomingMessage;
37+
body: V1beta1CronJobList;
38+
}> {
39+
const cronJobList = new V1beta1CronJobList();
40+
cronJobList.apiVersion = 'batch/v1beta1';
41+
cronJobList.kind = 'CronJobList';
42+
cronJobList.items = new Array<V1beta1CronJob>();
43+
44+
return await paginatedList(
45+
namespace,
46+
cronJobList,
47+
k8sApi.batchUnstableClient.listNamespacedCronJob.bind(
48+
k8sApi.batchUnstableClient,
49+
),
50+
);
51+
}
52+
53+
export async function cronJobWatchHandler(
54+
cronJob: V1CronJob | V1beta1CronJob,
55+
): Promise<void> {
3156
cronJob = trimWorkload(cronJob);
3257

3358
if (

src/supervisor/watchers/handlers/index.ts

Lines changed: 82 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -5,12 +5,18 @@ import {
55
ERROR,
66
UPDATE,
77
KubernetesObject,
8+
BatchV1beta1Api,
9+
BatchV1Api,
810
} from '@kubernetes/client-node';
911

1012
import { logger } from '../../../common/logger';
1113
import { WorkloadKind } from '../../types';
1214
import { podWatchHandler, podDeletedHandler, paginatedPodList } from './pod';
13-
import { cronJobWatchHandler, paginatedCronJobList } from './cron-job';
15+
import {
16+
cronJobWatchHandler,
17+
paginatedCronJobList,
18+
paginatedCronJobV1Beta1List,
19+
} from './cron-job';
1420
import { daemonSetWatchHandler, paginatedDaemonSetList } from './daemon-set';
1521
import { deploymentWatchHandler, paginatedDeploymentList } from './deployment';
1622
import { jobWatchHandler, paginatedJobList } from './job';
@@ -75,6 +81,13 @@ const workloadWatchMetadata: Readonly<IWorkloadWatchMetadata> = {
7581
},
7682
listFactory: (namespace) => () => paginatedCronJobList(namespace),
7783
},
84+
[WorkloadKind.CronJobV1Beta1]: {
85+
endpoint: '/apis/batch/v1beta1/watch/namespaces/{namespace}/cronjobs',
86+
handlers: {
87+
[DELETE]: cronJobWatchHandler,
88+
},
89+
listFactory: (namespace) => () => paginatedCronJobV1Beta1List(namespace),
90+
},
7891
[WorkloadKind.Job]: {
7992
endpoint: '/apis/batch/v1/watch/namespaces/{namespace}/jobs',
8093
handlers: {
@@ -125,10 +138,75 @@ async function isSupportedWorkload(
125138
namespace: string,
126139
workloadKind: WorkloadKind,
127140
): Promise<boolean> {
128-
if (workloadKind !== WorkloadKind.DeploymentConfig) {
129-
return true;
141+
switch (workloadKind) {
142+
case WorkloadKind.DeploymentConfig:
143+
return await isDeploymentConfigSupported(namespace);
144+
case WorkloadKind.CronJobV1Beta1:
145+
return await isCronJobVersionSupported(
146+
workloadKind,
147+
namespace,
148+
k8sApi.batchUnstableClient,
149+
);
150+
case WorkloadKind.CronJob:
151+
return await isCronJobVersionSupported(
152+
workloadKind,
153+
namespace,
154+
k8sApi.batchClient,
155+
);
156+
default:
157+
return true;
130158
}
159+
}
131160

161+
async function isCronJobVersionSupported(
162+
workloadKind: WorkloadKind,
163+
namespace: string,
164+
client: BatchV1Api | BatchV1beta1Api,
165+
): Promise<boolean> {
166+
try {
167+
const pretty = undefined;
168+
const allowWatchBookmarks = undefined;
169+
const continueToken = undefined;
170+
const fieldSelector = undefined;
171+
const labelSelector = undefined;
172+
const limit = 1; // Try to grab only a single object
173+
const resourceVersion = undefined; // List anything in the cluster
174+
const resourceVersionMatch = undefined;
175+
const timeoutSeconds = 10; // Don't block the snyk-monitor indefinitely
176+
const attemptedApiCall =
177+
await kubernetesApiWrappers.retryKubernetesApiRequest(() =>
178+
client.listNamespacedCronJob(
179+
namespace,
180+
pretty,
181+
allowWatchBookmarks,
182+
continueToken,
183+
fieldSelector,
184+
labelSelector,
185+
limit,
186+
resourceVersion,
187+
resourceVersionMatch,
188+
timeoutSeconds,
189+
),
190+
);
191+
return (
192+
attemptedApiCall !== undefined &&
193+
attemptedApiCall.response !== undefined &&
194+
attemptedApiCall.response.statusCode !== undefined &&
195+
attemptedApiCall.response.statusCode >= 200 &&
196+
attemptedApiCall.response.statusCode < 300
197+
);
198+
} catch (error) {
199+
logger.debug(
200+
{ error, workloadKind: workloadKind },
201+
'Failed on Kubernetes API call to list CronJob or v1beta1 CronJob',
202+
);
203+
return false;
204+
}
205+
}
206+
207+
async function isDeploymentConfigSupported(
208+
namespace: string,
209+
): Promise<boolean> {
132210
try {
133211
const pretty = undefined;
134212
const continueToken = undefined;
@@ -162,7 +240,7 @@ async function isSupportedWorkload(
162240
);
163241
} catch (error) {
164242
logger.debug(
165-
{ error, workloadKind },
243+
{ error, workloadKind: WorkloadKind.DeploymentConfig },
166244
'Failed on Kubernetes API call to list DeploymentConfig',
167245
);
168246
return false;

src/supervisor/workload-reader.ts

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -208,9 +208,19 @@ const jobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
208208

209209
// cronJobReader can read v1 and v1beta1 CronJobs
210210
const cronJobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
211-
const cronJobResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
212-
() => k8sApi.batchClient.readNamespacedCronJob(workloadName, namespace),
213-
);
211+
const cronJobResult = await kubernetesApiWrappers
212+
.retryKubernetesApiRequest(() =>
213+
k8sApi.batchClient.readNamespacedCronJob(workloadName, namespace),
214+
)
215+
// In case the V1 client fails, try using the V1Beta1 client.
216+
.catch(() =>
217+
kubernetesApiWrappers.retryKubernetesApiRequest(() =>
218+
k8sApi.batchUnstableClient.readNamespacedCronJob(
219+
workloadName,
220+
namespace,
221+
),
222+
),
223+
);
214224
const cronJob = trimWorkload(cronJobResult.body);
215225

216226
if (
@@ -280,13 +290,17 @@ function logIncompleteWorkload(workloadName: string, namespace: string): void {
280290
// Here we are using the "kind" property of a k8s object as a key to map it to a reader.
281291
// This gives us a quick look up table where we can abstract away the internal implementation of reading a resource
282292
// and just grab a generic handler/reader that does that for us (based on the "kind").
283-
const workloadReader = {
293+
const workloadReader: Record<string, IWorkloadReaderFunc> = {
284294
[WorkloadKind.Deployment]: deploymentReader,
285295
[WorkloadKind.ReplicaSet]: replicaSetReader,
286296
[WorkloadKind.StatefulSet]: statefulSetReader,
287297
[WorkloadKind.DaemonSet]: daemonSetReader,
288298
[WorkloadKind.Job]: jobReader,
289299
[WorkloadKind.CronJob]: cronJobReader,
300+
// ------------
301+
// Note: WorkloadKind.CronJobV1Beta1 is intentionally not listed here.
302+
// The WorkloadKind.CronJob reader can handle both v1 and v1beta1 API versions.
303+
// ------------
290304
[WorkloadKind.ReplicationController]: replicationControllerReader,
291305
[WorkloadKind.DeploymentConfig]: deploymentConfigReader,
292306
};

test/helpers/kubernetes-upstream.ts

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ import { config } from '../../src/common/config';
1313

1414
const UPSTREAM_POLLING_CONFIGURATION = {
1515
WAIT_BETWEEN_REQUESTS_MS: 5000,
16-
MAXIMUM_REQUESTS: 120,
16+
MAXIMUM_REQUESTS: 180,
1717
};
1818

1919
export async function getUpstreamResponseBody(
@@ -73,9 +73,11 @@ export async function validateUpstreamStoredMetadata(
7373
remainingChecks: number = UPSTREAM_POLLING_CONFIGURATION.MAXIMUM_REQUESTS,
7474
): Promise<boolean> {
7575
while (remainingChecks > 0) {
76-
console.log(
77-
`Pinging upstream for existing metadata (${remainingChecks} checks remaining)...`,
78-
);
76+
if (remainingChecks % 10 === 0) {
77+
console.log(
78+
`Pinging upstream for existing metadata (${remainingChecks} checks remaining)...`,
79+
);
80+
}
7981
const responseBody = await getUpstreamResponseBody(relativeUrl);
8082
const workloadInfo: IWorkloadMetadata | undefined =
8183
responseBody.workloadInfo;

test/integration/kubernetes.spec.ts

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ import {
1414
} from '../helpers/kubernetes-upstream';
1515
import * as kubectl from '../helpers/kubectl';
1616
import { execWrapper as exec } from '../helpers/exec';
17+
import { IWorkloadLocator } from '../../src/transmitter/types';
1718

1819
let integrationId: string;
1920
let namespace: string;
@@ -37,6 +38,19 @@ test('deploy snyk-monitor', async () => {
3738
integrationId = await setup.deployMonitor();
3839
});
3940

41+
const cronJobValidator = (workloads: IWorkloadLocator[]) =>
42+
workloads.find(
43+
(workload) =>
44+
workload.name === 'cron-job' && workload.type === WorkloadKind.CronJob,
45+
) !== undefined;
46+
47+
const cronJobV1Beta1Validator = (workloads: IWorkloadLocator[]) =>
48+
workloads.find(
49+
(workload) =>
50+
workload.name === 'cron-job-v1beta1' &&
51+
workload.type === WorkloadKind.CronJob,
52+
) !== undefined;
53+
4054
// Next we apply some sample workloads
4155
test('deploy sample workloads', async () => {
4256
const servicesNamespace = 'services';
@@ -50,8 +64,14 @@ test('deploy sample workloads', async () => {
5064
kubectl.applyK8sYaml('./test/fixtures/centos-deployment.yaml'),
5165
kubectl.applyK8sYaml('./test/fixtures/scratch-deployment.yaml'),
5266
kubectl.applyK8sYaml('./test/fixtures/consul-deployment.yaml'),
53-
kubectl.applyK8sYaml('./test/fixtures/cronjob.yaml'),
54-
kubectl.applyK8sYaml('./test/fixtures/cronjob-v1beta1.yaml'),
67+
kubectl.applyK8sYaml('./test/fixtures/cronjob.yaml').catch((error) => {
68+
console.log('CronJob is possibly unsupported', error);
69+
}),
70+
kubectl
71+
.applyK8sYaml('./test/fixtures/cronjob-v1beta1.yaml')
72+
.catch((error) => {
73+
console.log('CronJobV1Beta1 is possibly unsupported', error);
74+
}),
5575
kubectl.createPodFromImage(
5676
'alpine-from-sha',
5777
someImageWithSha,
@@ -152,16 +172,8 @@ test('snyk-monitor sends data to kubernetes-upstream', async () => {
152172
workload.name === 'consul' &&
153173
workload.type === WorkloadKind.Deployment,
154174
) !== undefined &&
155-
workloads.find(
156-
(workload) =>
157-
workload.name === 'cron-job' &&
158-
workload.type === WorkloadKind.CronJob,
159-
) !== undefined &&
160-
workloads.find(
161-
(workload) =>
162-
workload.name === 'cron-job-v1beta1' &&
163-
workload.type === WorkloadKind.CronJob,
164-
) !== undefined
175+
// only one of the cronjob versions needs to be valid
176+
(cronJobValidator(workloads) || cronJobV1Beta1Validator(workloads))
165177
);
166178
};
167179

0 commit comments

Comments
 (0)