Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions pkgs/dart_services/lib/server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,10 @@ Middleware exceptionResponse() {
return (Request request) async {
try {
return await handler(request);
} on HijackException {
// We ignore hijack exceptions as they are not error conditions; they're
// used used for control flow when upgrading websocket connections.
rethrow;
} catch (e, st) {
if (e is BadRequest) {
return Response.badRequest(body: e.message);
Expand Down
65 changes: 60 additions & 5 deletions pkgs/dart_services/lib/src/common_server.dart
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,14 @@ import 'dart:convert';
import 'dart:io';

import 'package:dartpad_shared/model.dart' as api;
import 'package:dartpad_shared/ws.dart';
import 'package:http/http.dart' as http;
import 'package:logging/logging.dart';
import 'package:shelf/shelf.dart';
import 'package:shelf_router/shelf_router.dart';
import 'package:shelf_static/shelf_static.dart';
import 'package:shelf_web_socket/shelf_web_socket.dart';
import 'package:web_socket_channel/web_socket_channel.dart';

import 'analysis.dart';
import 'caching.dart';
Expand All @@ -37,9 +40,9 @@ class CommonServerImpl {
final Sdk sdk;
final ServerCache cache;

late Analyzer analyzer;
late Compiler compiler;
final ai = GenerativeAI();
late final Analyzer analyzer;
late final Compiler compiler;
final GenerativeAI ai = GenerativeAI();

CommonServerImpl(this.sdk, this.cache);

Expand Down Expand Up @@ -73,6 +76,9 @@ class CommonServerApi {
// general requests (GET)
router.get(r'/api/<apiVersion>/version', handleVersion);

// websocket requests
router.get(r'/ws', webSocketHandler(handleWebSocket));

// serve the compiled artifacts
final artifactsDir = Directory('artifacts');
if (artifactsDir.existsSync()) {
Expand Down Expand Up @@ -115,6 +121,52 @@ class CommonServerApi {
return ok(version().toJson());
}

/// Handle a new websocket connection request.
///
/// Handle incoming requests, convert them to exising commands and dispatch
/// them appropriately. The commands and responses mirror the existing REST
/// protocol.
///
/// This will be a long-running conneciton to the client.
void handleWebSocket(WebSocketChannel webSocket, String? subprotocol) {
webSocket.stream.listen(
(message) {
try {
// Handle incoming WebSocket messages
final request = JsonRpcRequest.fromJson(message as String);
log.genericInfo('ws request: ${request.method}');
JsonRpcResponse? response;

switch (request.method) {
case 'version':
final v = version();
response = request.createResultResponse(v.toJson());
break;
default:
response = request.createErrorResponse(
'unknown command: ${request.method}',
);
break;
}

webSocket.sink.add(jsonEncode(response.toJson()));
log.genericInfo(
'ws response: '
'${request.method} ${response.error != null ? '500' : '200'}',
);
} catch (e) {
log.genericSevere('error handling websocket request', error: e);
}
},
onDone: () {
// Nothing to clean up here.
},
onError: (Object error) {
log.genericSevere('error from websocket connection', error: error);
},
);
}

Future<Response> handleAnalyze(Request request, String apiVersion) async {
if (apiVersion != api3) return unhandledVersion(apiVersion);

Expand Down Expand Up @@ -512,7 +564,6 @@ Middleware logRequestsToLogger(DartPadLogger log) {
final watch = Stopwatch()..start();

final ctx = DartPadRequestContext.fromRequest(request);
log.genericInfo('received request, enableLogging=${ctx.enableLogging}');

return Future.sync(() => innerHandler(request)).then(
(response) {
Expand All @@ -524,7 +575,11 @@ Middleware logRequestsToLogger(DartPadLogger log) {
return response;
},
onError: (Object error, StackTrace stackTrace) {
if (error is HijackException) throw error;
if (error is HijackException) {
log.info(_formatMessage(request, watch.elapsed), ctx);

throw error;
}

log.info(_formatMessage(request, watch.elapsed, error: error), ctx);

Expand Down
2 changes: 2 additions & 0 deletions pkgs/dart_services/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ dependencies:
shelf_gzip: ^4.1.0
shelf_router: ^1.1.4
shelf_static: ^1.1.0
shelf_web_socket: ^3.0.0
web_socket_channel: ^3.0.0
yaml: ^3.1.3

dev_dependencies:
Expand Down
5 changes: 3 additions & 2 deletions pkgs/dartpad_shared/lib/backend_client.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import 'package:http/http.dart';
import 'headers.dart';

class DartServicesHttpClient {
final Client _client = Client();
static Map<String, String> _headers = DartPadRequestHeaders(
enableLogging: true,
).encoded;
Expand All @@ -19,7 +18,7 @@ class DartServicesHttpClient {
_headers = DartPadRequestHeaders(enableLogging: false).encoded;
}

void close() => _client.close();
final Client _client = Client();

Future<Response> get(String url) async {
return await _client.get(Uri.parse(url), headers: _headers);
Expand All @@ -46,4 +45,6 @@ class DartServicesHttpClient {

return await _client.send(httpRequest);
}

void close() => _client.close();
}
79 changes: 79 additions & 0 deletions pkgs/dartpad_shared/lib/services.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';

import 'package:web_socket/web_socket.dart';

import 'backend_client.dart';
import 'model.dart';
import 'ws.dart';

export 'model.dart';

Expand Down Expand Up @@ -131,6 +135,81 @@ class DartServicesClient {
}
}

/// A websocket analog to [DartServicesClient].
class WebsocketServicesClient {
final Uri wsUrl;
final WebSocket socket;
final IDFactory idFactory = IDFactory();

final Map<int, Completer<Object>> responseCompleters = {};
final Map<int, Object Function(Map<String, Object?>)> responseDecoders = {};

final Completer<void> _closedCompleter = Completer();

WebsocketServicesClient._(this.wsUrl, this.socket);

static Future<WebsocketServicesClient> connect(String rootUrl) async {
final url = Uri.parse(rootUrl);
final wsUrl = url.replace(
scheme: url.scheme == 'https' ? 'wss' : 'ws',
path: 'ws',
);
final socket = await WebSocket.connect(wsUrl);
final client = WebsocketServicesClient._(wsUrl, socket);
client._init();
return client;
}

void _init() {
socket.events.listen((e) async {
switch (e) {
case TextDataReceived(text: final text):
_dispatch(JsonRpcResponse.fromJson(text));
break;
case BinaryDataReceived(data: final _):
// Ignore - binary data is unsupported.
break;
case CloseReceived(code: final _, reason: final _):
// Notify that the server connection has closed.
_closedCompleter.complete();
break;
}
});
}

Future<void> get onClosed => _closedCompleter.future;

Future<VersionResponse> version() {
final requestId = idFactory.generateNextId();
final completer = Completer<VersionResponse>();

responseCompleters[requestId] = completer;
responseDecoders[requestId] = VersionResponse.fromJson;

socket.sendText(
jsonEncode(JsonRpcRequest(method: 'version', id: requestId).toJson()),
);

return completer.future;
}

Future<void> dispose() => socket.close();

void _dispatch(JsonRpcResponse response) {
final id = response.id;

final completer = responseCompleters[id]!;
final decoder = responseDecoders[id]!;

if (response.error != null) {
completer.completeError(response.error!);
} else {
final result = decoder((response.result! as Map).cast());
completer.complete(result);
}
}
}

class ApiRequestError implements Exception {
ApiRequestError(this.message, this.body);

Expand Down
134 changes: 134 additions & 0 deletions pkgs/dartpad_shared/lib/ws.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:convert';

import 'model.dart';

/// A request in JSON-RPC format.
class JsonRpcRequest {
/// The name of the method to be invoked.
final String method;

/// The event ID; if null, this event is a notification.
final int? id;

/// A structured value that holds the parameter values to be used during the
/// invocation of the method.
final Map<String, Object?>? params;

JsonRpcRequest({required this.method, this.id, this.params});

factory JsonRpcRequest.fromJson(String val) {
final json = (jsonDecode(val) as Map).cast<String, Object?>();
return JsonRpcRequest(
method: json['method'] as String,
id: json['id'] as int?,
params: (json['params'] as Map?)?.cast(),
);
}

JsonRpcResponse createResultResponse(Map<String, Object?> result) =>
JsonRpcResponse(id: id!, result: result);

JsonRpcResponse createErrorResponse(Object error) =>
JsonRpcResponse(id: id!, error: error);

Map<String, Object?> toJson() => {
'id': id,
'method': method,
if (params != null) 'params': params,
};
}

/// A JSON-RPC response.
class JsonRpcResponse {
/// This must be the same as the value of the id member in the request object.
final int id;

/// This member is required on success.
///
/// This member must not exist if there was an error invoking the method.
///
/// The value of this member is determined by the method invoked on the
/// server.
final Object? result;

/// This member is required on error.
///
/// This member must not exist if there was no error triggered during
/// invocation.
final Object? error;

JsonRpcResponse({required this.id, this.result, this.error});

factory JsonRpcResponse.fromJson(String val) {
final json = (jsonDecode(val) as Map).cast<String, Object?>();
return JsonRpcResponse(
id: json['id'] as int,
result: json['result'],
error: json['error'],
);
}

Map<String, Object?> toJson() => {
'id': id,
if (result != null) 'result': result,
if (error != null) 'error': error,
};
}

/// This represents a websocket command that can be sent over the wire (aka, a
/// version command, and analyze command, ...).
abstract class WsCommand<T> {
/// Convert this command into a websocket formatted request.
JsonRpcRequest createRequest(IDFactory idFactory);

/// Given a json response to this command, parse it into the expected format.
///
/// For example, a `VersionCommand` might return a `VersionResponse` from this
/// method.
T parseResponse(Map<String, Object?> response);
}

class VersionCommand extends WsCommand<VersionResponse> {
static const name = 'version';

/// This command takes no parameters.
VersionCommand();

@override
JsonRpcRequest createRequest(IDFactory idFactory) {
return JsonRpcRequest(method: name, id: idFactory.generateNextId());
}

@override
VersionResponse parseResponse(Map<String, Object?> response) {
return _handleParseResponse(VersionResponse.fromJson, response);
}
}

T _handleParseResponse<T>(
T Function(Map<String, Object?>) decode,
Map<String, Object?> response,
) {
if (response.containsKey('error')) {
// ignore: only_throw_errors
throw response['error']!;
} else {
final result = (response['result'] as Map).cast<String, Object?>();
return decode(result);
}
}

/// A class to generate a monotonically increasing sequence of numbers.
class IDFactory {
int _next = 0;

int generateNextId() {
final id = _next;
_next++;
return id;
}
}
1 change: 1 addition & 0 deletions pkgs/dartpad_shared/pubspec.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ dependencies:
http: ^1.3.0
json_annotation: ^4.9.0
meta: ^1.15.0
web_socket: ^1.0.0

dev_dependencies:
build_runner: ^2.4.15
Expand Down
Loading