@@ -20,8 +20,11 @@ export interface ObjectCache<T> {
2020 list ( namespace ?: string ) : ReadonlyArray < T > ;
2121}
2222
23+ // exported for testing
24+ export type CacheMap < T extends KubernetesObject > = Map < string , Map < string , T > > ;
25+
2326export class ListWatch < T extends KubernetesObject > implements ObjectCache < T > , Informer < T > {
24- private objects : T [ ] = [ ] ;
27+ private objects : CacheMap < T > = new Map ( ) ;
2528 private resourceVersion : string ;
2629 private readonly indexCache : { [ key : string ] : T [ ] } = { } ;
2730 private readonly callbackCache : { [ key : string ] : ( ObjectCallback < T > | ErrorCallback ) [ ] } = { } ;
@@ -93,16 +96,26 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
9396 }
9497
9598 public get ( name : string , namespace ?: string ) : T | undefined {
96- return this . objects . find ( ( obj : T ) : boolean => {
97- return obj . metadata ! . name === name && ( ! namespace || obj . metadata ! . namespace === namespace ) ;
98- } ) ;
99+ const nsObjects = this . objects . get ( namespace || '' ) ;
100+ if ( nsObjects ) {
101+ return nsObjects . get ( name ) ;
102+ }
103+ return undefined ;
99104 }
100105
101106 public list ( namespace ?: string | undefined ) : ReadonlyArray < T > {
102107 if ( ! namespace ) {
103- return this . objects ;
108+ const allObjects : T [ ] = [ ] ;
109+ for ( const nsObjects of this . objects . values ( ) ) {
110+ allObjects . push ( ...nsObjects . values ( ) ) ;
111+ }
112+ return allObjects ;
113+ }
114+ const namespaceObjects = this . objects . get ( namespace || '' ) ;
115+ if ( ! namespaceObjects ) {
116+ return [ ] ;
104117 }
105- return this . indexCache [ namespace ] as ReadonlyArray < T > ;
118+ return Array . from ( namespaceObjects . values ( ) ) ;
106119 }
107120
108121 public latestResourceVersion ( ) : string {
@@ -116,7 +129,7 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
116129 }
117130 }
118131
119- private async doneHandler ( err : any ) : Promise < any > {
132+ private async doneHandler ( err : any ) : Promise < void > {
120133 this . _stop ( ) ;
121134 if ( err && err . statusCode === 410 ) {
122135 this . resourceVersion = '' ;
@@ -133,16 +146,8 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
133146 const promise = this . listFn ( ) ;
134147 const list = await promise ;
135148 this . objects = deleteItems ( this . objects , list . items , this . callbackCache [ DELETE ] . slice ( ) ) ;
136- Object . keys ( this . indexCache ) . forEach ( ( key ) => {
137- const updateObjects = deleteItems ( this . indexCache [ key ] , list . items ) ;
138- if ( updateObjects . length !== 0 ) {
139- this . indexCache [ key ] = updateObjects ;
140- } else {
141- delete this . indexCache [ key ] ;
142- }
143- } ) ;
144149 this . addOrUpdateItems ( list . items ) ;
145- this . resourceVersion = list . metadata ! . resourceVersion ! ;
150+ this . resourceVersion = list . metadata ! . resourceVersion || '' ;
146151 }
147152 const queryParams = {
148153 resourceVersion : this . resourceVersion ,
@@ -169,21 +174,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
169174 this . callbackCache [ ADD ] . slice ( ) ,
170175 this . callbackCache [ UPDATE ] . slice ( ) ,
171176 ) ;
172- if ( obj . metadata ! . namespace ) {
173- this . indexObj ( obj ) ;
174- }
175177 } ) ;
176178 }
177179
178- private indexObj ( obj : T ) : void {
179- let namespaceList = this . indexCache [ obj . metadata ! . namespace ! ] as T [ ] ;
180- if ( ! namespaceList ) {
181- namespaceList = [ ] ;
182- this . indexCache [ obj . metadata ! . namespace ! ] = namespaceList ;
183- }
184- addOrUpdateObject ( namespaceList , obj ) ;
185- }
186-
187180 private watchHandler ( phase : string , obj : T , watchObj ?: any ) : void {
188181 switch ( phase ) {
189182 case 'ERROR' :
@@ -199,18 +192,9 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
199192 this . callbackCache [ ADD ] . slice ( ) ,
200193 this . callbackCache [ UPDATE ] . slice ( ) ,
201194 ) ;
202- if ( obj . metadata ! . namespace ) {
203- this . indexObj ( obj ) ;
204- }
205195 break ;
206196 case 'DELETED' :
207197 deleteObject ( this . objects , obj , this . callbackCache [ DELETE ] . slice ( ) ) ;
208- if ( obj . metadata ! . namespace ) {
209- const namespaceList = this . indexCache [ obj . metadata ! . namespace ! ] as T [ ] ;
210- if ( namespaceList ) {
211- deleteObject ( namespaceList , obj ) ;
212- }
213- }
214198 break ;
215199 case 'BOOKMARK' :
216200 // nothing to do, here for documentation, mostly.
@@ -222,75 +206,113 @@ export class ListWatch<T extends KubernetesObject> implements ObjectCache<T>, In
222206 }
223207}
224208
209+ // exported for testing
210+ export function cacheMapFromList < T extends KubernetesObject > ( newObjects : T [ ] ) : CacheMap < T > {
211+ const objects : CacheMap < T > = new Map ( ) ;
212+ // build up the new list
213+ for ( const obj of newObjects ) {
214+ let namespaceObjects = objects . get ( obj . metadata ! . namespace || '' ) ;
215+ if ( ! namespaceObjects ) {
216+ namespaceObjects = new Map ( ) ;
217+ objects . set ( obj . metadata ! . namespace || '' , namespaceObjects ) ;
218+ }
219+
220+ const name = obj . metadata ! . name || '' ;
221+ namespaceObjects . set ( name , obj ) ;
222+ }
223+ return objects ;
224+ }
225+
225226// external for testing
226227export function deleteItems < T extends KubernetesObject > (
227- oldObjects : T [ ] ,
228+ oldObjects : CacheMap < T > ,
228229 newObjects : T [ ] ,
229230 deleteCallback ?: ObjectCallback < T > [ ] ,
230- ) : T [ ] {
231- return oldObjects . filter ( ( obj : T ) => {
232- if ( findKubernetesObject ( newObjects , obj ) === - 1 ) {
233- if ( deleteCallback ) {
234- deleteCallback . forEach ( ( fn : ObjectCallback < T > ) => fn ( obj ) ) ;
231+ ) : CacheMap < T > {
232+ const newObjectsMap = cacheMapFromList ( newObjects ) ;
233+
234+ for ( const [ namespace , oldNamespaceObjects ] of oldObjects . entries ( ) ) {
235+ const newNamespaceObjects = newObjectsMap . get ( namespace ) ;
236+ if ( newNamespaceObjects ) {
237+ for ( const [ name , oldObj ] of oldNamespaceObjects . entries ( ) ) {
238+ if ( ! newNamespaceObjects . has ( name ) ) {
239+ oldNamespaceObjects . delete ( name ) ;
240+ if ( deleteCallback ) {
241+ deleteCallback . forEach ( ( fn : ObjectCallback < T > ) => fn ( oldObj ) ) ;
242+ }
243+ }
235244 }
236- return false ;
245+ } else {
246+ oldObjects . delete ( namespace ) ;
247+ oldNamespaceObjects . forEach ( ( obj : T ) => {
248+ if ( deleteCallback ) {
249+ deleteCallback . forEach ( ( fn : ObjectCallback < T > ) => fn ( obj ) ) ;
250+ }
251+ } ) ;
237252 }
238- return true ;
239- } ) ;
253+ }
254+
255+ return oldObjects ;
240256}
241257
242258// Only public for testing.
243259export function addOrUpdateObject < T extends KubernetesObject > (
244- objects : T [ ] ,
260+ objects : CacheMap < T > ,
245261 obj : T ,
246- addCallback ?: ObjectCallback < T > [ ] ,
247- updateCallback ?: ObjectCallback < T > [ ] ,
262+ addCallbacks ?: ObjectCallback < T > [ ] ,
263+ updateCallbacks ?: ObjectCallback < T > [ ] ,
248264) : void {
249- const ix = findKubernetesObject ( objects , obj ) ;
250- if ( ix === - 1 ) {
251- objects . push ( obj ) ;
252- if ( addCallback ) {
253- addCallback . forEach ( ( elt : ObjectCallback < T > ) => elt ( obj ) ) ;
265+ let namespaceObjects = objects . get ( obj . metadata ! . namespace || '' ) ;
266+ if ( ! namespaceObjects ) {
267+ namespaceObjects = new Map ( ) ;
268+ objects . set ( obj . metadata ! . namespace || '' , namespaceObjects ) ;
269+ }
270+
271+ const name = obj . metadata ! . name || '' ;
272+ const found = namespaceObjects . get ( name ) ;
273+ if ( ! found ) {
274+ namespaceObjects . set ( name , obj ) ;
275+ if ( addCallbacks ) {
276+ addCallbacks . forEach ( ( elt : ObjectCallback < T > ) => elt ( obj ) ) ;
254277 }
255278 } else {
256- if ( ! isSameVersion ( objects [ ix ] , obj ) ) {
257- objects [ ix ] = obj ;
258- if ( updateCallback ) {
259- updateCallback . forEach ( ( elt : ObjectCallback < T > ) => elt ( obj ) ) ;
279+ if ( ! isSameVersion ( found , obj ) ) {
280+ namespaceObjects . set ( name , obj ) ;
281+ if ( updateCallbacks ) {
282+ updateCallbacks . forEach ( ( elt : ObjectCallback < T > ) => elt ( obj ) ) ;
260283 }
261284 }
262285 }
263286}
264287
265- function isSameObject < T extends KubernetesObject > ( o1 : T , o2 : T ) : boolean {
266- return o1 . metadata ! . name === o2 . metadata ! . name && o1 . metadata ! . namespace === o2 . metadata ! . namespace ;
267- }
268-
269- function isSameVersion < T extends KubernetesObject > ( o1 : T , o2 : T ) : boolean {
288+ function isSameVersion < T extends KubernetesObject > ( o1 : any , o2 : T ) : boolean {
270289 return (
271290 o1 . metadata ! . resourceVersion !== undefined &&
272291 o1 . metadata ! . resourceVersion !== null &&
273292 o1 . metadata ! . resourceVersion === o2 . metadata ! . resourceVersion
274293 ) ;
275294}
276295
277- function findKubernetesObject < T extends KubernetesObject > ( objects : T [ ] , obj : T ) : number {
278- return objects . findIndex ( ( elt : T ) => {
279- return isSameObject ( elt , obj ) ;
280- } ) ;
281- }
282-
283296// Public for testing.
284297export function deleteObject < T extends KubernetesObject > (
285- objects : T [ ] ,
298+ objects : CacheMap < T > ,
286299 obj : T ,
287- deleteCallback ?: ObjectCallback < T > [ ] ,
300+ deleteCallbacks ?: ObjectCallback < T > [ ] ,
288301) : void {
289- const ix = findKubernetesObject ( objects , obj ) ;
290- if ( ix !== - 1 ) {
291- objects . splice ( ix , 1 ) ;
292- if ( deleteCallback ) {
293- deleteCallback . forEach ( ( elt : ObjectCallback < T > ) => elt ( obj ) ) ;
302+ const namespace = obj . metadata ! . namespace || '' ;
303+ const name = obj . metadata ! . name || '' ;
304+
305+ const namespaceObjects = objects . get ( namespace ) ;
306+ if ( ! namespaceObjects ) {
307+ return ;
308+ }
309+ const deleted = namespaceObjects . delete ( name ) ;
310+ if ( deleted ) {
311+ if ( deleteCallbacks ) {
312+ deleteCallbacks . forEach ( ( elt : ObjectCallback < T > ) => elt ( obj ) ) ;
313+ }
314+ if ( namespaceObjects . size === 0 ) {
315+ objects . delete ( namespace ) ;
294316 }
295317 }
296318}
0 commit comments