Skip to content

Commit b7f7a99

Browse files
committed
Updated ApiExported and added tests
1 parent 98b1dfe commit b7f7a99

File tree

11 files changed

+761
-503
lines changed

11 files changed

+761
-503
lines changed
Lines changed: 247 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,247 @@
1+
// Copyright (c) 2023, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
import 'dart:async';
6+
7+
import 'package:clock/clock.dart';
8+
import 'package:gcloud/service_scope.dart' as ss;
9+
import 'package:gcloud/storage.dart';
10+
import 'package:logging/logging.dart';
11+
import 'package:pub_dev/service/security_advisories/backend.dart';
12+
import 'package:pub_dev/shared/parallel_foreach.dart';
13+
14+
import '../../search/backend.dart';
15+
import '../../shared/datastore.dart';
16+
import '../../shared/versions.dart';
17+
import '../../task/global_lock.dart';
18+
import '../backend.dart';
19+
import '../models.dart';
20+
import 'exported_api.dart';
21+
22+
final Logger _log = Logger('api_exporter');
23+
24+
/// Sets the API Exporter service.
25+
void registerApiExporter(ApiExporter value) =>
26+
ss.register(#_apiExporter, value);
27+
28+
/// The active API Exporter service or null if it hasn't been initialized.
29+
ApiExporter? get apiExporter => ss.lookup(#_apiExporter) as ApiExporter?;
30+
31+
const _concurrency = 30;
32+
33+
class ApiExporter {
34+
final ExportedApi _api;
35+
36+
/// If [stop] has been called to stop background processes.
37+
///
38+
/// `null` when not started yet, or we have been fully stopped.
39+
Completer<void>? _aborted;
40+
41+
/// If background processes created by [start] have stopped.
42+
///
43+
/// This won't be resolved if [start] has not been called!
44+
/// `null` when not started yet.
45+
Completer<void>? _stopped;
46+
47+
ApiExporter({
48+
required Bucket bucket,
49+
}) : _api = ExportedApi(storageService, bucket);
50+
51+
/// Start continuous background processes for scheduling of tasks.
52+
///
53+
/// Calling [start] without first calling [stop] is an error.
54+
Future<void> start() async {
55+
if (_aborted != null) {
56+
throw StateError('ApiExporter.start() has already been called!');
57+
}
58+
// Note: During testing we call [start] and [stop] in a [FakeAsync.run],
59+
// this only works because the completers are created here.
60+
// If we create the completers in the constructor which gets called
61+
// outside [FakeAsync.run], then this won't work.
62+
// In the future we hopefully support running the entire service using
63+
// FakeAsync, but this point we rely on completers being created when
64+
// [start] is called -- and not in the [ApiExporter] constructor.
65+
final aborted = _aborted = Completer();
66+
final stopped = _stopped = Completer();
67+
68+
// Start scanning for packages to be tracked
69+
scheduleMicrotask(() async {
70+
try {
71+
// Create a lock for task scheduling, so tasks
72+
final lock = GlobalLock.create(
73+
'$runtimeVersion/package/scan-sync-export-api',
74+
expiration: Duration(minutes: 25),
75+
);
76+
77+
while (!aborted.isCompleted) {
78+
// Acquire the global lock and scan for package changes while lock is
79+
// valid.
80+
try {
81+
await lock.withClaim((claim) async {
82+
await _scanForPackageUpdates(claim, abort: aborted);
83+
}, abort: aborted);
84+
} catch (e, st) {
85+
// Log this as very bad, and then move on. Nothing good can come
86+
// from straight up stopping.
87+
_log.shout(
88+
'scanning failed (will retry when lock becomes free)',
89+
e,
90+
st,
91+
);
92+
// Sleep 5 minutes to reduce risk of degenerate behavior
93+
await Future.delayed(Duration(minutes: 5));
94+
}
95+
}
96+
} catch (e, st) {
97+
_log.severe('scanning loop crashed', e, st);
98+
} finally {
99+
_log.info('scanning loop stopped');
100+
// Report background processes as stopped
101+
stopped.complete();
102+
}
103+
});
104+
}
105+
106+
/// Stop any background process that may be running.
107+
///
108+
/// Calling this method is always safe.
109+
Future<void> stop() async {
110+
final aborted = _aborted;
111+
if (aborted == null) {
112+
return;
113+
}
114+
if (!aborted.isCompleted) {
115+
aborted.complete();
116+
}
117+
await _stopped!.future;
118+
_aborted = null;
119+
_stopped = null;
120+
}
121+
122+
/// Gets and uploads the package name completion data.
123+
Future<void> synchronizePackageNameCompletionData() async {
124+
await _api.packageNameCompletionData.write(
125+
await searchBackend.getPackageNameCompletionData(),
126+
);
127+
}
128+
129+
/// Synchronize all exported API.
130+
///
131+
/// This is intended to be scheduled from a daily background task.
132+
Future<void> synchronizeExportedApi() async {
133+
final allPackageNames = <String>{};
134+
final packageQuery = dbService.query<Package>();
135+
await packageQuery.run().parallelForEach(_concurrency, (pkg) async {
136+
final name = pkg.name!;
137+
if (pkg.isNotVisible) {
138+
return;
139+
}
140+
allPackageNames.add(name);
141+
142+
// TODO: Consider retries around all this logic
143+
await synchronizePackage(name);
144+
});
145+
146+
await synchronizePackageNameCompletionData();
147+
148+
await _api.garbageCollect(allPackageNames);
149+
}
150+
151+
/// Sync package and into [ExportedApi], this will synchronize package into
152+
/// [ExportedApi].
153+
///
154+
/// This method will update [ExportedApi] ensuring:
155+
/// * Version listing for [package] is up-to-date,
156+
/// * Advisories for [package] is up-to-date,
157+
/// * Tarballs for each version of [package] is up-to-date,
158+
/// * Delete tarballs from old versions that no-longer exist.
159+
///
160+
/// This is intended when:
161+
/// * Running a full background synchronization.
162+
/// * When a change in [Package.updated] is detected.
163+
/// * A package is moderated, or other admin action is applied.
164+
Future<void> synchronizePackage(String package) async {
165+
// TODO: Handle the case where [package] is deleted or invisible!
166+
// TODO: We may need to delete the package, but only if it's not too recent!
167+
final versionListing = await packageBackend.listVersions(package);
168+
// TODO: Consider skipping the cache when fetching security advisories
169+
final advisories = await securityAdvisoryBackend.listAdvisoriesResponse(
170+
package,
171+
);
172+
173+
final versions = await packageBackend.tarballStorage
174+
.listVersionsInCanonicalBucket(package);
175+
176+
// Remove versions that are not exposed in the public API.
177+
versions.removeWhere(
178+
(version, _) => !versionListing.versions.any((v) => v.version == version),
179+
);
180+
181+
await _api.package(package).synchronizeTarballs(versions);
182+
await _api.package(package).advisories.write(advisories);
183+
await _api.package(package).versions.write(versionListing);
184+
}
185+
186+
/// Scan for updates from packages until [abort] is resolved, or [claim]
187+
/// is lost.
188+
Future<void> _scanForPackageUpdates(
189+
GlobalLockClaim claim, {
190+
Completer<void>? abort,
191+
}) async {
192+
abort ??= Completer<void>();
193+
194+
// Map from package to updated that has been seen.
195+
final seen = <String, DateTime>{};
196+
197+
// We will schedule longer overlaps every 6 hours.
198+
var nextLongScan = clock.fromNow(hours: 6);
199+
200+
// In theory 30 minutes overlap should be enough. In practice we should
201+
// allow an ample room for missed windows, and 3 days seems to be large enough.
202+
var since = clock.ago(days: 3);
203+
while (claim.valid && !abort.isCompleted) {
204+
// Look at all packages changed in [since]
205+
final q = dbService.query<Package>()
206+
..filter('updated >', since)
207+
..order('-updated');
208+
209+
if (clock.now().isAfter(nextLongScan)) {
210+
// Next time we'll do a longer scan
211+
since = clock.ago(days: 1);
212+
nextLongScan = clock.fromNow(hours: 6);
213+
} else {
214+
// Next time we'll only consider changes since now - 30 minutes
215+
since = clock.ago(minutes: 30);
216+
}
217+
218+
// Look at all packages that has changed
219+
await for (final p in q.run()) {
220+
// Abort, if claim is invalid or abort has been resolved!
221+
if (!claim.valid || abort.isCompleted) {
222+
return;
223+
}
224+
225+
// Check if the [updated] timestamp has been seen before.
226+
// If so, we skip checking it!
227+
final lastSeen = seen[p.name!];
228+
if (lastSeen != null && lastSeen.toUtc() == p.updated!.toUtc()) {
229+
continue;
230+
}
231+
// Remember the updated time for this package, so we don't check it
232+
// again...
233+
seen[p.name!] = p.updated!;
234+
235+
// Check the package
236+
await synchronizePackage(p.name!);
237+
}
238+
239+
// Cleanup the [seen] map for anything older than [since], as this won't
240+
// be relevant to the next iteration.
241+
seen.removeWhere((_, updated) => updated.isBefore(since));
242+
243+
// Wait until aborted or 10 minutes before scanning again!
244+
await abort.future.timeout(Duration(minutes: 10), onTimeout: () => null);
245+
}
246+
}
247+
}

0 commit comments

Comments
 (0)