diff --git a/app/lib/task/backend.dart b/app/lib/task/backend.dart index b2bb54533..4700383ff 100644 --- a/app/lib/task/backend.dart +++ b/app/lib/task/backend.dart @@ -1397,9 +1397,7 @@ final class _TaskDataAccess { return await withRetryTransaction(_db, (tx) async { // Reload [state] within a transaction to avoid overwriting changes // made by others trying to update state for another package. - final s = await tx.lookupOrNull( - PackageState.createKey(_db.emptyKey, runtimeVersion, package), - ); + final s = await tx.tasks.lookupOrNull(package); if (s == null) { // No entry has been created yet, probably because of a new deployment rolling out. // We can ignore it for now. @@ -1419,18 +1417,35 @@ final class _TaskDataAccess { Future bumpPriority(String packageName) async { await withRetryTransaction(_db, (tx) async { - final stateKey = PackageState.createKey( - _db.emptyKey, - runtimeVersion, - packageName, - ); - final state = await tx.lookupOrNull(stateKey); + final state = await tx.tasks.lookupOrNull(packageName); if (state != null) { state.pendingAt = initialTimestamp; tx.insert(state); } }); } + + /// Restores the previous versions map state when starting the tasks on [instanceName] failed. + Future restorePreviousVersionsState( + String packageName, + String instanceName, + Map previousVersionsMap, + ) async { + await withRetryTransaction(_db, (tx) async { + final s = await tx.tasks.lookupOrNull(packageName); + if (s == null) { + return; // Presumably, the package was deleted. + } + + s.versions!.addEntries( + s.versions!.entries + .where((e) => e.value.instance == instanceName) + .map((e) => MapEntry(e.key, previousVersionsMap[e.key]!)), + ); + s.derivePendingAt(); + await tx.tasks.update(s); + }); + } } class _TaskTransactionDataAcccess { diff --git a/app/lib/task/scheduler.dart b/app/lib/task/scheduler.dart index b5b195e59..eb238ff04 100644 --- a/app/lib/task/scheduler.dart +++ b/app/lib/task/scheduler.dart @@ -10,7 +10,6 @@ import 'package:pub_dev/package/backend.dart'; import 'package:pub_dev/shared/configuration.dart'; import 'package:pub_dev/shared/datastore.dart'; import 'package:pub_dev/shared/utils.dart'; -import 'package:pub_dev/shared/versions.dart' show runtimeVersion; import 'package:pub_dev/task/backend.dart'; import 'package:pub_dev/task/clock_control.dart'; import 'package:pub_dev/task/cloudcompute/cloudcompute.dart'; @@ -237,26 +236,11 @@ Future schedule( // suppose to run on the instance we just failed to create. // If this doesn't work, we'll eventually retry. Hence, correctness // does not hinge on this transaction being successful. - await withRetryTransaction(db, (tx) async { - final s = await tx.lookupOrNull( - PackageState.createKey( - db.emptyKey, - runtimeVersion, - selected.package, - ), - ); - if (s == null) { - return; // Presumably, the package was deleted. - } - - s.versions!.addEntries( - s.versions!.entries - .where((e) => e.value.instance == instanceName) - .map((e) => MapEntry(e.key, oldVersionsMap[e.key]!)), - ); - s.derivePendingAt(); - tx.insert(s); - }); + await db.tasks.restorePreviousVersionsState( + selected.package, + instanceName, + oldVersionsMap, + ); } } }); @@ -295,9 +279,7 @@ updatePackageStateWithPendingVersions( String instanceName, ) async { return await withRetryTransaction(db, (tx) async { - final s = await tx.lookupOrNull( - PackageState.createKey(db.emptyKey, runtimeVersion, package), - ); + final s = await tx.tasks.lookupOrNull(package); if (s == null) { // presumably the package was deleted. return null; @@ -360,7 +342,7 @@ updatePackageStateWithPendingVersions( ), }); s.derivePendingAt(); - tx.insert(s); + await tx.tasks.update(s); // Create payload final payload = Payload(