Skip to content

Commit f27f172

Browse files
author
Amir Moualem
authored
Merge pull request #285 from snyk/fix/k8s-api-requests
fix: retry and throttle requests to k8s-api
2 parents 65b1c32 + 344f6fb commit f27f172

File tree

8 files changed

+175
-28
lines changed

8 files changed

+175
-28
lines changed

package.json

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5,15 +5,15 @@
55
"scripts": {
66
"pretest": "./scripts/build-image.sh",
77
"test": "npm run lint && npm run build && npm run test:unit && npm run test:integration",
8-
"test:unit": "NODE_ENV=test tap test/unit -R spec",
9-
"test:integration": "TEST_PLATFORM=kind CREATE_CLUSTER=true tap test/integration/kubernetes.test.ts -R spec --timeout=1200",
10-
"test:integration:kind": "TEST_PLATFORM=kind CREATE_CLUSTER=true tap test/integration/kubernetes.test.ts -R spec --timeout=1200",
11-
"test:integration:eks": "TEST_PLATFORM=eks CREATE_CLUSTER=false tap test/integration/kubernetes.test.ts -R spec --timeout=1200",
8+
"test:unit": "NODE_ENV=test tap test/unit",
9+
"test:integration": "TEST_PLATFORM=kind CREATE_CLUSTER=true tap test/integration/kubernetes.test.ts --timeout=1200",
10+
"test:integration:kind": "TEST_PLATFORM=kind CREATE_CLUSTER=true tap test/integration/kubernetes.test.ts --timeout=1200",
11+
"test:integration:eks": "TEST_PLATFORM=eks CREATE_CLUSTER=false tap test/integration/kubernetes.test.ts --timeout=1200",
1212
"test:coverage": "npm run test:unit -- --coverage",
1313
"test:watch": "tsc-watch --onSuccess 'npm run test:unit'",
14-
"test:apk": "TEST_PLATFORM=kind CREATE_CLUSTER=true PACKAGE_MANAGER=apk tap test/integration/package-manager.test.ts -R spec --timeout=7200",
15-
"test:apt": "TEST_PLATFORM=kind CREATE_CLUSTER=true PACKAGE_MANAGER=apt tap test/integration/package-manager.test.ts -R spec --timeout=7200",
16-
"test:rpm": "TEST_PLATFORM=kind CREATE_CLUSTER=true PACKAGE_MANAGER=rpm tap test/integration/package-manager.test.ts -R spec --timeout=7200",
14+
"test:apk": "TEST_PLATFORM=kind CREATE_CLUSTER=true PACKAGE_MANAGER=apk tap test/integration/package-manager.test.ts --timeout=7200",
15+
"test:apt": "TEST_PLATFORM=kind CREATE_CLUSTER=true PACKAGE_MANAGER=apt tap test/integration/package-manager.test.ts --timeout=7200",
16+
"test:rpm": "TEST_PLATFORM=kind CREATE_CLUSTER=true PACKAGE_MANAGER=rpm tap test/integration/package-manager.test.ts --timeout=7200",
1717
"start": "bin/start",
1818
"prepare": "npm run build",
1919
"build": "tsc",
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import * as http from 'http';
2+
import * as sleep from 'sleep-promise';
3+
4+
export const ATTEMPTS_MAX = 3;
5+
export const DEFAULT_SLEEP_SEC = 1;
6+
export const MAX_SLEEP_SEC = 5;
7+
type IKubernetesApiFunction = () => Promise<any>;
8+
9+
export async function retryKubernetesApiRequest(func: IKubernetesApiFunction) {
10+
11+
for (let attempt = 1; attempt <= ATTEMPTS_MAX; attempt++) {
12+
try {
13+
return await func();
14+
} catch (err) {
15+
const response = err.response;
16+
if (response.statusCode !== 429) {
17+
throw err;
18+
}
19+
20+
if (attempt === ATTEMPTS_MAX) {
21+
throw err;
22+
}
23+
24+
const sleepSeconds = calculateSleepSeconds(response);
25+
await sleep(sleepSeconds * 1000);
26+
}
27+
}
28+
}
29+
30+
export function calculateSleepSeconds(httpResponse: http.IncomingMessage): number {
31+
let sleepSeconds = DEFAULT_SLEEP_SEC;
32+
if (httpResponse && httpResponse.headers && httpResponse.headers['Retry-After']) {
33+
try {
34+
sleepSeconds = Number(httpResponse.headers['Retry-After']);
35+
if (isNaN(sleepSeconds) || sleepSeconds <= 0) {
36+
sleepSeconds = DEFAULT_SLEEP_SEC;
37+
}
38+
} catch (err) {
39+
sleepSeconds = DEFAULT_SLEEP_SEC;
40+
}
41+
}
42+
return Math.min(sleepSeconds, MAX_SLEEP_SEC);
43+
}

src/supervisor/workload-reader.ts

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import { V1OwnerReference } from '@kubernetes/client-node';
2+
3+
import * as kubernetesApiWrappers from './kuberenetes-api-wrappers';
24
import { k8sApi } from './cluster';
35
import { IKubeObjectMetadata, WorkloadKind } from './types';
46

@@ -8,8 +10,8 @@ type IWorkloadReaderFunc = (
810
) => Promise<IKubeObjectMetadata | undefined>;
911

1012
const deploymentReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
11-
const deploymentResult = await k8sApi.appsClient.readNamespacedDeployment(
12-
workloadName, namespace);
13+
const deploymentResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
14+
() => k8sApi.appsClient.readNamespacedDeployment(workloadName, namespace));
1315
const deployment = deploymentResult.body;
1416

1517
if (!deployment.metadata || !deployment.spec || !deployment.spec.template.metadata ||
@@ -29,8 +31,8 @@ const deploymentReader: IWorkloadReaderFunc = async (workloadName, namespace) =>
2931
};
3032

3133
const replicaSetReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
32-
const replicaSetResult = await k8sApi.appsClient.readNamespacedReplicaSet(
33-
workloadName, namespace);
34+
const replicaSetResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
35+
() => k8sApi.appsClient.readNamespacedReplicaSet(workloadName, namespace));
3436
const replicaSet = replicaSetResult.body;
3537

3638
if (!replicaSet.metadata || !replicaSet.spec || !replicaSet.spec.template ||
@@ -50,8 +52,8 @@ const replicaSetReader: IWorkloadReaderFunc = async (workloadName, namespace) =>
5052
};
5153

5254
const statefulSetReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
53-
const statefulSetResult = await k8sApi.appsClient.readNamespacedStatefulSet(
54-
workloadName, namespace);
55+
const statefulSetResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
56+
() => k8sApi.appsClient.readNamespacedStatefulSet(workloadName, namespace));
5557
const statefulSet = statefulSetResult.body;
5658

5759
if (!statefulSet.metadata || !statefulSet.spec || !statefulSet.spec.template.metadata ||
@@ -71,8 +73,8 @@ const statefulSetReader: IWorkloadReaderFunc = async (workloadName, namespace) =
7173
};
7274

7375
const daemonSetReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
74-
const daemonSetResult = await k8sApi.appsClient.readNamespacedDaemonSet(
75-
workloadName, namespace);
76+
const daemonSetResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
77+
() => k8sApi.appsClient.readNamespacedDaemonSet(workloadName, namespace));
7678
const daemonSet = daemonSetResult.body;
7779

7880
if (!daemonSet.metadata || !daemonSet.spec || !daemonSet.spec.template.spec ||
@@ -92,8 +94,8 @@ const daemonSetReader: IWorkloadReaderFunc = async (workloadName, namespace) =>
9294
};
9395

9496
const jobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
95-
const jobResult = await k8sApi.batchClient.readNamespacedJob(
96-
workloadName, namespace);
97+
const jobResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
98+
() => k8sApi.batchClient.readNamespacedJob(workloadName, namespace));
9799
const job = jobResult.body;
98100

99101
if (!job.metadata || !job.spec || !job.spec.template.spec || !job.spec.template.metadata) {
@@ -114,8 +116,8 @@ const jobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
114116
// https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-versioning
115117
// CronJobs will appear in v2 API, but for now there' only v2alpha1, so it's a bad idea to use it.
116118
const cronJobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
117-
const cronJobResult = await k8sApi.batchUnstableClient.readNamespacedCronJob(
118-
workloadName, namespace);
119+
const cronJobResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
120+
() => k8sApi.batchUnstableClient.readNamespacedCronJob(workloadName, namespace));
119121
const cronJob = cronJobResult.body;
120122

121123
if (!cronJob.metadata || !cronJob.spec || !cronJob.spec.jobTemplate.metadata ||
@@ -134,8 +136,8 @@ const cronJobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
134136
};
135137

136138
const replicationControllerReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
137-
const replicationControllerResult = await k8sApi.coreClient.readNamespacedReplicationController(
138-
workloadName, namespace);
139+
const replicationControllerResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
140+
() => k8sApi.coreClient.readNamespacedReplicationController(workloadName, namespace));
139141
const replicationController = replicationControllerResult.body;
140142

141143
if (!replicationController.metadata || !replicationController.spec || !replicationController.spec.template ||
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import * as tap from 'tap';
2+
import * as http from 'http';
3+
import * as kubernetesApiWrappers from '../../../src/supervisor/kuberenetes-api-wrappers';
4+
5+
tap.test('calculateSleepSeconds', async (t) => {
6+
const responseWithoutHeaders = {};
7+
t.equals(
8+
kubernetesApiWrappers.calculateSleepSeconds(responseWithoutHeaders as http.IncomingMessage),
9+
kubernetesApiWrappers.DEFAULT_SLEEP_SEC,
10+
'returns the default value for a response without headers',
11+
);
12+
13+
const responseWithNegativeSeconds = {headers: {'Retry-After': -3}};
14+
t.equals(
15+
kubernetesApiWrappers.calculateSleepSeconds(responseWithNegativeSeconds as unknown as http.IncomingMessage),
16+
kubernetesApiWrappers.DEFAULT_SLEEP_SEC,
17+
'returns the default value for a response with negative retry',
18+
);
19+
20+
const responseWithZeroSeconds = {headers: {'Retry-After': 0}};
21+
t.equals(
22+
kubernetesApiWrappers.calculateSleepSeconds(responseWithZeroSeconds as unknown as http.IncomingMessage),
23+
kubernetesApiWrappers.DEFAULT_SLEEP_SEC,
24+
'returns the default value for a response with zero retry',
25+
);
26+
27+
const responseWithDate = {headers: {'Retry-After': 'Fri, 31 Dec 1999 23:59:59 GMT'}};
28+
t.equals(
29+
kubernetesApiWrappers.calculateSleepSeconds(responseWithDate as unknown as http.IncomingMessage),
30+
kubernetesApiWrappers.DEFAULT_SLEEP_SEC,
31+
'returns the default value for a response with a date',
32+
);
33+
34+
const responseWithHighSecondsMock = {headers: {'Retry-After': 55}};
35+
t.equals(
36+
kubernetesApiWrappers.calculateSleepSeconds(responseWithHighSecondsMock as unknown as http.IncomingMessage),
37+
kubernetesApiWrappers.MAX_SLEEP_SEC,
38+
'returns a value limited for high retry values',
39+
);
40+
41+
const responseWithSecondsMock = {headers: {'Retry-After': 4}};
42+
t.equals(
43+
kubernetesApiWrappers.calculateSleepSeconds(responseWithSecondsMock as unknown as http.IncomingMessage),
44+
4,
45+
'returns the retry-after value if numeric, positive and not too high',
46+
);
47+
});
48+
49+
tap.test('retryKubernetesApiRequest for retryable errors', async (t) => {
50+
const retryableErrorResponse = {response: {statusCode: 429}};
51+
52+
t.rejects(
53+
() => kubernetesApiWrappers.retryKubernetesApiRequest(() => Promise.reject(retryableErrorResponse)),
54+
'eventually throws on repeated retryable error responses',
55+
);
56+
57+
let failures = 0;
58+
const functionThatFailsJustEnoughTimes = () => {
59+
if (failures < kubernetesApiWrappers.ATTEMPTS_MAX - 1) {
60+
failures +=1;
61+
return Promise.reject(retryableErrorResponse);
62+
}
63+
return Promise.resolve('egg');
64+
};
65+
66+
const successfulResponse = await kubernetesApiWrappers.retryKubernetesApiRequest(functionThatFailsJustEnoughTimes);
67+
t.equals(
68+
successfulResponse,
69+
'egg',
70+
'keeps retrying on 429 as long as we don\'t cross max attempts',
71+
);
72+
73+
failures = 0;
74+
const functionThatFailsOneTooManyTimes = () => {
75+
if (failures < kubernetesApiWrappers.ATTEMPTS_MAX) {
76+
failures +=1;
77+
return Promise.reject(retryableErrorResponse);
78+
}
79+
return Promise.resolve('egg');
80+
};
81+
82+
t.rejects(
83+
() => kubernetesApiWrappers.retryKubernetesApiRequest(functionThatFailsOneTooManyTimes),
84+
'failure more than the maximum, rejects, even for a retryable error response',
85+
);
86+
});
87+
88+
tap.test('retryKubernetesApiRequest for non-retryable errors', async (t) => {
89+
const nonRetryableErrorResponse = {response: {statusCode: 500}};
90+
91+
let failures = 0;
92+
const functionThatFails = () => {
93+
failures +=1;
94+
return Promise.reject(nonRetryableErrorResponse);
95+
};
96+
97+
t.rejects(
98+
() => kubernetesApiWrappers.retryKubernetesApiRequest(functionThatFails),
99+
'failure more than the maximum, rejects, even for a retryable error response',
100+
);
101+
t.equals(failures, 1, 'did not retry even once for non-retryable error code');
102+
});

test/unit/metadata-extractor.test.ts renamed to test/unit/supervisor/metadata-extractor.test.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,9 @@ import * as fs from 'fs';
33
import * as YAML from 'yaml';
44

55
import { V1OwnerReference, V1Pod, V1Deployment } from '@kubernetes/client-node';
6-
import * as supervisorTypes from '../../src/supervisor/types';
6+
import * as supervisorTypes from '../../../src/supervisor/types';
77

8-
import * as metadataExtractor from '../../src/supervisor/metadata-extractor';
8+
import * as metadataExtractor from '../../../src/supervisor/metadata-extractor';
99

1010
tap.test('isPodAssociatedWithParent', async (t) => {
1111
const mockPodWithoutMetadata = {};

test/unit/pod-watch-handler-caches.test.ts renamed to test/unit/supervisor/pod-watch-handler-caches.test.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,13 @@ import * as YAML from 'yaml';
66
import async = require('async');
77

88
import { V1PodSpec, V1Pod } from '@kubernetes/client-node';
9-
import transmitterTypes = require('../../src/transmitter/types');
10-
import * as metadataExtractor from '../../src/supervisor/metadata-extractor';
9+
import transmitterTypes = require('../../../src/transmitter/types');
10+
import * as metadataExtractor from '../../../src/supervisor/metadata-extractor';
1111

1212
let pushCallCount = 0;
1313
sinon.stub(async, 'queue').returns({ error: () => { }, push: () => pushCallCount++ } as any);
1414

15-
import * as pod from '../../src/supervisor/watchers/handlers/pod';
15+
import * as pod from '../../../src/supervisor/watchers/handlers/pod';
1616

1717
tap.test('image and workload image cache', async (t) => {
1818
const podSpecFixture = fs.readFileSync('./test/fixtures/pod-spec.json', 'utf8');

test/unit/watchers.test.ts renamed to test/unit/supervisor/watchers.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
import * as tap from 'tap';
22
import { V1Namespace } from '@kubernetes/client-node';
33

4-
import watchers = require('../../src/supervisor/watchers');
4+
import watchers = require('../../../src/supervisor/watchers');
55

66
tap.test('extractNamespaceName', async (t) => {
77
const namespaceEmpty = {} as V1Namespace;

test/unit/workload-reader.test.ts renamed to test/unit/supervisor/workload-reader.test.ts

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import * as tap from 'tap';
22

3-
import { SupportedWorkloadTypes, getSupportedWorkload } from '../../src/supervisor/workload-reader';
3+
import { SupportedWorkloadTypes, getSupportedWorkload } from '../../../src/supervisor/workload-reader';
44
import { V1OwnerReference } from '@kubernetes/client-node';
55

66
tap.test('SupportedWorkloadTypes', async (t) => {

0 commit comments

Comments
 (0)