Skip to content

Commit 3783b77

Browse files
authored
Merge pull request #169 from brendandburns/cache
Add an initial watch cache implementation.
2 parents a9d5851 + 00a4d9f commit 3783b77

File tree

5 files changed

+334
-0
lines changed

5 files changed

+334
-0
lines changed

examples/cache-example.js

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
const k8s = require('@kubernetes/client-node');
2+
3+
const kc = new k8s.KubeConfig();
4+
kc.loadFromDefault();
5+
6+
const k8sApi = kc.makeApiClient(k8s.Core_v1Api);
7+
8+
const path = '/api/v1/namespaces/default/pods';
9+
const watch = new k8s.Watch(kc);
10+
const listFn = (fn) => {
11+
k8sApi.listNamespacedPod('default')
12+
.then((res) => {
13+
fn(res.body.items);
14+
})
15+
.catch((err) => {
16+
console.log(err);
17+
});
18+
}
19+
const cache = new k8s.ListWatch(path, watch, listFn);
20+
21+
const looper = () => {
22+
const list = cache.list('default');
23+
if (list) {
24+
let names = [];
25+
for (let i = 0; i < list.length; i++) {
26+
names.push(list[i].metadata.name);
27+
}
28+
console.log(names.join(','));
29+
}
30+
setTimeout(looper, 2000);
31+
}
32+
33+
looper();

src/cache.ts

Lines changed: 105 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,105 @@
1+
import { KubernetesObject } from './types';
2+
import { Watch } from './watch';
3+
4+
export interface ObjectCache<T> {
5+
get(name: string, namespace?: string): T | undefined;
6+
list(namespace?: string): ReadonlyArray<T>;
7+
}
8+
9+
export type ListCallback<T extends KubernetesObject> = (list: T[]) => void;
10+
11+
export class ListWatch<T extends KubernetesObject> implements ObjectCache<T> {
12+
private objects: T[] = [];
13+
private readonly indexCache: { [key: string]: T[] } = {};
14+
15+
public constructor(private readonly path: string,
16+
private readonly watch: Watch,
17+
private readonly listFn: (callback: ListCallback<T>) => void) {
18+
this.watch = watch;
19+
this.listFn = listFn;
20+
this.doneHandler(null);
21+
}
22+
23+
public get(name: string, namespace?: string): T | undefined {
24+
return this.objects.find((obj: T): boolean => {
25+
return (obj.metadata.name === name &&
26+
(!namespace || obj.metadata.namespace === namespace));
27+
});
28+
}
29+
30+
public list(namespace?: string | undefined): ReadonlyArray<T> {
31+
if (!namespace) {
32+
return this.objects;
33+
}
34+
return this.indexCache[namespace] as ReadonlyArray<T>;
35+
}
36+
37+
private doneHandler(err: any) {
38+
this.listFn((result: T[]) => {
39+
this.objects = result;
40+
for (const elt of this.objects) {
41+
this.indexObj(elt);
42+
}
43+
this.watch.watch(this.path, {}, this.watchHandler.bind(this), this.doneHandler.bind(this));
44+
});
45+
}
46+
47+
private indexObj(obj: T) {
48+
let namespaceList = this.indexCache[obj.metadata.namespace] as T[];
49+
if (!namespaceList) {
50+
namespaceList = [];
51+
this.indexCache[obj.metadata.namespace] = namespaceList;
52+
}
53+
addOrUpdateObject(namespaceList, obj);
54+
}
55+
56+
private watchHandler(phase: string, obj: T) {
57+
switch (phase) {
58+
case 'ADDED':
59+
case 'MODIFIED':
60+
addOrUpdateObject(this.objects, obj);
61+
if (obj.metadata.namespace) {
62+
this.indexObj(obj);
63+
}
64+
break;
65+
case 'DELETED':
66+
deleteObject(this.objects, obj);
67+
if (obj.metadata.namespace) {
68+
const namespaceList = this.indexCache[obj.metadata.namespace] as T[];
69+
if (namespaceList) {
70+
deleteObject(namespaceList, obj);
71+
}
72+
}
73+
break;
74+
}
75+
}
76+
}
77+
78+
// Only public for testing.
79+
export function addOrUpdateObject<T extends KubernetesObject>(objects: T[], obj: T) {
80+
const ix = findKubernetesObject(objects, obj);
81+
if (ix === -1) {
82+
objects.push(obj);
83+
} else {
84+
objects[ix] = obj;
85+
}
86+
}
87+
88+
function isSameObject<T extends KubernetesObject>(o1: T, o2: T): boolean {
89+
return o1.metadata.name === o2.metadata.name &&
90+
o1.metadata.namespace === o2.metadata.namespace;
91+
}
92+
93+
function findKubernetesObject<T extends KubernetesObject>(objects: T[], obj: T): number {
94+
return objects.findIndex((elt: T) => {
95+
return isSameObject(elt, obj);
96+
});
97+
}
98+
99+
// Public for testing.
100+
export function deleteObject<T extends KubernetesObject>(objects: T[], obj: T) {
101+
const ix = findKubernetesObject(objects, obj);
102+
if (ix !== -1) {
103+
objects.splice(ix, 1);
104+
}
105+
}

src/cache_test.ts

Lines changed: 180 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,180 @@
1+
import { expect } from 'chai';
2+
import * as mock from 'ts-mockito';
3+
4+
import { V1Namespace, V1ObjectMeta, V1Pod } from './api';
5+
import { deleteObject, ListCallback, ListWatch } from './cache';
6+
import { Watch } from './watch';
7+
8+
describe('ListWatchCache', () => {
9+
it('should perform basic caching', () => {
10+
const fakeWatch = mock.mock(Watch);
11+
const list: V1Namespace[] = [
12+
{
13+
metadata: {
14+
name: 'name1',
15+
} as V1ObjectMeta,
16+
} as V1Namespace,
17+
{
18+
metadata: {
19+
name: 'name2',
20+
} as V1ObjectMeta,
21+
} as V1Namespace,
22+
];
23+
const listFn = (callback: ListCallback<V1Namespace>) => {
24+
callback(list);
25+
};
26+
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
27+
const [pathOut, , watchHandler] = mock.capture(fakeWatch.watch).last();
28+
expect(pathOut).to.equal('/some/path');
29+
expect(cache.list()).to.deep.equal(list);
30+
31+
expect(cache.get('name1')).to.equal(list[0]);
32+
expect(cache.get('name2')).to.equal(list[1]);
33+
34+
watchHandler('ADDED', {
35+
metadata: {
36+
name: 'name3',
37+
} as V1ObjectMeta,
38+
} as V1Namespace);
39+
40+
expect(cache.list().length).to.equal(3);
41+
expect(cache.get('name3')).to.not.equal(null);
42+
43+
watchHandler('MODIFIED', {
44+
metadata: {
45+
name: 'name3',
46+
resourceVersion: 'baz',
47+
} as V1ObjectMeta,
48+
} as V1Namespace);
49+
expect(cache.list().length).to.equal(3);
50+
const obj3 = cache.get('name3');
51+
expect(obj3).to.not.equal(null);
52+
if (obj3) {
53+
expect(obj3.metadata.name).to.equal('name3');
54+
expect(obj3.metadata.resourceVersion).to.equal('baz');
55+
}
56+
57+
watchHandler('DELETED', {
58+
metadata: {
59+
name: 'name2',
60+
} as V1ObjectMeta,
61+
} as V1Namespace);
62+
expect(cache.list().length).to.equal(2);
63+
expect(cache.get('name2')).to.equal(undefined);
64+
});
65+
66+
it('should perform namespace caching', () => {
67+
const fakeWatch = mock.mock(Watch);
68+
const list: V1Pod[] = [
69+
{
70+
metadata: {
71+
name: 'name1',
72+
namespace: 'ns1',
73+
} as V1ObjectMeta,
74+
} as V1Pod,
75+
{
76+
metadata: {
77+
name: 'name2',
78+
namespace: 'ns2',
79+
} as V1ObjectMeta,
80+
} as V1Pod,
81+
];
82+
const listFn = (callback: ListCallback<V1Pod>) => {
83+
callback(list);
84+
};
85+
const cache = new ListWatch('/some/path', mock.instance(fakeWatch), listFn);
86+
const [pathOut, , watchHandler] = mock.capture(fakeWatch.watch).last();
87+
expect(pathOut).to.equal('/some/path');
88+
expect(cache.list()).to.deep.equal(list);
89+
90+
expect(cache.get('name1')).to.equal(list[0]);
91+
expect(cache.get('name2')).to.equal(list[1]);
92+
93+
expect(cache.list('ns1').length).to.equal(1);
94+
expect(cache.list('ns1')[0].metadata.name).to.equal('name1');
95+
96+
expect(cache.list('ns2').length).to.equal(1);
97+
expect(cache.list('ns2')[0].metadata.name).to.equal('name2');
98+
99+
watchHandler('ADDED', {
100+
metadata: {
101+
name: 'name3',
102+
namespace: 'ns3',
103+
} as V1ObjectMeta,
104+
} as V1Pod);
105+
106+
expect(cache.list().length).to.equal(3);
107+
expect(cache.get('name3', 'ns3')).to.not.equal(null);
108+
109+
watchHandler('MODIFIED', {
110+
metadata: {
111+
name: 'name3',
112+
namespace: 'ns3',
113+
resourceVersion: 'baz',
114+
} as V1ObjectMeta,
115+
} as V1Pod);
116+
expect(cache.list().length).to.equal(3);
117+
const obj3 = cache.get('name3', 'ns3');
118+
expect(obj3).to.not.equal(null);
119+
if (obj3) {
120+
expect(obj3.metadata.name).to.equal('name3');
121+
expect(obj3.metadata.resourceVersion).to.equal('baz');
122+
}
123+
124+
watchHandler('DELETED', {
125+
metadata: {
126+
name: 'name2',
127+
namespace: 'other-ns',
128+
} as V1ObjectMeta,
129+
} as V1Pod);
130+
expect(cache.list().length).to.equal(3);
131+
expect(cache.get('name2')).to.not.equal(null);
132+
133+
watchHandler('DELETED', {
134+
metadata: {
135+
name: 'name2',
136+
namespace: 'ns2',
137+
} as V1ObjectMeta,
138+
} as V1Pod);
139+
expect(cache.list().length).to.equal(2);
140+
expect(cache.list('ns2').length).to.equal(0);
141+
expect(cache.get('name2', 'ns2')).to.equal(undefined);
142+
});
143+
it('should delete an object correctly', () => {
144+
const list: V1Pod[] = [
145+
{
146+
metadata: {
147+
name: 'name1',
148+
namespace: 'ns1',
149+
} as V1ObjectMeta,
150+
} as V1Pod,
151+
{
152+
metadata: {
153+
name: 'name2',
154+
namespace: 'ns2',
155+
} as V1ObjectMeta,
156+
} as V1Pod,
157+
];
158+
deleteObject(list, {
159+
metadata: {
160+
name: 'other',
161+
namespace: 'ns1',
162+
},
163+
} as V1Pod);
164+
expect(list.length).to.equal(2);
165+
deleteObject(list, {
166+
metadata: {
167+
name: 'name1',
168+
namespace: 'ns2',
169+
},
170+
} as V1Pod);
171+
expect(list.length).to.equal(2);
172+
deleteObject(list, {
173+
metadata: {
174+
name: 'name1',
175+
namespace: 'ns1',
176+
},
177+
} as V1Pod);
178+
expect(list.length).to.equal(1);
179+
});
180+
});

src/index.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,9 @@
11
export * from './config';
2+
export * from './cache';
23
export * from './api';
34
export * from './attach';
45
export * from './watch';
56
export * from './exec';
67
export * from './portforward';
8+
export * from './types';
79
export * from './yaml';

src/types.ts

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
import { V1ListMeta, V1ObjectMeta } from './api';
2+
3+
export interface KubernetesObject {
4+
apiVersion: string;
5+
kind: string;
6+
metadata: V1ObjectMeta;
7+
}
8+
9+
export interface KubernetesListObject {
10+
apiVersion: string;
11+
kind: string;
12+
metadata: V1ListMeta;
13+
items: KubernetesObject;
14+
}

0 commit comments

Comments
 (0)