Skip to content

Commit 89a62a2

Browse files
committed
feat: expose a public Informer.stop()
1 parent 4cc02ce commit 89a62a2

File tree

3 files changed

+81
-6
lines changed

3 files changed

+81
-6
lines changed

src/cache.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
1313
private readonly indexCache: { [key: string]: T[] } = {};
1414
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};
1515
private request: RequestResult | undefined;
16+
private stopped: boolean = false;
1617

1718
public constructor(
1819
private readonly path: string,
@@ -31,14 +32,13 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
3132
}
3233

3334
public async start(): Promise<void> {
35+
this.stopped = false;
3436
await this.doneHandler(null);
3537
}
3638

37-
public stop(): void {
38-
if (this.request) {
39-
this.request.abort();
40-
this.request = undefined;
41-
}
39+
public async stop(): Promise<void> {
40+
this.stopped = true;
41+
this._stop();
4242
}
4343

4444
public on(verb: string, cb: ObjectCallback<T>): void {
@@ -80,12 +80,23 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
8080
return this.resourceVersion;
8181
}
8282

83+
private _stop(): void {
84+
if (this.request) {
85+
this.request.abort();
86+
this.request = undefined;
87+
}
88+
}
89+
8390
private async doneHandler(err: any): Promise<any> {
84-
this.stop();
91+
this._stop();
8592
if (err) {
8693
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
8794
return;
8895
}
96+
if (this.stopped) {
97+
// do not auto-restart
98+
return;
99+
}
89100
// TODO: Don't always list here for efficiency
90101
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
91102
// Or if resourceVersion is empty.

src/cache_test.ts

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -577,6 +577,7 @@ describe('ListWatchCache', () => {
577577
expect(cache.list('ns2').length).to.equal(0);
578578
expect(cache.get('name2', 'ns2')).to.equal(undefined);
579579
});
580+
580581
it('should delete an object correctly', () => {
581582
const list: V1Pod[] = [
582583
{
@@ -614,6 +615,7 @@ describe('ListWatchCache', () => {
614615
} as V1Pod);
615616
expect(list.length).to.equal(1);
616617
});
618+
617619
it('should not call handlers which have been unregistered', async () => {
618620
const fakeWatch = mock.mock(Watch);
619621
const list: V1Namespace[] = [];
@@ -721,6 +723,7 @@ describe('ListWatchCache', () => {
721723

722724
expect(addedList.length).to.equal(1);
723725
});
726+
724727
it('should resolve start promise after seeding the cache', async () => {
725728
const fakeWatch = mock.mock(Watch);
726729
const list: V1Namespace[] = [
@@ -823,6 +826,66 @@ describe('ListWatchCache', () => {
823826
expect(addedList1.length).to.equal(2);
824827
expect(addedList2.length).to.equal(1);
825828
});
829+
830+
it('should not auto-restart after explicitly stopping until restarted again', async () => {
831+
832+
const fakeWatch = mock.mock(Watch);
833+
const list: V1Pod[] = [
834+
{
835+
metadata: {
836+
name: 'name1',
837+
namespace: 'ns1',
838+
} as V1ObjectMeta,
839+
} as V1Pod,
840+
{
841+
metadata: {
842+
name: 'name2',
843+
namespace: 'ns2',
844+
} as V1ObjectMeta,
845+
} as V1Pod,
846+
];
847+
const listObj = {
848+
metadata: {
849+
resourceVersion: '12345',
850+
} as V1ListMeta,
851+
items: list,
852+
} as V1NamespaceList;
853+
854+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
855+
response: http.IncomingMessage;
856+
body: V1NamespaceList;
857+
}> {
858+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
859+
(resolve, reject) => {
860+
resolve({ response: {} as http.IncomingMessage, body: listObj });
861+
},
862+
);
863+
};
864+
let promise = new Promise((resolve) => {
865+
mock.when(
866+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
867+
).thenCall(() => {
868+
resolve(new FakeRequest());
869+
});
870+
});
871+
872+
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
873+
await promise;
874+
875+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
876+
877+
// stop the informer
878+
cache.stop();
879+
880+
await doneHandler(null);
881+
882+
mock.verify(fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything())).once();
883+
884+
// restart the informer
885+
await cache.start();
886+
887+
mock.verify(fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything())).twice();
888+
});
826889
});
827890

828891
describe('delete items', () => {

src/informer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export interface Informer<T> {
2121
on(verb: string, fn: ObjectCallback<T>): void;
2222
off(verb: string, fn: ObjectCallback<T>): void;
2323
start(): Promise<void>;
24+
stop(): Promise<void>;
2425
}
2526

2627
export function makeInformer<T>(

0 commit comments

Comments
 (0)