Skip to content

Commit ad8aec7

Browse files
Merge pull request #1060 from maximilianmaihoefner/graphql-transport-ws
Add support for graphql-transport-ws protocol
2 parents 3f8e0e0 + 493ef75 commit ad8aec7

File tree

3 files changed

+164
-23
lines changed

3 files changed

+164
-23
lines changed

packages/graphql/lib/src/links/websocket_link/websocket_client.dart

Lines changed: 85 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -3,20 +3,17 @@ import 'dart:collection';
33
import 'dart:convert';
44
import 'dart:typed_data';
55

6+
import 'package:gql_exec/gql_exec.dart';
7+
import 'package:graphql/src/core/query_options.dart' show WithType;
68
import 'package:graphql/src/links/gql_links.dart';
79
import 'package:graphql/src/utilities/platform.dart';
810
import 'package:meta/meta.dart';
9-
10-
import 'package:graphql/src/core/query_options.dart' show WithType;
11-
import 'package:gql_exec/gql_exec.dart';
12-
13-
import 'package:stream_channel/stream_channel.dart';
14-
import 'package:web_socket_channel/web_socket_channel.dart';
15-
import 'package:web_socket_channel/status.dart' as ws_status;
16-
1711
import 'package:rxdart/rxdart.dart';
12+
import 'package:stream_channel/stream_channel.dart';
1813
import 'package:uuid/uuid.dart';
1914
import 'package:uuid/uuid_util.dart';
15+
import 'package:web_socket_channel/status.dart' as ws_status;
16+
import 'package:web_socket_channel/web_socket_channel.dart';
2017

2118
import './websocket_messages.dart';
2219

@@ -38,7 +35,7 @@ class SubscriptionListener {
3835
SubscriptionListener(this.callback, this.hasBeenTriggered);
3936
}
4037

41-
enum SocketConnectionState { notConnected, connecting, connected }
38+
enum SocketConnectionState { notConnected, handshake, connecting, connected }
4239

4340
class SocketClientConfig {
4441
const SocketClientConfig({
@@ -144,6 +141,13 @@ class SocketClientConfig {
144141
}
145142
}
146143

144+
class SocketSubProtocol {
145+
SocketSubProtocol._();
146+
147+
static const String graphqlWs = "graphql-ws";
148+
static const String graphqlTransportWs = "graphql-transport-ws";
149+
}
150+
147151
/// Wraps a standard web socket instance to marshal and un-marshal the server /
148152
/// client payloads into dart object representation.
149153
///
@@ -155,7 +159,7 @@ class SocketClientConfig {
155159
class SocketClient {
156160
SocketClient(
157161
this.url, {
158-
this.protocols = const ['graphql-ws'],
162+
this.protocol = SocketSubProtocol.graphqlWs,
159163
this.config = const SocketClientConfig(),
160164
@visibleForTesting this.randomBytesForUuid,
161165
@visibleForTesting this.onMessage,
@@ -166,7 +170,7 @@ class SocketClient {
166170

167171
Uint8List? randomBytesForUuid;
168172
final String url;
169-
final Iterable<String>? protocols;
173+
final String protocol;
170174
final SocketClientConfig config;
171175

172176
final BehaviorSubject<SocketConnectionState> _connectionStateController =
@@ -179,6 +183,7 @@ class SocketClient {
179183
bool _wasDisposed = false;
180184

181185
Timer? _reconnectTimer;
186+
Timer? _pingTimer;
182187

183188
@visibleForTesting
184189
GraphQLWebSocketChannel? socketChannel;
@@ -226,11 +231,11 @@ class SocketClient {
226231
/// Connects to the server.
227232
///
228233
/// If this instance is disposed, this method does nothing.
229-
Future<void> _connect() async {
234+
Future<SocketClient> _connect() async {
230235
final InitOperation initOperation = await config.initOperation;
231236

232237
if (_connectionStateController.isClosed || _wasDisposed) {
233-
return;
238+
return this;
234239
}
235240

236241
_connectionStateController.add(SocketConnectionState.connecting);
@@ -239,17 +244,47 @@ class SocketClient {
239244
// Even though config.connect is sync, we call async in order to make the
240245
// SocketConnectionState.connected attribution not overload SocketConnectionState.connecting
241246
var connection =
242-
await config.connect(uri: Uri.parse(url), protocols: protocols);
247+
await config.connect(uri: Uri.parse(url), protocols: [protocol]);
243248
socketChannel = connection.forGraphQL();
244-
_connectionStateController.add(SocketConnectionState.connected);
249+
250+
if (protocol == SocketSubProtocol.graphqlTransportWs) {
251+
_connectionStateController.add(SocketConnectionState.handshake);
252+
} else {
253+
_connectionStateController.add(SocketConnectionState.connected);
254+
}
255+
print('Initialising connection');
245256
_write(initOperation);
257+
if (protocol == SocketSubProtocol.graphqlTransportWs) {
258+
// wait for ack
259+
// this blocks to prevent ping from being called before ack is recieved
260+
await _messages.firstWhere(
261+
(message) => message.type == MessageTypes.connectionAck);
262+
_connectionStateController.add(SocketConnectionState.connected);
263+
}
246264

247265
if (config.inactivityTimeout != null) {
248-
_disconnectOnKeepAliveTimeout(_messages);
266+
if (protocol == SocketSubProtocol.graphqlWs) {
267+
_disconnectOnKeepAliveTimeout(_messages);
268+
}
269+
if (protocol == SocketSubProtocol.graphqlTransportWs) {
270+
_enqueuePing();
271+
}
249272
}
250273

251274
_messageSubscription = _messages.listen(
252-
onMessage,
275+
(message) {
276+
if (onMessage != null) {
277+
onMessage!(message);
278+
}
279+
280+
if (protocol == SocketSubProtocol.graphqlTransportWs) {
281+
if (message.type == 'ping') {
282+
_write(PongMessage());
283+
} else if (message.type == 'pong') {
284+
_enqueuePing();
285+
}
286+
}
287+
},
253288
onDone: onConnectionLost,
254289
// onDone will not be triggered if the subscription is
255290
// auto-cancelled on error; make sure to pass false
@@ -267,6 +302,7 @@ class SocketClient {
267302
} catch (e) {
268303
onConnectionLost(e);
269304
}
305+
return this;
270306
}
271307

272308
void onConnectionLost([Object? e]) async {
@@ -276,6 +312,7 @@ class SocketClient {
276312
}
277313
print('Disconnected from websocket.');
278314
_reconnectTimer?.cancel();
315+
_pingTimer?.cancel();
279316
_keepAliveSubscription?.cancel();
280317
_messageSubscription?.cancel();
281318

@@ -302,6 +339,14 @@ class SocketClient {
302339
}
303340
}
304341

342+
void _enqueuePing() {
343+
_pingTimer?.cancel();
344+
_pingTimer = new Timer(
345+
config.inactivityTimeout!,
346+
() => _write(PingMessage()),
347+
);
348+
}
349+
305350
/// Closes the underlying socket if connected, and stops reconnection attempts.
306351
/// After calling this method, this [SocketClient] instance must be considered
307352
/// unusable. Instead, create a new instance of this class.
@@ -314,6 +359,7 @@ class SocketClient {
314359
_wasDisposed = true;
315360
print('Disposing socket client..');
316361
_reconnectTimer?.cancel();
362+
_pingTimer?.cancel();
317363
_keepAliveSubscription?.cancel();
318364

319365
await Future.wait([
@@ -385,6 +431,10 @@ class SocketClient {
385431
return message.id == id;
386432
}
387433

434+
if (message is SubscriptionNext) {
435+
return message.id == id;
436+
}
437+
388438
if (message is SubscriptionError) {
389439
return message.id == id;
390440
}
@@ -422,18 +472,30 @@ class SocketClient {
422472
parse(message.toJson()),
423473
));
424474

475+
dataErrorComplete
476+
.where((message) => message is SubscriptionNext)
477+
.whereType<SubscriptionNext>()
478+
.listen((message) => response.add(
479+
parse(message.toJson()),
480+
));
481+
425482
dataErrorComplete
426483
.where((message) => message is SubscriptionError)
427484
.cast<SubscriptionError>()
428485
.listen((message) => response.addError(message));
429486

430487
if (!_subscriptionInitializers[id]!.hasBeenTriggered) {
431-
_write(
432-
StartOperation(
488+
GraphQLSocketMessage operation = StartOperation(
489+
id,
490+
serialize(payload),
491+
);
492+
if (protocol == SocketSubProtocol.graphqlTransportWs) {
493+
operation = SubscribeOperation(
433494
id,
434495
serialize(payload),
435-
),
436-
);
496+
);
497+
}
498+
_write(operation);
437499
_subscriptionInitializers[id]!.hasBeenTriggered = true;
438500
}
439501
});
@@ -445,7 +507,8 @@ class SocketClient {
445507
_subscriptionInitializers.remove(id);
446508

447509
sub?.cancel();
448-
if (_connectionStateController.value == SocketConnectionState.connected &&
510+
if (protocol == SocketSubProtocol.graphqlWs &&
511+
_connectionStateController.value == SocketConnectionState.connected &&
449512
socketChannel != null) {
450513
_write(StopOperation(id));
451514
}

packages/graphql/lib/src/links/websocket_link/websocket_link.dart

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
1-
import 'package:gql_link/gql_link.dart';
21
import 'package:gql_exec/gql_exec.dart';
2+
import 'package:gql_link/gql_link.dart';
33

44
import './websocket_client.dart';
55

@@ -16,9 +16,11 @@ class WebSocketLink extends Link {
1616
WebSocketLink(
1717
this.url, {
1818
this.config = const SocketClientConfig(),
19+
this.subProtocol = SocketSubProtocol.graphqlWs,
1920
});
2021

2122
final String url;
23+
final String subProtocol;
2224
final SocketClientConfig config;
2325

2426
// cannot be final because we're changing the instance upon a header change.
@@ -39,6 +41,7 @@ class WebSocketLink extends Link {
3941
_socketClient = SocketClient(
4042
url,
4143
config: config,
44+
protocol: subProtocol,
4245
);
4346
}
4447

packages/graphql/lib/src/links/websocket_link/websocket_messages.dart

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,11 +20,16 @@ class MessageTypes {
2020
static const String connectionKeepAlive = "ka";
2121

2222
// client operations
23+
static const String subscribe = "subscribe";
2324
static const String start = "start";
2425
static const String stop = "stop";
2526

27+
static const String ping = "ping";
28+
static const String pong = "pong";
29+
2630
// server operations
2731
static const String data = "data";
32+
static const String next = "next";
2833
static const String error = "error";
2934
static const String complete = "complete";
3035

@@ -71,13 +76,21 @@ abstract class GraphQLSocketMessage extends JsonSerializable {
7176
return ConnectionKeepAlive();
7277

7378
// for completeness
79+
case MessageTypes.subscribe:
80+
return SubscribeOperation(id, payload);
7481
case MessageTypes.start:
7582
return StartOperation(id, payload);
7683
case MessageTypes.stop:
7784
return StopOperation(id);
85+
case MessageTypes.ping:
86+
return PingMessage(payload);
87+
case MessageTypes.pong:
88+
return PongMessage(payload);
7889

7990
case MessageTypes.data:
8091
return SubscriptionData(id, payload['data'], payload['errors']);
92+
case MessageTypes.next:
93+
return SubscriptionNext(id, payload['data'], payload['errors']);
8194
case MessageTypes.error:
8295
return SubscriptionError(id, payload);
8396
case MessageTypes.complete:
@@ -131,6 +144,46 @@ class QueryPayload extends JsonSerializable {
131144
};
132145
}
133146

147+
class SubscribeOperation extends GraphQLSocketMessage {
148+
SubscribeOperation(this.id, this.payload) : super(MessageTypes.subscribe);
149+
150+
final String id;
151+
152+
final Map<String, dynamic> payload;
153+
154+
@override
155+
toJson() => {
156+
"type": type,
157+
"id": id,
158+
"payload": payload,
159+
};
160+
}
161+
162+
class PingMessage extends GraphQLSocketMessage {
163+
PingMessage([this.payload = const <String, dynamic>{}])
164+
: super(MessageTypes.ping);
165+
166+
final Map<String, dynamic> payload;
167+
168+
@override
169+
toJson() => {
170+
"type": type,
171+
"payload": payload,
172+
};
173+
}
174+
175+
class PongMessage extends GraphQLSocketMessage {
176+
PongMessage([this.payload]) : super(MessageTypes.pong);
177+
178+
final Map<String, dynamic>? payload;
179+
180+
@override
181+
toJson() => {
182+
"type": type,
183+
"payload": payload,
184+
};
185+
}
186+
134187
/// A message to tell the server to create a subscription. The contents of the
135188
/// query will be defined by the payload request. The id provided will be used
136189
/// to tag messages such that they can be identified for this subscription
@@ -209,6 +262,28 @@ class SubscriptionData extends GraphQLSocketMessage {
209262
other is SubscriptionData && jsonEncode(other) == jsonEncode(this);
210263
}
211264

265+
class SubscriptionNext extends GraphQLSocketMessage {
266+
SubscriptionNext(this.id, this.data, this.errors) : super(MessageTypes.next);
267+
268+
final String id;
269+
final dynamic data;
270+
final dynamic errors;
271+
272+
@override
273+
toJson() => {
274+
"type": type,
275+
"data": data,
276+
"errors": errors,
277+
};
278+
279+
@override
280+
int get hashCode => toJson().hashCode;
281+
282+
@override
283+
bool operator ==(dynamic other) =>
284+
other is SubscriptionNext && jsonEncode(other) == jsonEncode(this);
285+
}
286+
212287
/// Errors sent from the server to the client if the subscription operation was
213288
/// not successful, usually due to GraphQL validation errors.
214289
class SubscriptionError extends GraphQLSocketMessage {

0 commit comments

Comments
 (0)