diff --git a/src/watch.ts b/src/watch.ts index 702f0f3da3..98db927d45 100644 --- a/src/watch.ts +++ b/src/watch.ts @@ -46,8 +46,8 @@ export class Watch { let doneCalled: boolean = false; const doneCallOnce = (err: any) => { if (!doneCalled) { - controller.abort(); doneCalled = true; + controller.abort(); done(err); } }; diff --git a/src/watch_test.ts b/src/watch_test.ts index fd2fe60dd8..58252b1290 100644 --- a/src/watch_test.ts +++ b/src/watch_test.ts @@ -5,7 +5,8 @@ import { PassThrough } from 'node:stream'; import { KubeConfig } from './config.js'; import { Cluster, Context, User } from './config_types.js'; import { Watch } from './watch.js'; -import { IncomingMessage } from 'node:http'; +import { IncomingMessage, createServer } from 'node:http'; +import { AddressInfo } from 'node:net'; const server = 'https://foo.company.com'; @@ -167,6 +168,69 @@ describe('Watch', () => { stream.destroy(); }); + it('should not call the done callback more than once on unexpected connection loss', async () => { + // Create a server that accepts the connection and flushes headers, then + // immediately destroys the connection (causing a "Premature close" + // error). + // + // This reproduces a bug where AbortController.abort() inside + // doneCallOnce could cause done() to be invoked twice. + + const mockServer = createServer((req, res) => { + res.writeHead(200, { + 'Content-Type': 'application/json', + 'Transfer-Encoding': 'chunked', + }); + + res.flushHeaders(); + res.destroy(); // Prematurely close the connection + }); + + const mockServerPort = await new Promise((resolve) => { + mockServer.listen(0, () => { + resolve((mockServer.address() as AddressInfo).port); + }); + }); + + const kc = new KubeConfig(); + + kc.loadFromClusterAndUser( + { + name: 'cluster', + server: `http://localhost:${mockServerPort}`, + skipTLSVerify: true, + }, + { + name: 'user', + }, + ); + + const watch = new Watch(kc); + + let doneCalled = 0; + let doneResolve: () => void; + + const donePromise = new Promise((resolve) => { + doneResolve = resolve; + }); + + await watch.watch( + '/some/path/to/object', + {}, + () => {}, + () => { + doneCalled += 1; + doneResolve(); + }, + ); + + await donePromise; + + mockServer.close(); + + strictEqual(doneCalled, 1); + }); + it('should call setKeepAlive on the socket to extend the default of 5 mins', async () => { const kc = new KubeConfig();