-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathlocal_api_runner.dart
More file actions
638 lines (592 loc) · 21.6 KB
/
local_api_runner.dart
File metadata and controls
638 lines (592 loc) · 21.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
import 'dart:async';
import 'dart:convert';
import 'dart:io';
import 'package:celest_ast/celest_ast.dart' as ast;
import 'package:celest_cli/src/compiler/frontend_server_client.dart';
import 'package:celest_cli/src/context.dart';
import 'package:celest_cli/src/process/port_finder.dart';
import 'package:celest_cli/src/sdk/dart_sdk.dart';
import 'package:celest_cli/src/utils/cli.dart';
import 'package:celest_cli/src/utils/error.dart';
import 'package:celest_cli/src/utils/json.dart';
import 'package:celest_cli/src/utils/run.dart';
import 'package:collection/collection.dart';
import 'package:logging/logging.dart';
import 'package:meta/meta.dart';
import 'package:vm_service/vm_service.dart';
final Logger _logger = Logger('LocalApiRunner');
/// Like [EntrypointCompiler], this class runs Celest API functions as a local
/// server, watching for changes and hot-reloading when functions are changed.
final class LocalApiRunner {
LocalApiRunner._({
required this.path,
required this.verbose,
required this.port,
required FrontendServerClient client,
required Process localApiProcess,
}) : _client = client,
_localApiProcess = localApiProcess;
final bool verbose;
final String path;
/// The port that the local API is running on.
final int port;
final FrontendServerClient _client;
final Process _localApiProcess;
VmService? _vmService;
late final String _vmIsolateId;
/// The WebSocket URI of the running Celest server.
String get wsUri => _vmService!.wsUri!;
late final StreamSubscription<String> _stdoutSub;
late final StreamSubscription<String> _stderrSub;
static Future<LocalApiRunner> start({
required ast.ResolvedProject resolvedProject,
required String path,
required String environmentId,
required Map<String, String> configValues,
required bool verbose,
List<String> additionalSources = const [],
int? port,
@visibleForTesting Duration? vmServiceTimeout,
@visibleForTesting StringSink? stdoutPipe,
@visibleForTesting StringSink? stderrPipe,
// Local API should use random port since it's being proxied by the user
// hub and is never exposed to the user.
@visibleForTesting PortFinder portFinder = const RandomPortFinder(),
}) async {
final (target, platformDill, sdkRoot) =
switch (resolvedProject.sdkConfig.targetSdk) {
SdkType.flutter => (
'flutter',
Sdk.current.flutterPlatformDill!,
Sdk.current.flutterPatchedSdk!,
),
SdkType.dart => (
'vm',
Sdk.current.vmPlatformDill,
p.join(Sdk.current.sdkPath, 'lib', '_internal'),
),
final unknown => unreachable('Unknown SDK type: $unknown'),
};
// Create initial kernel file so that it links the platform
//
// Incremental compilations do not the need the platform since it will be
// loaded into memory already.
var outputDill = p.setExtension(path, '.dill');
var outputDillFile = fileSystem.file(outputDill);
var index = 0;
while (outputDillFile.existsSync()) {
try {
await outputDillFile.delete();
} on Object {
// Windows gets fussy about deleting files sometimes.
// Just use a different name.
outputDill = p.setExtension('$path.${index++}', '.dill');
outputDillFile = fileSystem.file(outputDill);
}
}
// // Copy SQLite3 to the output directory on Windows.
// if (platform.isWindows) {
// final sqlite3Out = fileSystem
// .directory(p.dirname(path))
// .childFile('sqlite3.dll');
// if (!sqlite3Out.existsSync()) {
// final cachedSqlite3 = celestProject.config.configDir.childFile(
// 'sqlite3.dll',
// );
// await cachedSqlite3.copy(sqlite3Out.path);
// }
// }
// NOTE: FE server requires file: URIs for *some* paths on Windows.
final genKernelRes = await processManager.start(<String>[
Sdk.current.dartAotRuntime,
Sdk.current.frontendServerAotSnapshot,
'--sdk-root',
sdkRoot, // Must be path
'--platform',
Uri.file(platformDill).toString(), // Must be URI
'--link-platform',
'--target',
target,
'--packages',
Uri.file(projectPaths.packagesConfig).toString(),
'--filesystem-root=${projectPaths.projectRoot}',
'--filesystem-scheme=celest',
'--output-dill',
outputDill, // Must be path
_projectFsUri(path).toString(),
], workingDirectory: projectPaths.outputsDir);
final genKernelLogs = StringBuffer();
genKernelRes.captureStdout(
sink: genKernelLogs.writeln,
prefix: '[stdout] ',
);
genKernelRes.captureStderr(
sink: genKernelLogs.writeln,
prefix: '[stderr] ',
);
if (await genKernelRes.exitCode != 0) {
_logger.finer('Error generating initial kernel file:\n$genKernelLogs');
throw CompilationException('Error generating initial kernel file');
}
// This is so confusing but it seems to work.
//
// To enable incremental compilation, we need to pass the output dill file
// as the incremental output dill file. If we set it the other way around
// Windows will complain about the file being in use.
final incrementalOutputDill = p.setExtension(
outputDill,
'.incremental.dill',
); // Must be path
final client = await FrontendServerClient.start(
entrypoint: _projectFsUri(path).toString(), // must be URI
outputDillPath: incrementalOutputDill, // must be path
platformKernel: Uri.file(platformDill).toString(), // must be URI
incrementalOutputDill: outputDill,
fileSystemRoots: [projectPaths.projectRoot],
fileSystemScheme: 'celest',
workingDirectory: projectPaths.projectRoot,
target: target,
verbose: verbose,
sdkRoot: sdkRoot,
enabledExperiments: celestProject.analysisOptions.enabledExperiments,
frontendServerPath: Sdk.current.frontendServerAotSnapshot,
// additionalSources: additionalSources,
additionalArgs: [
'--no-support-mirrors', // Since it won't be supported in the cloud.
'--incremental-serialization', // Faster hot reload.
],
);
_logger.fine('Compiling local API...');
final flutterCacheDir = await fileSystem.systemTempDirectory.createTemp(
'celest_',
);
// Give the VM service a deterministic port, since allowing it to find one
// can lead to a race condition with our random port finder picking the same
// port.
//
// When we check the port below, it's valid because the VM service is not
// started yet, but later the API fails because it picked the same port.
final vmServicePort = await const RandomPortFinder()
// If we've specified a port, though, that must be reserved for us to
// use, so start the search from the next port.
.findOpenPort(port == null ? null : port + 1);
final command = switch (resolvedProject.sdkConfig.targetSdk) {
SdkType.dart => <String>[
Sdk.current.dart,
'run',
'--enable-vm-service=$vmServicePort', // Start VM service
'--no-dds', // We want to talk directly to VM service.
'--enable-asserts',
'--packages',
projectPaths.packagesConfig,
outputDill,
],
SdkType.flutter => <String>[
Sdk.current.flutterTester,
'--non-interactive',
'--vm-service-port=$vmServicePort',
'--run-forever',
'--icu-data-file-path='
'${p.join(Sdk.current.flutterOsArtifacts, 'icudtl.dat')}',
'--packages=${projectPaths.packagesConfig}',
'--log-tag=_CELEST',
if (verbose) '--verbose-logging',
'--enable-platform-isolates',
'--force-multithreading',
'--cache-dir-path=${flutterCacheDir.absolute.path}',
// '--enable-impeller',
outputDill,
],
final unknown => unreachable('Unknown SDK type: $unknown'),
};
port = await portFinder.checkOrUpdatePort(port, excluding: [vmServicePort]);
_logger.finer('Starting local API on port $port...');
final celestConfig = prettyPrintJson(
resolvedProject.toProto().toProto3Json(),
);
await fileSystem
.directory(projectPaths.outputsDir)
.childFile('celest.json')
.writeAsString(celestConfig);
final localApiProcess = await processManager.start(
command,
workingDirectory: projectPaths.outputsDir,
environment: {
...configValues,
// The HTTP port to serve Celest on.
'PORT': platform.environment['PORT'] ?? '$port',
'CELEST_ENVIRONMENT': environmentId,
},
);
final runner = LocalApiRunner._(
path: path,
verbose: verbose,
port: port,
client: client,
localApiProcess: localApiProcess,
);
await runner._init(
stdoutPipe: stdoutPipe,
stderrPipe: stderrPipe,
vmServiceTimeout: vmServiceTimeout,
);
return runner;
}
/// The virtual FS URI for the project [path].
static Uri _projectFsUri(String path) {
final relativePath = p.relative(path, from: projectPaths.projectRoot);
final rootPrefix = platform.isWindows ? r'\' : '/';
return Uri(scheme: 'celest', path: '$rootPrefix$relativePath');
}
static final _vmServicePattern = RegExp(
r'The Dart VM service is listening on ([^\s]+)',
);
static final _warnOnNoDebuggerPattern = RegExp(
r'Connect to the Dart VM service at ([^\s]+) to debug.',
);
/// Waits for the main Isolate to be available, resume it, then return its ID.
static Future<String> _waitForIsolatesAndResume(VmService vmService) async {
var vm = await vmService.getVM();
var isolates = vm.isolates;
final stopwatch = Stopwatch()..start();
const timeout = Duration(seconds: 10);
_logger.finest('Waiting for VM service to report isolates...');
while (isolates == null || isolates.isEmpty) {
if (stopwatch.elapsed > timeout) {
throw TimeoutException('Timed out waiting for VM to start.');
}
await Future<void>.delayed(const Duration(milliseconds: 50));
vm = await vmService.getVM();
isolates = vm.isolates;
}
stopwatch.stop();
_logger.finest(
'VM started in ${stopwatch.elapsedMilliseconds}ms. '
'Isolates: $isolates',
);
var isolateRef = isolates.firstWhereOrNull(
(isolate) => isolate.isSystemIsolate ?? false,
);
isolateRef ??= isolates.firstOrNull;
if (isolateRef == null) {
throw StateError('Could not determine main isolate ID.');
}
return isolateRef.id!;
}
// Doesn't seem that we need pause-on-start anymore, but keeping code around
// if needed later.
// ignore: unused_element
static Future<void> _resumeIsolate(
VmService vmService,
String isolateId,
) async {
_logger.finest('[Isolate $isolateId] Waiting for pause on start event...');
var isolate = await vmService.getIsolate(isolateId);
final stopwatch = Stopwatch()..start();
const timeout = Duration(seconds: 5);
while (isolate.pauseEvent?.kind != EventKind.kPauseStart) {
if (stopwatch.elapsed > timeout) {
throw TimeoutException(
'Timed out waiting for isolate to report PauseStart event.',
);
}
await Future<void>.delayed(const Duration(milliseconds: 50));
isolate = await vmService.getIsolate(isolateId);
}
// Only needed if `--observe` is used.
// Disable pause-on-exit and pause-on-unhandled-exceptions.
//
// This must be done here instead of as a flag since the VM service just
// exits immediately for some reason.
// _logger.finest('[Isolate $isolateId] Disabling pause on exit/exception...');
// await vmService.setIsolatePauseMode(
// isolateId,
// exceptionPauseMode: 'None',
// shouldPauseOnExit: false,
// );
_logger.finest('[Isolate $isolateId] Resuming isolate...');
await vmService.resume(isolateId);
}
Future<void> _init({
StringSink? stdoutPipe,
StringSink? stderrPipe,
Duration? vmServiceTimeout,
}) async {
stdoutPipe ??= stdout;
stderrPipe ??= stderr;
final vmServiceCompleter = Completer<VmService>();
void completeVmService(String rawObservatoryUrl) {
final observatoryUri =
'${rawObservatoryUrl.replaceFirst('http', 'ws')}ws';
_logger.finer('Connecting to local API at: $observatoryUri');
vmServiceCompleter.complete(_vmServiceConnectUri(observatoryUri));
}
final serverStartedCompleter = Completer<void>();
_stdoutSub = _localApiProcess.stdout
.transform(utf8.decoder)
.transform(const LineSplitter())
.listen((line) {
_logger.finest('[stdout] $line');
if (!vmServiceCompleter.isCompleted) {
final vmServiceInfo = _vmServicePattern.firstMatch(line)?.group(1);
if (vmServiceInfo != null) {
return completeVmService(vmServiceInfo);
}
}
if (line.startsWith('The Dart') ||
line.startsWith('vm-service') ||
line.contains('_CELEST')) {
// Ignore
} else if (line.startsWith('Serving on')) {
if (!serverStartedCompleter.isCompleted) {
serverStartedCompleter.complete();
}
} else if (line.startsWith('/')) {
analytics.capture('local_api_call', properties: {'route': line});
} else {
stdoutPipe!.writeln(line);
}
});
_stderrSub = _localApiProcess.stderr
.transform(utf8.decoder)
.transform(const LineSplitter())
.listen((line) {
_logger.finest('[stderr] $line');
if (!vmServiceCompleter.isCompleted) {
final vmServiceInfo =
_warnOnNoDebuggerPattern.firstMatch(line)?.group(1);
if (vmServiceInfo != null) {
return completeVmService(vmServiceInfo);
}
}
if (line.startsWith('vm-service')) {
// Ignore
} else {
stderrPipe!.writeln(line);
}
});
try {
vmServiceTimeout ??= const Duration(seconds: 10);
_logger.finer('Waiting for local API to report VM URI...');
var vmService = vmServiceCompleter.future;
if (vmServiceTimeout > Duration.zero) {
vmService = vmService.timeout(
vmServiceTimeout,
onTimeout: () {
throw TimeoutException(
'Could not connect to local API VM service.',
vmServiceTimeout,
);
},
);
}
_vmService = await vmService;
// Pipe logs to output
_vmService!.onLoggingEvent.listen((event) {
assert(event.kind == EventKind.kLogging);
final record = event.logRecord!;
// TODO(dnys1): Should this be the project name or something to help
// distinguish logs?
const defaultLoggerName = '';
final loggerName =
record.loggerName?.valueAsString ?? defaultLoggerName;
final logger = Logger(loggerName);
final level = record.level?.let(
(level) => Level.LEVELS.firstWhere((l) => l.value == level),
) ??
Level.FINE;
if (!logger.isLoggable(level)) {
return;
}
logger.log(
level,
record.message?.valueAsString ?? '<no message>',
switch (record.error?.valueAsString) {
null || 'null' || '' => null,
final error => error,
},
switch (record.stackTrace?.valueAsString) {
null || 'null' || '' => null,
final stackTrace => StackTrace.fromString(stackTrace),
},
);
});
await _vmService!.streamListen(EventStreams.kLogging);
_vmIsolateId = await _waitForIsolatesAndResume(_vmService!);
await Future.any([
serverStartedCompleter.future,
_localApiProcess.exitCode.then((exitCode) {
throw StateError(
'Local API process exited before serving (exitCode=$exitCode)',
);
}),
]).timeout(
const Duration(seconds: 5),
onTimeout: () {
throw TimeoutException('Local API did not start in time.');
},
);
_logger.fine('Connected to local API.');
} on Object catch (e, st) {
_logger.finer('Failure starting local API runner', e, st);
rethrow;
}
}
Future<void> hotReload(List<String> pathsToInvalidate) async {
_logger.fine('Recompiling local API...');
final result = await _client.compile([
for (final path in pathsToInvalidate)
if (p.isWithin(projectPaths.projectRoot, path))
_projectFsUri(path)
else
p.toUri(path),
]);
final dillOutput = _client.expectOutput(result);
_logger.fine('Hot reloading local API with entrypoint: $dillOutput');
await _vmService!.reloadSources(_vmIsolateId, rootLibUri: dillOutput);
}
// Copied from `package:flutter_tools/src/run_hot.dart`
// ignore: unused_element
Future<void> _killIsolates() async {
if (_vmService case final vmService?) {
final isolateOperations = <Future<void>>[];
final isolateRefs = await vmService.getVM().then((vm) => vm.isolates!);
final isolateIds = isolateRefs.map((r) => r.id!).toList();
_logger.finest('Killing isolates: ${isolateIds.join(', ')}');
for (final isolateId in isolateIds) {
isolateOperations.add(
_vmService!.kill(isolateId).then(
(Success success) => _logger.finest('Killed isolate $isolateId'),
onError: (Object error, StackTrace st) {
if (error is SentinelException ||
(error is RPCError &&
error.code == RPCErrorKind.kIsolateMustBeRunnable.code)) {
// Do nothing on a SentinelException since it means the isolate
// has already been killed.
// Error code 105 indicates the isolate is not yet runnable, and might
// be triggered if the tool is attempting to kill the asset parsing
// isolate before it has finished starting up.
return null;
}
_logger.finer('Error killing isolate $isolateId', error, st);
return Future<Never>.error(error, st);
},
),
);
}
await Future.wait(isolateOperations);
}
}
Future<void> close() async {
_logger.finer('Shutting down local API...');
if (!await Future(() => _client.closed)) {
_client.kill();
}
_logger.finest('Killing local process');
// flutter_tester requires a gentle nudge when using `--run-forever`.
_localApiProcess.kill(ProcessSignal.sigkill);
_logger.finest('Closing VM service...');
await _vmService?.dispose();
await Future.wait([
_stdoutSub.cancel().then((_) => _logger.finest('Stdout closed')),
_stderrSub.cancel().then((_) => _logger.finest('Stderr closed')),
_localApiProcess.exitCode.then(
(exitCode) => _logger.finest('Exit code: $exitCode'),
),
Future.value(
_vmService?.onDone,
).then((_) => _logger.finest('VM service done')),
]);
_logger.finer('Shut down local API.');
}
}
final class CompilationException implements Exception {
CompilationException(this.message);
final String message;
@override
String toString() => message;
}
extension on FrontendServerClient {
String expectOutput(CompileResult result) {
_logger.finest(
'Compile result: dillOutput=${result.dillOutput} '
'errors=${result.errorCount}',
);
switch (result) {
case CompileResult(errorCount: > 0):
_logger.finest('Error compiling local API', result.debugResult);
throw CompilationException(
'Error compiling local API: ${result.debugResult}',
);
case CompileResult(:final dillOutput?):
accept();
return dillOutput;
default:
_logger.finest('Compile result:\n${result.debugResult}');
// `dillOutput` should never be null (see its docs).
unreachable('An unknown error occurred compiling local API.');
}
}
}
extension on CompileResult {
String get debugResult {
final buffer = StringBuffer()
..writeln('dillOutput: $dillOutput')
..writeln('Error count: $errorCount')
..writeln('Compiler output:');
for (final line in compilerOutputLines) {
buffer.writeln(' $line');
}
buffer.writeln('New sources:');
for (final source in newSources) {
buffer.writeln(' $source');
}
buffer.writeln('Removed sources:');
for (final source in removedSources) {
buffer.writeln(' $source');
}
return buffer.toString();
}
}
/// Copied from `package:vm_service/vm_service_io.dart` to provide better
/// logging and debugging support.
Future<VmService> _vmServiceConnectUri(String wsUri) async {
final socket = await WebSocket.connect(wsUri);
final controller = StreamController<dynamic>();
final streamClosedCompleter = Completer<void>();
socket.listen(
(data) {
controller.add(data);
if (verbose) {
_logger.finest('VM service WS data: $data');
}
},
onError: (Object error, StackTrace stackTrace) {
_logger.finest('VM service WS error', error, stackTrace);
},
cancelOnError: true,
onDone: () {
_logger.finest('VM service WS closed');
streamClosedCompleter.complete();
controller.close();
},
);
return VmService.defaultFactory(
inStream: controller.stream,
writeMessage: socket.add,
log: VmServiceLogs(),
disposeHandler: socket.close,
streamClosed: streamClosedCompleter.future,
wsUri: wsUri,
);
}
final class VmServiceLogs implements Log {
@override
void severe(String message) {
_logger.finest('[vm-service] SEVERE: $message');
}
@override
void warning(String message) {
_logger.finest('[vm-service] WARNING: $message');
}
}