Skip to content

Commit bb8bcfe

Browse files
committed
send connection_init and await connection_ack
1 parent ca0c7db commit bb8bcfe

File tree

3 files changed

+69
-28
lines changed

3 files changed

+69
-28
lines changed

src/GraphQL.Client.Abstractions.Websocket/GraphQLWebSocketMessageType.cs

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,7 @@ public static class GraphQLWebSocketMessageType
3838
public const string GQL_CONNECTION_KEEP_ALIVE = "ka"; // Server -> Client
3939

4040
/// <summary>
41-
/// Client sends this message in order to stop a running GraphQL operation execution (for example: unsubscribe)
42-
/// id: string : operation id
41+
/// Client sends this message to terminate the connection.
4342
/// </summary>
4443
public const string GQL_CONNECTION_TERMINATE = "connection_terminate"; // Client -> Server
4544

src/GraphQL.Client/Websocket/GraphQLHttpWebSocket.cs

Lines changed: 43 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
using System.Reactive.Linq;
99
using System.Reactive.Subjects;
1010
using System.Reactive.Threading.Tasks;
11+
using System.Text;
1112
using System.Threading;
1213
using System.Threading.Tasks;
1314
using GraphQL.Client.Abstractions.Websocket;
@@ -111,12 +112,6 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
111112
Id = startRequest.Id,
112113
Type = GraphQLWebSocketMessageType.GQL_STOP
113114
};
114-
var initRequest = new GraphQLWebSocketRequest
115-
{
116-
Id = startRequest.Id,
117-
Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT,
118-
Payload = Options.ConfigureWebSocketConnectionInitPayload(Options)
119-
};
120115

121116
var observable = Observable.Create<GraphQLResponse<TResponse>>(o =>
122117
IncomingMessageStream
@@ -187,20 +182,8 @@ public IObservable<GraphQLResponse<TResponse>> CreateSubscriptionStream<TRespons
187182
catch (OperationCanceledException) { }
188183
})
189184
);
190-
191-
// send connection init
192-
Debug.WriteLine($"sending connection init on subscription {startRequest.Id}");
193-
try
194-
{
195-
await QueueWebSocketRequest(initRequest);
196-
}
197-
catch (Exception e)
198-
{
199-
Debug.WriteLine(e);
200-
throw;
201-
}
202-
203-
Debug.WriteLine($"sending initial message on subscription {startRequest.Id}");
185+
186+
Debug.WriteLine($"sending start message on subscription {startRequest.Id}");
204187
// send subscription request
205188
try
206189
{
@@ -354,12 +337,7 @@ private async Task<Unit> SendWebSocketRequestAsync(GraphQLWebSocketRequest reque
354337
}
355338

356339
await InitializeWebSocket();
357-
var requestBytes = _client.JsonSerializer.SerializeToBytes(request);
358-
await _clientWebSocket.SendAsync(
359-
new ArraySegment<byte>(requestBytes),
360-
WebSocketMessageType.Text,
361-
true,
362-
_internalCancellationToken);
340+
await SendWebSocketMessageAsync(request, _internalCancellationToken);
363341
request.SendCompleted();
364342
}
365343
catch (Exception e)
@@ -369,6 +347,16 @@ await _clientWebSocket.SendAsync(
369347
return Unit.Default;
370348
}
371349

350+
private async Task SendWebSocketMessageAsync(GraphQLWebSocketRequest request, CancellationToken cancellationToken = default)
351+
{
352+
var requestBytes = _client.JsonSerializer.SerializeToBytes(request);
353+
await _clientWebSocket.SendAsync(
354+
new ArraySegment<byte>(requestBytes),
355+
WebSocketMessageType.Text,
356+
true,
357+
cancellationToken);
358+
}
359+
372360
#endregion
373361

374362
public Task InitializeWebSocket()
@@ -469,9 +457,38 @@ private async Task ConnectAsync(CancellationToken token)
469457
Debug.WriteLine($"new incoming message stream {_incomingMessages.GetHashCode()} created");
470458

471459
_incomingMessagesConnection = new CompositeDisposable(maintenanceSubscription, connection);
460+
461+
var initRequest = new GraphQLWebSocketRequest
462+
{
463+
Type = GraphQLWebSocketMessageType.GQL_CONNECTION_INIT,
464+
Payload = Options.ConfigureWebSocketConnectionInitPayload(Options)
465+
};
466+
467+
// setup task to await connection_ack message
468+
var ackTask = _incomingMessages
469+
.Where(response => response != null )
470+
.TakeUntil(response => response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ACK ||
471+
response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ERROR)
472+
.FirstAsync()
473+
.ToTask();
474+
475+
// send connection init
476+
Debug.WriteLine($"sending connection init message");
477+
await SendWebSocketMessageAsync(initRequest);
478+
var response = await ackTask;
479+
480+
if (response.Type == GraphQLWebSocketMessageType.GQL_CONNECTION_ACK)
481+
Debug.WriteLine($"connection acknowledged: {Encoding.UTF8.GetString(response.MessageBytes)}");
482+
else
483+
{
484+
var errorPayload = Encoding.UTF8.GetString(response.MessageBytes);
485+
Debug.WriteLine($"connection error received: {errorPayload}");
486+
throw new GraphQLWebsocketConnectionException(errorPayload);
487+
}
472488
}
473489
catch (Exception e)
474490
{
491+
Debug.WriteLine($"failed to establish websocket connection");
475492
_stateSubject.OnNext(GraphQLWebsocketConnectionState.Disconnected);
476493
_exceptionSubject.OnNext(e);
477494
throw;
Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,25 @@
1+
using System;
2+
using System.Runtime.Serialization;
3+
4+
namespace GraphQL.Client.Http.Websocket
5+
{
6+
[Serializable]
7+
public class GraphQLWebsocketConnectionException: Exception
8+
{
9+
public GraphQLWebsocketConnectionException()
10+
{
11+
}
12+
13+
protected GraphQLWebsocketConnectionException(SerializationInfo info, StreamingContext context) : base(info, context)
14+
{
15+
}
16+
17+
public GraphQLWebsocketConnectionException(string message) : base(message)
18+
{
19+
}
20+
21+
public GraphQLWebsocketConnectionException(string message, Exception innerException) : base(message, innerException)
22+
{
23+
}
24+
}
25+
}

0 commit comments

Comments
 (0)