Skip to content

Commit 8d36178

Browse files
DanTupCommit Queue
authored andcommitted
[analysis_server] Move support for locking requests to the scheduler
This removes the original `lockRequestsWhile` functionality of the LSP server which worked by pausing reading from stdin to instead pause the processing of messages in the scheduler. Change-Id: I737e977adee11fe6fc70f44185cfaa22c22805da Reviewed-on: https://dart-review.googlesource.com/c/sdk/+/432342 Reviewed-by: Brian Wilkerson <[email protected]> Reviewed-by: Keerti Parthasarathy <[email protected]> Commit-Queue: Brian Wilkerson <[email protected]>
1 parent 87575b2 commit 8d36178

12 files changed

+243
-102
lines changed

pkg/analysis_server/lib/src/analysis_server.dart

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -273,6 +273,13 @@ abstract class AnalysisServer {
273273
/// A [TimingByteStore] that records timings for reads from the byte store.
274274
TimingByteStore? _timingByteStore;
275275

276+
/// Whether notifications caused by analysis should be suppressed.
277+
///
278+
/// This is used when an operation is temporarily modifying overlays and does
279+
/// not want the client to be notified of any analysis happening on the
280+
/// temporary content.
281+
bool suppressAnalysisResults = false;
282+
276283
AnalysisServer(
277284
this.options,
278285
this.sdkManager,
@@ -884,6 +891,39 @@ abstract class AnalysisServer {
884891
/// given [path] was changed - added, updated, or removed.
885892
void notifyFlutterWidgetDescriptions(String path) {}
886893

894+
/// Prevents the scheduler from processing new messages until [operation]
895+
/// completes.
896+
///
897+
/// This can be used to obtain analysis results/resolved units consistent with
898+
/// the state of a file at the time this method was called, preventing
899+
/// changes by incoming file modifications.
900+
///
901+
/// The contents of [operation] should be kept as short as possible and since
902+
/// cancellation requests will also be blocked for the duration of this
903+
/// operation, handlers should generally check the cancellation flag
904+
/// immediately after this function returns.
905+
Future<T> pauseSchedulerWhile<T>(FutureOr<T> Function() operation) async {
906+
// TODO(dantup): Prevent this method from locking responses from the client
907+
// because this can lead to deadlocks if called during initialization where
908+
// the server may wait for something (configuration) from the client. This
909+
// might fit in with potential upcoming scheduler changes.
910+
//
911+
// This is currently used by Completion+FixAll (which are less likely, but
912+
// possible to be called during init).
913+
//
914+
// https://github.com/dart-lang/sdk/issues/56311#issuecomment-2250089185
915+
916+
messageScheduler.pause();
917+
try {
918+
// `await` here is important to ensure `finally` doesn't execute until
919+
// `operation()` completes (`whenComplete` is not available on
920+
// `FutureOr`).
921+
return await operation();
922+
} finally {
923+
messageScheduler.resume();
924+
}
925+
}
926+
887927
/// Read all files, resolve all URIs, and perform required analysis in
888928
/// all current analysis drivers.
889929
Future<void> reanalyze() async {
@@ -1087,6 +1127,10 @@ abstract class CommonServerContextManagerCallbacks
10871127
@override
10881128
@mustCallSuper
10891129
void handleFileResult(FileResult result) {
1130+
if (analysisServer.suppressAnalysisResults) {
1131+
return;
1132+
}
1133+
10901134
var path = result.path;
10911135
filesToFlush.add(path);
10921136

pkg/analysis_server/lib/src/lsp/handlers/commands/fix_all.dart

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import 'dart:async';
66

77
import 'package:analysis_server/lsp_protocol/protocol.dart';
8+
import 'package:analysis_server/src/analysis_server.dart';
89
import 'package:analysis_server/src/lsp/constants.dart';
910
import 'package:analysis_server/src/lsp/error_or.dart';
1011
import 'package:analysis_server/src/lsp/handlers/commands/simple_edit_handler.dart';
@@ -89,22 +90,22 @@ class FixAllCommandHandler extends SimpleEditCommandHandler<LspAnalysisServer> {
8990

9091
/// Computes edits for iterative fix-all using temporary overlays.
9192
class _FixAllOperation extends TemporaryOverlayOperation
92-
with HandlerHelperMixin<LspAnalysisServer> {
93+
with HandlerHelperMixin<AnalysisServer> {
9394
final MessageInfo message;
9495
final CancellationToken cancellationToken;
9596
final String path;
9697
final bool autoTriggered;
9798

9899
_FixAllOperation({
99-
required LspAnalysisServer server,
100+
required AnalysisServer server,
100101
required this.message,
101102
required this.path,
102103
required this.cancellationToken,
103104
required this.autoTriggered,
104105
}) : super(server);
105106

106107
Future<ErrorOr<WorkspaceEdit?>> computeEdits() async {
107-
return await lockRequestsWithTemporaryOverlays(_computeEditsImpl);
108+
return await pauseSchedulerWithTemporaryOverlays(_computeEditsImpl);
108109
}
109110

110111
Future<ErrorOr<WorkspaceEdit?>> _computeEditsImpl() async {

pkg/analysis_server/lib/src/lsp/handlers/handler_completion.dart

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class CompletionHandler
119119
// unit and LineInfo.
120120
late ErrorOr<LineInfo> lineInfo;
121121
late ErrorOr<ResolvedUnitResult> unit;
122-
await server.lockRequestsWhile(() async {
122+
await server.pauseSchedulerWhile(() async {
123123
unit = await path.mapResult(requireResolvedUnit);
124124
lineInfo = await unit.map(
125125
// If we don't have a unit, we can still try to obtain the line info from

pkg/analysis_server/lib/src/lsp/lsp_analysis_server.dart

Lines changed: 1 addition & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -106,9 +106,6 @@ class LspAnalysisServer extends AnalysisServer {
106106
@visibleForTesting
107107
int contextBuilds = 0;
108108

109-
/// The subscription to the stream of incoming messages from the client.
110-
late final StreamSubscription<void> _channelSubscription;
111-
112109
/// An optional manager to handle file systems which may not always be
113110
/// available.
114111
final DetachableFileSystemManager? detachableFileSystemManager;
@@ -117,13 +114,6 @@ class LspAnalysisServer extends AnalysisServer {
117114
/// `sendStatusNotification` was invoked.
118115
bool wasAnalyzing = false;
119116

120-
/// Whether notifications caused by analysis should be suppressed.
121-
///
122-
/// This is used when an operation is temporarily modifying overlays and does
123-
/// not want the client to be notified of any analysis happening on the
124-
/// temporary content.
125-
bool suppressAnalysisResults = false;
126-
127117
/// Tracks files that have non-empty diagnostics on the client.
128118
///
129119
/// This is an optimization to avoid sending empty diagnostics when they are
@@ -181,11 +171,7 @@ class LspAnalysisServer extends AnalysisServer {
181171
.listen(handleAnalysisEvent);
182172
analysisDriverScheduler.start();
183173

184-
_channelSubscription = channel.listen(
185-
scheduleMessage,
186-
onDone: done,
187-
onError: socketError,
188-
);
174+
channel.listen(scheduleMessage, onDone: done, onError: socketError);
189175

190176
if (AnalysisServer.supportsPlugins) {
191177
_pluginChangeSubscription = pluginManager.pluginsChanged.listen(
@@ -572,45 +558,6 @@ class LspAnalysisServer extends AnalysisServer {
572558
}, socketError);
573559
}
574560

575-
/// Locks the server from processing incoming messages until [operation]
576-
/// completes.
577-
///
578-
/// This can be used to obtain analysis results/resolved units consistent with
579-
/// the state of a file at the time this method was called, preventing
580-
/// changes by incoming file modifications.
581-
///
582-
/// The contents of [operation] should be kept as short as possible and since
583-
/// cancellation requests will also be blocked for the duration of this
584-
/// operation, handles should generally check the cancellation flag
585-
/// immediately after this function returns.
586-
Future<T> lockRequestsWhile<T>(FutureOr<T> Function() operation) async {
587-
// TODO(dantup): Prevent this method from locking responses from the client
588-
// because this can lead to deadlocks if called during initialization where
589-
// the server may wait for something (configuration) from the client. This
590-
// might fit in with potential upcoming scheduler changes.
591-
//
592-
// This is currently used by Completion+FixAll (which are less likely, but
593-
// possible to be called during init).
594-
//
595-
// https://github.com/dart-lang/sdk/issues/56311#issuecomment-2250089185
596-
var completer = Completer<void>();
597-
598-
// Pause handling incoming messages until `operation` completes.
599-
//
600-
// If this method is called multiple times, the pauses will stack, meaning
601-
// the subscription will not resume until all operations complete.
602-
_channelSubscription.pause(completer.future);
603-
604-
try {
605-
// `await` here is important to ensure `finally` doesn't execute until
606-
// `operation()` completes (`whenComplete` is not available on
607-
// `FutureOr`).
608-
return await operation();
609-
} finally {
610-
completer.complete();
611-
}
612-
}
613-
614561
/// Logs the error on the client using window/logMessage.
615562
void logErrorToClient(String message) {
616563
channel.sendNotification(
@@ -1304,15 +1251,6 @@ class LspServerContextManagerCallbacks
13041251
}
13051252
}
13061253

1307-
@override
1308-
void handleFileResult(FileResult result) {
1309-
if (analysisServer.suppressAnalysisResults) {
1310-
return;
1311-
}
1312-
1313-
super.handleFileResult(result);
1314-
}
1315-
13161254
@override
13171255
void handleResolvedUnitResult(ResolvedUnitResult result) {
13181256
var path = result.path;

pkg/analysis_server/lib/src/lsp/temporary_overlay_operation.dart

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,8 @@
44

55
import 'dart:async';
66

7+
import 'package:analysis_server/src/analysis_server.dart';
78
import 'package:analysis_server/src/context_manager.dart';
8-
import 'package:analysis_server/src/lsp/lsp_analysis_server.dart';
99
import 'package:analysis_server/src/protocol_server.dart';
1010
import 'package:analyzer/dart/analysis/analysis_context.dart';
1111
import 'package:analyzer/file_system/overlay_file_system.dart';
@@ -18,7 +18,7 @@ import 'package:analyzer/src/dart/analysis/driver.dart';
1818
/// need to be merged together (to be mappable to LSP document changes) and then
1919
/// reverted to allow the client to apply the change.
2020
abstract class TemporaryOverlayOperation {
21-
final LspAnalysisServer server;
21+
final AnalysisServer server;
2222
final ContextManager contextManager;
2323
final OverlayResourceProvider resourceProvider;
2424

@@ -78,16 +78,16 @@ abstract class TemporaryOverlayOperation {
7878
}
7979

8080
/// Locks the server from processing incoming messages until [operation]
81-
/// completes just like [LspAnalysisServer.lockRequestsWhile] but
81+
/// completes just like [AnalysisServer.pauseSchedulerWhile] but
8282
/// additionally provides a function for writing temporary overlays that will
8383
/// be reverted when the operation completes.
8484
///
8585
/// Additionally, sending diagnostics, outlines, etc. are suppressed by the
8686
/// temporary overlays and re-enabled after the overlays are restored.
87-
Future<T> lockRequestsWithTemporaryOverlays<T>(
87+
Future<T> pauseSchedulerWithTemporaryOverlays<T>(
8888
Future<T> Function() operation,
8989
) {
90-
return server.lockRequestsWhile(() async {
90+
return server.pauseSchedulerWhile(() async {
9191
// Wait for any in-progress analysis to complete before we start
9292
// suppressing analysis results.
9393
server.contextManager.pauseWatchers();

pkg/analysis_server/lib/src/scheduler/message_scheduler.dart

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,11 @@ final class MessageScheduler {
4545
/// Whether the [MessageScheduler] is currently processing messages.
4646
bool _isProcessing = false;
4747

48+
/// The number of times [pause] has been called without matching [resume]s.
49+
///
50+
/// If zero, the queue is not paused.
51+
int _pauseCount = 0;
52+
4853
/// The completer used to indicate that message handling has been completed.
4954
Completer<void> _completer = Completer();
5055

@@ -58,6 +63,9 @@ final class MessageScheduler {
5863
/// around.
5964
MessageScheduler({required this.listener});
6065

66+
/// Whether the queue is currently paused.
67+
bool get isPaused => _pauseCount > 0;
68+
6169
/// Add the [message] to the end of the pending messages queue.
6270
///
6371
/// Some incoming messages are handled immediately rather than being added to
@@ -177,12 +185,28 @@ final class MessageScheduler {
177185
}
178186
}
179187

188+
/// Pauses processing messages.
189+
///
190+
/// Any messages that are already being processed will continue until they
191+
/// complete, but no new messages will be processed.
192+
///
193+
/// If this method is called multiple times, [resume] will need to be called
194+
/// an equal number of times for processing to continue.
195+
void pause() {
196+
_pauseCount++;
197+
listener?.pauseProcessingMessages(_pauseCount);
198+
}
199+
180200
/// Dispatch the first message in the queue to be executed.
181201
void processMessages() async {
202+
if (isPaused) {
203+
return;
204+
}
205+
182206
_isProcessing = true;
183207
listener?.startProcessingMessages();
184208
try {
185-
while (_pendingMessages.isNotEmpty) {
209+
while (_pendingMessages.isNotEmpty && !isPaused) {
186210
var currentMessage = _pendingMessages.removeFirst();
187211
_activeMessages.addLast(currentMessage);
188212
listener?.addActiveMessage(currentMessage);
@@ -249,6 +273,21 @@ final class MessageScheduler {
249273
listener?.endProcessingMessages();
250274
}
251275

276+
/// Resumes processing messages.
277+
void resume() {
278+
if (!isPaused) {
279+
throw StateError('Cannot resume if not paused');
280+
}
281+
_pauseCount--;
282+
listener?.resumeProcessingMessages(_pauseCount);
283+
if (!isPaused && !_isProcessing) {
284+
// Process on the next tick so that the caller to resume() doesn't get
285+
// messages in the queue attributed to their time (or run before they
286+
// complete).
287+
Future.delayed(Duration.zero, processMessages);
288+
}
289+
}
290+
252291
/// Returns the parameters of a cancellation [message].
253292
lsp.CancelParams? _getCancelParams(lsp.NotificationMessage message) {
254293
try {
@@ -477,6 +516,14 @@ abstract class MessageSchedulerListener {
477516
/// This implies that the message was active and wasn't cancelled.
478517
void messageCompleted(ScheduledMessage message);
479518

519+
/// Report that the pause counter was increased to [newPauseCount], and that
520+
/// processing will be paused.
521+
void pauseProcessingMessages(int newPauseCount);
522+
523+
/// Report that the pause counter was decreased to [newPauseCount] which, if
524+
/// zero, indicates processing will resume.
525+
void resumeProcessingMessages(int newPauseCount);
526+
480527
/// Report that the loop that processes messages has started to run.
481528
void startProcessingMessages();
482529
}

pkg/analysis_server/lib/src/scheduler/scheduler_tracking_listener.dart

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -113,23 +113,26 @@ class SchedulerTrackingListener extends MessageSchedulerListener {
113113

114114
@override
115115
void cancelActiveMessage(ScheduledMessage message) {
116-
var messageData = _messageDataMap.remove(message);
116+
var messageData = _messageDataMap[message];
117117
if (messageData == null) {
118118
return;
119119
}
120120
messageData.completeTime = _now;
121121
messageData.wasCancelled = true;
122-
_activeMessageCount--;
123-
_reportMessageData(messageData);
122+
// Don't decrement counts or report message data yet because
123+
// cancelled messages still complete and call messageCompleted().
124124
}
125125

126126
@override
127127
void cancelPendingMessage(ScheduledMessage message) {
128-
var messageData = _messageDataMap.remove(message)!;
128+
var messageData = _messageDataMap[message];
129+
if (messageData == null) {
130+
return;
131+
}
129132
messageData.completeTime = _now;
130133
messageData.wasCancelled = true;
131-
_pendingMessageCount--;
132-
_reportMessageData(messageData);
134+
// Don't decrement counts or report message data yet because
135+
// cancelled messages still complete and call messageCompleted().
133136
}
134137

135138
/// Report that the loop that processes messages has stopped running.
@@ -148,6 +151,16 @@ class SchedulerTrackingListener extends MessageSchedulerListener {
148151
_reportMessageData(messageData);
149152
}
150153

154+
@override
155+
void pauseProcessingMessages(int newPauseCount) {
156+
// TODO(dantup): Consider tracking the pause start time if newPauseCount=1.
157+
}
158+
159+
@override
160+
void resumeProcessingMessages(int newPauseCount) {
161+
// TODO(dantup): Consider recording the pause duration if newPauseCount=0.
162+
}
163+
151164
@override
152165
void startProcessingMessages() {
153166
processingStartTime = _now;

0 commit comments

Comments
 (0)