Skip to content

Commit c13bf36

Browse files
authored
Merge pull request #606 from dominykas/stop
Expose a public Informer.stop()
2 parents f7c075b + 9d959ea commit c13bf36

File tree

3 files changed

+143
-7
lines changed

3 files changed

+143
-7
lines changed

src/cache.ts

Lines changed: 17 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
1313
private readonly indexCache: { [key: string]: T[] } = {};
1414
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};
1515
private request: RequestResult | undefined;
16+
private stopped: boolean = false;
1617

1718
public constructor(
1819
private readonly path: string,
@@ -31,14 +32,13 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
3132
}
3233

3334
public async start(): Promise<void> {
35+
this.stopped = false;
3436
await this.doneHandler(null);
3537
}
3638

37-
public stop(): void {
38-
if (this.request) {
39-
this.request.abort();
40-
this.request = undefined;
41-
}
39+
public async stop(): Promise<void> {
40+
this.stopped = true;
41+
this._stop();
4242
}
4343

4444
public on(verb: string, cb: ObjectCallback<T>): void {
@@ -80,12 +80,23 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
8080
return this.resourceVersion;
8181
}
8282

83+
private _stop(): void {
84+
if (this.request) {
85+
this.request.abort();
86+
this.request = undefined;
87+
}
88+
}
89+
8390
private async doneHandler(err: any): Promise<any> {
84-
this.stop();
91+
this._stop();
8592
if (err) {
8693
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
8794
return;
8895
}
96+
if (this.stopped) {
97+
// do not auto-restart
98+
return;
99+
}
89100
// TODO: Don't always list here for efficiency
90101
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
91102
// Or if resourceVersion is empty.

src/cache_test.ts

Lines changed: 125 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ import { EventEmitter } from 'ws';
99

1010
import { V1Namespace, V1NamespaceList, V1ObjectMeta, V1Pod, V1ListMeta } from './api';
1111
import { deleteObject, ListWatch, deleteItems } from './cache';
12-
import { ADD, UPDATE, DELETE, ListPromise } from './informer';
12+
import { ADD, UPDATE, DELETE, ERROR, ListPromise } from './informer';
1313

1414
use(chaiAsPromised);
1515

@@ -577,6 +577,7 @@ describe('ListWatchCache', () => {
577577
expect(cache.list('ns2').length).to.equal(0);
578578
expect(cache.get('name2', 'ns2')).to.equal(undefined);
579579
});
580+
580581
it('should delete an object correctly', () => {
581582
const list: V1Pod[] = [
582583
{
@@ -614,6 +615,7 @@ describe('ListWatchCache', () => {
614615
} as V1Pod);
615616
expect(list.length).to.equal(1);
616617
});
618+
617619
it('should not call handlers which have been unregistered', async () => {
618620
const fakeWatch = mock.mock(Watch);
619621
const list: V1Namespace[] = [];
@@ -721,6 +723,7 @@ describe('ListWatchCache', () => {
721723

722724
expect(addedList.length).to.equal(1);
723725
});
726+
724727
it('should resolve start promise after seeding the cache', async () => {
725728
const fakeWatch = mock.mock(Watch);
726729
const list: V1Namespace[] = [
@@ -823,6 +826,127 @@ describe('ListWatchCache', () => {
823826
expect(addedList1.length).to.equal(2);
824827
expect(addedList2.length).to.equal(1);
825828
});
829+
830+
it('should not auto-restart after explicitly stopping until restarted again', async () => {
831+
const fakeWatch = mock.mock(Watch);
832+
const list: V1Pod[] = [
833+
{
834+
metadata: {
835+
name: 'name1',
836+
namespace: 'ns1',
837+
} as V1ObjectMeta,
838+
} as V1Pod,
839+
{
840+
metadata: {
841+
name: 'name2',
842+
namespace: 'ns2',
843+
} as V1ObjectMeta,
844+
} as V1Pod,
845+
];
846+
const listObj = {
847+
metadata: {
848+
resourceVersion: '12345',
849+
} as V1ListMeta,
850+
items: list,
851+
} as V1NamespaceList;
852+
853+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
854+
response: http.IncomingMessage;
855+
body: V1NamespaceList;
856+
}> {
857+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
858+
(resolve, reject) => {
859+
resolve({ response: {} as http.IncomingMessage, body: listObj });
860+
},
861+
);
862+
};
863+
let promise = new Promise((resolve) => {
864+
mock.when(
865+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
866+
).thenCall(() => {
867+
resolve(new FakeRequest());
868+
});
869+
});
870+
871+
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
872+
await promise;
873+
874+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
875+
876+
// stop the informer
877+
cache.stop();
878+
879+
await doneHandler(null);
880+
881+
mock.verify(
882+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
883+
).once();
884+
885+
// restart the informer
886+
await cache.start();
887+
888+
mock.verify(
889+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
890+
).twice();
891+
});
892+
893+
it('does not auto-restart after an error', async () => {
894+
const fakeWatch = mock.mock(Watch);
895+
const list: V1Pod[] = [
896+
{
897+
metadata: {
898+
name: 'name1',
899+
namespace: 'ns1',
900+
} as V1ObjectMeta,
901+
} as V1Pod,
902+
{
903+
metadata: {
904+
name: 'name2',
905+
namespace: 'ns2',
906+
} as V1ObjectMeta,
907+
} as V1Pod,
908+
];
909+
const listObj = {
910+
metadata: {
911+
resourceVersion: '12345',
912+
} as V1ListMeta,
913+
items: list,
914+
} as V1NamespaceList;
915+
916+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
917+
response: http.IncomingMessage;
918+
body: V1NamespaceList;
919+
}> {
920+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>(
921+
(resolve, reject) => {
922+
resolve({ response: {} as http.IncomingMessage, body: listObj });
923+
},
924+
);
925+
};
926+
let promise = new Promise((resolve) => {
927+
mock.when(
928+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
929+
).thenCall(() => {
930+
resolve(new FakeRequest());
931+
});
932+
});
933+
934+
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
935+
await promise;
936+
937+
let errorEmitted = false;
938+
cache.on(ERROR, () => (errorEmitted = true));
939+
940+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
941+
942+
const error = new Error('testing');
943+
await doneHandler(error);
944+
945+
mock.verify(
946+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
947+
).once();
948+
expect(errorEmitted).to.equal(true);
949+
});
826950
});
827951

828952
describe('delete items', () => {

src/informer.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ export interface Informer<T> {
2121
on(verb: string, fn: ObjectCallback<T>): void;
2222
off(verb: string, fn: ObjectCallback<T>): void;
2323
start(): Promise<void>;
24+
stop(): Promise<void>;
2425
}
2526

2627
export function makeInformer<T>(

0 commit comments

Comments
 (0)