Skip to content

Commit b1888b4

Browse files
committed
eliminate incomingMessagesSubject
1 parent 2c4aec7 commit b1888b4

File tree

3 files changed

+83
-84
lines changed

3 files changed

+83
-84
lines changed

src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs

Lines changed: 72 additions & 79 deletions
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ internal class GraphQLHttpWebSocket : IDisposable {
3131
private readonly EventLoopScheduler sendLoopScheduler = new EventLoopScheduler();
3232

3333
private int connectionAttempt = 0;
34-
private Subject<WebsocketMessageWrapper> incomingMessagesSubject;
35-
private IDisposable incomingMessagesDisposable;
34+
private IConnectableObservable<WebsocketMessageWrapper> incomingMessages;
35+
private IDisposable incomingMessagesConnection;
3636
private GraphQLHttpClientOptions Options => client.Options;
3737

3838
private Task initializeWebSocketTask = Task.CompletedTask;
@@ -302,7 +302,7 @@ public Task<GraphQLResponse<TResponse>> SendRequest<TResponse>(GraphQLRequest re
302302
.ToTask(cancellationToken);
303303
}
304304

305-
public Task QueueWebSocketRequest(GraphQLWebSocketRequest request) {
305+
private Task QueueWebSocketRequest(GraphQLWebSocketRequest request) {
306306
requestSubject.OnNext(request);
307307
return request.SendTask();
308308
}
@@ -381,13 +381,41 @@ private async Task ConnectAsync(CancellationToken token) {
381381
try {
382382
await BackOff();
383383
stateSubject.OnNext(GraphQLWebsocketConnectionState.Connecting);
384-
Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()}");
384+
Debug.WriteLine($"opening websocket {clientWebSocket.GetHashCode()} (thread {Thread.CurrentThread.ManagedThreadId})");
385385
await clientWebSocket.ConnectAsync(webSocketUri, token);
386386
stateSubject.OnNext(GraphQLWebsocketConnectionState.Connected);
387387
Debug.WriteLine($"connection established on websocket {clientWebSocket.GetHashCode()}, invoking Options.OnWebsocketConnected()");
388388
await (Options.OnWebsocketConnected?.Invoke(client) ?? Task.CompletedTask);
389389
Debug.WriteLine($"invoking Options.OnWebsocketConnected() on websocket {clientWebSocket.GetHashCode()}");
390390
connectionAttempt = 1;
391+
392+
// create receiving observable
393+
incomingMessages = Observable
394+
.Defer(() => GetReceiveTask().ToObservable().ObserveOn(receiveLoopScheduler))
395+
.Repeat()
396+
// complete sequence on OperationCanceledException, this is triggered by the cancellation token on disposal
397+
.Catch<WebsocketMessageWrapper, OperationCanceledException>(exception => Observable.Empty<WebsocketMessageWrapper>())
398+
.Publish();
399+
400+
// subscribe maintenance
401+
var maintenanceSubscription = incomingMessages.Subscribe(_ => { }, ex => {
402+
Debug.WriteLine($"incoming message stream {incomingMessages.GetHashCode()} received an error: {ex}");
403+
exceptionSubject.OnNext(ex);
404+
incomingMessagesConnection?.Dispose();
405+
stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected);
406+
},
407+
() => {
408+
Debug.WriteLine($"incoming message stream {incomingMessages.GetHashCode()} completed");
409+
incomingMessagesConnection?.Dispose();
410+
stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected);
411+
});
412+
413+
414+
// connect observable
415+
var connection = incomingMessages.Connect();
416+
Debug.WriteLine($"new incoming message stream {incomingMessages.GetHashCode()} created");
417+
418+
incomingMessagesConnection = new CompositeDisposable(maintenanceSubscription, connection);
391419
}
392420
catch (Exception e) {
393421
stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected);
@@ -410,61 +438,25 @@ private Task BackOff() {
410438
return Task.Delay(delay, internalCancellationToken);
411439
}
412440

413-
414441
private IObservable<WebsocketMessageWrapper> GetMessageStream() {
415-
return Observable.Create<WebsocketMessageWrapper>(CreateMessageStream)
416-
// complete sequence on OperationCanceledException, this is triggered by the cancellation token on disposal
417-
.Catch<WebsocketMessageWrapper, OperationCanceledException>(exception =>
418-
Observable.Empty<WebsocketMessageWrapper>());
419-
}
420-
421-
private async Task<IDisposable> CreateMessageStream(IObserver<WebsocketMessageWrapper> observer, CancellationToken token) {
422-
var cts = CancellationTokenSource.CreateLinkedTokenSource(token, internalCancellationToken);
423-
cts.Token.ThrowIfCancellationRequested();
424-
425-
426-
if (incomingMessagesSubject == null || incomingMessagesSubject.IsDisposed) {
427-
// create new response subject
428-
incomingMessagesSubject = new Subject<WebsocketMessageWrapper>();
429-
Debug.WriteLine($"creating new incoming message stream {incomingMessagesSubject.GetHashCode()}");
430-
431-
// initialize and connect websocket
432-
await InitializeWebSocket();
433-
434-
// loop the receive task and subscribe the created subject to the results
435-
var receiveLoopSubscription = Observable
436-
.Defer(() => GetReceiveTask().ToObservable())
437-
.Repeat()
438-
.Subscribe(incomingMessagesSubject);
439-
440-
incomingMessagesDisposable = new CompositeDisposable(
441-
incomingMessagesSubject,
442-
receiveLoopSubscription,
443-
Disposable.Create(() => {
444-
Debug.WriteLine($"incoming message stream {incomingMessagesSubject.GetHashCode()} disposed");
442+
return Observable.Using(() => new EventLoopScheduler(), scheduler =>
443+
Observable.Create<WebsocketMessageWrapper>(async observer => {
444+
// make sure the websocket ist connected
445+
await InitializeWebSocket();
446+
// subscribe observer to message stream
447+
var subscription = new CompositeDisposable(incomingMessages.ObserveOn(scheduler).Subscribe(observer));
448+
// register the observer's OnCompleted method with the cancellation token to complete the sequence on disposal
449+
subscription.Add(internalCancellationTokenSource.Token.Register(observer.OnCompleted));
450+
451+
// add some debug output
452+
var hashCode = subscription.GetHashCode();
453+
subscription.Add(Disposable.Create(() => {
454+
Debug.WriteLine($"incoming message subscription {hashCode} disposed");
445455
}));
456+
Debug.WriteLine($"new incoming message subscription {hashCode} created");
446457

447-
// dispose the subject on any error or completion (will be recreated)
448-
incomingMessagesSubject.Subscribe(_ => { }, ex => {
449-
exceptionSubject.OnNext(ex);
450-
incomingMessagesDisposable?.Dispose();
451-
incomingMessagesSubject = null;
452-
stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected);
453-
},
454-
() => {
455-
incomingMessagesDisposable?.Dispose();
456-
incomingMessagesSubject = null;
457-
stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected);
458-
});
459-
}
460-
461-
var subscription = new CompositeDisposable(incomingMessagesSubject.Subscribe(observer));
462-
var hashCode = subscription.GetHashCode();
463-
subscription.Add(Disposable.Create(() => {
464-
Debug.WriteLine($"incoming message subscription {hashCode} disposed");
465-
}));
466-
Debug.WriteLine($"new incoming message subscription {hashCode} created");
467-
return subscription;
458+
return subscription;
459+
}));
468460
}
469461

470462
private Task<WebsocketMessageWrapper> receiveAsyncTask = null;
@@ -490,30 +482,31 @@ private Task<WebsocketMessageWrapper> GetReceiveTask() {
490482
/// </summary>
491483
/// <returns></returns>
492484
private async Task<WebsocketMessageWrapper> ReceiveWebsocketMessagesAsync() {
485+
internalCancellationToken.ThrowIfCancellationRequested();
486+
493487
try {
494488
Debug.WriteLine($"waiting for data on websocket {clientWebSocket.GetHashCode()} (thread {Thread.CurrentThread.ManagedThreadId})...");
495489

496-
using (var ms = new MemoryStream()) {
497-
WebSocketReceiveResult webSocketReceiveResult = null;
498-
do {
499-
internalCancellationToken.ThrowIfCancellationRequested();
500-
webSocketReceiveResult = await clientWebSocket.ReceiveAsync(buffer, CancellationToken.None);
501-
ms.Write(buffer.Array, buffer.Offset, webSocketReceiveResult.Count);
502-
}
503-
while (!webSocketReceiveResult.EndOfMessage);
490+
using var ms = new MemoryStream();
491+
WebSocketReceiveResult webSocketReceiveResult = null;
492+
do {
493+
// cancellation is done implicitly via the close method
494+
webSocketReceiveResult = await clientWebSocket.ReceiveAsync(buffer, CancellationToken.None);
495+
ms.Write(buffer.Array, buffer.Offset, webSocketReceiveResult.Count);
496+
}
497+
while (!webSocketReceiveResult.EndOfMessage && !internalCancellationToken.IsCancellationRequested);
504498

505-
internalCancellationToken.ThrowIfCancellationRequested();
506-
ms.Seek(0, SeekOrigin.Begin);
499+
internalCancellationToken.ThrowIfCancellationRequested();
500+
ms.Seek(0, SeekOrigin.Begin);
507501

508-
if (webSocketReceiveResult.MessageType == WebSocketMessageType.Text) {
509-
var response = await Options.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms);
510-
response.MessageBytes = ms.ToArray();
511-
Debug.WriteLine($"{response.MessageBytes.Length} bytes received on websocket {clientWebSocket.GetHashCode()} (thread {Thread.CurrentThread.ManagedThreadId})...");
512-
return response;
513-
}
514-
else {
515-
throw new NotSupportedException("binary websocket messages are not supported");
516-
}
502+
if (webSocketReceiveResult.MessageType == WebSocketMessageType.Text) {
503+
var response = await Options.JsonSerializer.DeserializeToWebsocketResponseWrapperAsync(ms);
504+
response.MessageBytes = ms.ToArray();
505+
Debug.WriteLine($"{response.MessageBytes.Length} bytes received on websocket {clientWebSocket.GetHashCode()} (thread {Thread.CurrentThread.ManagedThreadId})...");
506+
return response;
507+
}
508+
else {
509+
throw new NotSupportedException("binary websocket messages are not supported");
517510
}
518511
}
519512
catch (Exception e) {
@@ -560,15 +553,15 @@ public void Complete() {
560553
private readonly object completedLocker = new object();
561554
private async Task CompleteAsync() {
562555
Debug.WriteLine($"disposing websocket {clientWebSocket.GetHashCode()}...");
556+
incomingMessagesConnection?.Dispose();
557+
563558
if (!internalCancellationTokenSource.IsCancellationRequested)
564559
internalCancellationTokenSource.Cancel();
560+
565561
await CloseAsync();
566562
requestSubscription?.Dispose();
567563
clientWebSocket?.Dispose();
568-
569-
incomingMessagesSubject?.OnCompleted();
570-
incomingMessagesDisposable?.Dispose();
571-
564+
572565
stateSubject?.OnCompleted();
573566
stateSubject?.Dispose();
574567

tests/GraphQL.Client.Tests.Common/Helpers/ObservableTester.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -89,7 +89,12 @@ public AndWhichConstraint<SubscriptionAssertions<TPayload>, TPayload> HaveReceiv
8989
string because = "", params object[] becauseArgs) {
9090
Execute.Assertion
9191
.BecauseOf(because, becauseArgs)
92-
.Given(() => Subject.updateReceived.Wait(timeout))
92+
.Given(() => {
93+
var isSet = Subject.updateReceived.Wait(timeout);
94+
if(!isSet)
95+
Debug.WriteLine($"waiting for payload on thread {Thread.CurrentThread.ManagedThreadId} timed out!");
96+
return isSet;
97+
})
9398
.ForCondition(isSet => isSet)
9499
.FailWith("Expected {context:Subscription} to receive new payload{reason}, but did not receive an update within {0}", timeout);
95100

tests/GraphQL.Integration.Tests/WebsocketTests/Base.cs

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,24 +169,25 @@ public async void CanReconnectWithSameObservable() {
169169

170170
const string message1 = "Hello World";
171171
Debug.WriteLine($"adding message {message1}");
172-
var response = await ChatClient.AddMessageAsync(message1);
172+
var response = await ChatClient.AddMessageAsync(message1).ConfigureAwait(true);
173173
response.Data.AddMessage.Content.Should().Be(message1);
174174
tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message1);
175175

176176
const string message2 = "How are you?";
177-
response = await ChatClient.AddMessageAsync(message2);
177+
response = await ChatClient.AddMessageAsync(message2).ConfigureAwait(true);
178178
response.Data.AddMessage.Content.Should().Be(message2);
179179
tester.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message2);
180180

181181
Debug.WriteLine("disposing subscription...");
182182
tester.Dispose(); // does not close the websocket connection
183183

184-
Debug.WriteLine("creating new subscription...");
184+
Debug.WriteLine($"creating new subscription from thread {Thread.CurrentThread.ManagedThreadId} ...");
185185
var tester2 = observable.Monitor();
186+
Debug.WriteLine($"waiting for payload on {Thread.CurrentThread.ManagedThreadId} ...");
186187
tester2.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message2);
187188

188189
const string message3 = "lorem ipsum dolor si amet";
189-
response = await ChatClient.AddMessageAsync(message3);
190+
response = await ChatClient.AddMessageAsync(message3).ConfigureAwait(true);
190191
response.Data.AddMessage.Content.Should().Be(message3);
191192
tester2.Should().HaveReceivedPayload().Which.Data.MessageAdded.Content.Should().Be(message3);
192193

0 commit comments

Comments
 (0)