diff --git a/pkgs/dart_services/lib/server.dart b/pkgs/dart_services/lib/server.dart index 0df8abcd1..bb847ef40 100644 --- a/pkgs/dart_services/lib/server.dart +++ b/pkgs/dart_services/lib/server.dart @@ -146,6 +146,10 @@ Middleware exceptionResponse() { return (Request request) async { try { return await handler(request); + } on HijackException { + // We ignore hijack exceptions as they are not error conditions; they're + // used used for control flow when upgrading websocket connections. + rethrow; } catch (e, st) { if (e is BadRequest) { return Response.badRequest(body: e.message); diff --git a/pkgs/dart_services/lib/src/common_server.dart b/pkgs/dart_services/lib/src/common_server.dart index f347ab1af..4c0f6756f 100644 --- a/pkgs/dart_services/lib/src/common_server.dart +++ b/pkgs/dart_services/lib/src/common_server.dart @@ -7,11 +7,14 @@ import 'dart:convert'; import 'dart:io'; import 'package:dartpad_shared/model.dart' as api; +import 'package:dartpad_shared/ws.dart'; import 'package:http/http.dart' as http; import 'package:logging/logging.dart'; import 'package:shelf/shelf.dart'; import 'package:shelf_router/shelf_router.dart'; import 'package:shelf_static/shelf_static.dart'; +import 'package:shelf_web_socket/shelf_web_socket.dart'; +import 'package:web_socket_channel/web_socket_channel.dart'; import 'analysis.dart'; import 'caching.dart'; @@ -37,9 +40,9 @@ class CommonServerImpl { final Sdk sdk; final ServerCache cache; - late Analyzer analyzer; - late Compiler compiler; - final ai = GenerativeAI(); + late final Analyzer analyzer; + late final Compiler compiler; + final GenerativeAI ai = GenerativeAI(); CommonServerImpl(this.sdk, this.cache); @@ -73,6 +76,9 @@ class CommonServerApi { // general requests (GET) router.get(r'/api//version', handleVersion); + // websocket requests + router.get(r'/ws', webSocketHandler(handleWebSocket)); + // serve the compiled artifacts final artifactsDir = Directory('artifacts'); if (artifactsDir.existsSync()) { @@ -115,6 +121,52 @@ class CommonServerApi { return ok(version().toJson()); } + /// Handle a new websocket connection request. + /// + /// Handle incoming requests, convert them to exising commands and dispatch + /// them appropriately. The commands and responses mirror the existing REST + /// protocol. + /// + /// This will be a long-running conneciton to the client. + void handleWebSocket(WebSocketChannel webSocket, String? subprotocol) { + webSocket.stream.listen( + (message) { + try { + // Handle incoming WebSocket messages + final request = JsonRpcRequest.fromJson(message as String); + log.genericInfo('ws request: ${request.method}'); + JsonRpcResponse? response; + + switch (request.method) { + case 'version': + final v = version(); + response = request.createResultResponse(v.toJson()); + break; + default: + response = request.createErrorResponse( + 'unknown command: ${request.method}', + ); + break; + } + + webSocket.sink.add(jsonEncode(response.toJson())); + log.genericInfo( + 'ws response: ' + '${request.method} ${response.error != null ? '500' : '200'}', + ); + } catch (e) { + log.genericSevere('error handling websocket request', error: e); + } + }, + onDone: () { + // Nothing to clean up here. + }, + onError: (Object error) { + log.genericSevere('error from websocket connection', error: error); + }, + ); + } + Future handleAnalyze(Request request, String apiVersion) async { if (apiVersion != api3) return unhandledVersion(apiVersion); @@ -512,7 +564,6 @@ Middleware logRequestsToLogger(DartPadLogger log) { final watch = Stopwatch()..start(); final ctx = DartPadRequestContext.fromRequest(request); - log.genericInfo('received request, enableLogging=${ctx.enableLogging}'); return Future.sync(() => innerHandler(request)).then( (response) { @@ -524,7 +575,11 @@ Middleware logRequestsToLogger(DartPadLogger log) { return response; }, onError: (Object error, StackTrace stackTrace) { - if (error is HijackException) throw error; + if (error is HijackException) { + log.info(_formatMessage(request, watch.elapsed), ctx); + + throw error; + } log.info(_formatMessage(request, watch.elapsed, error: error), ctx); diff --git a/pkgs/dart_services/pubspec.yaml b/pkgs/dart_services/pubspec.yaml index 1c042fed0..51ebec00b 100644 --- a/pkgs/dart_services/pubspec.yaml +++ b/pkgs/dart_services/pubspec.yaml @@ -26,6 +26,8 @@ dependencies: shelf_gzip: ^4.1.0 shelf_router: ^1.1.4 shelf_static: ^1.1.0 + shelf_web_socket: ^3.0.0 + web_socket_channel: ^3.0.0 yaml: ^3.1.3 dev_dependencies: diff --git a/pkgs/dartpad_shared/lib/backend_client.dart b/pkgs/dartpad_shared/lib/backend_client.dart index 8bcc581b3..dca3cc530 100644 --- a/pkgs/dartpad_shared/lib/backend_client.dart +++ b/pkgs/dartpad_shared/lib/backend_client.dart @@ -9,7 +9,6 @@ import 'package:http/http.dart'; import 'headers.dart'; class DartServicesHttpClient { - final Client _client = Client(); static Map _headers = DartPadRequestHeaders( enableLogging: true, ).encoded; @@ -19,7 +18,7 @@ class DartServicesHttpClient { _headers = DartPadRequestHeaders(enableLogging: false).encoded; } - void close() => _client.close(); + final Client _client = Client(); Future get(String url) async { return await _client.get(Uri.parse(url), headers: _headers); @@ -46,4 +45,6 @@ class DartServicesHttpClient { return await _client.send(httpRequest); } + + void close() => _client.close(); } diff --git a/pkgs/dartpad_shared/lib/services.dart b/pkgs/dartpad_shared/lib/services.dart index 1d43bbc76..5e5cc7d11 100644 --- a/pkgs/dartpad_shared/lib/services.dart +++ b/pkgs/dartpad_shared/lib/services.dart @@ -2,10 +2,14 @@ // for details. All rights reserved. Use of this source code is governed by a // BSD-style license that can be found in the LICENSE file. +import 'dart:async'; import 'dart:convert'; +import 'package:web_socket/web_socket.dart'; + import 'backend_client.dart'; import 'model.dart'; +import 'ws.dart'; export 'model.dart'; @@ -131,6 +135,81 @@ class DartServicesClient { } } +/// A websocket analog to [DartServicesClient]. +class WebsocketServicesClient { + final Uri wsUrl; + final WebSocket socket; + final IDFactory idFactory = IDFactory(); + + final Map> responseCompleters = {}; + final Map)> responseDecoders = {}; + + final Completer _closedCompleter = Completer(); + + WebsocketServicesClient._(this.wsUrl, this.socket); + + static Future connect(String rootUrl) async { + final url = Uri.parse(rootUrl); + final wsUrl = url.replace( + scheme: url.scheme == 'https' ? 'wss' : 'ws', + path: 'ws', + ); + final socket = await WebSocket.connect(wsUrl); + final client = WebsocketServicesClient._(wsUrl, socket); + client._init(); + return client; + } + + void _init() { + socket.events.listen((e) async { + switch (e) { + case TextDataReceived(text: final text): + _dispatch(JsonRpcResponse.fromJson(text)); + break; + case BinaryDataReceived(data: final _): + // Ignore - binary data is unsupported. + break; + case CloseReceived(code: final _, reason: final _): + // Notify that the server connection has closed. + _closedCompleter.complete(); + break; + } + }); + } + + Future get onClosed => _closedCompleter.future; + + Future version() { + final requestId = idFactory.generateNextId(); + final completer = Completer(); + + responseCompleters[requestId] = completer; + responseDecoders[requestId] = VersionResponse.fromJson; + + socket.sendText( + jsonEncode(JsonRpcRequest(method: 'version', id: requestId).toJson()), + ); + + return completer.future; + } + + Future dispose() => socket.close(); + + void _dispatch(JsonRpcResponse response) { + final id = response.id; + + final completer = responseCompleters[id]!; + final decoder = responseDecoders[id]!; + + if (response.error != null) { + completer.completeError(response.error!); + } else { + final result = decoder((response.result! as Map).cast()); + completer.complete(result); + } + } +} + class ApiRequestError implements Exception { ApiRequestError(this.message, this.body); diff --git a/pkgs/dartpad_shared/lib/ws.dart b/pkgs/dartpad_shared/lib/ws.dart new file mode 100644 index 000000000..9f6e16120 --- /dev/null +++ b/pkgs/dartpad_shared/lib/ws.dart @@ -0,0 +1,134 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +import 'dart:convert'; + +import 'model.dart'; + +/// A request in JSON-RPC format. +class JsonRpcRequest { + /// The name of the method to be invoked. + final String method; + + /// The event ID; if null, this event is a notification. + final int? id; + + /// A structured value that holds the parameter values to be used during the + /// invocation of the method. + final Map? params; + + JsonRpcRequest({required this.method, this.id, this.params}); + + factory JsonRpcRequest.fromJson(String val) { + final json = (jsonDecode(val) as Map).cast(); + return JsonRpcRequest( + method: json['method'] as String, + id: json['id'] as int?, + params: (json['params'] as Map?)?.cast(), + ); + } + + JsonRpcResponse createResultResponse(Map result) => + JsonRpcResponse(id: id!, result: result); + + JsonRpcResponse createErrorResponse(Object error) => + JsonRpcResponse(id: id!, error: error); + + Map toJson() => { + 'id': id, + 'method': method, + if (params != null) 'params': params, + }; +} + +/// A JSON-RPC response. +class JsonRpcResponse { + /// This must be the same as the value of the id member in the request object. + final int id; + + /// This member is required on success. + /// + /// This member must not exist if there was an error invoking the method. + /// + /// The value of this member is determined by the method invoked on the + /// server. + final Object? result; + + /// This member is required on error. + /// + /// This member must not exist if there was no error triggered during + /// invocation. + final Object? error; + + JsonRpcResponse({required this.id, this.result, this.error}); + + factory JsonRpcResponse.fromJson(String val) { + final json = (jsonDecode(val) as Map).cast(); + return JsonRpcResponse( + id: json['id'] as int, + result: json['result'], + error: json['error'], + ); + } + + Map toJson() => { + 'id': id, + if (result != null) 'result': result, + if (error != null) 'error': error, + }; +} + +/// This represents a websocket command that can be sent over the wire (aka, a +/// version command, and analyze command, ...). +abstract class WsCommand { + /// Convert this command into a websocket formatted request. + JsonRpcRequest createRequest(IDFactory idFactory); + + /// Given a json response to this command, parse it into the expected format. + /// + /// For example, a `VersionCommand` might return a `VersionResponse` from this + /// method. + T parseResponse(Map response); +} + +class VersionCommand extends WsCommand { + static const name = 'version'; + + /// This command takes no parameters. + VersionCommand(); + + @override + JsonRpcRequest createRequest(IDFactory idFactory) { + return JsonRpcRequest(method: name, id: idFactory.generateNextId()); + } + + @override + VersionResponse parseResponse(Map response) { + return _handleParseResponse(VersionResponse.fromJson, response); + } +} + +T _handleParseResponse( + T Function(Map) decode, + Map response, +) { + if (response.containsKey('error')) { + // ignore: only_throw_errors + throw response['error']!; + } else { + final result = (response['result'] as Map).cast(); + return decode(result); + } +} + +/// A class to generate a monotonically increasing sequence of numbers. +class IDFactory { + int _next = 0; + + int generateNextId() { + final id = _next; + _next++; + return id; + } +} diff --git a/pkgs/dartpad_shared/pubspec.yaml b/pkgs/dartpad_shared/pubspec.yaml index fe5c3febc..13d314825 100644 --- a/pkgs/dartpad_shared/pubspec.yaml +++ b/pkgs/dartpad_shared/pubspec.yaml @@ -16,6 +16,7 @@ dependencies: http: ^1.3.0 json_annotation: ^4.9.0 meta: ^1.15.0 + web_socket: ^1.0.0 dev_dependencies: build_runner: ^2.4.15 diff --git a/pkgs/dartpad_ui/lib/model/model.dart b/pkgs/dartpad_ui/lib/model/model.dart index 7a286b201..c837e6c0f 100644 --- a/pkgs/dartpad_ui/lib/model/model.dart +++ b/pkgs/dartpad_ui/lib/model/model.dart @@ -14,6 +14,12 @@ import '../primitives/gists.dart'; import '../primitives/samples.g.dart'; import '../primitives/utils.dart'; +/// A compile-time flag to control making requests over websockets. +/// +/// Do not check this in `true`; this will create a long-lived connection to the +/// backend and we don't yet know how well that will scale. +const useWebsockets = false; + abstract class ExecutionService { Future execute( Channel usingChannel, @@ -191,8 +197,8 @@ class AppServices { final AppModel appModel; final ValueNotifier _channel = ValueNotifier(Channel.defaultChannel); - final _httpClient = DartServicesHttpClient(); late DartServicesClient services; + WebsocketServicesClient? webSocketServices; ExecutionService? _executionService; EditorService? _editorService; @@ -213,7 +219,10 @@ class AppServices { AppServices(this.appModel, Channel channel) { _channel.value = channel; - services = DartServicesClient(_httpClient, rootUrl: channel.url); + services = DartServicesClient( + DartServicesHttpClient(), + rootUrl: channel.url, + ); appModel.sourceCodeController.addListener(_handleCodeChanged); appModel.analysisIssues.addListener(_updateEditorProblemsStatus); @@ -234,7 +243,15 @@ class AppServices { ValueListenable get channel => _channel; Future setChannel(Channel channel) async { - services = DartServicesClient(_httpClient, rootUrl: channel.url); + services = DartServicesClient(services.client, rootUrl: channel.url); + + // Tear this down if using websockets; this will be recreated automatically + // as necessary. + if (useWebsockets) { + webSocketServices?.dispose(); + webSocketServices = null; + } + final versionResponse = await populateVersions(); _channel.value = channel; return versionResponse; @@ -270,7 +287,20 @@ class AppServices { } Future populateVersions() async { - final version = await services.version(); + VersionResponse version; + + if (useWebsockets) { + // Since using websockets is a compile-time config option, and it's only + // used for the version call, we create it here on demand. + webSocketServices ??= await WebsocketServicesClient.connect( + services.rootUrl, + ); + + version = await webSocketServices!.version(); + } else { + version = await services.version(); + } + appModel.runtimeVersions.value = version; return version; } @@ -576,7 +606,7 @@ class AppServices { } void dispose() { - _httpClient.close(); + services.dispose(); appModel.sourceCodeController.removeListener(_handleCodeChanged); } diff --git a/pkgs/dartpad_ui/pubspec.yaml b/pkgs/dartpad_ui/pubspec.yaml index 27cf71ce6..e0e5cfa91 100644 --- a/pkgs/dartpad_ui/pubspec.yaml +++ b/pkgs/dartpad_ui/pubspec.yaml @@ -33,16 +33,17 @@ dependencies: url_launcher: ^6.3.0 vtable: ^0.4.0 web: ^1.1.0 + web_socket: ^1.0.0 dev_dependencies: - integration_test: - sdk: flutter + dart_services: + path: ../dart_services + flutter_lints: ^6.0.0 flutter_test: sdk: flutter - flutter_lints: ^6.0.0 + integration_test: + sdk: flutter test: ^1.25.7 - dart_services: - path: ../dart_services flutter: uses-material-design: true