Skip to content

Commit 7e6db35

Browse files
committed
[ DWDS ] Disconnect non-DDS clients when DDS connects
The native VM service disconnects all non-DDS clients when DDS connects in order to keep state consistent. DWDS was never configured to work this way and, instead, would not let DDS connect if there were already existing clients. This is fine most of the time, as DDS is typically the first client, but in cases where multiple DDS instances attempt to start at once (e.g., a simultaneous `flutter run` and `flutter attach`), this inconsistency prevents tools from falling back to using an already connected DDS instance, as seen in flutter/flutter#171758. Fixes #2399
1 parent b947357 commit 7e6db35

File tree

4 files changed

+174
-141
lines changed

4 files changed

+174
-141
lines changed

dwds/lib/src/services/chrome_proxy_service.dart

Lines changed: 46 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,38 @@ import 'package:vm_service/vm_service.dart' hide vmServiceVersion;
3333
import 'package:vm_service_interface/vm_service_interface.dart';
3434
import 'package:webkit_inspection_protocol/webkit_inspection_protocol.dart';
3535

36+
// This event is identical to the one sent by the VM service from
37+
// sdk/lib/vmservice/vmservice.dart before existing VM service clients are
38+
// disconnected.
39+
final class DartDevelopmentServiceConnectedEvent extends Event {
40+
DartDevelopmentServiceConnectedEvent({
41+
required super.timestamp,
42+
required this.uri,
43+
}) : message =
44+
'A Dart Developer Service instance has connected and this direct '
45+
'connection to the VM service will now be closed. Please reconnect to '
46+
'the Dart Development Service at $uri.',
47+
super(kind: 'DartDevelopmentServiceConnected');
48+
49+
final String message;
50+
final String uri;
51+
52+
@override
53+
Map<String, Object?> toJson() => {
54+
...super.toJson(),
55+
'uri': uri,
56+
'message': message,
57+
};
58+
}
59+
60+
final class DisconnectNonDartDevelopmentServiceClients extends RPCError {
61+
DisconnectNonDartDevelopmentServiceClients()
62+
: super('_yieldControlToDDS', kErrorCode);
63+
64+
// Arbitrary error code that's unlikely to be used elsewhere.
65+
static const kErrorCode = -199328;
66+
}
67+
3668
/// A proxy from the chrome debug protocol to the dart vm service protocol.
3769
class ChromeProxyService extends ProxyService {
3870
/// Signals when isolate starts.
@@ -1563,15 +1595,21 @@ class ChromeProxyService extends ProxyService {
15631595

15641596
@override
15651597
Future<void> yieldControlToDDS(String uri) async {
1566-
final canYield = ChromeDebugService.yieldControlToDDS(uri);
1598+
// This will throw an RPCError if there's already an existing DDS instance.
1599+
ChromeDebugService.yieldControlToDDS(uri);
1600+
1601+
// Notify existing clients that DDS has connected and they're about to be
1602+
// disconnected.
1603+
final event = DartDevelopmentServiceConnectedEvent(
1604+
timestamp: DateTime.now().millisecondsSinceEpoch,
1605+
uri: uri,
1606+
);
1607+
streamNotify(EventStreams.kService, event);
15671608

1568-
if (!canYield) {
1569-
throw RPCError(
1570-
'yieldControlToDDS',
1571-
RPCErrorKind.kFeatureDisabled.code,
1572-
'Existing VM service clients prevent DDS from taking control.',
1573-
);
1574-
}
1609+
// We throw since we have no other way to control what the response content
1610+
// is for this RPC. The debug service will check for this particular
1611+
// exception as a signal to close connections to all other clients.
1612+
throw DisconnectNonDartDevelopmentServiceClients();
15751613
}
15761614

15771615
Future<InstanceRef> _instanceRef(RemoteObject? obj) async {

dwds/lib/src/services/debug_service.dart

Lines changed: 98 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -25,57 +25,101 @@ import 'package:shelf/shelf.dart' as shelf;
2525
import 'package:shelf/shelf.dart' hide Response;
2626
import 'package:shelf_web_socket/shelf_web_socket.dart';
2727
import 'package:sse/server/sse_handler.dart';
28+
import 'package:stream_channel/stream_channel.dart';
29+
import 'package:vm_service/vm_service.dart';
2830
import 'package:vm_service_interface/vm_service_interface.dart';
2931
import 'package:web_socket_channel/web_socket_channel.dart';
3032

3133
const _kSseHandlerPath = '\$debugHandler';
3234

3335
bool _acceptNewConnections = true;
34-
int _clientsConnected = 0;
36+
37+
final _clientConnections = <int, StreamChannel>{};
38+
int _clientId = 0;
3539

3640
Logger _logger = Logger('DebugService');
3741

42+
void _handleConnection(
43+
StreamChannel channel,
44+
ChromeProxyService chromeProxyService,
45+
ServiceExtensionRegistry serviceExtensionRegistry, {
46+
void Function(Map<String, Object>)? onRequest,
47+
void Function(Map<String, Object?>)? onResponse,
48+
}) {
49+
final clientId = _clientId++;
50+
final responseController = StreamController<Map<String, Object?>>();
51+
responseController.stream
52+
.asyncMap((response) async {
53+
// This error indicates a successful invocation to _yieldControlToDDS.
54+
// We don't have a good way to access the list of connected clients
55+
// while also being able to determine which client invoked the RPC
56+
// without some form of client ID.
57+
//
58+
// We can probably do better than this, but it will likely involve some
59+
// refactoring.
60+
if (response case {
61+
'error': {
62+
'code': DisconnectNonDartDevelopmentServiceClients.kErrorCode,
63+
},
64+
}) {
65+
final nonDdsClients = _clientConnections.entries
66+
.where((MapEntry<int, StreamChannel> e) => e.key != clientId)
67+
.map((e) => e.value);
68+
await Future.wait([
69+
for (final client in nonDdsClients) client.sink.close(),
70+
]);
71+
// Remove the artificial error and return Success.
72+
response.remove('error');
73+
response['result'] = Success().toJson();
74+
}
75+
if (onResponse != null) onResponse(response);
76+
channel.sink.add(jsonEncode(response));
77+
})
78+
.listen(channel.sink.add, onError: channel.sink.addError);
79+
final inputStream = channel.stream.map((value) {
80+
if (value is List<int>) {
81+
value = utf8.decode(value);
82+
} else if (value is! String) {
83+
throw StateError(
84+
'Got value with unexpected type ${value.runtimeType} from web '
85+
'socket, expected a List<int> or String.',
86+
);
87+
}
88+
final request = Map<String, Object>.from(jsonDecode(value));
89+
if (onRequest != null) onRequest(request);
90+
return request;
91+
});
92+
VmServerConnection(
93+
inputStream,
94+
responseController.sink,
95+
serviceExtensionRegistry,
96+
chromeProxyService,
97+
).done.whenComplete(() {
98+
_clientConnections.remove(clientId);
99+
if (!_acceptNewConnections && _clientConnections.isEmpty) {
100+
// DDS has disconnected so we can allow for clients to connect directly
101+
// to DWDS.
102+
ChromeDebugService._ddsUri = null;
103+
_acceptNewConnections = true;
104+
}
105+
});
106+
_clientConnections[clientId] = channel;
107+
}
108+
38109
void Function(WebSocketChannel, String?) _createNewConnectionHandler(
39110
ChromeProxyService chromeProxyService,
40111
ServiceExtensionRegistry serviceExtensionRegistry, {
41112
void Function(Map<String, Object>)? onRequest,
42113
void Function(Map<String, Object?>)? onResponse,
43114
}) {
44115
return (webSocket, subprotocol) {
45-
final responseController = StreamController<Map<String, Object?>>();
46-
webSocket.sink.addStream(
47-
responseController.stream.map((response) {
48-
if (onResponse != null) onResponse(response);
49-
return jsonEncode(response);
50-
}),
51-
);
52-
final inputStream = webSocket.stream.map((value) {
53-
if (value is List<int>) {
54-
value = utf8.decode(value);
55-
} else if (value is! String) {
56-
throw StateError(
57-
'Got value with unexpected type ${value.runtimeType} from web '
58-
'socket, expected a List<int> or String.',
59-
);
60-
}
61-
final request = Map<String, Object>.from(jsonDecode(value));
62-
if (onRequest != null) onRequest(request);
63-
return request;
64-
});
65-
++_clientsConnected;
66-
VmServerConnection(
67-
inputStream,
68-
responseController.sink,
69-
serviceExtensionRegistry,
116+
_handleConnection(
117+
webSocket,
70118
chromeProxyService,
71-
).done.whenComplete(() {
72-
--_clientsConnected;
73-
if (!_acceptNewConnections && _clientsConnected == 0) {
74-
// DDS has disconnected so we can allow for clients to connect directly
75-
// to DWDS.
76-
_acceptNewConnections = true;
77-
}
78-
});
119+
serviceExtensionRegistry,
120+
onRequest: onRequest,
121+
onResponse: onResponse,
122+
);
79123
};
80124
}
81125

@@ -88,41 +132,12 @@ Future<void> _handleSseConnections(
88132
}) async {
89133
while (await handler.connections.hasNext) {
90134
final connection = await handler.connections.next;
91-
final responseController = StreamController<Map<String, Object?>>();
92-
final sub = responseController.stream
93-
.map((response) {
94-
if (onResponse != null) onResponse(response);
95-
return jsonEncode(response);
96-
})
97-
.listen(connection.sink.add);
98-
safeUnawaited(
99-
chromeProxyService.remoteDebugger.onClose.first.whenComplete(() {
100-
connection.sink.close();
101-
sub.cancel();
102-
}),
103-
);
104-
final inputStream = connection.stream.map((value) {
105-
final request = jsonDecode(value) as Map<String, Object>;
106-
if (onRequest != null) onRequest(request);
107-
return request;
108-
});
109-
++_clientsConnected;
110-
final vmServerConnection = VmServerConnection(
111-
inputStream,
112-
responseController.sink,
113-
serviceExtensionRegistry,
135+
_handleConnection(
136+
connection,
114137
chromeProxyService,
115-
);
116-
safeUnawaited(
117-
vmServerConnection.done.whenComplete(() {
118-
--_clientsConnected;
119-
if (!_acceptNewConnections && _clientsConnected == 0) {
120-
// DDS has disconnected so we can allow for clients to connect directly
121-
// to DWDS.
122-
_acceptNewConnections = true;
123-
}
124-
return sub.cancel();
125-
}),
138+
serviceExtensionRegistry,
139+
onRequest: onRequest,
140+
onResponse: onResponse,
126141
);
127142
}
128143
}
@@ -224,15 +239,19 @@ class ChromeDebugService implements DebugService {
224239
return _encodedUri = encoded;
225240
}
226241

227-
// TODO(https://github.com/dart-lang/webdev/issues/2399): yieldControlToDDS
228-
// should disconnect existing non-DDS clients.
229-
static bool yieldControlToDDS(String uri) {
230-
if (_clientsConnected > 1) {
231-
return false;
242+
static void yieldControlToDDS(String uri) {
243+
if (_ddsUri != null) {
244+
// This exception is identical to the one thrown from
245+
// sdk/lib/vmservice/vmservice.dart
246+
throw RPCError(
247+
'_yieldControlToDDS',
248+
RPCErrorKind.kFeatureDisabled.code,
249+
'A DDS instance is already connected at $_ddsUri.',
250+
{'ddsUri': _ddsUri.toString()},
251+
);
232252
}
233-
_ddsUri = uri;
234253
_acceptNewConnections = false;
235-
return true;
254+
_ddsUri = uri;
236255
}
237256

238257
static Future<ChromeDebugService> start(
@@ -441,6 +460,7 @@ class WebSocketDebugService implements DebugService {
441460
WebSocketProxyService webSocketProxyService,
442461
) {
443462
return webSocketHandler((WebSocketChannel webSocket, String? subprotocol) {
463+
final clientId = _clientId++;
444464
final responseController = StreamController<Map<String, Object?>>();
445465
webSocket.sink.addStream(responseController.stream.map(jsonEncode));
446466

@@ -455,14 +475,15 @@ class WebSocketDebugService implements DebugService {
455475
return Map<String, Object>.from(jsonDecode(value));
456476
});
457477

458-
++_clientsConnected;
478+
_clientConnections[clientId] = webSocket;
479+
459480
VmServerConnection(
460481
inputStream,
461482
responseController.sink,
462483
serviceExtensionRegistry,
463484
webSocketProxyService,
464485
).done.whenComplete(() {
465-
--_clientsConnected;
486+
_clientConnections.remove(clientId);
466487
});
467488
});
468489
}

dwds/pubspec.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ dependencies:
3232
shelf_web_socket: '>=2.0.0 <4.0.0'
3333
source_maps: ^0.10.10
3434
stack_trace: ^1.10.0
35+
stream_channel: ^2.1.4
3536
sse: ^4.1.2
3637
uuid: ^4.0.0
3738
vm_service: '>=14.2.4 <16.0.0'

0 commit comments

Comments
 (0)