Skip to content

Commit 25581b9

Browse files
authored
feat: add K8s().Watch() reconnect on failure logic (#10)
1 parent e789803 commit 25581b9

File tree

10 files changed

+306
-151
lines changed

10 files changed

+306
-151
lines changed

.npmignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,4 +4,6 @@ coverage
44
.prettierrc
55
jest.config.json
66
tsconfig.json
7+
__mocks__
8+
*.config.js
79

package-lock.json

Lines changed: 94 additions & 90 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
"homepage": "https://github.com/defenseunicorns/kubernetes-fluent-client#readme",
3636
"dependencies": {
3737
"@kubernetes/client-node": "1.0.0-rc3",
38+
"byline": "5.0.0",
3839
"fast-json-patch": "3.1.1",
3940
"http-status-codes": "2.3.0",
4041
"node-fetch": "2.7.0",
@@ -44,6 +45,7 @@
4445
"@commitlint/cli": "17.7.2",
4546
"@commitlint/config-conventional": "17.7.0",
4647
"@jest/globals": "29.7.0",
48+
"@types/byline": "4.2.34",
4749
"@typescript-eslint/eslint-plugin": "6.7.3",
4850
"@typescript-eslint/parser": "6.7.3",
4951
"jest": "29.7.0",

src/fluent/index.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ import { modelToGroupVersionKind } from "../kinds";
1010
import { GenericClass } from "../types";
1111
import { Filters, K8sInit, Paths, WatchAction } from "./types";
1212
import { k8sExec } from "./utils";
13-
import { ExecWatch } from "./watch";
13+
import { ExecWatch, WatchCfg } from "./watch";
1414

1515
/**
1616
* Kubernetes fluent API inspired by Kubectl. Pass in a model, then call filters and actions on it.
@@ -119,8 +119,8 @@ export function K8s<T extends GenericClass, K extends KubernetesObject = Instanc
119119
return k8sExec<T, K>(model, filters, "PATCH", payload);
120120
}
121121

122-
async function Watch(callback: WatchAction<T>): Promise<AbortController> {
123-
return ExecWatch(model, filters, callback);
122+
async function Watch(callback: WatchAction<T>, watchCfg?: WatchCfg) {
123+
return ExecWatch(model, filters, callback, watchCfg);
124124
}
125125

126126
return { InNamespace, Apply, Create, Patch, ...withFilters };

src/fluent/types.ts

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ import { Operation } from "fast-json-patch";
66
import type { PartialDeep } from "type-fest";
77

88
import { GenericClass, GroupVersionKind } from "../types";
9+
import { WatchCfg, WatchController } from "./watch";
910

1011
/**
1112
* The Phase matched when using the K8s Watch API.
@@ -51,7 +52,10 @@ export type K8sFilteredActions<K extends KubernetesObject> = {
5152
* @param callback
5253
* @returns
5354
*/
54-
Watch: (callback: (payload: K, phase: WatchPhase) => void) => Promise<AbortController>;
55+
Watch: (
56+
callback: (payload: K, phase: WatchPhase) => void,
57+
watchCfg?: WatchCfg,
58+
) => Promise<WatchController>;
5559
};
5660

5761
export type K8sUnfilteredActions<K extends KubernetesObject> = {

src/fluent/utils.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -112,6 +112,9 @@ export async function k8sCfg(method: FetchMethods) {
112112
},
113113
});
114114

115+
// Enable compression
116+
opts.compress = true;
117+
115118
return { opts, serverUrl: cluster.server };
116119
}
117120

src/fluent/watch.ts

Lines changed: 172 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,63 @@
11
// SPDX-License-Identifier: Apache-2.0
22
// SPDX-FileCopyrightText: 2023-Present The Pepr Authors
33

4-
import readline from "readline";
5-
4+
import byline from "byline";
65
import fetch from "node-fetch";
7-
import { GenericClass } from "../types";
6+
7+
import { GenericClass, LogFn } from "../types";
88
import { Filters, WatchAction, WatchPhase } from "./types";
99
import { k8sCfg, pathBuilder } from "./utils";
1010

11+
/**
12+
* Wrapper for the AbortController to allow the watch to be aborted externally.
13+
*/
14+
export type WatchController = {
15+
/**
16+
* Abort the watch.
17+
* @param reason optional reason for aborting the watch
18+
* @returns
19+
*/
20+
abort: (reason?: string) => void;
21+
/**
22+
* Get the AbortSignal for the watch.
23+
* @returns
24+
*/
25+
signal: () => AbortSignal;
26+
};
27+
28+
/**
29+
* Configuration for the watch function.
30+
*/
31+
export type WatchCfg = {
32+
/**
33+
* The maximum number of times to retry the watch, the retry count is reset on success.
34+
*/
35+
retryMax?: number;
36+
/**
37+
* The delay between retries in seconds.
38+
*/
39+
retryDelaySec?: number;
40+
/**
41+
* A function to log errors.
42+
*/
43+
logFn?: LogFn;
44+
/**
45+
* A function to call when the watch fails after the maximum number of retries.
46+
*/
47+
retryFail?: (e: Error) => void;
48+
};
49+
1150
/**
1251
* Execute a watch on the specified resource.
1352
*/
1453
export async function ExecWatch<T extends GenericClass>(
1554
model: T,
1655
filters: Filters,
1756
callback: WatchAction<T>,
57+
watchCfg: WatchCfg = {},
1858
) {
59+
watchCfg.logFn?.({ model, filters, watchCfg }, "ExecWatch");
60+
1961
// Build the path and query params for the resource, excluding the name
2062
const { opts, serverUrl } = await k8sCfg("GET");
2163
const url = pathBuilder(serverUrl, model, filters, true);
@@ -31,64 +73,137 @@ export async function ExecWatch<T extends GenericClass>(
3173
url.searchParams.set("fieldSelector", `metadata.name=${filters.name}`);
3274
}
3375

34-
// Add abort controller to the long-running request
35-
const controller = new AbortController();
36-
opts.signal = controller.signal;
76+
// Set the initial timeout to 15 seconds
77+
opts.timeout = 15 * 1000;
78+
79+
// Enable keep alive
80+
(opts.agent as unknown as { keepAlive: boolean }).keepAlive = true;
81+
82+
// Track the number of retries
83+
let retryCount = 0;
84+
85+
// Set the maximum number of retries to 5 if not specified
86+
watchCfg.retryMax ??= 5;
87+
88+
// Set the retry delay to 5 seconds if not specified
89+
watchCfg.retryDelaySec ??= 5;
90+
91+
// Create a throwaway AbortController to setup the wrapped AbortController
92+
let abortController: AbortController;
93+
94+
// Create a wrapped AbortController to allow the watch to be aborted externally
95+
const abortWrapper = {} as WatchController;
96+
97+
function bindAbortController() {
98+
// Create a new AbortController
99+
abortController = new AbortController();
100+
101+
// Update the abort wrapper
102+
abortWrapper.abort = reason => abortController.abort(reason);
103+
abortWrapper.signal = () => abortController.signal;
104+
105+
// Add the abort signal to the request options
106+
opts.signal = abortController.signal;
107+
}
108+
109+
async function runner() {
110+
let doneCalled = false;
37111

38-
// Close the connection and make the callback function no-op
39-
let close = (err?: Error) => {
40-
controller.abort();
41-
close = () => {};
42-
if (err) {
43-
throw err;
112+
bindAbortController();
113+
114+
// Create a stream to read the response body
115+
const stream = byline.createStream();
116+
117+
const onError = (err: Error) => {
118+
stream.removeAllListeners();
119+
120+
if (!doneCalled) {
121+
doneCalled = true;
122+
123+
// If the error is not an AbortError, reload the watch
124+
if (err.name !== "AbortError") {
125+
watchCfg.logFn?.(err, "stream error");
126+
void reload(err);
127+
} else {
128+
watchCfg.logFn?.("watch aborted via WatchController.abort()");
129+
}
130+
}
131+
};
132+
133+
const cleanup = () => {
134+
if (!doneCalled) {
135+
doneCalled = true;
136+
stream.removeAllListeners();
137+
}
138+
};
139+
140+
try {
141+
// Make the actual request
142+
const response = await fetch(url, { ...opts });
143+
144+
// If the request is successful, start listening for events
145+
if (response.ok) {
146+
const { body } = response;
147+
148+
// Reset the retry count
149+
retryCount = 0;
150+
151+
stream.on("error", onError);
152+
stream.on("close", cleanup);
153+
stream.on("finish", cleanup);
154+
155+
// Listen for events and call the callback function
156+
stream.on("data", line => {
157+
try {
158+
// Parse the event payload
159+
const { object: payload, type: phase } = JSON.parse(line) as {
160+
type: WatchPhase;
161+
object: InstanceType<T>;
162+
};
163+
164+
// Call the callback function with the parsed payload
165+
void callback(payload, phase as WatchPhase);
166+
} catch (err) {
167+
watchCfg.logFn?.(err, "watch callback error");
168+
}
169+
});
170+
171+
body.on("error", onError);
172+
body.on("close", cleanup);
173+
body.on("finish", cleanup);
174+
175+
// Pipe the response body to the stream
176+
body.pipe(stream);
177+
} else {
178+
throw new Error(`watch failed: ${response.status} ${response.statusText}`);
179+
}
180+
} catch (e) {
181+
onError(e);
44182
}
45-
};
46-
47-
try {
48-
// Make the actual request
49-
const response = await fetch(url, opts);
50-
51-
// If the request is successful, start listening for events
52-
if (response.ok) {
53-
const { body } = response;
54-
55-
// Bind connection events to the close function
56-
body.on("error", close);
57-
body.on("close", close);
58-
body.on("finish", close);
59-
60-
// Create a readline interface to parse the stream
61-
const rl = readline.createInterface({
62-
input: response.body!,
63-
terminal: false,
64-
});
65-
66-
// Listen for events and call the callback function
67-
rl.on("line", line => {
68-
try {
69-
// Parse the event payload
70-
const { object: payload, type: phase } = JSON.parse(line) as {
71-
type: WatchPhase;
72-
object: InstanceType<T>;
73-
};
74-
75-
// Call the callback function with the parsed payload
76-
void callback(payload, phase as WatchPhase);
77-
} catch (ignore) {
78-
// ignore parse errors
183+
184+
// On unhandled errors, retry the watch
185+
async function reload(e: Error) {
186+
// If there are more attempts, retry the watch
187+
if (watchCfg.retryMax! > retryCount) {
188+
retryCount++;
189+
190+
watchCfg.logFn?.(`retrying watch ${retryCount}/${watchCfg.retryMax}`);
191+
192+
// Sleep for the specified delay or 5 seconds
193+
await new Promise(r => setTimeout(r, watchCfg.retryDelaySec! * 1000));
194+
195+
// Retry the watch after the delay
196+
await runner();
197+
} else {
198+
// Otherwise, call the finally function if it exists
199+
if (watchCfg.retryFail) {
200+
watchCfg.retryFail(e);
79201
}
80-
});
81-
} else {
82-
// If the request fails, throw an error
83-
const error = new Error(response.statusText) as Error & {
84-
statusCode: number | undefined;
85-
};
86-
error.statusCode = response.status;
87-
throw error;
202+
}
88203
}
89-
} catch (e) {
90-
close(e);
91204
}
92205

93-
return controller;
206+
await runner();
207+
208+
return abortWrapper;
94209
}

src/kinds.test.ts

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@ import { RegisterKind } from "./kinds";
88
import { GroupVersionKind } from "./types";
99

1010
const testCases = [
11+
{
12+
name: kind.Event,
13+
expected: { group: "events.k8s.io", version: "v1", kind: "Event" },
14+
},
1115
{
1216
name: kind.ClusterRole,
1317
expected: { group: "rbac.authorization.k8s.io", version: "v1", kind: "ClusterRole" },

src/kinds.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,20 @@
44
import { GenericClass, GroupVersionKind } from "./types";
55

66
const gvkMap: Record<string, GroupVersionKind> = {
7+
/**
8+
* Represents a K8s Event resource.
9+
* Event is a report of an event somewhere in the cluster. It generally denotes some state change in the system.
10+
* Events have a limited retention time and triggers and messages may evolve with time. Event consumers should not
11+
* rely on the timing of an event with a given Reason reflecting a consistent underlying trigger, or the continued
12+
* existence of events with that Reason. Events should be treated as informative, best-effort, supplemental data.
13+
* @see {@link https://kubernetes.io/docs/reference/kubernetes-api/cluster-resources/event-v1/}
14+
*/
15+
CoreV1Event: {
16+
kind: "Event",
17+
version: "v1",
18+
group: "events.k8s.io",
19+
},
20+
721
/**
822
* Represents a K8s ClusterRole resource.
923
* ClusterRole is a set of permissions that can be bound to a user or group in a cluster-wide scope.

src/types.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,3 +33,10 @@ export interface GroupVersionKind {
3333
/** Optional, override the plural name for use in Webhook rules generation */
3434
readonly plural?: string;
3535
}
36+
37+
export interface LogFn {
38+
/* tslint:disable:no-unnecessary-generics */
39+
<T extends object>(obj: T, msg?: string, ...args: never[]): void;
40+
(obj: unknown, msg?: string, ...args: never[]): void;
41+
(msg: string, ...args: never[]): void;
42+
}

0 commit comments

Comments
 (0)