Skip to content

Commit 7cb148e

Browse files
committed
fix preprocessing of queries and mutations when sent via websocket
1 parent fd2d88d commit 7cb148e

File tree

2 files changed

+49
-50
lines changed

2 files changed

+49
-50
lines changed

src/GraphQL.Client.LocalExecution/GraphQLLocalExecutionClient.cs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
using System.Linq;
55
using System.Reactive.Linq;
66
using System.Reactive.Threading.Tasks;
7-
using System.Text;
87
using System.Threading;
98
using System.Threading.Tasks;
109
using GraphQL.Client.Abstractions;

src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs

Lines changed: 49 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
182182
catch (OperationCanceledException) { }
183183
})
184184
);
185-
185+
186186
Debug.WriteLine($"sending start message on subscription {startRequest.Id}");
187187
// send subscription request
188188
try
@@ -265,55 +265,55 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
265265
/// <returns></returns>
266266
public Task<GraphQLResponse<TResponse>> SendRequest<TResponse>(GraphQLRequest request, CancellationToken cancellationToken = default) =>
267267
Observable.Create<GraphQLResponse<TResponse>>(async observer =>
268+
{
269+
var preprocessedRequest = await _client.Options.PreprocessRequest(request, _client);
270+
var websocketRequest = new GraphQLWebSocketRequest
268271
{
269-
await _client.Options.PreprocessRequest(request, _client);
270-
var websocketRequest = new GraphQLWebSocketRequest
272+
Id = Guid.NewGuid().ToString("N"),
273+
Type = GraphQLWebSocketMessageType.GQL_START,
274+
Payload = preprocessedRequest
275+
};
276+
var observable = IncomingMessageStream
277+
.Where(response => response != null && response.Id == websocketRequest.Id)
278+
.TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE)
279+
.Select(response =>
271280
{
272-
Id = Guid.NewGuid().ToString("N"),
273-
Type = GraphQLWebSocketMessageType.GQL_START,
274-
Payload = request
275-
};
276-
var observable = IncomingMessageStream
277-
.Where(response => response != null && response.Id == websocketRequest.Id)
278-
.TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_COMPLETE)
279-
.Select(response =>
280-
{
281-
Debug.WriteLine($"received response for request {websocketRequest.Id}");
282-
var typedResponse =
283-
_client.JsonSerializer.DeserializeToWebsocketResponse<TResponse>(
284-
response.MessageBytes);
285-
return typedResponse.Payload;
286-
});
281+
Debug.WriteLine($"received response for request {websocketRequest.Id}");
282+
var typedResponse =
283+
_client.JsonSerializer.DeserializeToWebsocketResponse<TResponse>(
284+
response.MessageBytes);
285+
return typedResponse.Payload;
286+
});
287287

288-
try
289-
{
290-
// initialize websocket (completes immediately if socket is already open)
291-
await InitializeWebSocket();
292-
}
293-
catch (Exception e)
294-
{
295-
// subscribe observer to failed observable
296-
return Observable.Throw<GraphQLResponse<TResponse>>(e).Subscribe(observer);
297-
}
288+
try
289+
{
290+
// initialize websocket (completes immediately if socket is already open)
291+
await InitializeWebSocket();
292+
}
293+
catch (Exception e)
294+
{
295+
// subscribe observer to failed observable
296+
return Observable.Throw<GraphQLResponse<TResponse>>(e).Subscribe(observer);
297+
}
298298

299-
var disposable = new CompositeDisposable(
300-
observable.Subscribe(observer)
301-
);
299+
var disposable = new CompositeDisposable(
300+
observable.Subscribe(observer)
301+
);
302302

303-
Debug.WriteLine($"submitting request {websocketRequest.Id}");
304-
// send request
305-
try
306-
{
307-
await QueueWebSocketRequest(websocketRequest);
308-
}
309-
catch (Exception e)
310-
{
311-
Debug.WriteLine(e);
312-
throw;
313-
}
303+
Debug.WriteLine($"submitting request {websocketRequest.Id}");
304+
// send request
305+
try
306+
{
307+
await QueueWebSocketRequest(websocketRequest);
308+
}
309+
catch (Exception e)
310+
{
311+
Debug.WriteLine(e);
312+
throw;
313+
}
314314

315-
return disposable;
316-
})
315+
return disposable;
316+
})
317317
// complete sequence on OperationCanceledException, this is triggered by the cancellation token
318318
.Catch<GraphQLResponse<TResponse>, OperationCanceledException>(exception =>
319319
Observable.Empty<GraphQLResponse<TResponse>>())
@@ -410,7 +410,7 @@ public Task InitializeWebSocket()
410410
}
411411
catch (NotImplementedException)
412412
{
413-
Debug.WriteLine("property 'ClientWebSocketOptions.ClientCertificates' not implemented by current platform");
413+
Debug.WriteLine("property 'ClientWebSocketOptions.ClientCertificates' not implemented by current platform");
414414
}
415415
catch (PlatformNotSupportedException)
416416
{
@@ -423,7 +423,7 @@ public Task InitializeWebSocket()
423423
}
424424
catch (NotImplementedException)
425425
{
426-
Debug.WriteLine("property 'ClientWebSocketOptions.UseDefaultCredentials' not implemented by current platform");
426+
Debug.WriteLine("property 'ClientWebSocketOptions.UseDefaultCredentials' not implemented by current platform");
427427
}
428428
catch (PlatformNotSupportedException)
429429
{
@@ -479,7 +479,7 @@ private async Task ConnectAsync(CancellationToken token)
479479
Debug.WriteLine($"new incoming message stream {_incomingMessages.GetHashCode()} created");
480480

481481
_incomingMessagesConnection = new CompositeDisposable(maintenanceSubscription, connection);
482-
482+
483483
var initRequest = new GraphQLWebSocketRequest
484484
{
485485
Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT,
@@ -488,7 +488,7 @@ private async Task ConnectAsync(CancellationToken token)
488488

489489
// setup task to await connection_ack message
490490
var ackTask = _incomingMessages
491-
.Where(response => response != null )
491+
.Where(response => response != null)
492492
.TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ACK ||
493493
response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ERROR)
494494
.LastAsync()
@@ -640,7 +640,7 @@ private async Task CloseAsync()
640640
}
641641

642642
Debug.WriteLine($"send \"connection_terminate\" message");
643-
await SendWebSocketMessageAsync(new GraphQLWebSocketRequest{Type = GraphQLWebSocketMessageType.GQL_CONNECTION_TERMINATE});
643+
await SendWebSocketMessageAsync(new GraphQLWebSocketRequest { Type = GraphQLWebSocketMessageType.GQL_CONNECTION_TERMINATE });
644644

645645
Debug.WriteLine($"closing websocket {_clientWebSocket.GetHashCode()}");
646646
await _clientWebSocket.CloseAsync(WebSocketCloseStatus.NormalClosure, "", CancellationToken.None);

0 commit comments

Comments
 (0)