Skip to content

Commit ca42a73

Browse files
committed
Implement more efficient publishing scheduler
1 parent a3c6a9f commit ca42a73

File tree

1 file changed

+251
-11
lines changed

1 file changed

+251
-11
lines changed
Lines changed: 251 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,36 +1,276 @@
11
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
22
// SPDX-License-Identifier: Apache-2.0
33

4+
import 'dart:async';
5+
import 'dart:io';
6+
47
import 'package:aft/aft.dart';
58

9+
// ANSI escape codes for terminal colors.
10+
const _yellow = '\x1B[33m';
11+
const _green = '\x1B[32m';
12+
const _reset = '\x1B[0m';
13+
14+
/// Initial delay before the first "still alive" reminder is printed.
15+
final _initialAliveMessageDelay =
16+
Platform.environment.containsKey('QA_DURATIONS')
17+
? const Duration(seconds: 5)
18+
: const Duration(minutes: 5);
19+
20+
/// Interval between subsequent "still alive" reminders.
21+
final _aliveMessageReminderInterval =
22+
Platform.environment.containsKey('QA_DURATIONS')
23+
? const Duration(seconds: 5)
24+
: const Duration(minutes: 5);
25+
626
/// A function that publishes a single package (e.g. pre-publish checks +
727
/// actual publish).
828
typedef PublishPackageFn = Future<void> Function(PackageInfo package);
929

10-
/// Schedules and executes the publishing of a list of packages in order.
11-
///
12-
/// Packages are published sequentially. After each package is published the
13-
/// scheduler polls for a pending analysis on pub.dev and waits until the
14-
/// analysis is complete before proceeding to the next package.
30+
/// Schedules and executes the publishing of a list of packages in dependency
31+
/// order.
1532
class PublishScheduler {
1633
PublishScheduler({
1734
required List<PackageInfo> packages,
1835
required PublishPackageFn publishPackage,
1936
required AmplifyCommand command,
20-
}) : _packages = packages,
21-
_publishPackage = publishPackage,
22-
_command = command;
37+
}) : _packages = packages,
38+
_publishPackage = publishPackage,
39+
_command = command;
2340

2441
final List<PackageInfo> _packages;
2542
final PublishPackageFn _publishPackage;
2643
final AmplifyCommand _command;
2744

28-
/// Publishes all packages sequentially, awaiting pending analysis between
29-
/// each one.
45+
/// Packages that are ready to be published (all in-process deps done).
46+
final List<PackageInfo> _packagePublishQueue = [];
47+
48+
/// Signalled whenever a new package is added to [_packagePublishQueue].
49+
Completer<void> _queueNotEmpty = Completer<void>();
50+
51+
/// Builds the initial adjacency lists.
52+
Map<String, List<String>> _buildAdjacencyLists() {
53+
final publishNames = _packages.map((p) => p.name).toSet();
54+
return {
55+
for (final package in _packages)
56+
package.name: [
57+
...package.pubspecInfo.pubspec.dependencies.keys.where(
58+
publishNames.contains,
59+
),
60+
...package.pubspecInfo.pubspec.devDependencies.keys.where(
61+
publishNames.contains,
62+
),
63+
],
64+
};
65+
}
66+
67+
/// Prints the current publish queue to [stdout] if non-empty.
68+
void _printPublishQueue() {
69+
if (_packagePublishQueue.isEmpty) return;
70+
stdout
71+
..writeln('${_green}Publish queue (ready to publish):')
72+
..writeln(
73+
'${_packagePublishQueue.map((p) => ' - ${p.name} (${p.version})').join('\n')}'
74+
'$_reset',
75+
);
76+
}
77+
78+
/// Removes [publishedPackageName] from every adjacency list and enqueues
79+
/// any package whose dependency list is now empty.
80+
void _markAnalysisComplete(
81+
Map<String, List<String>> adjacencyLists,
82+
String publishedPackageName,
83+
Set<String> published,
84+
Set<String> enqueued,
85+
) {
86+
for (final entry in adjacencyLists.entries) {
87+
if (entry.key == publishedPackageName) continue;
88+
entry.value.remove(publishedPackageName);
89+
}
90+
91+
// Enqueue packages whose in-process dependencies are now all done.
92+
for (final candidate in _packages) {
93+
if (published.contains(candidate.name)) continue;
94+
if (enqueued.contains(candidate.name)) continue;
95+
if (adjacencyLists[candidate.name]?.isEmpty ?? true) {
96+
_enqueue(candidate, enqueued);
97+
}
98+
}
99+
}
100+
101+
/// Waits for all [pendingAnalyses] to finish, logging progress as each
102+
/// one completes.
103+
Future<void> _awaitPendingAnalyses(
104+
Map<String, Future<void>> pendingAnalyses,
105+
) async {
106+
if (pendingAnalyses.isNotEmpty) {
107+
// Track which analyses are still running.
108+
final remainingNames = Set<String>.of(pendingAnalyses.keys);
109+
final analysisComplete = Completer<void>();
110+
111+
String timestamp() => DateTime.now().toIso8601String().split('.').first;
112+
113+
stdout.writeln(
114+
'${_yellow}Still awaiting: '
115+
'${remainingNames.join(', ')}$_reset',
116+
);
117+
118+
// Attach a callback to each pending analysis that removes it from
119+
// the remaining set and logs progress.
120+
for (final entry in pendingAnalyses.entries) {
121+
unawaited(
122+
entry.value.whenComplete(() {
123+
remainingNames.remove(entry.key);
124+
if (remainingNames.isNotEmpty) {
125+
stdout.writeln(
126+
'$_yellow[${timestamp()}] Still awaiting: '
127+
'${remainingNames.join(', ')}$_reset',
128+
);
129+
} else {
130+
stdout.writeln('All analyses complete.');
131+
analysisComplete.complete();
132+
}
133+
}),
134+
);
135+
}
136+
137+
// Print periodic reminders while waiting, using the same timing as
138+
// _waitForQueueWork (10 s initial delay, then every 5 s).
139+
unawaited(() async {
140+
await Future<void>.delayed(_initialAliveMessageDelay);
141+
while (!analysisComplete.isCompleted) {
142+
if (remainingNames.isNotEmpty) {
143+
stdout.writeln(
144+
'$_yellow[${timestamp()}] Still waiting for analyses to finish..$_reset',
145+
);
146+
}
147+
await Future<void>.delayed(_aliveMessageReminderInterval);
148+
}
149+
}());
150+
151+
await analysisComplete.future;
152+
} else {
153+
stdout.writeln('All analyses complete.');
154+
}
155+
}
156+
157+
/// Waits for the publish queue to receive new work by logging the
158+
/// currently pending analyses and then awaiting [_queueNotEmpty].
159+
Future<void> _waitForQueueWork(
160+
Map<String, Future<void>> pendingAnalyses,
161+
Set<String> completedAnalyses,
162+
) async {
163+
final pendingList = pendingAnalyses.keys
164+
.where((name) => !completedAnalyses.contains(name))
165+
.map((name) {
166+
final pkg = _packages.firstWhere((p) => p.name == name);
167+
return ' - $name (${pkg.version})';
168+
})
169+
.join('\n');
170+
stdout.writeln(
171+
'$_yellow'
172+
'Queue empty – waiting for one of these analyses to finish..\n'
173+
'$pendingList'
174+
'$_reset',
175+
);
176+
// Reset the completer so we can await the next signal.
177+
_queueNotEmpty = Completer<void>();
178+
179+
// Wait for the queue to become non-empty. Print periodic reminders
180+
// so the process appears alive.
181+
final stopwatch = Stopwatch()..start();
182+
183+
var nextReminder = _initialAliveMessageDelay;
184+
while (!_queueNotEmpty.isCompleted) {
185+
final timeout = nextReminder - stopwatch.elapsed;
186+
if (timeout <= Duration.zero) {
187+
// Already past the reminder point – print immediately.
188+
final now = DateTime.now().toIso8601String().split('.').first;
189+
stdout.writeln(
190+
'$_yellow[$now] Still waiting for analyses to finish..$_reset',
191+
);
192+
nextReminder += _aliveMessageReminderInterval;
193+
continue;
194+
}
195+
// Race: either the queue gets work, or we hit the next reminder.
196+
await Future.any([_queueNotEmpty.future, Future<void>.delayed(timeout)]);
197+
if (_queueNotEmpty.isCompleted) break;
198+
// Timeout fired – loop back to print the reminder.
199+
}
200+
}
201+
202+
/// Adds [package] to the publish queue and signals the main loop.
203+
void _enqueue(PackageInfo package, Set<String> enqueued) {
204+
_packagePublishQueue.add(package);
205+
enqueued.add(package.name);
206+
// Wake up the main loop if it is waiting for work.
207+
if (!_queueNotEmpty.isCompleted) {
208+
_queueNotEmpty.complete();
209+
}
210+
}
211+
212+
/// Publishes all packages, processing the queue without blocking on
213+
/// analysis.
30214
Future<void> run() async {
215+
final adjacencyLists = _buildAdjacencyLists();
216+
217+
/// Tracks which packages have been published (publish call returned).
218+
final published = <String>{};
219+
220+
/// Tracks which packages have been enqueued (to avoid double-enqueue).
221+
final enqueued = <String>{};
222+
223+
/// In-flight analysis futures keyed by package name.
224+
final pendingAnalyses = <String, Future<void>>{};
225+
226+
/// Analyses that have already completed (during the main publish loop).
227+
final completedAnalyses = <String>{};
228+
229+
// Seed the queue with packages that already have no in-process
230+
// dependencies.
31231
for (final package in _packages) {
232+
if (adjacencyLists[package.name]?.isEmpty ?? true) {
233+
_enqueue(package, enqueued);
234+
}
235+
}
236+
237+
while (published.length < _packages.length) {
238+
stdout.writeln('');
239+
_printPublishQueue();
240+
241+
// If the queue is empty, wait for an in-flight analysis to unlock
242+
// new packages.
243+
if (_packagePublishQueue.isEmpty) {
244+
await _waitForQueueWork(pendingAnalyses, completedAnalyses);
245+
continue;
246+
}
247+
248+
// Take the next ready package off the queue.
249+
final package = _packagePublishQueue.removeAt(0);
250+
251+
// Publish it (this includes pre-publish + actual publish).
32252
await _publishPackage(package);
33-
await _command.awaitPendingAnalysis(package.name);
253+
published.add(package.name);
254+
255+
// Kick off analysis in the background. When it completes, update
256+
// the adjacency lists and potentially enqueue new packages.
257+
final packageName = package.name;
258+
final analysisFuture = _command.awaitPendingAnalysis(packageName).then((
259+
_,
260+
) {
261+
_markAnalysisComplete(adjacencyLists, packageName, published, enqueued);
262+
completedAnalyses.add(packageName);
263+
});
264+
pendingAnalyses[packageName] = analysisFuture;
34265
}
266+
267+
stdout.writeln('');
268+
269+
// All packages have been published. Wait for any remaining in-flight
270+
// analyses to finish so the run is fully complete.
271+
// Remove analyses that already completed during the main loop.
272+
pendingAnalyses.removeWhere((name, _) => completedAnalyses.contains(name));
273+
stdout.writeln('All packages published.');
274+
await _awaitPendingAnalyses(pendingAnalyses);
35275
}
36276
}

0 commit comments

Comments
 (0)