Skip to content

Commit 0b66f75

Browse files
committed
RFC: Options to SQL-migration.
1 parent 32ade8b commit 0b66f75

File tree

2 files changed

+242
-36
lines changed

2 files changed

+242
-36
lines changed

app/lib/shared/sql_datastore.dart

Lines changed: 143 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,143 @@
1+
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
2+
// for details. All rights reserved. Use of this source code is governed by a
3+
// BSD-style license that can be found in the LICENSE file.
4+
5+
// ignore_for_file: non_constant_identifier_names, unused_element
6+
7+
import 'dart:convert';
8+
9+
import 'package:gcloud/common.dart';
10+
import 'package:gcloud/datastore.dart';
11+
12+
/// WARNING: This class is not for production use yet.
13+
class SqlDatastore implements Datastore {
14+
@override
15+
Future<List<Key>> allocateIds(List<Key> keys) async {
16+
throw UnimplementedError();
17+
}
18+
19+
@override
20+
Future<Transaction> beginTransaction({bool crossEntityGroup = false}) async {
21+
return _Transaction();
22+
}
23+
24+
@override
25+
Future rollback(Transaction transaction) async {
26+
// no-op
27+
}
28+
29+
@override
30+
Future<List<Entity?>> lookup(
31+
List<Key> keys, {
32+
Transaction? transaction,
33+
}) async {
34+
return await Future.wait(
35+
keys.map((k) async => await _lookup(k, transaction)));
36+
}
37+
38+
Future<Entity?> _lookup(Key key, Transaction? transaction) async {
39+
if (_isPackageState(key)) {
40+
// final stateId = key.elements.single.id as String;
41+
// TODO: store in the transaction object that the entry exists
42+
//
43+
// TODO: if (transaction != null) also add FOR UPDATE clause
44+
// final row = await _db.packageStates.byKey(stateId: stateId);
45+
//
46+
// return row == null ? : _packageStateRowToEntity(row);
47+
}
48+
throw UnimplementedError();
49+
}
50+
51+
@override
52+
Future<Page<Entity>> query(
53+
Query query, {
54+
Partition? partition,
55+
Transaction? transaction,
56+
}) async {
57+
if (query.kind == 'PackageState') {
58+
// TODO: build query
59+
// TODO: run query with limit + offset
60+
// TODO: transform rows + build nextFn
61+
// TODO: store in the transaction object that the entry exists
62+
}
63+
throw UnimplementedError();
64+
}
65+
66+
@override
67+
Future<CommitResult> commit({
68+
List<Entity> inserts = const [],
69+
List<Entity> autoIdInserts = const [],
70+
List<Key> deletes = const [],
71+
Transaction? transaction,
72+
}) async {
73+
if (autoIdInserts.isNotEmpty) {
74+
throw UnimplementedError();
75+
}
76+
for (final key in deletes) {
77+
if (_isPackageState(key)) {
78+
// final stateId = key.elements.single.id as String;
79+
// await _db.packageStates.delete(state_id: stateId).execute();
80+
continue;
81+
}
82+
throw UnimplementedError();
83+
}
84+
85+
for (final entity in inserts) {
86+
if (_isPackageState(entity.key)) {
87+
// final stateId = key.elements.single.id as String;
88+
// TODO: use the transaction object if the entity exists or use lookup or upsert
89+
// await _db.packageStates.insert(stateId: stateId, ...).execute();
90+
continue;
91+
}
92+
throw UnimplementedError();
93+
}
94+
95+
throw UnimplementedError();
96+
}
97+
}
98+
99+
class _Transaction implements Transaction {}
100+
101+
class _Page implements Page<Entity> {
102+
@override
103+
final List<Entity> items;
104+
105+
@override
106+
final bool isLast;
107+
108+
final Future<Page<Entity>> Function({int? pageSize}) _nextFn;
109+
110+
_Page(this.items, this.isLast, this._nextFn);
111+
112+
@override
113+
Future<Page<Entity>> next({int? pageSize}) async {
114+
return _nextFn(pageSize: pageSize);
115+
}
116+
}
117+
118+
bool _isPackageState(Key key) {
119+
return (key.elements.length == 1 &&
120+
key.elements.single.kind == 'PackageState');
121+
}
122+
123+
Entity _packageStateRowToEntity(PackageStateRow row) {
124+
return Entity(
125+
Key([
126+
KeyElement('PackageState', '${row.runtime_version}/${row.package}')
127+
]),
128+
{
129+
'runtimeVersion': row.runtime_version,
130+
'versions': json.encode(row.versions_blob),
131+
'abortedTokens': null,
132+
'pendingAt': null,
133+
'dependencies': null,
134+
'lastDependencyChanged': null,
135+
'finished': null,
136+
});
137+
}
138+
139+
abstract class PackageStateRow {
140+
String get runtime_version;
141+
String get package;
142+
Map<String, dynamic> get versions_blob;
143+
}

app/lib/task/backend.dart

Lines changed: 99 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,8 @@ import 'package:pub_dev/shared/versions.dart'
3434
gcBeforeRuntimeVersion,
3535
shouldGCVersion,
3636
acceptedRuntimeVersions;
37+
import 'package:pub_dev/shared/versions.dart' as shared_versions
38+
show runtimeVersion;
3739
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
3840
import 'package:pub_dev/task/global_lock.dart';
3941
import 'package:pub_dev/task/handlers.dart';
@@ -80,6 +82,9 @@ class TaskBackend {
8082
final DatastoreDB _db;
8183
final Bucket _bucket;
8284

85+
late final _packages = PackageDataAcccess(_db);
86+
late final _tasks = TaskDataAccess(_db);
87+
8388
/// If [stop] has been called to stop background processes.
8489
///
8590
/// `null` when not started yet, or we have been fully stopped.
@@ -234,14 +239,13 @@ class TaskBackend {
234239
StackTrace? stackTrace;
235240

236241
// For each package we should ensure state is tracked
237-
final pq = _db.query<Package>();
238-
await for (final p in pq.run()) {
239-
packageNames.add(p.name!);
242+
await for (final p in _packages.listPackageNamesToTrack()) {
243+
packageNames.add(p.name);
240244

241245
scheduleMicrotask(() async {
242246
await pool.withResource(() async {
243247
try {
244-
await trackPackage(p.name!, updateDependents: false);
248+
await trackPackage(p.name, updateDependents: false);
245249
} catch (e, st) {
246250
_log.severe('failed to track state for "${p.name}"', e, st);
247251
if (error == null) {
@@ -254,20 +258,15 @@ class TaskBackend {
254258
}
255259

256260
// Check that all [PackageState] entities have a matching [Package] entity.
257-
final sq = _db.query<PackageState>()
258-
..filter('runtimeVersion =', runtimeVersion);
259-
260-
await for (final state in sq.run()) {
261+
await for (final state in _tasks.listForCurrentRuntime()) {
261262
if (!packageNames.contains(state.package)) {
262263
final r = await pool.request();
263264

264265
scheduleMicrotask(() async {
265266
try {
266267
// Lookup the package to ensure it really doesn't exist
267-
final packageKey = _db.emptyKey.append(Package, id: state.package);
268-
final package = await _db.lookupOrNull<Package>(packageKey);
269-
if (package == null) {
270-
await _db.commit(deletes: [state.key]);
268+
if (!await _packages.exists(state.package)) {
269+
await _tasks.delete(state.package);
271270
}
272271
} catch (e, st) {
273272
_log.severe('failed to untrack "${state.package}"', e, st);
@@ -313,9 +312,6 @@ class TaskBackend {
313312
var since = clock.ago(days: 3);
314313
while (claim.valid && !abort.isCompleted) {
315314
// Look at all packages changed in [since]
316-
final q = _db.query<Package>()
317-
..filter('updated >', since)
318-
..order('-updated');
319315

320316
if (clock.now().isAfter(nextLongScan)) {
321317
// Next time we'll do a longer scan
@@ -327,24 +323,24 @@ class TaskBackend {
327323
}
328324

329325
// Look at all packages that has changed
330-
await for (final p in q.run()) {
326+
await for (final p in _packages.listPackagesUpdatedSince(since)) {
331327
// Abort, if claim is invalid or abort has been resolved!
332328
if (!claim.valid || abort.isCompleted) {
333329
return;
334330
}
335331

336332
// Check if the [updated] timestamp has been seen before.
337333
// If so, we skip checking it!
338-
final lastSeen = seen[p.name!];
339-
if (lastSeen != null && lastSeen.toUtc() == p.updated!.toUtc()) {
334+
final lastSeen = seen[p.name];
335+
if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) {
340336
continue;
341337
}
342338
// Remember the updated time for this package, so we don't check it
343339
// again...
344-
seen[p.name!] = p.updated!;
340+
seen[p.name] = p.updated;
345341

346342
// Check the package
347-
await trackPackage(p.name!, updateDependents: true);
343+
await trackPackage(p.name, updateDependents: true);
348344
}
349345

350346
// Cleanup the [seen] map for anything older than [since], as this won't
@@ -505,10 +501,7 @@ class TaskBackend {
505501
/// Garbage collect [PackageState] and results from old runtimeVersions.
506502
Future<void> garbageCollect() async {
507503
// GC the old [PackageState] entities
508-
await _db.deleteWithQuery(
509-
_db.query<PackageState>()
510-
..filter('runtimeVersion <', gcBeforeRuntimeVersion),
511-
);
504+
await _tasks.deleteOldEntries();
512505

513506
// Limit to 50 concurrent deletion requests
514507
final pool = Pool(50);
@@ -572,10 +565,9 @@ class TaskBackend {
572565
//
573566
// We only update [PackageState] to have [lastDependencyChanged], this
574567
// ensures that there is no risk of indefinite propagation.
575-
final q = _db.query<PackageState>()
576-
..filter('dependencies =', package)
577-
..filter('lastDependencyChanged <', publishedAt);
578-
await for (final state in q.run()) {
568+
final stream = _tasks.listWhereDependencyChangedBefore(package,
569+
publishedAt: publishedAt);
570+
await for (final state in stream) {
579571
final r = await pool.request();
580572

581573
// Schedule a microtask that attempts to update [lastDependencyChanged],
@@ -630,8 +622,7 @@ class TaskBackend {
630622
throw AuthenticationException.authenticationRequired();
631623
}
632624

633-
final key = PackageState.createKey(_db, runtimeVersion, package);
634-
final state = await _db.lookupOrNull<PackageState>(key);
625+
final state = await _tasks.lookupState(package);
635626
if (state == null) {
636627
throw NotFoundException.resource('$package/$version');
637628
}
@@ -1004,8 +995,7 @@ class TaskBackend {
1004995
Future<PackageStateInfo> packageStatus(String package) async {
1005996
final status = await cache.taskPackageStatus(package).get(() async {
1006997
for (final rt in acceptedRuntimeVersions) {
1007-
final key = PackageState.createKey(_db, rt, package);
1008-
final state = await _db.lookupOrNull<PackageState>(key);
998+
final state = await _tasks.lookupState(package, runtimeVersion: rt);
1009999
// skip states where the entry was created, but no analysis has not finished yet
10101000
if (state == null || state.hasNeverFinished) {
10111001
continue;
@@ -1076,8 +1066,7 @@ class TaskBackend {
10761066
final cachedValue =
10771067
await cache.latestFinishedVersion(package).get(() async {
10781068
for (final rt in acceptedRuntimeVersions) {
1079-
final key = PackageState.createKey(_db, rt, package);
1080-
final state = await _db.lookupOrNull<PackageState>(key);
1069+
final state = await _tasks.lookupState(package, runtimeVersion: rt);
10811070
// skip states where the entry was created, but no analysis has not finished yet
10821071
if (state == null || state.hasNeverFinished) {
10831072
continue;
@@ -1118,8 +1107,7 @@ class TaskBackend {
11181107
await cache.closestFinishedVersion(package, version).get(() async {
11191108
final semanticVersion = Version.parse(version);
11201109
for (final rt in acceptedRuntimeVersions) {
1121-
final key = PackageState.createKey(_db, rt, package);
1122-
final state = await _db.lookupOrNull<PackageState>(key);
1110+
final state = await _tasks.lookupState(package, runtimeVersion: rt);
11231111
// Skip states where the entry was created, but the analysis has not finished yet.
11241112
if (state == null || state.hasNeverFinished) {
11251113
continue;
@@ -1307,3 +1295,78 @@ List<String> _updatedDependencies(
13071295
.takeWhile((p) => (size += p.length + 1) < 1500)
13081296
.sorted();
13091297
}
1298+
1299+
class PackageDataAcccess {
1300+
final DatastoreDB _db;
1301+
1302+
PackageDataAcccess(this._db);
1303+
1304+
Future<bool> exists(String name) async {
1305+
final p =
1306+
await _db.lookupOrNull<Package>(_db.emptyKey.append(Package, id: name));
1307+
return p != null;
1308+
}
1309+
1310+
Stream<({String name})> listPackageNamesToTrack() async* {
1311+
await for (final p in _db.query<Package>().run()) {
1312+
yield (name: p.name!);
1313+
}
1314+
}
1315+
1316+
Stream<({String name, DateTime updated})> listPackagesUpdatedSince(
1317+
DateTime since) async* {
1318+
final query = _db.query<Package>()
1319+
..filter('updated >', since)
1320+
..order('-updated');
1321+
await for (final p in query.run()) {
1322+
yield (name: p.name!, updated: p.updated!);
1323+
}
1324+
}
1325+
}
1326+
1327+
class TaskDataAccess {
1328+
final DatastoreDB _db;
1329+
1330+
TaskDataAccess(this._db);
1331+
1332+
Future<PackageState?> lookupState(
1333+
String package, {
1334+
String? runtimeVersion,
1335+
}) async {
1336+
final key = PackageState.createKey(
1337+
_db, runtimeVersion ?? shared_versions.runtimeVersion, package);
1338+
return await _db.lookupOrNull<PackageState>(key);
1339+
}
1340+
1341+
Future<void> delete(
1342+
String package, {
1343+
String? runtimeVersion,
1344+
}) async {
1345+
final key = PackageState.createKey(
1346+
_db, runtimeVersion ?? shared_versions.runtimeVersion, package);
1347+
await _db.commit(deletes: [key]);
1348+
}
1349+
1350+
Future<void> deleteOldEntries() async {
1351+
await _db.deleteWithQuery(
1352+
_db.query<PackageState>()
1353+
..filter('runtimeVersion <', gcBeforeRuntimeVersion),
1354+
);
1355+
}
1356+
1357+
Stream<({String package})> listForCurrentRuntime() {
1358+
final query = _db.query<PackageState>()
1359+
..filter('runtimeVersion =', runtimeVersion);
1360+
return query.run().map((s) => (package: s.package));
1361+
}
1362+
1363+
Stream<PackageState> listWhereDependencyChangedBefore(
1364+
String package, {
1365+
required DateTime publishedAt,
1366+
}) {
1367+
final query = _db.query<PackageState>()
1368+
..filter('dependencies =', package)
1369+
..filter('lastDependencyChanged <', publishedAt);
1370+
return query.run();
1371+
}
1372+
}

0 commit comments

Comments
 (0)