diff --git a/app/lib/shared/sql_datastore.dart b/app/lib/shared/sql_datastore.dart new file mode 100644 index 0000000000..8d9b8343d8 --- /dev/null +++ b/app/lib/shared/sql_datastore.dart @@ -0,0 +1,143 @@ +// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file +// for details. All rights reserved. Use of this source code is governed by a +// BSD-style license that can be found in the LICENSE file. + +// ignore_for_file: non_constant_identifier_names, unused_element + +import 'dart:convert'; + +import 'package:gcloud/common.dart'; +import 'package:gcloud/datastore.dart'; + +/// WARNING: This class is not for production use yet. +class SqlDatastore implements Datastore { + @override + Future> allocateIds(List keys) async { + throw UnimplementedError(); + } + + @override + Future beginTransaction({bool crossEntityGroup = false}) async { + return _Transaction(); + } + + @override + Future rollback(Transaction transaction) async { + // no-op + } + + @override + Future> lookup( + List keys, { + Transaction? transaction, + }) async { + return await Future.wait( + keys.map((k) async => await _lookup(k, transaction))); + } + + Future _lookup(Key key, Transaction? transaction) async { + if (_isPackageState(key)) { + // final stateId = key.elements.single.id as String; + // TODO: store in the transaction object that the entry exists + // + // TODO: if (transaction != null) also add FOR UPDATE clause + // final row = await _db.packageStates.byKey(stateId: stateId); + // + // return row == null ? : _packageStateRowToEntity(row); + } + throw UnimplementedError(); + } + + @override + Future> query( + Query query, { + Partition? partition, + Transaction? transaction, + }) async { + if (query.kind == 'PackageState') { + // TODO: build query + // TODO: run query with limit + offset + // TODO: transform rows + build nextFn + // TODO: store in the transaction object that the entry exists + } + throw UnimplementedError(); + } + + @override + Future commit({ + List inserts = const [], + List autoIdInserts = const [], + List deletes = const [], + Transaction? transaction, + }) async { + if (autoIdInserts.isNotEmpty) { + throw UnimplementedError(); + } + for (final key in deletes) { + if (_isPackageState(key)) { + // final stateId = key.elements.single.id as String; + // await _db.packageStates.delete(state_id: stateId).execute(); + continue; + } + throw UnimplementedError(); + } + + for (final entity in inserts) { + if (_isPackageState(entity.key)) { + // final stateId = key.elements.single.id as String; + // TODO: use the transaction object if the entity exists or use lookup or upsert + // await _db.packageStates.insert(stateId: stateId, ...).execute(); + continue; + } + throw UnimplementedError(); + } + + throw UnimplementedError(); + } +} + +class _Transaction implements Transaction {} + +class _Page implements Page { + @override + final List items; + + @override + final bool isLast; + + final Future> Function({int? pageSize}) _nextFn; + + _Page(this.items, this.isLast, this._nextFn); + + @override + Future> next({int? pageSize}) async { + return _nextFn(pageSize: pageSize); + } +} + +bool _isPackageState(Key key) { + return (key.elements.length == 1 && + key.elements.single.kind == 'PackageState'); +} + +Entity _packageStateRowToEntity(PackageStateRow row) { + return Entity( + Key([ + KeyElement('PackageState', '${row.runtime_version}/${row.package}') + ]), + { + 'runtimeVersion': row.runtime_version, + 'versions': json.encode(row.versions_blob), + 'abortedTokens': null, + 'pendingAt': null, + 'dependencies': null, + 'lastDependencyChanged': null, + 'finished': null, + }); +} + +abstract class PackageStateRow { + String get runtime_version; + String get package; + Map get versions_blob; +} diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index 7be584283a..cbbbe61aad 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -34,6 +34,8 @@ import 'package:pub_dev/shared/versions.dart' gcBeforeRuntimeVersion, shouldGCVersion, acceptedRuntimeVersions; +import 'package:pub_dev/shared/versions.dart' as shared_versions + show runtimeVersion; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; import 'package:pub_dev/task/global_lock.dart'; import 'package:pub_dev/task/handlers.dart'; @@ -80,6 +82,9 @@ class TaskBackend { final DatastoreDB _db; final Bucket _bucket; + late final _packages = PackageDataAcccess(_db); + late final _tasks = TaskDataAccess(_db); + /// If [stop] has been called to stop background processes. /// /// `null` when not started yet, or we have been fully stopped. @@ -234,14 +239,13 @@ class TaskBackend { StackTrace? stackTrace; // For each package we should ensure state is tracked - final pq = _db.query(); - await for (final p in pq.run()) { - packageNames.add(p.name!); + await for (final p in _packages.listPackageNamesToTrack()) { + packageNames.add(p.name); scheduleMicrotask(() async { await pool.withResource(() async { try { - await trackPackage(p.name!, updateDependents: false); + await trackPackage(p.name, updateDependents: false); } catch (e, st) { _log.severe('failed to track state for "${p.name}"', e, st); if (error == null) { @@ -254,20 +258,15 @@ class TaskBackend { } // Check that all [PackageState] entities have a matching [Package] entity. - final sq = _db.query() - ..filter('runtimeVersion =', runtimeVersion); - - await for (final state in sq.run()) { + await for (final state in _tasks.listForCurrentRuntime()) { if (!packageNames.contains(state.package)) { final r = await pool.request(); scheduleMicrotask(() async { try { // Lookup the package to ensure it really doesn't exist - final packageKey = _db.emptyKey.append(Package, id: state.package); - final package = await _db.lookupOrNull(packageKey); - if (package == null) { - await _db.commit(deletes: [state.key]); + if (!await _packages.exists(state.package)) { + await _tasks.delete(state.package); } } catch (e, st) { _log.severe('failed to untrack "${state.package}"', e, st); @@ -313,9 +312,6 @@ class TaskBackend { var since = clock.ago(days: 3); while (claim.valid && !abort.isCompleted) { // Look at all packages changed in [since] - final q = _db.query() - ..filter('updated >', since) - ..order('-updated'); if (clock.now().isAfter(nextLongScan)) { // Next time we'll do a longer scan @@ -327,7 +323,7 @@ class TaskBackend { } // Look at all packages that has changed - await for (final p in q.run()) { + await for (final p in _packages.listPackagesUpdatedSince(since)) { // Abort, if claim is invalid or abort has been resolved! if (!claim.valid || abort.isCompleted) { return; @@ -335,16 +331,16 @@ class TaskBackend { // Check if the [updated] timestamp has been seen before. // If so, we skip checking it! - final lastSeen = seen[p.name!]; - if (lastSeen != null && lastSeen.toUtc() == p.updated!.toUtc()) { + final lastSeen = seen[p.name]; + if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) { continue; } // Remember the updated time for this package, so we don't check it // again... - seen[p.name!] = p.updated!; + seen[p.name] = p.updated; // Check the package - await trackPackage(p.name!, updateDependents: true); + await trackPackage(p.name, updateDependents: true); } // Cleanup the [seen] map for anything older than [since], as this won't @@ -505,10 +501,7 @@ class TaskBackend { /// Garbage collect [PackageState] and results from old runtimeVersions. Future garbageCollect() async { // GC the old [PackageState] entities - await _db.deleteWithQuery( - _db.query() - ..filter('runtimeVersion <', gcBeforeRuntimeVersion), - ); + await _tasks.deleteOldEntries(); // Limit to 50 concurrent deletion requests final pool = Pool(50); @@ -572,10 +565,9 @@ class TaskBackend { // // We only update [PackageState] to have [lastDependencyChanged], this // ensures that there is no risk of indefinite propagation. - final q = _db.query() - ..filter('dependencies =', package) - ..filter('lastDependencyChanged <', publishedAt); - await for (final state in q.run()) { + final stream = _tasks.listWhereDependencyChangedBefore(package, + publishedAt: publishedAt); + await for (final state in stream) { final r = await pool.request(); // Schedule a microtask that attempts to update [lastDependencyChanged], @@ -630,8 +622,7 @@ class TaskBackend { throw AuthenticationException.authenticationRequired(); } - final key = PackageState.createKey(_db, runtimeVersion, package); - final state = await _db.lookupOrNull(key); + final state = await _tasks.lookupState(package); if (state == null) { throw NotFoundException.resource('$package/$version'); } @@ -1004,8 +995,7 @@ class TaskBackend { Future packageStatus(String package) async { final status = await cache.taskPackageStatus(package).get(() async { for (final rt in acceptedRuntimeVersions) { - final key = PackageState.createKey(_db, rt, package); - final state = await _db.lookupOrNull(key); + final state = await _tasks.lookupState(package, runtimeVersion: rt); // skip states where the entry was created, but no analysis has not finished yet if (state == null || state.hasNeverFinished) { continue; @@ -1076,8 +1066,7 @@ class TaskBackend { final cachedValue = await cache.latestFinishedVersion(package).get(() async { for (final rt in acceptedRuntimeVersions) { - final key = PackageState.createKey(_db, rt, package); - final state = await _db.lookupOrNull(key); + final state = await _tasks.lookupState(package, runtimeVersion: rt); // skip states where the entry was created, but no analysis has not finished yet if (state == null || state.hasNeverFinished) { continue; @@ -1118,8 +1107,7 @@ class TaskBackend { await cache.closestFinishedVersion(package, version).get(() async { final semanticVersion = Version.parse(version); for (final rt in acceptedRuntimeVersions) { - final key = PackageState.createKey(_db, rt, package); - final state = await _db.lookupOrNull(key); + final state = await _tasks.lookupState(package, runtimeVersion: rt); // Skip states where the entry was created, but the analysis has not finished yet. if (state == null || state.hasNeverFinished) { continue; @@ -1307,3 +1295,78 @@ List _updatedDependencies( .takeWhile((p) => (size += p.length + 1) < 1500) .sorted(); } + +class PackageDataAcccess { + final DatastoreDB _db; + + PackageDataAcccess(this._db); + + Future exists(String name) async { + final p = + await _db.lookupOrNull(_db.emptyKey.append(Package, id: name)); + return p != null; + } + + Stream<({String name})> listPackageNamesToTrack() async* { + await for (final p in _db.query().run()) { + yield (name: p.name!); + } + } + + Stream<({String name, DateTime updated})> listPackagesUpdatedSince( + DateTime since) async* { + final query = _db.query() + ..filter('updated >', since) + ..order('-updated'); + await for (final p in query.run()) { + yield (name: p.name!, updated: p.updated!); + } + } +} + +class TaskDataAccess { + final DatastoreDB _db; + + TaskDataAccess(this._db); + + Future lookupState( + String package, { + String? runtimeVersion, + }) async { + final key = PackageState.createKey( + _db, runtimeVersion ?? shared_versions.runtimeVersion, package); + return await _db.lookupOrNull(key); + } + + Future delete( + String package, { + String? runtimeVersion, + }) async { + final key = PackageState.createKey( + _db, runtimeVersion ?? shared_versions.runtimeVersion, package); + await _db.commit(deletes: [key]); + } + + Future deleteOldEntries() async { + await _db.deleteWithQuery( + _db.query() + ..filter('runtimeVersion <', gcBeforeRuntimeVersion), + ); + } + + Stream<({String package})> listForCurrentRuntime() { + final query = _db.query() + ..filter('runtimeVersion =', runtimeVersion); + return query.run().map((s) => (package: s.package)); + } + + Stream listWhereDependencyChangedBefore( + String package, { + required DateTime publishedAt, + }) { + final query = _db.query() + ..filter('dependencies =', package) + ..filter('lastDependencyChanged <', publishedAt); + return query.run(); + } +}