@@ -40,6 +40,8 @@ export type SerializeMessage<T> = (
4040export interface MessageProtocolHandler < U , D > {
4141 validate : ValidateProtocolMessage < D > ;
4242 serialize : SerializeMessage < U > ;
43+ // Optional: create a synthetic protocol message when the websocket closes
44+ createClosedMessage ?: ( event : CloseEvent ) => D ;
4345}
4446
4547export class WebSocketClient < U , D > implements AsyncIterable < D > {
@@ -50,6 +52,7 @@ export class WebSocketClient<U, D> implements AsyncIterable<D> {
5052 private messageQueue : D [ ] = [ ] ;
5153 private validate : ValidateProtocolMessage < D > ;
5254 private serialize : SerializeMessage < U > ;
55+ private createClosedMessage ?: ( event : CloseEvent ) => D ;
5356
5457 private receiverQueue : [ ResolveFn < D > , RejectFn < Error > ] [ ] = [ ] ;
5558 private done : boolean = false ;
@@ -60,6 +63,7 @@ export class WebSocketClient<U, D> implements AsyncIterable<D> {
6063 ) {
6164 this . validate = handler . validate ;
6265 this . serialize = handler . serialize ;
66+ this . createClosedMessage = handler . createClosedMessage ;
6367 this . connectedPromise = new Promise ( async ( resolve , reject ) => {
6468 this . socket = await getWebsocket ( settings ) ;
6569 this . socket . onopen = ( ) => {
@@ -88,7 +92,17 @@ export class WebSocketClient<U, D> implements AsyncIterable<D> {
8892 private getClosedHandler (
8993 closeResolve : ( _ : void ) => void ,
9094 ) : ( _ : CloseEvent ) => void {
91- return ( _ : CloseEvent ) => {
95+ return ( event : CloseEvent ) => {
96+ // If provided, enqueue a synthetic closed message for consumers
97+ if ( this . createClosedMessage ) {
98+ const closedMsg = this . createClosedMessage ( event ) ;
99+ if ( this . receiverQueue . length > 0 ) {
100+ const [ resolve , _ ] = this . receiverQueue . shift ( ) ! ;
101+ resolve ( { value : closedMsg , done : false } ) ;
102+ } else {
103+ this . messageQueue . push ( closedMsg ) ;
104+ }
105+ }
92106 this . done = true ;
93107 while ( this . receiverQueue . length > 0 ) {
94108 const [ resolve , reject ] = this . receiverQueue . shift ( ) ! ;
@@ -126,11 +140,11 @@ export class WebSocketClient<U, D> implements AsyncIterable<D> {
126140 next : ( ) : Promise < IteratorResult < D > > => {
127141 if ( this . error ) {
128142 return Promise . reject ( this . error ) ;
129- } else if ( this . done ) {
130- return Promise . resolve ( { value : undefined , done : true } ) ;
131143 } else if ( this . messageQueue . length > 0 ) {
132144 const message = this . messageQueue . shift ( ) ! ;
133145 return Promise . resolve ( { value : message , done : false } ) ;
146+ } else if ( this . done ) {
147+ return Promise . resolve ( { value : undefined , done : true } ) ;
134148 } else {
135149 return new Promise ( ( resolve , reject ) => {
136150 this . receiverQueue . push ( [ resolve , reject ] ) ;
0 commit comments