From a88bb13c18dd672d50f2c12c2022ce642f17ac11 Mon Sep 17 00:00:00 2001 From: Ben Batha Date: Wed, 2 Mar 2022 11:06:53 -0500 Subject: [PATCH 1/3] fix(cache): watch errors must call done handler The type of `watchObject` was incorrect and has been updated to match the actual request body. Using this info it was clear that 'ERROR' events were not being handled correctly. When the watch receives an error it is not always an http status code, because the status code can only be sent when the stream is starting. This means that `410` resourceVersion out of date errors could only be handled if they were detected before the watch stream started leaving watches running on channels that would never receive more events and not notifying `ListWatch` consumers of the error. --- src/cache.ts | 18 +++++++--- src/cache_test.ts | 87 +++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 98 insertions(+), 7 deletions(-) diff --git a/src/cache.ts b/src/cache.ts index af5e5bc852..e0490114cd 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -140,7 +140,10 @@ export class ListWatch implements ObjectCache, In private async doneHandler(err: any): Promise { this._stop(); - if (err && err.statusCode === 410) { + if ( + err && + ((err as { statusCode?: number }).statusCode === 410 || (err as { code?: number }).code === 410) + ) { this.resourceVersion = ''; } else if (err) { this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err)); @@ -192,7 +195,11 @@ export class ListWatch implements ObjectCache, In }); } - private watchHandler(phase: string, obj: T, watchObj?: any): void { + private async watchHandler( + phase: string, + obj: T, + watchObj?: { type: string; object: KubernetesObject }, + ): Promise { switch (phase) { case 'ERROR': if ((obj as { code?: number }).code === 410) { @@ -214,10 +221,11 @@ export class ListWatch implements ObjectCache, In case 'BOOKMARK': // nothing to do, here for documentation, mostly. break; + case 'ERROR': + await this.doneHandler(obj); + return; } - if (watchObj && watchObj.metadata) { - this.resourceVersion = watchObj.metadata.resourceVersion; - } + this.resourceVersion = obj.metadata!.resourceVersion || ''; } } diff --git a/src/cache_test.ts b/src/cache_test.ts index 315a4f55c5..6a57a0fec5 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -1205,9 +1205,10 @@ describe('ListWatchCache', () => { { metadata: { name: 'name3', + resourceVersion: '23456', } as V1ObjectMeta, } as V1Namespace, - { metadata: { resourceVersion: '23456' } }, + { type: 'ADDED', metadata: { resourceVersion: '23456' } }, ); await informer.stop(); @@ -1259,9 +1260,91 @@ describe('ListWatchCache', () => { { metadata: { name: 'name3', + resourceVersion: '23456', } as V1ObjectMeta, } as V1Namespace, - { metadata: { resourceVersion: '23456' } }, + { type: 'ADDED', metadata: { resourceVersion: '23456' } }, + ); + + await informer.stop(); + + let errorEmitted = false; + informer.on('error', () => (errorEmitted = true)); + + promise = new Promise((resolve) => { + mock.when( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thenCall(() => { + resolve(new FakeRequest()); + }); + }); + + informer.start(); + await promise; + + const [, , , doneHandler] = mock.capture(fakeWatch.watch).last(); + + const object = { + kind: 'Status', + apiVersion: 'v1', + metadata: {}, + status: 'Failure', + message: 'too old resource version: 12345 (1234)', + reason: 'Expired', + code: 410, + }; + await watchHandler('ERROR', object, { type: 'ERROR', object }); + + mock.verify( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thrice(); + expect(errorEmitted).to.equal(false); + expect(listCalls).to.be.equal(2); + }); + + it('should list if the watch errors from the last version', async () => { + const fakeWatch = mock.mock(Watch); + const list: V1Pod[] = []; + const listObj = { + metadata: { + resourceVersion: '12345', + } as V1ListMeta, + items: list, + } as V1NamespaceList; + + let listCalls = 0; + const listFn: ListPromise = function(): Promise<{ + response: http.IncomingMessage; + body: V1NamespaceList; + }> { + return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => { + listCalls++; + resolve({ response: {} as http.IncomingMessage, body: listObj }); + }); + }; + let promise = new Promise((resolve) => { + mock.when( + fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), + ).thenCall(() => { + resolve(new FakeRequest()); + }); + }); + + const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false); + + informer.start(); + await promise; + + const [, , watchHandler] = mock.capture(fakeWatch.watch).last(); + watchHandler( + 'ADDED', + { + metadata: { + name: 'name3', + resourceVersion: '23456', + } as V1ObjectMeta, + } as V1Namespace, + { type: 'ADDED', metadata: { resourceVersion: '23456' } }, ); await informer.stop(); From e0d3695116add2aa534cfa3615c3eed19bb55375 Mon Sep 17 00:00:00 2001 From: Jan Hesse Date: Fri, 20 Jan 2023 20:24:51 +0100 Subject: [PATCH 2/3] Update cache.ts --- src/cache.ts | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/src/cache.ts b/src/cache.ts index e0490114cd..55923e7e2b 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -34,6 +34,7 @@ export class ListWatch implements ObjectCache, In private readonly watch: Watch; private readonly listFn: ListPromise; private readonly labelSelector?: string; + private readonly fieldSelector?: string; public constructor( path: string, @@ -41,11 +42,13 @@ export class ListWatch implements ObjectCache, In listFn: ListPromise, autoStart: boolean = true, labelSelector?: string, + fieldSelector?: string, ) { this.path = path; this.watch = watch; this.listFn = listFn; this.labelSelector = labelSelector; + this.fieldSelector = fieldSelector; this.callbackCache[ADD] = []; this.callbackCache[UPDATE] = []; @@ -172,10 +175,14 @@ export class ListWatch implements ObjectCache, In } as { resourceVersion: string | undefined; labelSelector: string | undefined; + fieldSelector: string | undefined; }; if (this.labelSelector !== undefined) { queryParams.labelSelector = ObjectSerializer.serialize(this.labelSelector, 'string'); } + if (this.fieldSelector !== undefined) { + queryParams.fieldSelector = ObjectSerializer.serialize(this.fieldSelector, 'string'); + } this.request = await this.watch.watch( this.path, queryParams, From 2a961a0aebaba79536db1b819a607c83d955fb2d Mon Sep 17 00:00:00 2001 From: Brendan Burns <5751682+brendandburns@users.noreply.github.com> Date: Fri, 9 May 2025 16:52:16 +0000 Subject: [PATCH 3/3] Updates/fixes for watch --- src/cache.ts | 16 ++++++----- src/cache_test.ts | 69 +++++++++++++++++++---------------------------- 2 files changed, 38 insertions(+), 47 deletions(-) diff --git a/src/cache.ts b/src/cache.ts index 55923e7e2b..be9bf6688f 100644 --- a/src/cache.ts +++ b/src/cache.ts @@ -168,7 +168,7 @@ export class ListWatch implements ObjectCache, In } this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice()); this.addOrUpdateItems(list.items); - this.resourceVersion = list.metadata!.resourceVersion || ''; + this.resourceVersion = list.metadata ? list.metadata!.resourceVersion || '' : ''; } const queryParams = { resourceVersion: this.resourceVersion, @@ -192,6 +192,9 @@ export class ListWatch implements ObjectCache, In } private addOrUpdateItems(items: T[]): void { + if (items === undefined || items === null) { + return; + } items.forEach((obj: T) => { addOrUpdateObject( this.objects, @@ -212,7 +215,8 @@ export class ListWatch implements ObjectCache, In if ((obj as { code?: number }).code === 410) { this.resourceVersion = ''; } - break; + // We don't restart here, because it should be handled by the watch exiting if necessary + return; case 'ADDED': case 'MODIFIED': addOrUpdateObject( @@ -228,17 +232,17 @@ export class ListWatch implements ObjectCache, In case 'BOOKMARK': // nothing to do, here for documentation, mostly. break; - case 'ERROR': - await this.doneHandler(obj); - return; } - this.resourceVersion = obj.metadata!.resourceVersion || ''; + this.resourceVersion = obj.metadata ? obj.metadata!.resourceVersion || '' : ''; } } // exported for testing export function cacheMapFromList(newObjects: T[]): CacheMap { const objects: CacheMap = new Map(); + if (newObjects === undefined || newObjects === null) { + return objects; + } // build up the new list for (const obj of newObjects) { let namespaceObjects = objects.get(obj.metadata!.namespace || ''); diff --git a/src/cache_test.ts b/src/cache_test.ts index 6a57a0fec5..9746425149 100644 --- a/src/cache_test.ts +++ b/src/cache_test.ts @@ -235,6 +235,7 @@ describe('ListWatchCache', () => { watchHandler('ADDED', { metadata: { name: 'name3', + resourceVersion: 'blah', } as V1ObjectMeta, } as V1Namespace); @@ -245,40 +246,28 @@ describe('ListWatchCache', () => { } as V1ObjectMeta, } as V1Namespace); - watchHandler( - 'DELETED', - { - metadata: { - name: 'name2', - resourceVersion: 'blah', - } as V1ObjectMeta, - } as V1Namespace, - { - metadata: { - resourceVersion: '54321', - }, - }, - ); + watchHandler('DELETED', { + metadata: { + name: 'name2', + resourceVersion: '54321', + } as V1ObjectMeta, + } as V1Namespace); const [addResult, updateResult, deleteResult] = await Promise.all([ addPromise, updatePromise, deletePromise, ]); - deepStrictEqual(addResult.metadata, { name: 'name3' }); + deepStrictEqual(addResult.metadata, { name: 'name3', resourceVersion: 'blah' }); deepStrictEqual(updateResult.metadata, { name: 'name3', resourceVersion: 'baz' }); - deepStrictEqual(deleteResult.metadata, { name: 'name2', resourceVersion: 'blah' }); + deepStrictEqual(deleteResult.metadata, { name: 'name2', resourceVersion: '54321' }); strictEqual(informer.latestResourceVersion(), '54321'); - watchHandler( - 'BOOKMARK', - {}, - { - metadata: { - resourceVersion: '5454', - }, + watchHandler('BOOKMARK', { + metadata: { + resourceVersion: '5454', }, - ); + }); strictEqual(informer.latestResourceVersion(), '5454'); }); @@ -1275,7 +1264,7 @@ describe('ListWatchCache', () => { mock.when( fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(new FakeRequest()); + resolve({}); }); }); @@ -1294,39 +1283,35 @@ describe('ListWatchCache', () => { code: 410, }; await watchHandler('ERROR', object, { type: 'ERROR', object }); + await doneHandler(null); mock.verify( fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thrice(); - expect(errorEmitted).to.equal(false); - expect(listCalls).to.be.equal(2); + strictEqual(errorEmitted, false); + strictEqual(listCalls, 2); }); it('should list if the watch errors from the last version', async () => { const fakeWatch = mock.mock(Watch); - const list: V1Pod[] = []; - const listObj = { - metadata: { - resourceVersion: '12345', - } as V1ListMeta, - items: list, - } as V1NamespaceList; let listCalls = 0; - const listFn: ListPromise = function(): Promise<{ - response: http.IncomingMessage; - body: V1NamespaceList; - }> { - return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => { + const listFn: ListPromise = function (): Promise { + return new Promise((resolve, reject) => { listCalls++; - resolve({ response: {} as http.IncomingMessage, body: listObj }); + resolve({ + metadata: { + resourceVersion: '12345', + } as V1ListMeta, + items: [], + } as V1NamespaceList); }); }; let promise = new Promise((resolve) => { mock.when( fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()), ).thenCall(() => { - resolve(new FakeRequest()); + resolve({}); }); }); @@ -1418,6 +1403,8 @@ describe('ListWatchCache', () => { ); await informer.stop(); + strictEqual(listCalls, 1); + listCalls = 0; let errorEmitted = false; informer.on('error', () => (errorEmitted = true));