Skip to content

Commit f856f29

Browse files
Merge pull request #1145 from othorin/fix-ws-connection-init
fix(graphql): send connection_init message during handshake
2 parents 02be959 + 7d9b8d3 commit f856f29

File tree

4 files changed

+305
-17
lines changed

4 files changed

+305
-17
lines changed

melos.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,7 @@ scripts:
6565

6666
client_test_coverage:
6767
description: Run tests in a specific package.
68-
run: melos exec --concurrency=2 -- "dart run test --coverage="coverage" && dart run coverage:format_coverage --lcov --in=coverage --out=coverage.lcov --packages=.packages --report-on=lib"
68+
run: melos exec --concurrency=2 -- "dart run test --coverage="coverage" && dart run coverage:format_coverage --lcov --in=coverage --out=coverage.lcov --packages=.dart_tool/package_config.json --report-on=lib"
6969
select-package:
7070
scope: "graphql"
7171
dir-exists:

packages/graphql/lib/src/links/websocket_link/websocket_client.dart

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -370,13 +370,18 @@ class SocketClient {
370370
}
371371

372372
void _write(final GraphQLSocketMessage message) {
373-
if (_connectionStateController.value == SocketConnectionState.connected) {
374-
socketChannel!.sink.add(
375-
json.encode(
376-
message,
377-
toEncodable: (dynamic m) => m.toJson(),
378-
),
379-
);
373+
switch (_connectionStateController.value) {
374+
case SocketConnectionState.connected:
375+
case SocketConnectionState.handshake:
376+
socketChannel!.sink.add(
377+
json.encode(
378+
message,
379+
toEncodable: (dynamic m) => m.toJson(),
380+
),
381+
);
382+
break;
383+
default:
384+
break;
380385
}
381386
}
382387

packages/graphql/test/mock_server/ws_echo_server.dart

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
/// to run the test and cover the web socket test
33
///
44
/// author: https://github.com/vincenzopalazzo
5+
import 'dart:convert';
56
import 'dart:io';
67

78
const String forceDisconnectCommand = '___force_disconnect___';
@@ -17,10 +18,19 @@ Future<String> runWebSocketServer(
1718
/// Handle event received on server.
1819
void onWebSocketData(WebSocket client) {
1920
client.listen((data) async {
20-
if (data != null && data.toString().contains(forceDisconnectCommand)) {
21+
if (data == forceDisconnectCommand) {
2122
client.close(WebSocketStatus.normalClosure, 'shutting down');
2223
} else {
23-
client.add(data);
24+
final message = json.decode(data.toString());
25+
if (message['type'] == 'connection_init' &&
26+
message['payload']?['protocol'] == 'graphql-transport-ws') {
27+
client.add(json.encode({
28+
'type': 'connection_ack',
29+
'payload': null,
30+
}));
31+
} else {
32+
client.add(data);
33+
}
2434
}
2535
});
2636
}

packages/graphql/test/websocket_test.dart

Lines changed: 280 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -12,19 +12,24 @@ import './helpers.dart';
1212
import './mock_server/ws_echo_server.dart';
1313
import 'mock_server/ws_echo_server.dart';
1414

15-
SocketClient getTestClient(
16-
{required String wsUrl,
17-
StreamController<dynamic>? controller,
18-
bool autoReconnect = true,
19-
Map<String, dynamic>? customHeaders,
20-
Duration delayBetweenReconnectionAttempts =
21-
const Duration(milliseconds: 1)}) =>
15+
SocketClient getTestClient({
16+
required String wsUrl,
17+
StreamController<dynamic>? controller,
18+
bool autoReconnect = true,
19+
Map<String, dynamic>? customHeaders,
20+
Duration delayBetweenReconnectionAttempts = const Duration(milliseconds: 1),
21+
String protocol = SocketSubProtocol.graphqlWs,
22+
}) =>
2223
SocketClient(
2324
wsUrl,
25+
protocol: protocol,
2426
config: SocketClientConfig(
2527
autoReconnect: autoReconnect,
2628
headers: customHeaders,
2729
delayBetweenReconnectionAttempts: delayBetweenReconnectionAttempts,
30+
initialPayload: {
31+
'protocol': protocol,
32+
},
2833
),
2934
randomBytesForUuid: Uint8List.fromList(
3035
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16],
@@ -324,6 +329,274 @@ Future<void> main() async {
324329
});
325330
}, tags: "integration");
326331

332+
group('SocketClient without payload graphql-transport-ws', () {
333+
late SocketClient socketClient;
334+
StreamController<dynamic> controller;
335+
final expectedMessage = r'{'
336+
r'"type":"subscribe","id":"01020304-0506-4708-890a-0b0c0d0e0f10",'
337+
r'"payload":{"operationName":null,"variables":{},"query":"subscription {\n \n}"}'
338+
r'}';
339+
setUp(overridePrint((log) {
340+
controller = StreamController(sync: true);
341+
socketClient = getTestClient(
342+
controller: controller,
343+
protocol: SocketSubProtocol.graphqlTransportWs,
344+
wsUrl: wsUrl,
345+
);
346+
}));
347+
tearDown(overridePrint(
348+
(log) => socketClient.dispose(),
349+
));
350+
test('connection graphql-transport-ws', () async {
351+
await expectLater(
352+
socketClient.connectionState.asBroadcastStream(),
353+
emitsInOrder(
354+
[
355+
SocketConnectionState.connecting,
356+
SocketConnectionState.handshake,
357+
SocketConnectionState.connected,
358+
],
359+
),
360+
);
361+
});
362+
test('disconnect via dispose graphql-transport-ws', () async {
363+
// First wait for connection to complete
364+
await expectLater(
365+
socketClient.connectionState.asBroadcastStream(),
366+
emitsInOrder(
367+
[
368+
SocketConnectionState.connecting,
369+
SocketConnectionState.handshake,
370+
SocketConnectionState.connected,
371+
],
372+
),
373+
);
374+
375+
// We need to begin waiting on the connectionState
376+
// before we issue the command to disconnect; otherwise
377+
// it can reconnect so fast that it will be reconnected
378+
// by the time that the expectLater check is initiated.
379+
await overridePrint((_) async {
380+
Timer(const Duration(milliseconds: 20), () async {
381+
await socketClient.dispose();
382+
});
383+
})();
384+
// The connectionState BehaviorController emits the current state
385+
// to any new listener, so we expect it to start in the connected
386+
// state and transition to notConnected because of dispose.
387+
await expectLater(
388+
socketClient.connectionState,
389+
emitsInOrder([
390+
SocketConnectionState.connected,
391+
SocketConnectionState.notConnected,
392+
]),
393+
);
394+
395+
// Have to wait for socket close to be fully processed after we reach
396+
// the notConnected state, including updating channel with close code.
397+
await Future<void>.delayed(const Duration(milliseconds: 20));
398+
399+
// The websocket should be in a fully closed state at this point,
400+
// we should have a confirmed close code in the channel.
401+
expect(socketClient.socketChannel, isNotNull);
402+
expect(socketClient.socketChannel!.closeCode, isNotNull);
403+
});
404+
test('subscription data graphql-transport-ws', () async {
405+
final payload = Request(
406+
operation: Operation(document: parseString('subscription {}')),
407+
);
408+
final waitForConnection = true;
409+
final subscriptionDataStream =
410+
socketClient.subscribe(payload, waitForConnection);
411+
await socketClient.connectionState
412+
.where((state) => state == SocketConnectionState.connected)
413+
.first;
414+
415+
// ignore: unawaited_futures
416+
socketClient.socketChannel!.stream
417+
.where((message) => message == expectedMessage)
418+
.first
419+
.then((_) {
420+
socketClient.socketChannel!.sink.add(jsonEncode({
421+
'type': 'next',
422+
'id': '01020304-0506-4708-890a-0b0c0d0e0f10',
423+
'payload': {
424+
'data': {'foo': 'bar'},
425+
'errors': [
426+
{'message': 'error and data can coexist'}
427+
]
428+
}
429+
}));
430+
});
431+
432+
await expectLater(
433+
subscriptionDataStream,
434+
emits(
435+
// todo should ids be included in response context? probably '01020304-0506-4708-890a-0b0c0d0e0f10'
436+
Response(
437+
data: {'foo': 'bar'},
438+
errors: [
439+
GraphQLError(message: 'error and data can coexist'),
440+
],
441+
context: Context().withEntry(ResponseExtensions(null)),
442+
response: {
443+
"type": "next",
444+
"data": {"foo": "bar"},
445+
"errors": [
446+
{"message": "error and data can coexist"}
447+
]
448+
},
449+
),
450+
),
451+
);
452+
});
453+
test('resubscribe', () async {
454+
final payload = Request(
455+
operation: Operation(document: gql('subscription {}')),
456+
);
457+
final waitForConnection = true;
458+
final subscriptionDataStream =
459+
socketClient.subscribe(payload, waitForConnection);
460+
461+
await expectLater(
462+
socketClient.connectionState,
463+
emitsInOrder([
464+
SocketConnectionState.connecting,
465+
SocketConnectionState.handshake,
466+
SocketConnectionState.connected,
467+
]),
468+
);
469+
470+
await overridePrint((_) async {
471+
socketClient.onConnectionLost();
472+
})();
473+
474+
await expectLater(
475+
socketClient.connectionState,
476+
emitsInOrder([
477+
SocketConnectionState.notConnected,
478+
SocketConnectionState.connecting,
479+
SocketConnectionState.handshake,
480+
SocketConnectionState.connected,
481+
]),
482+
);
483+
484+
// ignore: unawaited_futures
485+
socketClient.socketChannel!.stream
486+
.where((message) => message == expectedMessage)
487+
.first
488+
.then((_) {
489+
socketClient.socketChannel!.sink.add(jsonEncode({
490+
'type': 'next',
491+
'id': '01020304-0506-4708-890a-0b0c0d0e0f10',
492+
'payload': {
493+
'data': {'foo': 'bar'},
494+
'errors': [
495+
{'message': 'error and data can coexist'}
496+
]
497+
}
498+
}));
499+
});
500+
501+
await expectLater(
502+
subscriptionDataStream,
503+
emits(
504+
// todo should ids be included in response context? probably '01020304-0506-4708-890a-0b0c0d0e0f10'
505+
Response(
506+
data: {'foo': 'bar'},
507+
errors: [
508+
GraphQLError(message: 'error and data can coexist'),
509+
],
510+
context: Context().withEntry(ResponseExtensions(null)),
511+
response: {
512+
"type": "next",
513+
"data": {"foo": "bar"},
514+
"errors": [
515+
{"message": "error and data can coexist"}
516+
]
517+
},
518+
),
519+
),
520+
);
521+
});
522+
test('resubscribe after server disconnect', () async {
523+
final payload = Request(
524+
operation: Operation(document: gql('subscription {}')),
525+
);
526+
final waitForConnection = true;
527+
final subscriptionDataStream =
528+
socketClient.subscribe(payload, waitForConnection);
529+
530+
await expectLater(
531+
socketClient.connectionState,
532+
emitsInOrder([
533+
SocketConnectionState.connecting,
534+
SocketConnectionState.handshake,
535+
SocketConnectionState.connected,
536+
]),
537+
);
538+
539+
// We need to begin waiting on the connectionState
540+
// before we issue the command to disconnect; otherwise
541+
// it can reconnect so fast that it will be reconnected
542+
// by the time that the expectLater check is initiated.
543+
Timer(const Duration(milliseconds: 20), () async {
544+
socketClient.socketChannel!.sink.add(forceDisconnectCommand);
545+
});
546+
// The connectionState BehaviorController emits the current state
547+
// to any new listener, so we expect it to start in the connected
548+
// state, transition to notConnected, and then reconnect after that.
549+
await expectLater(
550+
socketClient.connectionState,
551+
emitsInOrder([
552+
SocketConnectionState.connected,
553+
SocketConnectionState.notConnected,
554+
SocketConnectionState.connecting,
555+
SocketConnectionState.handshake,
556+
SocketConnectionState.connected,
557+
]),
558+
);
559+
560+
// ignore: unawaited_futures
561+
socketClient.socketChannel!.stream
562+
.where((message) => message == expectedMessage)
563+
.first
564+
.then((_) {
565+
socketClient.socketChannel!.sink.add(jsonEncode({
566+
'type': 'next',
567+
'id': '01020304-0506-4708-890a-0b0c0d0e0f10',
568+
'payload': {
569+
'data': {'foo': 'bar'},
570+
'errors': [
571+
{'message': 'error and data can coexist'}
572+
]
573+
}
574+
}));
575+
});
576+
577+
await expectLater(
578+
subscriptionDataStream,
579+
emits(
580+
// todo should ids be included in response context? probably '01020304-0506-4708-890a-0b0c0d0e0f10'
581+
Response(
582+
data: {'foo': 'bar'},
583+
errors: [
584+
GraphQLError(message: 'error and data can coexist'),
585+
],
586+
context: Context().withEntry(ResponseExtensions(null)),
587+
response: {
588+
"type": "next",
589+
"data": {"foo": "bar"},
590+
"errors": [
591+
{"message": "error and data can coexist"}
592+
]
593+
},
594+
),
595+
),
596+
);
597+
});
598+
}, tags: "integration");
599+
327600
group('SocketClient without autoReconnect', () {
328601
late SocketClient socketClient;
329602
StreamController<dynamic> controller;

0 commit comments

Comments
 (0)