Skip to content

Commit 704b8b9

Browse files
authored
Refactor Datastore-related task queries. (#9051)
1 parent 7ec7cab commit 704b8b9

File tree

3 files changed

+150
-128
lines changed

3 files changed

+150
-128
lines changed

app/lib/search/backend.dart

Lines changed: 4 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ import '../shared/storage.dart';
4040
import '../shared/versions.dart';
4141
import '../task/backend.dart';
4242
import '../task/global_lock.dart';
43-
import '../task/models.dart';
4443

4544
import 'models.dart';
4645
import 'result_combiner.dart';
@@ -254,18 +253,12 @@ class SearchBackend {
254253
}
255254
}
256255

257-
final q1 = _db.query<Package>()
258-
..filter('updated >=', updatedThreshold)
259-
..order('-updated');
260-
await for (final p in q1.run()) {
261-
addResult(p.name!, p.updated!);
256+
await for (final e in _db.packages.listUpdatedSince(updatedThreshold)) {
257+
addResult(e.name, e.updated);
262258
}
263259

264-
final q3 = _db.query<PackageState>()
265-
..filter('finished >=', updatedThreshold)
266-
..order('-finished');
267-
await for (final s in q3.run()) {
268-
addResult(s.package, s.finished);
260+
await for (final e in _db.tasks.listFinishedSince(updatedThreshold)) {
261+
addResult(e.package, e.finished);
269262
}
270263

271264
return results;

app/lib/task/backend.dart

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1025,15 +1025,16 @@ class TaskBackend {
10251025
Future<void> Function(Payload payload) processPayload,
10261026
) async {
10271027
await backfillTrackingState();
1028-
await for (final state in _db.tasks.listAll()) {
1028+
await for (final state in _db.tasks.listAllForCurrentRuntime()) {
10291029
final zone = taskWorkerCloudCompute.zones.first;
10301030
// ignore: invalid_use_of_visible_for_testing_member
1031-
final payload = await updatePackageStateWithPendingVersions(
1031+
final updated = await updatePackageStateWithPendingVersions(
10321032
_db,
1033-
state,
1033+
state.package,
10341034
zone,
10351035
taskWorkerCloudCompute.generateInstanceName(),
10361036
);
1037+
final payload = updated?.$1;
10371038
if (payload == null) continue;
10381039
await processPayload(payload);
10391040
}
@@ -1331,10 +1332,6 @@ final class _TaskDataAccess {
13311332
);
13321333
}
13331334

1334-
Stream<PackageState> listAll() {
1335-
return _db.query<PackageState>().run();
1336-
}
1337-
13381335
Stream<({String package})> listAllForCurrentRuntime() async* {
13391336
final query = _db.query<PackageState>()
13401337
..filter('runtimeVersion =', runtimeVersion);
@@ -1343,6 +1340,17 @@ final class _TaskDataAccess {
13431340
}
13441341
}
13451342

1343+
Stream<({String package, DateTime finished})> listFinishedSince(
1344+
DateTime since,
1345+
) async* {
1346+
final query = _db.query<PackageState>()
1347+
..filter('finished >=', since)
1348+
..order('-finished');
1349+
await for (final s in query.run()) {
1350+
yield (package: s.package, finished: s.finished);
1351+
}
1352+
}
1353+
13461354
Stream<({String package})> listDependenciesOfPackage(
13471355
String package,
13481356
DateTime publishedAt,
@@ -1355,6 +1363,17 @@ final class _TaskDataAccess {
13551363
}
13561364
}
13571365

1366+
Stream<({String package})> selectSomePending(int limit) async* {
1367+
final query = _db.query<PackageState>()
1368+
..filter('runtimeVersion =', runtimeVersion)
1369+
..filter('pendingAt <=', clock.now())
1370+
..order('pendingAt')
1371+
..limit(limit);
1372+
await for (final ps in query.run()) {
1373+
yield (package: ps.package);
1374+
}
1375+
}
1376+
13581377
/// Returns whether the entry has been updated.
13591378
Future<bool> updateDependencyChanged(
13601379
String package,

app/lib/task/scheduler.dart

Lines changed: 120 additions & 110 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import 'package:pub_dev/shared/configuration.dart';
1111
import 'package:pub_dev/shared/datastore.dart';
1212
import 'package:pub_dev/shared/utils.dart';
1313
import 'package:pub_dev/shared/versions.dart' show runtimeVersion;
14+
import 'package:pub_dev/task/backend.dart';
1415
import 'package:pub_dev/task/clock_control.dart';
1516
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
1617
import 'package:pub_dev/task/global_lock.dart';
@@ -144,120 +145,122 @@ Future<void> schedule(
144145

145146
// Schedule analysis for some packages
146147
var pendingPackagesReviewed = 0;
148+
final selectLimit = min(
149+
_maxInstancesPerIteration,
150+
max(0, activeConfiguration.maxTaskInstances - instances),
151+
);
147152
await Future.wait(
148-
await (db.query<PackageState>()
149-
..filter('runtimeVersion =', runtimeVersion)
150-
..filter('pendingAt <=', clock.now())
151-
..order('pendingAt')
152-
..limit(
153-
min(
154-
_maxInstancesPerIteration,
155-
max(0, activeConfiguration.maxTaskInstances - instances),
156-
),
157-
))
158-
.run()
159-
.map<Future<void>>((state) async {
160-
pendingPackagesReviewed += 1;
153+
await (db.tasks.selectSomePending(selectLimit)).map<Future<void>>((
154+
selected,
155+
) async {
156+
pendingPackagesReviewed += 1;
157+
158+
final instanceName = compute.generateInstanceName();
159+
final zone = pickZone();
161160

162-
final instanceName = compute.generateInstanceName();
163-
final zone = pickZone();
161+
final updated = await updatePackageStateWithPendingVersions(
162+
db,
163+
selected.package,
164+
zone,
165+
instanceName,
166+
);
167+
final payload = updated?.$1;
168+
if (payload == null) {
169+
return;
170+
}
171+
// Create human readable description for GCP console.
172+
final description =
173+
'package:${payload.package} analysis of ${payload.versions.length} '
174+
'versions.';
164175

165-
final payload = await updatePackageStateWithPendingVersions(
166-
db,
167-
state,
168-
zone,
169-
instanceName,
176+
await Future.microtask(() async {
177+
var rollbackPackageState = true;
178+
try {
179+
// Purging cache is important for the edge case, where the new upload happens
180+
// on a different runtime version, and the current one's cache is still stale
181+
// and does not have the version yet.
182+
// TODO(https://github.com/dart-lang/pub-dev/issues/7268) remove after it gets fixed.
183+
await purgePackageCache(payload.package);
184+
_log.info(
185+
'creating instance $instanceName in $zone for '
186+
'package:${selected.package}',
187+
);
188+
await compute.createInstance(
189+
zone: zone,
190+
instanceName: instanceName,
191+
dockerImage: activeConfiguration.taskWorkerImage!,
192+
arguments: [json.encode(payload)],
193+
description: description,
194+
);
195+
rollbackPackageState = false;
196+
} on ZoneExhaustedException catch (e, st) {
197+
// A zone being exhausted is normal operations, we just use another
198+
// zone for 15 minutes.
199+
_log.info(
200+
'zone resources exhausted, banning ${e.zone} for 30 minutes',
201+
e,
202+
st,
203+
);
204+
// Ban usage of zone for 30 minutes
205+
banZone(e.zone, minutes: 30);
206+
} on QuotaExhaustedException catch (e, st) {
207+
// Quota exhausted, this can happen, but it shouldn't. We'll just stop
208+
// doing anything for 10 minutes. Hopefully that'll resolve the issue.
209+
// We log severe, because this is a reason to adjust the quota or
210+
// instance limits.
211+
_log.severe(
212+
'Quota exhausted trying to create $instanceName, banning all zones '
213+
'for 10 minutes',
214+
e,
215+
st,
170216
);
171-
if (payload == null) {
172-
return;
173-
}
174-
// Create human readable description for GCP console.
175-
final description =
176-
'package:${payload.package} analysis of ${payload.versions.length} '
177-
'versions.';
178217

179-
scheduleMicrotask(() async {
180-
var rollbackPackageState = true;
181-
try {
182-
// Purging cache is important for the edge case, where the new upload happens
183-
// on a different runtime version, and the current one's cache is still stale
184-
// and does not have the version yet.
185-
// TODO(https://github.com/dart-lang/pub-dev/issues/7268) remove after it gets fixed.
186-
await purgePackageCache(payload.package);
187-
_log.info(
188-
'creating instance $instanceName in $zone for '
189-
'package:${state.package}',
190-
);
191-
await compute.createInstance(
192-
zone: zone,
193-
instanceName: instanceName,
194-
dockerImage: activeConfiguration.taskWorkerImage!,
195-
arguments: [json.encode(payload)],
196-
description: description,
197-
);
198-
rollbackPackageState = false;
199-
} on ZoneExhaustedException catch (e, st) {
200-
// A zone being exhausted is normal operations, we just use another
201-
// zone for 15 minutes.
202-
_log.info(
203-
'zone resources exhausted, banning ${e.zone} for 30 minutes',
204-
e,
205-
st,
206-
);
207-
// Ban usage of zone for 30 minutes
208-
banZone(e.zone, minutes: 30);
209-
} on QuotaExhaustedException catch (e, st) {
210-
// Quota exhausted, this can happen, but it shouldn't. We'll just stop
211-
// doing anything for 10 minutes. Hopefully that'll resolve the issue.
212-
// We log severe, because this is a reason to adjust the quota or
213-
// instance limits.
214-
_log.severe(
215-
'Quota exhausted trying to create $instanceName, banning all zones '
216-
'for 10 minutes',
217-
e,
218-
st,
218+
// Ban all zones for 10 minutes
219+
for (final zone in compute.zones) {
220+
banZone(zone, minutes: 10);
221+
}
222+
} on Exception catch (e, st) {
223+
// No idea what happened, but for robustness we'll stop using the zone
224+
// and shout into the logs
225+
_log.shout(
226+
'Failed to create instance $instanceName, banning zone "$zone" for '
227+
'15 minutes',
228+
e,
229+
st,
230+
);
231+
// Ban usage of zone for 15 minutes
232+
banZone(zone, minutes: 15);
233+
} finally {
234+
if (rollbackPackageState) {
235+
final oldVersionsMap = updated?.$2 ?? const {};
236+
// Restore the state of the PackageState for versions that were
237+
// suppose to run on the instance we just failed to create.
238+
// If this doesn't work, we'll eventually retry. Hence, correctness
239+
// does not hinge on this transaction being successful.
240+
await withRetryTransaction(db, (tx) async {
241+
final s = await tx.lookupOrNull<PackageState>(
242+
PackageState.createKey(
243+
db.emptyKey,
244+
runtimeVersion,
245+
selected.package,
246+
),
219247
);
220-
221-
// Ban all zones for 10 minutes
222-
for (final zone in compute.zones) {
223-
banZone(zone, minutes: 10);
248+
if (s == null) {
249+
return; // Presumably, the package was deleted.
224250
}
225-
} on Exception catch (e, st) {
226-
// No idea what happened, but for robustness we'll stop using the zone
227-
// and shout into the logs
228-
_log.shout(
229-
'Failed to create instance $instanceName, banning zone "$zone" for '
230-
'15 minutes',
231-
e,
232-
st,
233-
);
234-
// Ban usage of zone for 15 minutes
235-
banZone(zone, minutes: 15);
236-
} finally {
237-
if (rollbackPackageState) {
238-
// Restore the state of the PackageState for versions that were
239-
// suppose to run on the instance we just failed to create.
240-
// If this doesn't work, we'll eventually retry. Hence, correctness
241-
// does not hinge on this transaction being successful.
242-
await withRetryTransaction(db, (tx) async {
243-
final s = await tx.lookupOrNull<PackageState>(state.key);
244-
if (s == null) {
245-
return; // Presumably, the package was deleted.
246-
}
247251

248-
s.versions!.addEntries(
249-
s.versions!.entries
250-
.where((e) => e.value.instance == instanceName)
251-
.map((e) => MapEntry(e.key, state.versions![e.key]!)),
252-
);
253-
s.derivePendingAt();
254-
tx.insert(s);
255-
});
256-
}
257-
}
258-
});
259-
})
260-
.toList(),
252+
s.versions!.addEntries(
253+
s.versions!.entries
254+
.where((e) => e.value.instance == instanceName)
255+
.map((e) => MapEntry(e.key, oldVersionsMap[e.key]!)),
256+
);
257+
s.derivePendingAt();
258+
tx.insert(s);
259+
});
260+
}
261+
}
262+
});
263+
}).toList(),
261264
);
262265

263266
// If there was no pending packages reviewed, and no instances currently
@@ -281,19 +284,25 @@ Future<void> schedule(
281284

282285
/// Updates the package state with versions that are already pending or
283286
/// will be pending soon.
287+
///
288+
/// Returns the payload and the old status of the state info version map
284289
@visibleForTesting
285-
Future<Payload?> updatePackageStateWithPendingVersions(
290+
Future<(Payload, Map<String, PackageVersionStateInfo>)?>
291+
updatePackageStateWithPendingVersions(
286292
DatastoreDB db,
287-
PackageState state,
293+
String package,
288294
String zone,
289295
String instanceName,
290296
) async {
291297
return await withRetryTransaction(db, (tx) async {
292-
final s = await tx.lookupOrNull<PackageState>(state.key);
298+
final s = await tx.lookupOrNull<PackageState>(
299+
PackageState.createKey(db.emptyKey, runtimeVersion, package),
300+
);
293301
if (s == null) {
294302
// presumably the package was deleted.
295303
return null;
296304
}
305+
final oldVersionsMap = {...?s.versions};
297306

298307
final now = clock.now();
299308
final pendingVersions = s
@@ -354,7 +363,7 @@ Future<Payload?> updatePackageStateWithPendingVersions(
354363
tx.insert(s);
355364

356365
// Create payload
357-
return Payload(
366+
final payload = Payload(
358367
package: s.package,
359368
pubHostedUrl: activeConfiguration.defaultServiceBaseUrl,
360369
versions: pendingVersions.map(
@@ -364,5 +373,6 @@ Future<Payload?> updatePackageStateWithPendingVersions(
364373
),
365374
),
366375
);
376+
return (payload, oldVersionsMap);
367377
});
368378
}

0 commit comments

Comments
 (0)