88using System . Reactive . Linq ;
99using System . Reactive . Subjects ;
1010using System . Reactive . Threading . Tasks ;
11+ using System . Text ;
1112using System . Threading ;
1213using System . Threading . Tasks ;
1314using GraphQL . Client . Abstractions . Websocket ;
@@ -111,12 +112,6 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
111112 Id = startRequest . Id ,
112113 Type = GraphQLWebSocketMessageType . GQL_STOP
113114 } ;
114- var initRequest = new GraphQLWebSocketRequest
115- {
116- Id = startRequest . Id ,
117- Type = GraphQLWebSocketMessageType . GQL_CONNECTION_INIT ,
118- Payload = Options . ConfigureWebSocketConnectionInitPayload ( Options )
119- } ;
120115
121116 var observable = Observable . Create < GraphQLResponse < TResponse > > ( o =>
122117 IncomingMessageStream
@@ -187,20 +182,8 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
187182 catch ( OperationCanceledException ) { }
188183 } )
189184 ) ;
190-
191- // send connection init
192- Debug . WriteLine ( $ "sending connection init on subscription { startRequest . Id } ") ;
193- try
194- {
195- await QueueWebSocketRequest ( initRequest ) ;
196- }
197- catch ( Exception e )
198- {
199- Debug . WriteLine ( e ) ;
200- throw ;
201- }
202-
203- Debug . WriteLine ( $ "sending initial message on subscription { startRequest . Id } ") ;
185+
186+ Debug . WriteLine ( $ "sending start message on subscription { startRequest . Id } ") ;
204187 // send subscription request
205188 try
206189 {
@@ -354,12 +337,7 @@ private async Task<Unit> SendWebSocketRequestAsync(GraphQLWebSocketRequest reque
354337 }
355338
356339 await InitializeWebSocket ( ) ;
357- var requestBytes = _client . JsonSerializer . SerializeToBytes ( request ) ;
358- await _clientWebSocket . SendAsync (
359- new ArraySegment < byte > ( requestBytes ) ,
360- WebSocketMessageType . Text ,
361- true ,
362- _internalCancellationToken ) ;
340+ await SendWebSocketMessageAsync ( request , _internalCancellationToken ) ;
363341 request . SendCompleted ( ) ;
364342 }
365343 catch ( Exception e )
@@ -369,6 +347,16 @@ await _clientWebSocket.SendAsync(
369347 return Unit . Default ;
370348 }
371349
350+ private async Task SendWebSocketMessageAsync ( GraphQLWebSocketRequest request , CancellationToken cancellationToken = default )
351+ {
352+ var requestBytes = _client . JsonSerializer . SerializeToBytes ( request ) ;
353+ await _clientWebSocket . SendAsync (
354+ new ArraySegment < byte > ( requestBytes ) ,
355+ WebSocketMessageType . Text ,
356+ true ,
357+ cancellationToken ) ;
358+ }
359+
372360 #endregion
373361
374362 public Task InitializeWebSocket ( )
@@ -469,9 +457,38 @@ private async Task ConnectAsync(CancellationToken token)
469457 Debug . WriteLine ( $ "new incoming message stream { _incomingMessages . GetHashCode ( ) } created") ;
470458
471459 _incomingMessagesConnection = new CompositeDisposable ( maintenanceSubscription , connection ) ;
460+
461+ var initRequest = new GraphQLWebSocketRequest
462+ {
463+ Type = GraphQLWebSocketMessageType . GQL_CONNECTION_INIT ,
464+ Payload = Options . ConfigureWebSocketConnectionInitPayload ( Options )
465+ } ;
466+
467+ // setup task to await connection_ack message
468+ var ackTask = _incomingMessages
469+ . Where ( response => response != null )
470+ . TakeUntil ( response => response . Type == GraphQLWebSocketMessageType . GQL_CONNECTION_ACK ||
471+ response . Type == GraphQLWebSocketMessageType . GQL_CONNECTION_ERROR )
472+ . FirstAsync ( )
473+ . ToTask ( ) ;
474+
475+ // send connection init
476+ Debug . WriteLine ( $ "sending connection init message") ;
477+ await SendWebSocketMessageAsync ( initRequest ) ;
478+ var response = await ackTask ;
479+
480+ if ( response . Type == GraphQLWebSocketMessageType . GQL_CONNECTION_ACK )
481+ Debug . WriteLine ( $ "connection acknowledged: { Encoding . UTF8 . GetString ( response . MessageBytes ) } ") ;
482+ else
483+ {
484+ var errorPayload = Encoding . UTF8 . GetString ( response . MessageBytes ) ;
485+ Debug . WriteLine ( $ "connection error received: { errorPayload } ") ;
486+ throw new GraphQLWebsocketConnectionException ( errorPayload ) ;
487+ }
472488 }
473489 catch ( Exception e )
474490 {
491+ Debug . WriteLine ( $ "failed to establish websocket connection") ;
475492 _stateSubject . OnNext ( GraphQLWebsocketConnectionState . Disconnected ) ;
476493 _exceptionSubject . OnNext ( e ) ;
477494 throw ;
@@ -600,6 +617,9 @@ private async Task CloseAsync()
600617 return ;
601618 }
602619
620+ Debug . WriteLine ( $ "send \" connection_terminate\" message") ;
621+ await SendWebSocketMessageAsync ( new GraphQLWebSocketRequest { Type = GraphQLWebSocketMessageType . GQL_CONNECTION_TERMINATE } ) ;
622+
603623 Debug . WriteLine ( $ "closing websocket { _clientWebSocket . GetHashCode ( ) } ") ;
604624 await _clientWebSocket . CloseAsync ( WebSocketCloseStatus . NormalClosure , "" , CancellationToken . None ) ;
605625 _stateSubject . OnNext ( GraphQLWebsocketConnectionState . Disconnected ) ;
0 commit comments