@@ -22,14 +22,12 @@ export class MetadataStream<T> {
2222 private retryCount = 0 ;
2323 private readonly maxRetries : number ;
2424 private currentChunkIndex = 0 ;
25- private reader : ReadableStreamDefaultReader < T > ;
2625
2726 constructor ( private options : MetadataOptions < T > ) {
2827 const [ serverStream , consumerStream ] = this . createTeeStreams ( ) ;
2928 this . serverStream = serverStream ;
3029 this . consumerStream = consumerStream ;
3130 this . maxRetries = options . maxRetries ?? 10 ;
32- this . reader = this . serverStream . getReader ( ) ;
3331
3432 this . streamPromise = this . initializeServerStream ( ) ;
3533 }
@@ -52,6 +50,8 @@ export class MetadataStream<T> {
5250 }
5351
5452 private async makeRequest ( startFromChunk : number = 0 ) : Promise < void > {
53+ const reader = this . serverStream . getReader ( ) ;
54+
5555 return new Promise ( ( resolve , reject ) => {
5656 const url = new URL ( this . buildUrl ( ) ) ;
5757 const timeout = 15 * 60 * 1000 ; // 15 minutes
@@ -71,15 +71,19 @@ export class MetadataStream<T> {
7171 } ) ;
7272
7373 req . on ( "error" , ( error ) => {
74+ reader . releaseLock ( ) ;
7475 reject ( error ) ;
7576 } ) ;
7677
7778 req . on ( "timeout" , ( ) => {
79+ reader . releaseLock ( ) ;
7880 req . destroy ( new Error ( "Request timed out" ) ) ;
7981 } ) ;
8082
8183 req . on ( "response" , ( res ) => {
8284 if ( res . statusCode === 408 ) {
85+ reader . releaseLock ( ) ;
86+
8387 if ( this . retryCount < this . maxRetries ) {
8488 this . retryCount ++ ;
8589
@@ -112,7 +116,7 @@ export class MetadataStream<T> {
112116 const processStream = async ( ) => {
113117 try {
114118 while ( true ) {
115- const { done, value } = await this . reader . read ( ) ;
119+ const { done, value } = await reader . read ( ) ;
116120
117121 if ( done ) {
118122 req . end ( ) ;
@@ -124,7 +128,7 @@ export class MetadataStream<T> {
124128 this . currentChunkIndex ++ ;
125129 }
126130 } catch ( error ) {
127- req . destroy ( error as Error ) ;
131+ reject ( error ) ;
128132 }
129133 } ;
130134
@@ -135,12 +139,7 @@ export class MetadataStream<T> {
135139 }
136140
137141 private async initializeServerStream ( ) : Promise < void > {
138- try {
139- await this . makeRequest ( 0 ) ;
140- } catch ( error ) {
141- this . reader . releaseLock ( ) ;
142- throw error ;
143- }
142+ await this . makeRequest ( 0 ) ;
144143 }
145144
146145 public async wait ( ) : Promise < void > {
0 commit comments