Skip to content

Commit 9e4e2d7

Browse files
committed
remove all reactive schedulers
1 parent 8c763ad commit 9e4e2d7

File tree

1 file changed

+6
-13
lines changed

1 file changed

+6
-13
lines changed

src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs

Lines changed: 6 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ internal class GraphQLHttpWebSocket : IDisposable
3030
private readonly BehaviorSubject<GraphQLWebsocketConnectionState> _stateSubject =
3131
new BehaviorSubject<GraphQLWebsocketConnectionState>(GraphQLWebsocketConnectionState.Disconnected);
3232
private readonly IDisposable _requestSubscription;
33-
private readonly EventLoopScheduler _receiveLoopScheduler = new EventLoopScheduler();
34-
private readonly EventLoopScheduler _sendLoopScheduler = new EventLoopScheduler();
3533

3634
private int _connectionAttempt = 0;
3735
private IConnectableObservable<WebsocketMessageWrapper> _incomingMessages;
@@ -80,8 +78,6 @@ public GraphQLHttpWebSocket(Uri webSocketUri, GraphQLHttpClient client)
8078
_client = client;
8179
_buffer = new ArraySegment<byte>(new byte[8192]);
8280
IncomingMessageStream = GetMessageStream();
83-
_receiveLoopScheduler.Schedule(() =>
84-
Debug.WriteLine($"receive loop scheduler thread id: {Thread.CurrentThread.ManagedThreadId}"));
8581

8682
_requestSubscription = _requestSubject
8783
.Select(SendWebSocketRequestAsync)
@@ -436,7 +432,7 @@ private async Task ConnectAsync(CancellationToken token)
436432

437433
// create receiving observable
438434
_incomingMessages = Observable
439-
.Defer(() => GetReceiveTask().ToObservable().ObserveOn(_receiveLoopScheduler))
435+
.Defer(() => GetReceiveTask().ToObservable())
440436
.Repeat()
441437
// complete sequence on OperationCanceledException, this is triggered by the cancellation token on disposal
442438
.Catch<WebsocketMessageWrapper, OperationCanceledException>(exception => Observable.Empty<WebsocketMessageWrapper>())
@@ -489,13 +485,13 @@ private Task BackOff()
489485
}
490486

491487
private IObservable<WebsocketMessageWrapper> GetMessageStream() =>
492-
Observable.Using(() => new EventLoopScheduler(), scheduler =>
493-
Observable.Create<WebsocketMessageWrapper>(async observer =>
488+
Observable.Create<WebsocketMessageWrapper>(async observer =>
494489
{
495490
// make sure the websocket is connected
496491
await InitializeWebSocket();
497492
// subscribe observer to message stream
498-
var subscription = new CompositeDisposable(_incomingMessages.ObserveOn(scheduler).Subscribe(observer))
493+
var subscription = new CompositeDisposable(_incomingMessages
494+
.Subscribe(observer))
499495
{
500496
// register the observer's OnCompleted method with the cancellation token to complete the sequence on disposal
501497
_internalCancellationTokenSource.Token.Register(observer.OnCompleted)
@@ -507,7 +503,7 @@ private IObservable<WebsocketMessageWrapper> GetMessageStream() =>
507503
Debug.WriteLine($"new incoming message subscription {hashCode} created");
508504

509505
return subscription;
510-
}));
506+
});
511507

512508
private Task<WebsocketMessageWrapper> _receiveAsyncTask = null;
513509
private readonly object _receiveTaskLocker = new object();
@@ -634,10 +630,7 @@ private async Task CompleteAsync()
634630
_exceptionSubject?.OnCompleted();
635631
_exceptionSubject?.Dispose();
636632
_internalCancellationTokenSource.Dispose();
637-
638-
_sendLoopScheduler?.Dispose();
639-
_receiveLoopScheduler?.Dispose();
640-
633+
641634
Debug.WriteLine("GraphQLHttpWebSocket disposed");
642635
}
643636

0 commit comments

Comments
 (0)