Skip to content

Commit e7f758e

Browse files
committed
Attempt to start list watch from last resourceVersion
This change should avoid unnecessary calls to the listFn that populates the ListWatch cache by first attempting to start the watch from the last resourceVersion. If a 410 Gone is detected it will fallback to the full list call.
1 parent 1d5d466 commit e7f758e

File tree

2 files changed

+148
-18
lines changed

2 files changed

+148
-18
lines changed

src/cache.ts

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
120120

121121
private async doneHandler(err: any): Promise<any> {
122122
this._stop();
123-
if (err) {
123+
if (err?.message === 'Gone') {
124+
this.resourceVersion = '';
125+
} else if (err) {
124126
this.callbackCache[ERROR].forEach((elt: ErrorCallback) => elt(err));
125127
return;
126128
}
@@ -129,28 +131,28 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
129131
return;
130132
}
131133
this.callbackCache[CONNECT].forEach((elt: ErrorCallback) => elt(undefined));
132-
// TODO: Don't always list here for efficiency
133-
// try to restart the watch from resourceVersion, but detect 410 GONE and relist in that case.
134-
// Or if resourceVersion is empty.
135-
const promise = this.listFn();
136-
const result = await promise;
137-
const list = result.body;
138-
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
139-
Object.keys(this.indexCache).forEach((key) => {
140-
const updateObjects = deleteItems(this.indexCache[key], list.items);
141-
if (updateObjects.length !== 0) {
142-
this.indexCache[key] = updateObjects;
143-
} else {
144-
delete this.indexCache[key];
145-
}
146-
});
147-
this.addOrUpdateItems(list.items);
148134
const queryParams = {
149-
resourceVersion: list.metadata!.resourceVersion,
135+
resourceVersion: this.resourceVersion
150136
} as {
151137
resourceVersion: string | undefined;
152138
labelSelector: string | undefined;
153139
};
140+
if (!this.resourceVersion) {
141+
const promise = this.listFn();
142+
const result = await promise;
143+
const list = result.body;
144+
this.objects = deleteItems(this.objects, list.items, this.callbackCache[DELETE].slice());
145+
Object.keys(this.indexCache).forEach((key) => {
146+
const updateObjects = deleteItems(this.indexCache[key], list.items);
147+
if (updateObjects.length !== 0) {
148+
this.indexCache[key] = updateObjects;
149+
} else {
150+
delete this.indexCache[key];
151+
}
152+
});
153+
this.addOrUpdateItems(list.items);
154+
queryParams.resourceVersion = list.metadata!.resourceVersion;
155+
}
154156
if (this.labelSelector !== undefined) {
155157
queryParams.labelSelector = ObjectSerializer.serialize(this.labelSelector, 'string');
156158
}

src/cache_test.ts

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1054,6 +1054,134 @@ describe('ListWatchCache', () => {
10541054
expect(errorEmitted).to.equal(true);
10551055
});
10561056

1057+
it('should not re-list if the watch can be restarted from the latest resourceVersion', async () => {
1058+
let listCalls = 0;
1059+
const fakeWatch = mock.mock(Watch);
1060+
const list: V1Namespace[] = [];
1061+
const listObj = {
1062+
metadata: {
1063+
resourceVersion: '12345',
1064+
} as V1ListMeta,
1065+
items: list,
1066+
} as V1NamespaceList;
1067+
1068+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
1069+
response: http.IncomingMessage;
1070+
body: V1NamespaceList;
1071+
}> {
1072+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
1073+
listCalls++;
1074+
resolve({ response: {} as http.IncomingMessage, body: listObj });
1075+
});
1076+
};
1077+
let promise = new Promise((resolve) => {
1078+
mock.when(
1079+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1080+
).thenCall(() => {
1081+
resolve(new FakeRequest());
1082+
});
1083+
});
1084+
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
1085+
1086+
informer.start();
1087+
await promise;
1088+
1089+
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();
1090+
watchHandler(
1091+
'ADDED',
1092+
{
1093+
metadata: {
1094+
name: 'name3',
1095+
} as V1ObjectMeta,
1096+
} as V1Namespace,
1097+
{ metadata: { resourceVersion: '23456' } },
1098+
);
1099+
1100+
await informer.stop();
1101+
promise = new Promise((resolve) => {
1102+
mock.when(
1103+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1104+
).thenCall(() => {
1105+
resolve(new FakeRequest());
1106+
});
1107+
});
1108+
informer.start();
1109+
await promise;
1110+
expect(listCalls).to.be.equal(1);
1111+
});
1112+
1113+
it('should list if the watch cannot be restarted from the latest resourceVersion', async () => {
1114+
const fakeWatch = mock.mock(Watch);
1115+
const list: V1Pod[] = [];
1116+
const listObj = {
1117+
metadata: {
1118+
resourceVersion: '12345',
1119+
} as V1ListMeta,
1120+
items: list,
1121+
} as V1NamespaceList;
1122+
1123+
let listCalls = 0;
1124+
const listFn: ListPromise<V1Namespace> = function(): Promise<{
1125+
response: http.IncomingMessage;
1126+
body: V1NamespaceList;
1127+
}> {
1128+
return new Promise<{ response: http.IncomingMessage; body: V1NamespaceList }>((resolve) => {
1129+
listCalls++;
1130+
resolve({ response: {} as http.IncomingMessage, body: listObj });
1131+
});
1132+
};
1133+
let promise = new Promise((resolve) => {
1134+
mock.when(
1135+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1136+
).thenCall(() => {
1137+
resolve(new FakeRequest());
1138+
});
1139+
});
1140+
1141+
const informer = new ListWatch('/some/path', mock.instance(fakeWatch), listFn, false);
1142+
1143+
informer.start();
1144+
await promise;
1145+
1146+
const [, , watchHandler] = mock.capture(fakeWatch.watch).last();
1147+
watchHandler(
1148+
'ADDED',
1149+
{
1150+
metadata: {
1151+
name: 'name3',
1152+
} as V1ObjectMeta,
1153+
} as V1Namespace,
1154+
{ metadata: { resourceVersion: '23456' } },
1155+
);
1156+
1157+
await informer.stop();
1158+
1159+
let errorEmitted = false;
1160+
informer.on('error', () => (errorEmitted = true));
1161+
1162+
promise = new Promise((resolve) => {
1163+
mock.when(
1164+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1165+
).thenCall(() => {
1166+
resolve(new FakeRequest());
1167+
});
1168+
});
1169+
1170+
informer.start();
1171+
await promise;
1172+
1173+
const [, , , doneHandler] = mock.capture(fakeWatch.watch).last();
1174+
1175+
const error = new Error('Gone');
1176+
await doneHandler(error);
1177+
1178+
mock.verify(
1179+
fakeWatch.watch(mock.anything(), mock.anything(), mock.anything(), mock.anything()),
1180+
).thrice();
1181+
expect(errorEmitted).to.equal(false);
1182+
expect(listCalls).to.be.equal(2);
1183+
});
1184+
10571185
it('should send label selector', async () => {
10581186
const APP_LABEL_SELECTOR = 'app=foo';
10591187

0 commit comments

Comments
 (0)