@@ -182,7 +182,7 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
182182 catch ( OperationCanceledException ) { }
183183 } )
184184 ) ;
185-
185+
186186 Debug . WriteLine ( $ "sending start message on subscription { startRequest . Id } ") ;
187187 // send subscription request
188188 try
@@ -265,55 +265,55 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
265265 /// <returns></returns>
266266 public Task < GraphQLResponse < TResponse > > SendRequest < TResponse > ( GraphQLRequest request , CancellationToken cancellationToken = default ) =>
267267 Observable . Create < GraphQLResponse < TResponse > > ( async observer =>
268+ {
269+ var preprocessedRequest = await _client . Options . PreprocessRequest ( request , _client ) ;
270+ var websocketRequest = new GraphQLWebSocketRequest
268271 {
269- await _client . Options . PreprocessRequest ( request , _client ) ;
270- var websocketRequest = new GraphQLWebSocketRequest
272+ Id = Guid . NewGuid ( ) . ToString ( "N" ) ,
273+ Type = GraphQLWebSocketMessageType . GQL_START ,
274+ Payload = preprocessedRequest
275+ } ;
276+ var observable = IncomingMessageStream
277+ . Where ( response => response != null && response . Id == websocketRequest . Id )
278+ . TakeUntil ( response => response . Type == GraphQLWebSocketMessageType . GQL_COMPLETE )
279+ . Select ( response =>
271280 {
272- Id = Guid . NewGuid ( ) . ToString ( "N" ) ,
273- Type = GraphQLWebSocketMessageType . GQL_START ,
274- Payload = request
275- } ;
276- var observable = IncomingMessageStream
277- . Where ( response => response != null && response . Id == websocketRequest . Id )
278- . TakeUntil ( response => response . Type == GraphQLWebSocketMessageType . GQL_COMPLETE )
279- . Select ( response =>
280- {
281- Debug . WriteLine ( $ "received response for request { websocketRequest . Id } ") ;
282- var typedResponse =
283- _client . JsonSerializer . DeserializeToWebsocketResponse < TResponse > (
284- response . MessageBytes ) ;
285- return typedResponse . Payload ;
286- } ) ;
281+ Debug . WriteLine ( $ "received response for request { websocketRequest . Id } ") ;
282+ var typedResponse =
283+ _client . JsonSerializer . DeserializeToWebsocketResponse < TResponse > (
284+ response . MessageBytes ) ;
285+ return typedResponse . Payload ;
286+ } ) ;
287287
288- try
289- {
290- // initialize websocket (completes immediately if socket is already open)
291- await InitializeWebSocket ( ) ;
292- }
293- catch ( Exception e )
294- {
295- // subscribe observer to failed observable
296- return Observable . Throw < GraphQLResponse < TResponse > > ( e ) . Subscribe ( observer ) ;
297- }
288+ try
289+ {
290+ // initialize websocket (completes immediately if socket is already open)
291+ await InitializeWebSocket ( ) ;
292+ }
293+ catch ( Exception e )
294+ {
295+ // subscribe observer to failed observable
296+ return Observable . Throw < GraphQLResponse < TResponse > > ( e ) . Subscribe ( observer ) ;
297+ }
298298
299- var disposable = new CompositeDisposable (
300- observable . Subscribe ( observer )
301- ) ;
299+ var disposable = new CompositeDisposable (
300+ observable . Subscribe ( observer )
301+ ) ;
302302
303- Debug . WriteLine ( $ "submitting request { websocketRequest . Id } ") ;
304- // send request
305- try
306- {
307- await QueueWebSocketRequest ( websocketRequest ) ;
308- }
309- catch ( Exception e )
310- {
311- Debug . WriteLine ( e ) ;
312- throw ;
313- }
303+ Debug . WriteLine ( $ "submitting request { websocketRequest . Id } ") ;
304+ // send request
305+ try
306+ {
307+ await QueueWebSocketRequest ( websocketRequest ) ;
308+ }
309+ catch ( Exception e )
310+ {
311+ Debug . WriteLine ( e ) ;
312+ throw ;
313+ }
314314
315- return disposable ;
316- } )
315+ return disposable ;
316+ } )
317317 // complete sequence on OperationCanceledException, this is triggered by the cancellation token
318318 . Catch < GraphQLResponse < TResponse > , OperationCanceledException > ( exception =>
319319 Observable . Empty < GraphQLResponse < TResponse > > ( ) )
@@ -410,7 +410,7 @@ public Task InitializeWebSocket()
410410 }
411411 catch ( NotImplementedException )
412412 {
413- Debug . WriteLine ( "property 'ClientWebSocketOptions.ClientCertificates' not implemented by current platform" ) ;
413+ Debug . WriteLine ( "property 'ClientWebSocketOptions.ClientCertificates' not implemented by current platform" ) ;
414414 }
415415 catch ( PlatformNotSupportedException )
416416 {
@@ -423,7 +423,7 @@ public Task InitializeWebSocket()
423423 }
424424 catch ( NotImplementedException )
425425 {
426- Debug . WriteLine ( "property 'ClientWebSocketOptions.UseDefaultCredentials' not implemented by current platform" ) ;
426+ Debug . WriteLine ( "property 'ClientWebSocketOptions.UseDefaultCredentials' not implemented by current platform" ) ;
427427 }
428428 catch ( PlatformNotSupportedException )
429429 {
@@ -479,7 +479,7 @@ private async Task ConnectAsync(CancellationToken token)
479479 Debug . WriteLine ( $ "new incoming message stream { _incomingMessages . GetHashCode ( ) } created") ;
480480
481481 _incomingMessagesConnection = new CompositeDisposable ( maintenanceSubscription , connection ) ;
482-
482+
483483 var initRequest = new GraphQLWebSocketRequest
484484 {
485485 Type = GraphQLWebSocketMessageType . GQL_CONNECTION_INIT ,
@@ -488,7 +488,7 @@ private async Task ConnectAsync(CancellationToken token)
488488
489489 // setup task to await connection_ack message
490490 var ackTask = _incomingMessages
491- . Where ( response => response != null )
491+ . Where ( response => response != null )
492492 . TakeUntil ( response => response . Type == GraphQLWebSocketMessageType . GQL_CONNECTION_ACK ||
493493 response . Type == GraphQLWebSocketMessageType . GQL_CONNECTION_ERROR )
494494 . LastAsync ( )
@@ -640,7 +640,7 @@ private async Task CloseAsync()
640640 }
641641
642642 Debug . WriteLine ( $ "send \" connection_terminate\" message") ;
643- await SendWebSocketMessageAsync ( new GraphQLWebSocketRequest { Type = GraphQLWebSocketMessageType . GQL_CONNECTION_TERMINATE } ) ;
643+ await SendWebSocketMessageAsync ( new GraphQLWebSocketRequest { Type = GraphQLWebSocketMessageType . GQL_CONNECTION_TERMINATE } ) ;
644644
645645 Debug . WriteLine ( $ "closing websocket { _clientWebSocket . GetHashCode ( ) } ") ;
646646 await _clientWebSocket . CloseAsync ( WebSocketCloseStatus . NormalClosure , "" , CancellationToken . None ) ;
0 commit comments