Skip to content

Commit 649c9e2

Browse files
wait the ack when a ping is sent
Signed-off-by: Vincenzo Palazzo <[email protected]>
1 parent b73bb36 commit 649c9e2

File tree

1 file changed

+26
-16
lines changed

1 file changed

+26
-16
lines changed

packages/graphql/lib/src/links/websocket_link/websocket_client.dart

Lines changed: 26 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class SubscriptionListener {
3535
SubscriptionListener(this.callback, this.hasBeenTriggered);
3636
}
3737

38-
enum SocketConnectionState { notConnected, connecting, connected }
38+
enum SocketConnectionState { notConnected, handshake, connecting, connected }
3939

4040
class SocketClientConfig {
4141
const SocketClientConfig({
@@ -231,11 +231,11 @@ class SocketClient {
231231
/// Connects to the server.
232232
///
233233
/// If this instance is disposed, this method does nothing.
234-
Future<void> _connect() async {
234+
Future<SocketClient> _connect() async {
235235
final InitOperation initOperation = await config.initOperation;
236236

237237
if (_connectionStateController.isClosed || _wasDisposed) {
238-
return;
238+
return this;
239239
}
240240

241241
_connectionStateController.add(SocketConnectionState.connecting);
@@ -246,8 +246,21 @@ class SocketClient {
246246
var connection =
247247
await config.connect(uri: Uri.parse(url), protocols: [protocol]);
248248
socketChannel = connection.forGraphQL();
249-
_connectionStateController.add(SocketConnectionState.connected);
249+
250+
if (protocol == SocketSubProtocol.graphqlTransportWs) {
251+
_connectionStateController.add(SocketConnectionState.handshake);
252+
} else {
253+
_connectionStateController.add(SocketConnectionState.connected);
254+
}
255+
print('Initialising connection');
250256
_write(initOperation);
257+
if (protocol == SocketSubProtocol.graphqlTransportWs) {
258+
// wait for ack
259+
// this blocks to prevent ping from being called before ack is recieved
260+
await _messages.firstWhere(
261+
(message) => message.type == MessageTypes.connectionAck);
262+
_connectionStateController.add(SocketConnectionState.connected);
263+
}
251264

252265
if (config.inactivityTimeout != null) {
253266
if (protocol == SocketSubProtocol.graphqlWs) {
@@ -289,6 +302,7 @@ class SocketClient {
289302
} catch (e) {
290303
onConnectionLost(e);
291304
}
305+
return this;
292306
}
293307

294308
void onConnectionLost([Object? e]) async {
@@ -471,21 +485,17 @@ class SocketClient {
471485
.listen((message) => response.addError(message));
472486

473487
if (!_subscriptionInitializers[id]!.hasBeenTriggered) {
488+
GraphQLSocketMessage operation = StartOperation(
489+
id,
490+
serialize(payload),
491+
);
474492
if (protocol == SocketSubProtocol.graphqlTransportWs) {
475-
_write(
476-
SubscribeOperation(
477-
id,
478-
serialize(payload),
479-
),
480-
);
481-
} else {
482-
_write(
483-
StartOperation(
484-
id,
485-
serialize(payload),
486-
),
493+
operation = SubscribeOperation(
494+
id,
495+
serialize(payload),
487496
);
488497
}
498+
_write(operation);
489499
_subscriptionInitializers[id]!.hasBeenTriggered = true;
490500
}
491501
});

0 commit comments

Comments
 (0)