@@ -174,38 +174,74 @@ export class SynthesizeStream extends tts.SynthesizeStream {
174174 }
175175 } ;
176176
177- ws . on ( 'message' , ( data ) => {
178- const json = JSON . parse ( data . toString ( ) ) ;
179-
180- if ( json ?. data ?. audio ) {
181- const audio = new Int8Array ( Buffer . from ( json . data . audio , 'base64' ) ) ;
182- for ( const frame of bstream . write ( audio ) ) {
183- sendLastFrame ( requestId , false ) ;
184- lastFrame = frame ;
185- // this.queue.put({frame, requestId, segmentId: requestId, final: false})
186- }
177+ while ( ! closing ) {
178+ try {
179+ await new Promise < void > ( ( resolve , reject ) => {
180+ ws . removeAllListeners ( ) ;
181+
182+ ws . on ( 'message' , ( data ) => {
183+ try {
184+ const json = JSON . parse ( data . toString ( ) ) ;
185+
186+ if ( json ?. data ?. audio ) {
187+ const audio = new Int8Array ( Buffer . from ( json . data . audio , 'base64' ) ) ;
188+ for ( const frame of bstream . write ( audio ) ) {
189+ sendLastFrame ( requestId , false ) ;
190+ lastFrame = frame ;
191+ }
187192
188- if ( json ?. data ?. stop ) {
189- // This is a bool flag, it is True when audio reaches "<STOP>"
190- for ( const frame of bstream . flush ( ) ) {
191- sendLastFrame ( requestId , false ) ;
192- lastFrame = frame ;
193- }
194- sendLastFrame ( requestId , true ) ;
195- this . queue . put ( SynthesizeStream . END_OF_STREAM ) ;
193+ if ( json ?. data ?. stop ) {
194+ // This is a bool flag, it is True when audio reaches "<STOP>"
195+ for ( const frame of bstream . flush ( ) ) {
196+ sendLastFrame ( requestId , false ) ;
197+ lastFrame = frame ;
198+ }
199+ sendLastFrame ( requestId , true ) ;
200+ this . queue . put ( SynthesizeStream . END_OF_STREAM ) ;
201+
202+ closing = true ;
203+ ws . close ( ) ;
204+ resolve ( ) ;
205+ return ;
206+ }
207+ }
208+ resolve ( ) ;
209+ } catch ( error ) {
210+ this . #logger. error ( `Error parsing WebSocket message: ${ error } ` ) ;
211+ reject ( error ) ;
212+ }
213+ } ) ;
196214
197- closing = true ;
198- ws . close ( ) ;
199- return ;
215+ ws . on ( 'error' , ( error ) => {
216+ this . #logger. error ( `WebSocket error: ${ error } ` ) ;
217+ if ( ! closing ) {
218+ closing = true ;
219+ this . queue . put ( SynthesizeStream . END_OF_STREAM ) ;
220+ ws . close ( ) ;
221+ }
222+ reject ( error ) ;
223+ } ) ;
224+
225+ ws . on ( 'close' , ( code , reason ) => {
226+ if ( ! closing ) {
227+ this . #logger. error ( `WebSocket closed with code ${ code } : ${ reason } ` ) ;
228+ this . queue . put ( SynthesizeStream . END_OF_STREAM ) ;
229+ }
230+ // Only reject if we haven't processed all expected frames
231+ if ( ! closing ) {
232+ reject ( new Error ( `WebSocket closed prematurely with code ${ code } : ${ reason } ` ) ) ;
233+ } else {
234+ resolve ( ) ;
235+ }
236+ } ) ;
237+ } ) ;
238+ } catch ( err ) {
239+ if ( err instanceof Error && ! err . message . includes ( 'WebSocket closed prematurely' ) ) {
240+ this . #logger. error ( { err } , 'Error in recvTask from Neuphonic WebSocket' ) ;
200241 }
242+ break ;
201243 }
202- } ) ;
203- ws . on ( 'close' , ( code , reason ) => {
204- if ( ! closing ) {
205- this . #logger. error ( `WebSocket closed with code ${ code } : ${ reason } ` ) ;
206- }
207- ws . removeAllListeners ( ) ;
208- } ) ;
244+ }
209245 } ;
210246
211247 const url = `wss://${ API_BASE_URL } /speak/en?${ getQueryParamString ( this . #opts) } &api_key=${ this . #opts. apiKey } ` ;
0 commit comments