Skip to content

Commit 437218d

Browse files
authored
Merge pull request #555 from brendandburns/informer
split watch error and watch done handlers.
2 parents be4ecd1 + 9e175e8 commit 437218d

File tree

4 files changed

+117
-43
lines changed

4 files changed

+117
-43
lines changed

src/cache.ts

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
1212
private resourceVersion: string;
1313
private readonly indexCache: { [key: string]: T[] } = {};
1414
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};
15+
private stopped: boolean;
1516

1617
public constructor(
1718
private readonly path: string,
@@ -24,13 +25,19 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
2425
this.callbackCache[DELETE] = [];
2526
this.callbackCache[ERROR] = [];
2627
this.resourceVersion = '';
28+
this.stopped = true;
2729
if (autoStart) {
28-
this.doneHandler(null);
30+
this.start();
2931
}
3032
}
3133

3234
public async start(): Promise<void> {
33-
await this.doneHandler(null);
35+
this.stopped = false;
36+
await this.doneHandler();
37+
}
38+
39+
public stop(): void {
40+
this.stopped = true;
3441
}
3542

3643
public on(verb: string, cb: ObjectCallback<T>): void {
@@ -72,9 +79,15 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
7279
return this.resourceVersion;
7380
}
7481

75-
private async doneHandler(err: any): Promise<any> {
82+
private async errorHandler(err: any): Promise<void> {
7683
if (err) {
7784
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
85+
}
86+
this.stopped = true;
87+
}
88+
89+
private async doneHandler(): Promise<any> {
90+
if (this.stopped) {
7891
return;
7992
}
8093
// TODO: Don't always list here for efficiency
@@ -90,6 +103,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
90103
{ resourceVersion: list.metadata!.resourceVersion },
91104
this.watchHandler.bind(this),
92105
this.doneHandler.bind(this),
106+
this.errorHandler.bind(this),
93107
);
94108
}
95109

src/cache_test.ts

Lines changed: 72 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,13 @@ describe('ListWatchCache', () => {
6767
};
6868
const promise = new Promise((resolve) => {
6969
mock.when(
70-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
70+
fakeWatch.watch(
71+
mock.anything(),
72+
mock.anything(),
73+
mock.anything(),
74+
mock.anything(),
75+
mock.anything(),
76+
),
7177
).thenCall(() => {
7278
resolve();
7379
});
@@ -146,7 +152,13 @@ describe('ListWatchCache', () => {
146152
};
147153
const promise = new Promise((resolve) => {
148154
mock.when(
149-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
155+
fakeWatch.watch(
156+
mock.anything(),
157+
mock.anything(),
158+
mock.anything(),
159+
mock.anything(),
160+
mock.anything(),
161+
),
150162
).thenCall(() => {
151163
resolve();
152164
});
@@ -227,7 +239,13 @@ describe('ListWatchCache', () => {
227239
};
228240
const promise = new Promise((resolve) => {
229241
mock.when(
230-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
242+
fakeWatch.watch(
243+
mock.anything(),
244+
mock.anything(),
245+
mock.anything(),
246+
mock.anything(),
247+
mock.anything(),
248+
),
231249
).thenCall(() => {
232250
resolve();
233251
});
@@ -298,7 +316,13 @@ describe('ListWatchCache', () => {
298316
};
299317
let promise = new Promise((resolve) => {
300318
mock.when(
301-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
319+
fakeWatch.watch(
320+
mock.anything(),
321+
mock.anything(),
322+
mock.anything(),
323+
mock.anything(),
324+
mock.anything(),
325+
),
302326
).thenCall(() => {
303327
resolve();
304328
});
@@ -320,12 +344,18 @@ describe('ListWatchCache', () => {
320344

321345
promise = new Promise((resolve) => {
322346
mock.when(
323-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
347+
fakeWatch.watch(
348+
mock.anything(),
349+
mock.anything(),
350+
mock.anything(),
351+
mock.anything(),
352+
mock.anything(),
353+
),
324354
).thenCall(() => {
325355
resolve();
326356
});
327357
});
328-
doneHandler(null);
358+
doneHandler();
329359
await promise;
330360
expect(addObjects).to.deep.equal(list);
331361
expect(updateObjects).to.deep.equal(list);
@@ -371,7 +401,13 @@ describe('ListWatchCache', () => {
371401
};
372402
let promise = new Promise((resolve) => {
373403
mock.when(
374-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
404+
fakeWatch.watch(
405+
mock.anything(),
406+
mock.anything(),
407+
mock.anything(),
408+
mock.anything(),
409+
mock.anything(),
410+
),
375411
).thenCall(() => {
376412
resolve();
377413
});
@@ -394,13 +430,19 @@ describe('ListWatchCache', () => {
394430

395431
promise = new Promise((resolve) => {
396432
mock.when(
397-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
433+
fakeWatch.watch(
434+
mock.anything(),
435+
mock.anything(),
436+
mock.anything(),
437+
mock.anything(),
438+
mock.anything(),
439+
),
398440
).thenCall(() => {
399441
resolve();
400442
});
401443
});
402444
listObj.items = list2;
403-
doneHandler(null);
445+
doneHandler();
404446
await promise;
405447
expect(addObjects).to.deep.equal(list);
406448
expect(updateObjects).to.deep.equal(list2);
@@ -448,7 +490,13 @@ describe('ListWatchCache', () => {
448490
};
449491
const promise = new Promise((resolve) => {
450492
mock.when(
451-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
493+
fakeWatch.watch(
494+
mock.anything(),
495+
mock.anything(),
496+
mock.anything(),
497+
mock.anything(),
498+
mock.anything(),
499+
),
452500
).thenCall(() => {
453501
resolve();
454502
});
@@ -568,7 +616,13 @@ describe('ListWatchCache', () => {
568616
};
569617
const watchCalled = new Promise((resolve) => {
570618
mock.when(
571-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
619+
fakeWatch.watch(
620+
mock.anything(),
621+
mock.anything(),
622+
mock.anything(),
623+
mock.anything(),
624+
mock.anything(),
625+
),
572626
).thenCall(resolve);
573627
});
574628
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
@@ -627,7 +681,13 @@ describe('ListWatchCache', () => {
627681
};
628682
const watchCalled = new Promise((resolve) => {
629683
mock.when(
630-
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
684+
fakeWatch.watch(
685+
mock.anything(),
686+
mock.anything(),
687+
mock.anything(),
688+
mock.anything(),
689+
mock.anything(),
690+
),
631691
).thenCall(resolve);
632692
});
633693
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);

src/watch.ts

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ export class Watch {
4646
path: string,
4747
queryParams: any,
4848
callback: (phase: string, apiObj: any, watchObj?: any) => void,
49-
done: (err: any) => void,
49+
done: () => void,
50+
error: (err: any) => void,
5051
): Promise<any> {
5152
const cluster = this.config.getCurrentCluster();
5253
if (!cluster) {
@@ -76,20 +77,18 @@ export class Watch {
7677
// ignore parse errors
7778
}
7879
});
79-
let errOut: Error | null = null;
80-
stream.on('error', (err) => {
81-
errOut = err;
82-
done(err);
83-
});
84-
stream.on('close', () => done(errOut));
80+
stream.on('error', error);
81+
stream.on('close', done);
8582

86-
const req = this.requestImpl.webRequest(requestOptions, (error, response, body) => {
87-
if (error) {
88-
done(error);
83+
const req = this.requestImpl.webRequest(requestOptions, (err, response, body) => {
84+
if (err) {
85+
error(err);
86+
done();
8987
} else if (response && response.statusCode !== 200) {
90-
done(new Error(response.statusMessage));
88+
error(new Error(response.statusMessage));
89+
done();
9190
} else {
92-
done(null);
91+
done();
9392
}
9493
});
9594
req.pipe(stream);

src/watch_test.ts

Lines changed: 17 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
import { expect } from 'chai';
22
import request = require('request');
3-
import { ReadableStreamBuffer, WritableStreamBuffer } from 'stream-buffers';
43
import { anyFunction, anything, capture, instance, mock, reset, verify, when } from 'ts-mockito';
54

65
import { KubeConfig } from './config';
@@ -60,8 +59,10 @@ describe('Watch', () => {
6059
path,
6160
{},
6261
(phase: string, obj: string) => {},
63-
(err: any) => {
62+
() => {
6463
doneCalled = true;
64+
},
65+
(err: any) => {
6566
doneErr = err;
6667
},
6768
);
@@ -130,8 +131,10 @@ describe('Watch', () => {
130131
receivedTypes.push(phase);
131132
receivedObjects.push(obj);
132133
},
133-
(err: any) => {
134+
() => {
134135
doneCalled = true;
136+
},
137+
(err: any) => {
135138
doneErr = err;
136139
},
137140
);
@@ -153,7 +156,7 @@ describe('Watch', () => {
153156
doneCallback(null, null, null);
154157

155158
expect(doneCalled).to.equal(true);
156-
expect(doneErr).to.equal(null);
159+
expect(doneErr).to.be.undefined;
157160

158161
const errIn = { error: 'err' };
159162
doneCallback(errIn, null, null);
@@ -198,10 +201,8 @@ describe('Watch', () => {
198201
receivedTypes.push(phase);
199202
receivedObjects.push(obj);
200203
},
201-
(err: any) => {
202-
doneCalled = true;
203-
doneErr.push(err);
204-
},
204+
() => (doneCalled = true),
205+
(err: any) => doneErr.push(err),
205206
);
206207

207208
verify(fakeRequestor.webRequest(anything(), anyFunction()));
@@ -217,9 +218,8 @@ describe('Watch', () => {
217218
expect(receivedObjects).to.deep.equal([obj1.object]);
218219

219220
expect(doneCalled).to.equal(true);
220-
expect(doneErr.length).to.equal(2);
221+
expect(doneErr.length).to.equal(1);
221222
expect(doneErr[0]).to.deep.equal(errIn);
222-
expect(doneErr[1]).to.deep.equal(errIn);
223223
});
224224

225225
it('should handle server side close correctly', async () => {
@@ -258,10 +258,8 @@ describe('Watch', () => {
258258
receivedTypes.push(phase);
259259
receivedObjects.push(obj);
260260
},
261-
(err: any) => {
262-
doneCalled = true;
263-
doneErr = err;
264-
},
261+
() => (doneCalled = true),
262+
(err: any) => (doneErr = err),
265263
);
266264

267265
verify(fakeRequestor.webRequest(anything(), anyFunction()));
@@ -277,7 +275,7 @@ describe('Watch', () => {
277275
expect(receivedObjects).to.deep.equal([obj1.object]);
278276

279277
expect(doneCalled).to.equal(true);
280-
expect(doneErr).to.be.null;
278+
expect(doneErr).to.be.undefined;
281279
});
282280

283281
it('should ignore JSON parse errors', async () => {
@@ -317,6 +315,9 @@ describe('Watch', () => {
317315
() => {
318316
/* ignore */
319317
},
318+
() => {
319+
/* ignore */
320+
},
320321
);
321322

322323
verify(fakeRequestor.webRequest(anything(), anyFunction()));
@@ -332,7 +333,7 @@ describe('Watch', () => {
332333
const kc = new KubeConfig();
333334
const watch = new Watch(kc);
334335

335-
const promise = watch.watch('/some/path', {}, () => {}, () => {});
336+
const promise = watch.watch('/some/path', {}, () => {}, () => {}, () => {});
336337
expect(promise).to.be.rejected;
337338
});
338339
});

0 commit comments

Comments
 (0)