@@ -5,7 +5,7 @@ import { PassThrough } from 'node:stream';
55import { KubeConfig } from './config.js' ;
66import { Cluster , Context , User } from './config_types.js' ;
77import { Watch } from './watch.js' ;
8- import { IncomingMessage , createServer } from 'node:http' ;
8+ import { ServerResponse , createServer } from 'node:http' ;
99import { AddressInfo } from 'node:net' ;
1010
1111const server = 'https://foo.company.com' ;
@@ -72,11 +72,7 @@ describe('Watch', () => {
7272 s . done ( ) ;
7373 } ) ;
7474
75- it ( 'should not call watch done callback more than once' , async ( ) => {
76- const kc = new KubeConfig ( ) ;
77- Object . assign ( kc , fakeConfig ) ;
78- const watch = new Watch ( kc ) ;
79-
75+ it ( 'should not call watch done callback more than once' , async ( t ) => {
8076 const obj1 = {
8177 type : 'ADDED' ,
8278 object : {
@@ -93,26 +89,13 @@ describe('Watch', () => {
9389
9490 const path = '/some/path/to/object' ;
9591
96- const stream = new PassThrough ( ) ;
97-
98- const [ scope ] = systemUnderTest ( ) ;
99-
100- let response : IncomingMessage | undefined ;
101-
102- const s = scope
103- . get ( path )
104- . query ( {
105- watch : 'true' ,
106- a : 'b' ,
107- } )
108- . reply ( 200 , function ( ) : PassThrough {
109- this . req . on ( 'response' , ( r ) => {
110- response = r ;
111- } ) ;
112- stream . push ( JSON . stringify ( obj1 ) + '\n' ) ;
113- stream . push ( JSON . stringify ( obj2 ) + '\n' ) ;
114- return stream ;
115- } ) ;
92+ let response : ServerResponse | undefined ;
93+ const kc = await setupMockSystem ( t , ( req , res ) => {
94+ response = res ;
95+ res . write ( JSON . stringify ( obj1 ) + '\n' ) ;
96+ res . write ( JSON . stringify ( obj2 ) + '\n' ) ;
97+ } ) ;
98+ const watch = new Watch ( kc ) ;
11699
117100 const receivedTypes : string [ ] = [ ] ;
118101 const receivedObjects : string [ ] = [ ] ;
@@ -154,57 +137,27 @@ describe('Watch', () => {
154137 deepStrictEqual ( receivedObjects , [ obj1 . object , obj2 . object ] ) ;
155138
156139 strictEqual ( doneCalled , 0 ) ;
157-
158- const errIn = new Error ( 'err' ) ;
159- ( response as IncomingMessage ) . destroy ( errIn ) ;
160-
140+ response ! . destroy ( ) ;
161141 await donePromise ;
162-
163142 strictEqual ( doneCalled , 1 ) ;
164- deepStrictEqual ( doneErr , errIn ) ;
165-
166- s . done ( ) ;
167-
168- stream . destroy ( ) ;
143+ strictEqual ( doneErr . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
169144 } ) ;
170145
171- it ( 'should not call the done callback more than once on unexpected connection loss' , async ( ) => {
146+ it ( 'should not call the done callback more than once on unexpected connection loss' , async ( t ) => {
172147 // Create a server that accepts the connection and flushes headers, then
173148 // immediately destroys the connection (causing a "Premature close"
174149 // error).
175150 //
176151 // This reproduces a bug where AbortController.abort() inside
177152 // doneCallOnce could cause done() to be invoked twice.
178-
179- const mockServer = createServer ( ( req , res ) => {
153+ const kc = await setupMockSystem ( t , ( req , res ) => {
180154 res . writeHead ( 200 , {
181155 'Content-Type' : 'application/json' ,
182156 'Transfer-Encoding' : 'chunked' ,
183157 } ) ;
184-
185158 res . flushHeaders ( ) ;
186159 res . destroy ( ) ; // Prematurely close the connection
187160 } ) ;
188-
189- const mockServerPort = await new Promise < number > ( ( resolve ) => {
190- mockServer . listen ( 0 , ( ) => {
191- resolve ( ( mockServer . address ( ) as AddressInfo ) . port ) ;
192- } ) ;
193- } ) ;
194-
195- const kc = new KubeConfig ( ) ;
196-
197- kc . loadFromClusterAndUser (
198- {
199- name : 'cluster' ,
200- server : `http://localhost:${ mockServerPort } ` ,
201- skipTLSVerify : true ,
202- } ,
203- {
204- name : 'user' ,
205- } ,
206- ) ;
207-
208161 const watch = new Watch ( kc ) ;
209162
210163 let doneCalled = 0 ;
@@ -225,15 +178,15 @@ describe('Watch', () => {
225178 ) ;
226179
227180 await donePromise ;
228-
229- mockServer . close ( ) ;
230-
231181 strictEqual ( doneCalled , 1 ) ;
232182 } ) ;
233183
234- it ( 'should call setKeepAlive on the socket to extend the default of 5 mins' , async ( ) => {
235- const kc = new KubeConfig ( ) ;
236-
184+ it ( 'should call setKeepAlive on the socket to extend the default of 5 mins' , async ( t ) => {
185+ let response : ServerResponse | undefined ;
186+ const kc = await setupMockSystem ( t , ( req , res ) => {
187+ response = res ;
188+ res . write ( JSON . stringify ( obj1 ) + '\n' ) ;
189+ } ) ;
237190 const mockSocket = {
238191 setKeepAlive : function ( enable : boolean , timeout : number ) {
239192 this . keepAliveEnabled = enable ;
@@ -242,16 +195,16 @@ describe('Watch', () => {
242195 keepAliveEnabled : false ,
243196 keepAliveTimeout : 0 ,
244197 } ;
245- Object . assign ( kc , {
246- ... fakeConfig ,
247- applyToFetchOptions : async ( ) => ( {
198+
199+ ( kc as any ) . applyToFetchOptions = async ( ) => {
200+ return {
248201 agent : {
249202 sockets : {
250203 'mock-url' : [ mockSocket ] ,
251204 } ,
252205 } ,
253- } ) ,
254- } ) ;
206+ } ;
207+ } ;
255208 const watch = new Watch ( kc ) ;
256209
257210 const obj1 = {
@@ -262,27 +215,6 @@ describe('Watch', () => {
262215 } ;
263216
264217 const path = '/some/path/to/object' ;
265-
266- const stream = new PassThrough ( ) ;
267-
268- const [ scope ] = systemUnderTest ( ) ;
269-
270- let response : IncomingMessage | undefined ;
271-
272- const s = scope
273- . get ( path )
274- . query ( {
275- watch : 'true' ,
276- a : 'b' ,
277- } )
278- . reply ( 200 , function ( ) : PassThrough {
279- this . req . on ( 'response' , ( r ) => {
280- response = r ;
281- } ) ;
282- stream . push ( JSON . stringify ( obj1 ) + '\n' ) ;
283- return stream ;
284- } ) ;
285-
286218 const receivedTypes : string [ ] = [ ] ;
287219 const receivedObjects : string [ ] = [ ] ;
288220 let doneCalled = 0 ;
@@ -326,46 +258,28 @@ describe('Watch', () => {
326258 strictEqual ( mockSocket . keepAliveEnabled , true ) ;
327259 strictEqual ( mockSocket . keepAliveTimeout , 30000 ) ;
328260
329- const errIn = new Error ( 'err' ) ;
330- ( response as IncomingMessage ) . destroy ( errIn ) ;
261+ response ! . destroy ( ) ;
331262
332263 await donePromise ;
333264
334265 strictEqual ( doneCalled , 1 ) ;
335- deepStrictEqual ( doneErr , errIn ) ;
336-
337- s . done ( ) ;
338-
339- stream . destroy ( ) ;
266+ strictEqual ( doneErr . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
340267 } ) ;
341268
342- it ( 'should handle server errors correctly' , async ( ) => {
343- const kc = new KubeConfig ( ) ;
344- Object . assign ( kc , fakeConfig ) ;
345- const watch = new Watch ( kc ) ;
346-
269+ it ( 'should handle server errors correctly' , async ( t ) => {
347270 const obj1 = {
348271 type : 'ADDED' ,
349272 object : {
350273 foo : 'bar' ,
351274 } ,
352275 } ;
353-
354- const stream = new PassThrough ( ) ;
355-
356- const [ scope ] = systemUnderTest ( ) ;
357-
358276 const path = '/some/path/to/object?watch=true' ;
359-
360- let response : IncomingMessage | undefined ;
361-
362- const s = scope . get ( path ) . reply ( 200 , function ( ) : PassThrough {
363- this . req . on ( 'response' , ( r ) => {
364- response = r ;
365- } ) ;
366- stream . push ( JSON . stringify ( obj1 ) + '\n' ) ;
367- return stream ;
277+ let response : ServerResponse | undefined ;
278+ const kc = await setupMockSystem ( t , ( req , res ) => {
279+ response = res ;
280+ res . write ( JSON . stringify ( obj1 ) + '\n' ) ;
368281 } ) ;
282+ const watch = new Watch ( kc ) ;
369283
370284 const receivedTypes : string [ ] = [ ] ;
371285 const receivedObjects : string [ ] = [ ] ;
@@ -405,16 +319,12 @@ describe('Watch', () => {
405319 strictEqual ( doneErr . length , 0 ) ;
406320
407321 const errIn = new Error ( 'err' ) ;
408- ( response as IncomingMessage ) . destroy ( errIn ) ;
322+ response ! . destroy ( errIn ) ;
409323
410324 await donePromise ;
411325
412326 strictEqual ( doneErr . length , 1 ) ;
413- deepStrictEqual ( doneErr [ 0 ] , errIn ) ;
414-
415- s . done ( ) ;
416-
417- stream . destroy ( ) ;
327+ strictEqual ( doneErr [ 0 ] . code , 'ERR_STREAM_PREMATURE_CLOSE' ) ;
418328 } ) ;
419329
420330 it ( 'should handle server side close correctly' , async ( ) => {
@@ -555,3 +465,31 @@ describe('Watch', () => {
555465 await rejects ( promise ) ;
556466 } ) ;
557467} ) ;
468+
469+ async function setupMockSystem ( ctx , handler ) {
470+ const server = createServer ( handler ) ;
471+ ctx . after ( ( ) => {
472+ try {
473+ server . close ( ) ;
474+ } catch {
475+ // Ignore errors during server close.
476+ }
477+ } ) ;
478+ const port = await new Promise < number > ( ( resolve ) => {
479+ server . listen ( 0 , ( ) => {
480+ resolve ( ( server . address ( ) as AddressInfo ) . port ) ;
481+ } ) ;
482+ } ) ;
483+ const kc = new KubeConfig ( ) ;
484+ kc . loadFromClusterAndUser (
485+ {
486+ name : 'cluster' ,
487+ server : `http://localhost:${ port } ` ,
488+ skipTLSVerify : true ,
489+ } ,
490+ {
491+ name : 'user' ,
492+ } ,
493+ ) ;
494+ return kc ;
495+ }
0 commit comments