Skip to content

Commit c08cf69

Browse files
committed
fix: retry and throttle requests to k8s-api
When building a workload's metadata, we send requests to the Kubernetes-API to get additional details (a workload's parent). Some of these requests fail with an HTTP response code of 429 ("too many requests") which is basically the Kubernetes-API telling us its busy at the moment. These responses also may contain a "Retry-After" header, that according to the HTTP standard is either a non-negative decimal integer or a date, serving as an instruction by the server for when the client should try next. https://tools.ietf.org/html/rfc7231#section-7.1.3 This commit adds a retry mechanism to our Kubernetes-API requests in case we get such a 429 response.
1 parent 0f9d342 commit c08cf69

File tree

3 files changed

+161
-14
lines changed

3 files changed

+161
-14
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import * as http from 'http';
2+
import * as sleep from 'sleep-promise';
3+
4+
export const ATTEMPTS_MAX = 3;
5+
export const DEFAULT_SLEEP_SEC = 1;
6+
export const MAX_SLEEP_SEC = 5;
7+
type IKubernetesApiFunction = () => Promise<any>;
8+
9+
export async function retryKubernetesApiRequest(func: IKubernetesApiFunction) {
10+
11+
for (let attempt = 1; attempt <= ATTEMPTS_MAX; attempt++) {
12+
try {
13+
return await func();
14+
} catch (err) {
15+
const response = err.response;
16+
if (response.statusCode !== 429) {
17+
throw err;
18+
}
19+
20+
if (attempt === ATTEMPTS_MAX) {
21+
throw err;
22+
}
23+
24+
const sleepSeconds = calculateSleepSeconds(response);
25+
await sleep(sleepSeconds * 1000);
26+
}
27+
}
28+
}
29+
30+
export function calculateSleepSeconds(httpResponse: http.IncomingMessage): number {
31+
let sleepSeconds = DEFAULT_SLEEP_SEC;
32+
if (httpResponse && httpResponse.headers && httpResponse.headers['Retry-After']) {
33+
try {
34+
sleepSeconds = Number(httpResponse.headers['Retry-After']);
35+
if (isNaN(sleepSeconds) || sleepSeconds <= 0) {
36+
sleepSeconds = DEFAULT_SLEEP_SEC;
37+
}
38+
} catch (err) {
39+
sleepSeconds = DEFAULT_SLEEP_SEC;
40+
}
41+
}
42+
return Math.min(sleepSeconds, MAX_SLEEP_SEC);
43+
}

src/supervisor/workload-reader.ts

Lines changed: 16 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import { V1OwnerReference } from '@kubernetes/client-node';
2+
3+
import * as kubernetesApiWrappers from './kuberenetes-api-wrappers';
24
import { k8sApi } from './cluster';
35
import { IKubeObjectMetadata, WorkloadKind } from './types';
46

@@ -8,8 +10,8 @@ type IWorkloadReaderFunc = (
810
) => Promise<IKubeObjectMetadata | undefined>;
911

1012
const deploymentReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
11-
const deploymentResult = await k8sApi.appsClient.readNamespacedDeployment(
12-
workloadName, namespace);
13+
const deploymentResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
14+
() => k8sApi.appsClient.readNamespacedDeployment(workloadName, namespace));
1315
const deployment = deploymentResult.body;
1416

1517
if (!deployment.metadata || !deployment.spec || !deployment.spec.template.metadata ||
@@ -29,8 +31,8 @@ const deploymentReader: IWorkloadReaderFunc = async (workloadName, namespace) =>
2931
};
3032

3133
const replicaSetReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
32-
const replicaSetResult = await k8sApi.appsClient.readNamespacedReplicaSet(
33-
workloadName, namespace);
34+
const replicaSetResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
35+
() => k8sApi.appsClient.readNamespacedReplicaSet(workloadName, namespace));
3436
const replicaSet = replicaSetResult.body;
3537

3638
if (!replicaSet.metadata || !replicaSet.spec || !replicaSet.spec.template ||
@@ -50,8 +52,8 @@ const replicaSetReader: IWorkloadReaderFunc = async (workloadName, namespace) =>
5052
};
5153

5254
const statefulSetReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
53-
const statefulSetResult = await k8sApi.appsClient.readNamespacedStatefulSet(
54-
workloadName, namespace);
55+
const statefulSetResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
56+
() => k8sApi.appsClient.readNamespacedStatefulSet(workloadName, namespace));
5557
const statefulSet = statefulSetResult.body;
5658

5759
if (!statefulSet.metadata || !statefulSet.spec || !statefulSet.spec.template.metadata ||
@@ -71,8 +73,8 @@ const statefulSetReader: IWorkloadReaderFunc = async (workloadName, namespace) =
7173
};
7274

7375
const daemonSetReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
74-
const daemonSetResult = await k8sApi.appsClient.readNamespacedDaemonSet(
75-
workloadName, namespace);
76+
const daemonSetResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
77+
() => k8sApi.appsClient.readNamespacedDaemonSet(workloadName, namespace));
7678
const daemonSet = daemonSetResult.body;
7779

7880
if (!daemonSet.metadata || !daemonSet.spec || !daemonSet.spec.template.spec ||
@@ -92,8 +94,8 @@ const daemonSetReader: IWorkloadReaderFunc = async (workloadName, namespace) =>
9294
};
9395

9496
const jobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
95-
const jobResult = await k8sApi.batchClient.readNamespacedJob(
96-
workloadName, namespace);
97+
const jobResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
98+
() => k8sApi.batchClient.readNamespacedJob(workloadName, namespace));
9799
const job = jobResult.body;
98100

99101
if (!job.metadata || !job.spec || !job.spec.template.spec || !job.spec.template.metadata) {
@@ -114,8 +116,8 @@ const jobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
114116
// https://kubernetes.io/docs/concepts/overview/kubernetes-api/#api-versioning
115117
// CronJobs will appear in v2 API, but for now there' only v2alpha1, so it's a bad idea to use it.
116118
const cronJobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
117-
const cronJobResult = await k8sApi.batchUnstableClient.readNamespacedCronJob(
118-
workloadName, namespace);
119+
const cronJobResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
120+
() => k8sApi.batchUnstableClient.readNamespacedCronJob(workloadName, namespace));
119121
const cronJob = cronJobResult.body;
120122

121123
if (!cronJob.metadata || !cronJob.spec || !cronJob.spec.jobTemplate.metadata ||
@@ -134,8 +136,8 @@ const cronJobReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
134136
};
135137

136138
const replicationControllerReader: IWorkloadReaderFunc = async (workloadName, namespace) => {
137-
const replicationControllerResult = await k8sApi.coreClient.readNamespacedReplicationController(
138-
workloadName, namespace);
139+
const replicationControllerResult = await kubernetesApiWrappers.retryKubernetesApiRequest(
140+
() => k8sApi.coreClient.readNamespacedReplicationController(workloadName, namespace));
139141
const replicationController = replicationControllerResult.body;
140142

141143
if (!replicationController.metadata || !replicationController.spec || !replicationController.spec.template ||
Lines changed: 102 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,102 @@
1+
import * as tap from 'tap';
2+
import * as http from 'http';
3+
import * as kubernetesApiWrappers from '../../../src/supervisor/kuberenetes-api-wrappers';
4+
5+
tap.test('calculateSleepSeconds', async (t) => {
6+
const responseWithoutHeaders = {};
7+
t.equals(
8+
kubernetesApiWrappers.calculateSleepSeconds(responseWithoutHeaders as http.IncomingMessage),
9+
kubernetesApiWrappers.DEFAULT_SLEEP_SEC,
10+
'returns the default value for a response without headers',
11+
);
12+
13+
const responseWithNegativeSeconds = {headers: {'Retry-After': -3}};
14+
t.equals(
15+
kubernetesApiWrappers.calculateSleepSeconds(responseWithNegativeSeconds as unknown as http.IncomingMessage),
16+
kubernetesApiWrappers.DEFAULT_SLEEP_SEC,
17+
'returns the default value for a response with negative retry',
18+
);
19+
20+
const responseWithZeroSeconds = {headers: {'Retry-After': 0}};
21+
t.equals(
22+
kubernetesApiWrappers.calculateSleepSeconds(responseWithZeroSeconds as unknown as http.IncomingMessage),
23+
kubernetesApiWrappers.DEFAULT_SLEEP_SEC,
24+
'returns the default value for a response with zero retry',
25+
);
26+
27+
const responseWithDate = {headers: {'Retry-After': 'Fri, 31 Dec 1999 23:59:59 GMT'}};
28+
t.equals(
29+
kubernetesApiWrappers.calculateSleepSeconds(responseWithDate as unknown as http.IncomingMessage),
30+
kubernetesApiWrappers.DEFAULT_SLEEP_SEC,
31+
'returns the default value for a response with a date',
32+
);
33+
34+
const responseWithHighSecondsMock = {headers: {'Retry-After': 55}};
35+
t.equals(
36+
kubernetesApiWrappers.calculateSleepSeconds(responseWithHighSecondsMock as unknown as http.IncomingMessage),
37+
kubernetesApiWrappers.MAX_SLEEP_SEC,
38+
'returns a value limited for high retry values',
39+
);
40+
41+
const responseWithSecondsMock = {headers: {'Retry-After': 4}};
42+
t.equals(
43+
kubernetesApiWrappers.calculateSleepSeconds(responseWithSecondsMock as unknown as http.IncomingMessage),
44+
4,
45+
'returns the retry-after value if numeric, positive and not too high',
46+
);
47+
});
48+
49+
tap.test('retryKubernetesApiRequest for retryable errors', async (t) => {
50+
const retryableErrorResponse = {response: {statusCode: 429}};
51+
52+
t.rejects(
53+
() => kubernetesApiWrappers.retryKubernetesApiRequest(() => Promise.reject(retryableErrorResponse)),
54+
'eventually throws on repeated retryable error responses',
55+
);
56+
57+
let failures = 0;
58+
const functionThatFailsJustEnoughTimes = () => {
59+
if (failures < kubernetesApiWrappers.ATTEMPTS_MAX - 1) {
60+
failures +=1;
61+
return Promise.reject(retryableErrorResponse);
62+
}
63+
return Promise.resolve('egg');
64+
};
65+
66+
const successfulResponse = await kubernetesApiWrappers.retryKubernetesApiRequest(functionThatFailsJustEnoughTimes);
67+
t.equals(
68+
successfulResponse,
69+
'egg',
70+
'keeps retrying on 429 as long as we don\'t cross max attempts',
71+
);
72+
73+
failures = 0;
74+
const functionThatFailsOneTooManyTimes = () => {
75+
if (failures < kubernetesApiWrappers.ATTEMPTS_MAX) {
76+
failures +=1;
77+
return Promise.reject(retryableErrorResponse);
78+
}
79+
return Promise.resolve('egg');
80+
};
81+
82+
t.rejects(
83+
() => kubernetesApiWrappers.retryKubernetesApiRequest(functionThatFailsOneTooManyTimes),
84+
'failure more than the maximum, rejects, even for a retryable error response',
85+
);
86+
});
87+
88+
tap.test('retryKubernetesApiRequest for non-retryable errors', async (t) => {
89+
const nonRetryableErrorResponse = {response: {statusCode: 500}};
90+
91+
let failures = 0;
92+
const functionThatFails = () => {
93+
failures +=1;
94+
return Promise.reject(nonRetryableErrorResponse);
95+
};
96+
97+
t.rejects(
98+
() => kubernetesApiWrappers.retryKubernetesApiRequest(functionThatFails),
99+
'failure more than the maximum, rejects, even for a retryable error response',
100+
);
101+
t.equals(failures, 1, 'did not retry even once for non-retryable error code');
102+
});

0 commit comments

Comments
 (0)