Skip to content

Commit 5fa7604

Browse files
committed
Add initial support for the BOOKMARK message in watch.
1 parent 7590f31 commit 5fa7604

File tree

3 files changed

+26
-6
lines changed

3 files changed

+26
-6
lines changed

examples/typescript/watch/watch-example.ts

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,9 +6,11 @@ kc.loadFromDefault();
66
const watch = new k8s.Watch(kc);
77
watch.watch('/api/v1/namespaces',
88
// optional query parameters can go here.
9-
{},
9+
{
10+
allowWatchBookmarks: true,
11+
},
1012
// callback is called for each received object.
11-
(type, obj) => {
13+
(type, apiObj, watchObj) => {
1214
if (type === 'ADDED') {
1315
// tslint:disable-next-line:no-console
1416
console.log('new object:');
@@ -18,12 +20,15 @@ watch.watch('/api/v1/namespaces',
1820
} else if (type === 'DELETED') {
1921
// tslint:disable-next-line:no-console
2022
console.log('deleted object:');
23+
} else if (type === 'BOOKMARK') {
24+
// tslint:disable-next-line:no-console
25+
console.log(`bookmark: ${watchObj.metadata.resourceVersion}`);
2126
} else {
2227
// tslint:disable-next-line:no-console
2328
console.log('unknown type: ' + type);
2429
}
2530
// tslint:disable-next-line:no-console
26-
console.log(obj);
31+
console.log(apiObj);
2732
},
2833
// done callback is called if the watch terminates normally
2934
(err) => {

src/cache.ts

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ export interface ObjectCache<T> {
99

1010
export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, Informer<T> {
1111
private objects: T[] = [];
12+
private resourceVersion: string;
1213
private readonly indexCache: { [key: string]: T[] } = {};
1314
private readonly callbackCache: { [key: string]: Array<ObjectCallback<T>> } = {};
1415

@@ -24,6 +25,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
2425
this.callbackCache[UPDATE] = [];
2526
this.callbackCache[DELETE] = [];
2627
this.callbackCache[ERROR] = [];
28+
this.resourceVersion = '';
2729
if (autoStart) {
2830
this.doneHandler(null);
2931
}
@@ -68,11 +70,18 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
6870
return this.indexCache[namespace] as ReadonlyArray<T>;
6971
}
7072

73+
public latestResourceVersion(): string {
74+
return this.resourceVersion;
75+
}
76+
7177
private async doneHandler(err: any) {
7278
if (err) {
7379
this.callbackCache[ERROR].forEach((elt: ObjectCallback<T>) => elt(err));
7480
return;
7581
}
82+
// TODO: Don't always list here for efficiency
83+
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
84+
// Or if resourceVersion is empty.
7685
const promise = this.listFn();
7786
const result = await promise;
7887
const list = result.body;
@@ -109,7 +118,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
109118
addOrUpdateObject(namespaceList, obj);
110119
}
111120

112-
private watchHandler(phase: string, obj: T) {
121+
private watchHandler(phase: string, obj: T, watchObj?: any) {
113122
switch (phase) {
114123
case 'ADDED':
115124
case 'MODIFIED':
@@ -132,6 +141,12 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
132141
}
133142
}
134143
break;
144+
case 'BOOKMARK':
145+
// nothing to do, here for documentation, mostly.
146+
break;
147+
}
148+
if (watchObj && watchObj.metadata) {
149+
this.resourceVersion = watchObj.metadata.resourceVersion;
135150
}
136151
}
137152
}

src/watch.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ export class Watch {
3434
public async watch(
3535
path: string,
3636
queryParams: any,
37-
callback: (phase: string, obj: any) => void,
37+
callback: (phase: string, apiObj: any, watchObj?: any) => void,
3838
done: (err: any) => void,
3939
): Promise<any> {
4040
const cluster = this.config.getCurrentCluster();
@@ -60,7 +60,7 @@ export class Watch {
6060
stream.on('data', (line) => {
6161
try {
6262
const data = JSON.parse(line);
63-
callback(data.type, data.object);
63+
callback(data.type, data.object, data);
6464
} catch (ignore) {
6565
// ignore parse errors
6666
}

0 commit comments

Comments
 (0)