Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
143 changes: 143 additions & 0 deletions app/lib/shared/sql_datastore.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
// Copyright (c) 2025, the Dart project authors. Please see the AUTHORS file
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

// ignore_for_file: non_constant_identifier_names, unused_element

import 'dart:convert';

import 'package:gcloud/common.dart';
import 'package:gcloud/datastore.dart';

/// WARNING: This class is not for production use yet.
class SqlDatastore implements Datastore {
@override
Future<List<Key>> allocateIds(List<Key> keys) async {
throw UnimplementedError();
}

@override
Future<Transaction> beginTransaction({bool crossEntityGroup = false}) async {
return _Transaction();
}

@override
Future rollback(Transaction transaction) async {
// no-op
}

@override
Future<List<Entity?>> lookup(
List<Key> keys, {
Transaction? transaction,
}) async {
return await Future.wait(
keys.map((k) async => await _lookup(k, transaction)));
}

Future<Entity?> _lookup(Key key, Transaction? transaction) async {
if (_isPackageState(key)) {
// final stateId = key.elements.single.id as String;
// TODO: store in the transaction object that the entry exists
//
// TODO: if (transaction != null) also add FOR UPDATE clause
// final row = await _db.packageStates.byKey(stateId: stateId);
//
// return row == null ? : _packageStateRowToEntity(row);
}
throw UnimplementedError();
}

@override
Future<Page<Entity>> query(
Query query, {
Partition? partition,
Transaction? transaction,
}) async {
if (query.kind == 'PackageState') {
// TODO: build query
// TODO: run query with limit + offset
// TODO: transform rows + build nextFn
// TODO: store in the transaction object that the entry exists
}
throw UnimplementedError();
}

@override
Future<CommitResult> commit({
List<Entity> inserts = const [],
List<Entity> autoIdInserts = const [],
List<Key> deletes = const [],
Transaction? transaction,
}) async {
if (autoIdInserts.isNotEmpty) {
throw UnimplementedError();
}
for (final key in deletes) {
if (_isPackageState(key)) {
// final stateId = key.elements.single.id as String;
// await _db.packageStates.delete(state_id: stateId).execute();
continue;
}
throw UnimplementedError();
}

for (final entity in inserts) {
if (_isPackageState(entity.key)) {
// final stateId = key.elements.single.id as String;
// TODO: use the transaction object if the entity exists or use lookup or upsert
// await _db.packageStates.insert(stateId: stateId, ...).execute();
continue;
}
throw UnimplementedError();
}

throw UnimplementedError();
}
}

class _Transaction implements Transaction {}

class _Page implements Page<Entity> {
@override
final List<Entity> items;

@override
final bool isLast;

final Future<Page<Entity>> Function({int? pageSize}) _nextFn;

_Page(this.items, this.isLast, this._nextFn);

@override
Future<Page<Entity>> next({int? pageSize}) async {
return _nextFn(pageSize: pageSize);
}
}

bool _isPackageState(Key key) {
return (key.elements.length == 1 &&
key.elements.single.kind == 'PackageState');
}

Entity _packageStateRowToEntity(PackageStateRow row) {
return Entity(
Key([
KeyElement('PackageState', '${row.runtime_version}/${row.package}')
]),
{
'runtimeVersion': row.runtime_version,
'versions': json.encode(row.versions_blob),
'abortedTokens': null,
'pendingAt': null,
'dependencies': null,
'lastDependencyChanged': null,
'finished': null,
});
}

abstract class PackageStateRow {
String get runtime_version;
String get package;
Map<String, dynamic> get versions_blob;
}
135 changes: 99 additions & 36 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ import 'package:pub_dev/shared/versions.dart'
gcBeforeRuntimeVersion,
shouldGCVersion,
acceptedRuntimeVersions;
import 'package:pub_dev/shared/versions.dart' as shared_versions
show runtimeVersion;
import 'package:pub_dev/task/cloudcompute/cloudcompute.dart';
import 'package:pub_dev/task/global_lock.dart';
import 'package:pub_dev/task/handlers.dart';
Expand Down Expand Up @@ -80,6 +82,9 @@ class TaskBackend {
final DatastoreDB _db;
final Bucket _bucket;

late final _packages = PackageDataAcccess(_db);
late final _tasks = TaskDataAccess(_db);

/// If [stop] has been called to stop background processes.
///
/// `null` when not started yet, or we have been fully stopped.
Expand Down Expand Up @@ -234,14 +239,13 @@ class TaskBackend {
StackTrace? stackTrace;

// For each package we should ensure state is tracked
final pq = _db.query<Package>();
await for (final p in pq.run()) {
packageNames.add(p.name!);
await for (final p in _packages.listPackageNamesToTrack()) {
packageNames.add(p.name);

scheduleMicrotask(() async {
await pool.withResource(() async {
try {
await trackPackage(p.name!, updateDependents: false);
await trackPackage(p.name, updateDependents: false);
} catch (e, st) {
_log.severe('failed to track state for "${p.name}"', e, st);
if (error == null) {
Expand All @@ -254,20 +258,15 @@ class TaskBackend {
}

// Check that all [PackageState] entities have a matching [Package] entity.
final sq = _db.query<PackageState>()
..filter('runtimeVersion =', runtimeVersion);

await for (final state in sq.run()) {
await for (final state in _tasks.listForCurrentRuntime()) {
if (!packageNames.contains(state.package)) {
final r = await pool.request();

scheduleMicrotask(() async {
try {
// Lookup the package to ensure it really doesn't exist
final packageKey = _db.emptyKey.append(Package, id: state.package);
final package = await _db.lookupOrNull<Package>(packageKey);
if (package == null) {
await _db.commit(deletes: [state.key]);
if (!await _packages.exists(state.package)) {
await _tasks.delete(state.package);
}
} catch (e, st) {
_log.severe('failed to untrack "${state.package}"', e, st);
Expand Down Expand Up @@ -313,9 +312,6 @@ class TaskBackend {
var since = clock.ago(days: 3);
while (claim.valid && !abort.isCompleted) {
// Look at all packages changed in [since]
final q = _db.query<Package>()
..filter('updated >', since)
..order('-updated');

if (clock.now().isAfter(nextLongScan)) {
// Next time we'll do a longer scan
Expand All @@ -327,24 +323,24 @@ class TaskBackend {
}

// Look at all packages that has changed
await for (final p in q.run()) {
await for (final p in _packages.listPackagesUpdatedSince(since)) {
// Abort, if claim is invalid or abort has been resolved!
if (!claim.valid || abort.isCompleted) {
return;
}

// Check if the [updated] timestamp has been seen before.
// If so, we skip checking it!
final lastSeen = seen[p.name!];
if (lastSeen != null && lastSeen.toUtc() == p.updated!.toUtc()) {
final lastSeen = seen[p.name];
if (lastSeen != null && lastSeen.toUtc() == p.updated.toUtc()) {
continue;
}
// Remember the updated time for this package, so we don't check it
// again...
seen[p.name!] = p.updated!;
seen[p.name] = p.updated;

// Check the package
await trackPackage(p.name!, updateDependents: true);
await trackPackage(p.name, updateDependents: true);
}

// Cleanup the [seen] map for anything older than [since], as this won't
Expand Down Expand Up @@ -505,10 +501,7 @@ class TaskBackend {
/// Garbage collect [PackageState] and results from old runtimeVersions.
Future<void> garbageCollect() async {
// GC the old [PackageState] entities
await _db.deleteWithQuery(
_db.query<PackageState>()
..filter('runtimeVersion <', gcBeforeRuntimeVersion),
);
await _tasks.deleteOldEntries();

// Limit to 50 concurrent deletion requests
final pool = Pool(50);
Expand Down Expand Up @@ -572,10 +565,9 @@ class TaskBackend {
//
// We only update [PackageState] to have [lastDependencyChanged], this
// ensures that there is no risk of indefinite propagation.
final q = _db.query<PackageState>()
..filter('dependencies =', package)
..filter('lastDependencyChanged <', publishedAt);
await for (final state in q.run()) {
final stream = _tasks.listWhereDependencyChangedBefore(package,
publishedAt: publishedAt);
await for (final state in stream) {
final r = await pool.request();

// Schedule a microtask that attempts to update [lastDependencyChanged],
Expand Down Expand Up @@ -630,8 +622,7 @@ class TaskBackend {
throw AuthenticationException.authenticationRequired();
}

final key = PackageState.createKey(_db, runtimeVersion, package);
final state = await _db.lookupOrNull<PackageState>(key);
final state = await _tasks.lookupState(package);
if (state == null) {
throw NotFoundException.resource('$package/$version');
}
Expand Down Expand Up @@ -1004,8 +995,7 @@ class TaskBackend {
Future<PackageStateInfo> packageStatus(String package) async {
final status = await cache.taskPackageStatus(package).get(() async {
for (final rt in acceptedRuntimeVersions) {
final key = PackageState.createKey(_db, rt, package);
final state = await _db.lookupOrNull<PackageState>(key);
final state = await _tasks.lookupState(package, runtimeVersion: rt);
// skip states where the entry was created, but no analysis has not finished yet
if (state == null || state.hasNeverFinished) {
continue;
Expand Down Expand Up @@ -1076,8 +1066,7 @@ class TaskBackend {
final cachedValue =
await cache.latestFinishedVersion(package).get(() async {
for (final rt in acceptedRuntimeVersions) {
final key = PackageState.createKey(_db, rt, package);
final state = await _db.lookupOrNull<PackageState>(key);
final state = await _tasks.lookupState(package, runtimeVersion: rt);
// skip states where the entry was created, but no analysis has not finished yet
if (state == null || state.hasNeverFinished) {
continue;
Expand Down Expand Up @@ -1118,8 +1107,7 @@ class TaskBackend {
await cache.closestFinishedVersion(package, version).get(() async {
final semanticVersion = Version.parse(version);
for (final rt in acceptedRuntimeVersions) {
final key = PackageState.createKey(_db, rt, package);
final state = await _db.lookupOrNull<PackageState>(key);
final state = await _tasks.lookupState(package, runtimeVersion: rt);
// Skip states where the entry was created, but the analysis has not finished yet.
if (state == null || state.hasNeverFinished) {
continue;
Expand Down Expand Up @@ -1307,3 +1295,78 @@ List<String> _updatedDependencies(
.takeWhile((p) => (size += p.length + 1) < 1500)
.sorted();
}

class PackageDataAcccess {
final DatastoreDB _db;

PackageDataAcccess(this._db);

Future<bool> exists(String name) async {
final p =
await _db.lookupOrNull<Package>(_db.emptyKey.append(Package, id: name));
return p != null;
}

Stream<({String name})> listPackageNamesToTrack() async* {
await for (final p in _db.query<Package>().run()) {
yield (name: p.name!);
}
}

Stream<({String name, DateTime updated})> listPackagesUpdatedSince(
DateTime since) async* {
final query = _db.query<Package>()
..filter('updated >', since)
..order('-updated');
await for (final p in query.run()) {
yield (name: p.name!, updated: p.updated!);
}
}
}

class TaskDataAccess {
final DatastoreDB _db;

TaskDataAccess(this._db);

Future<PackageState?> lookupState(
String package, {
String? runtimeVersion,
}) async {
final key = PackageState.createKey(
_db, runtimeVersion ?? shared_versions.runtimeVersion, package);
return await _db.lookupOrNull<PackageState>(key);
}

Future<void> delete(
String package, {
String? runtimeVersion,
}) async {
final key = PackageState.createKey(
_db, runtimeVersion ?? shared_versions.runtimeVersion, package);
await _db.commit(deletes: [key]);
}

Future<void> deleteOldEntries() async {
await _db.deleteWithQuery(
_db.query<PackageState>()
..filter('runtimeVersion <', gcBeforeRuntimeVersion),
);
}

Stream<({String package})> listForCurrentRuntime() {
final query = _db.query<PackageState>()
..filter('runtimeVersion =', runtimeVersion);
return query.run().map((s) => (package: s.package));
}

Stream<PackageState> listWhereDependencyChangedBefore(
String package, {
required DateTime publishedAt,
}) {
final query = _db.query<PackageState>()
..filter('dependencies =', package)
..filter('lastDependencyChanged <', publishedAt);
return query.run();
}
}
Loading