Skip to content

Commit c5bc653

Browse files
jakemac53Commit Queue
authored andcommitted
match analysis server logs when replaying scenarios
Change-Id: I68fa02bbcbfa667d65fe07abe69a9f04a47daa13 Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/466240 Reviewed-by: Brian Wilkerson <[email protected]> Auto-Submit: Jake Macdonald <[email protected]> Commit-Queue: Jake Macdonald <[email protected]>
1 parent 11effd9 commit c5bc653

File tree

2 files changed

+97
-75
lines changed

2 files changed

+97
-75
lines changed

pkg/analysis_server/tool/log_player/log_player.dart

Lines changed: 84 additions & 72 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@
22
// for details. All rights reserved. Use of this source code is governed by a
33
// BSD-style license that can be found in the LICENSE file.
44

5+
import 'dart:async';
6+
import 'dart:convert';
7+
import 'dart:io';
8+
59
import 'package:analysis_server/src/server/driver.dart';
610
import 'package:analysis_server/src/session_logger/entry_kind.dart';
711
import 'package:analysis_server/src/session_logger/log_entry.dart';
812
import 'package:analysis_server/src/session_logger/process_id.dart';
13+
import 'package:collection/collection.dart';
914

1015
import 'log.dart';
1116
import 'server_driver.dart';
@@ -18,9 +23,6 @@ class LogPlayer {
1823
/// The log to be played.
1924
Log log;
2025

21-
/// The object used to communicate with the running server.
22-
ServerDriver? server;
23-
2426
/// Whether the `shutdown` method has been seen.
2527
bool _hasSeenShutdown = false;
2628

@@ -37,84 +39,76 @@ class LogPlayer {
3739
Future<void> play() async {
3840
var entries = log.entries;
3941
var nextIndex = 0;
40-
while (nextIndex < entries.length) {
41-
// TODO(brianwilkerson): This doesn't currently attempt to retain the same
42-
// timing of messages as was recorded in the log.
43-
var entry = entries[nextIndex];
44-
switch (entry.kind) {
45-
case EntryKind.commandLine:
46-
if (this.server != null) {
47-
throw StateError(
48-
'Analysis server already started, only one instance is allowed.',
49-
);
50-
}
51-
var server = this.server = ServerDriver(arguments: entry.argList);
52-
await server.start();
53-
case EntryKind.message:
54-
if (entry.receiver == ProcessId.server) {
55-
await _sendMessageToServer(entry);
56-
} else if (entry.sender == ProcessId.server) {
57-
_handleMessageFromServer(entry);
58-
} else {
59-
throw StateError('''
42+
ServerDriver? server;
43+
var pendingServerMessageExpectations = <Message>[];
44+
try {
45+
while (nextIndex < entries.length) {
46+
// TODO(brianwilkerson): This doesn't currently attempt to retain the same
47+
// timing of messages as was recorded in the log.
48+
var entry = entries[nextIndex];
49+
switch (entry.kind) {
50+
case EntryKind.commandLine:
51+
if (server != null) {
52+
throw StateError(
53+
'Analysis server already started, only one instance is allowed.',
54+
);
55+
}
56+
server = ServerDriver(arguments: entry.argList);
57+
await server.start();
58+
server.serverMessages.listen((message) {
59+
var entryToRemove = pendingServerMessageExpectations
60+
.firstWhereOrNull(
61+
(expectation) => const MapEquality().equals(
62+
expectation.map,
63+
message.map,
64+
),
65+
);
66+
if (entryToRemove != null) {
67+
pendingServerMessageExpectations.remove(entryToRemove);
68+
} else {
69+
stderr.writeln(
70+
'Unexpected message from analysis server:\n'
71+
'${jsonEncode(message)}',
72+
);
73+
}
74+
});
75+
case EntryKind.message:
76+
if (entry.receiver == ProcessId.server) {
77+
await _sendMessageToServer(entry, server);
78+
} else if (entry.sender == ProcessId.server) {
79+
pendingServerMessageExpectations.add(entry.message);
80+
// TODO(jakemac): Remove this once are not reliant on consistent
81+
// ordering.
82+
await _waitForMessagesFromServer(
83+
pendingServerMessageExpectations,
84+
);
85+
} else {
86+
throw StateError('''
6087
Unexpected sender/receiver for message:
6188
6289
sender: ${entry.sender}
6390
receiver: ${entry.receiver}
6491
''');
65-
}
66-
}
67-
nextIndex++;
68-
}
69-
await _readMessagesFromServer();
70-
if (!_hasSeenShutdown) {
71-
server?.shutdown();
72-
}
73-
if (!_hasSeenExit) {
74-
server?.exit();
75-
}
76-
await _readMessagesFromServer();
77-
server = null;
78-
}
79-
80-
/// Responds to a message sent from the server to some other process.
81-
void _handleMessageFromServer(LogEntry entry) {
82-
var message = entry.message;
83-
switch (entry.receiver) {
84-
case ProcessId.dtd:
85-
throw UnimplementedError();
86-
case ProcessId.ide:
87-
if (message.isLogMessage ||
88-
message.isShowDocument ||
89-
message.isShowMessage ||
90-
message.isShowMessageRequest) {
91-
// The response from the client should be recorded in the log, so it
92-
// will eventually be sent to the server.
93-
return;
92+
}
9493
}
95-
// throw UnimplementedError();
96-
case ProcessId.plugin:
97-
throw UnimplementedError();
98-
case ProcessId.server:
99-
throw StateError(
100-
'Cannot send a message from the server to the server.',
101-
);
102-
case ProcessId.watcher:
103-
throw StateError(
104-
'Cannot send a message from the server to the file watcher.',
105-
);
94+
nextIndex++;
95+
}
96+
await _waitForMessagesFromServer(pendingServerMessageExpectations);
97+
} finally {
98+
if (!_hasSeenShutdown) {
99+
server?.shutdown();
100+
}
101+
if (!_hasSeenExit) {
102+
server?.exit();
103+
}
106104
}
107105
}
108106

109-
/// Wait for the server to process any messages that it may have received and
110-
/// for any responses to be sent back.
111-
Future<void> _readMessagesFromServer() async {
112-
await Future.delayed(const Duration(seconds: 1));
113-
}
114-
115107
/// Sends the message in the [entry] to the server.
116-
Future<void> _sendMessageToServer(LogEntry entry) async {
117-
var server = this.server;
108+
Future<void> _sendMessageToServer(
109+
LogEntry entry,
110+
ServerDriver? server,
111+
) async {
118112
if (server == null) {
119113
throw StateError('Analysis server not started.');
120114
}
@@ -136,7 +130,6 @@ receiver: ${entry.receiver}
136130
_hasSeenShutdown = true;
137131
} else if (message.isExit) {
138132
_hasSeenExit = true;
139-
this.server = null;
140133
}
141134
server.sendMessageFromIde(message);
142135
case ProcessId.plugin:
@@ -149,4 +142,23 @@ receiver: ${entry.receiver}
149142
server.sendMessageFromFileWatcher(message);
150143
}
151144
}
145+
146+
/// Waits up to 5 seconds for [pendingServerMessageExpectations] to be
147+
/// emptied out.
148+
Future<void> _waitForMessagesFromServer(
149+
List<Message> pendingServerMessageExpectations,
150+
) async {
151+
if (pendingServerMessageExpectations.isEmpty) return;
152+
var watch = Stopwatch()..start();
153+
while (watch.elapsed < const Duration(seconds: 5)) {
154+
if (pendingServerMessageExpectations.isEmpty) {
155+
return;
156+
}
157+
await Future.delayed(const Duration(milliseconds: 50));
158+
}
159+
throw TimeoutException(
160+
'Timed out waiting for analysis server messages:\n\n'
161+
'${pendingServerMessageExpectations.join('\n\n')}',
162+
);
163+
}
152164
}

pkg/analysis_server/tool/log_player/server_driver.dart

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
// for details. All rights reserved. Use of this source code is governed by a
33
// BSD-style license that can be found in the LICENSE file.
44

5+
import 'dart:async';
56
import 'dart:convert';
67
import 'dart:io';
78

@@ -26,8 +27,9 @@ class ServerDriver {
2627
/// server has not been connected to DTD using [connectToDtd].
2728
WebSocket? _dtdSocket;
2829

29-
/// The messages read from the analysis server's stdout.
30-
final List<String> _messagesFromServer = [];
30+
/// Stream controller for analysis server output messages.
31+
final StreamController<Message> _serverMessagesController =
32+
StreamController();
3133

3234
/// Creates a new driver that can be used to communicate with a server.
3335
///
@@ -68,6 +70,9 @@ class ServerDriver {
6870
ServerDriver._({required this.arguments, required ServerProtocol protocol})
6971
: _protocol = protocol;
7072

73+
/// The messages read from the analysis server's stdout.
74+
Stream<Message> get serverMessages => _serverMessagesController.stream;
75+
7176
/// Returns the path to the `dart` executable.
7277
String get _dartExecutable {
7378
return Platform.resolvedExecutable;
@@ -100,6 +105,7 @@ class ServerDriver {
100105
_stdinSink = null;
101106
_dtdSocket?.close();
102107
_dtdSocket = null;
108+
_serverMessagesController.close();
103109
}
104110

105111
void sendMessageFromDTD(Message message) {
@@ -203,7 +209,11 @@ class ServerDriver {
203209
}
204210

205211
void _receiveMessageFromServer(String message) {
206-
_messagesFromServer.add(message);
212+
if (_serverMessagesController.isClosed) {
213+
stderr.writeln('Got analysis server message after shutdown:\n$message');
214+
} else {
215+
_serverMessagesController.add(jsonDecode(message) as Message);
216+
}
207217
}
208218
}
209219

0 commit comments

Comments
 (0)