Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 29 additions & 12 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ import 'package:pub_dev/task/models.dart'
PackageStateInfo,
PackageVersionStateInfo,
PackageVersionStatus,
derivePendingAt,
initialTimestamp,
maxTaskExecutionTime;
import 'package:pub_dev/task/scheduler.dart';
Expand Down Expand Up @@ -394,21 +395,25 @@ class TaskBackend {
if (state == null) {
// Create [PackageState] entity to track the package
_log.info('Started state tracking for $packageName');
final versionsMap = {
for (final version in versions)
version: PackageVersionStateInfo(
scheduled: initialTimestamp,
attempts: 0,
),
};
await tx.tasks.insert(
PackageState()
..setId(runtimeVersion, packageName)
..runtimeVersion = runtimeVersion
..versions = {
for (final version in versions)
version: PackageVersionStateInfo(
scheduled: initialTimestamp,
attempts: 0,
),
}
..versions = versionsMap
..dependencies = <String>[]
..lastDependencyChanged = initialTimestamp
..finished = initialTimestamp
..derivePendingAt(),
..pendingAt = derivePendingAt(
versions: versionsMap,
lastDependencyChanged: initialTimestamp,
),
);
return true; // no more work for this package, state is synced
}
Expand Down Expand Up @@ -464,7 +469,10 @@ class TaskBackend {
attempts: 0,
),
});
state.derivePendingAt();
state.pendingAt = derivePendingAt(
versions: state.versions!,
lastDependencyChanged: state.lastDependencyChanged!,
);

_log.info('Update state tracking for $packageName');
await tx.tasks.update(state);
Expand Down Expand Up @@ -726,7 +734,10 @@ class TaskBackend {

// Ensure that we update [state.pendingAt], otherwise it might be
// re-scheduled way too soon.
state.derivePendingAt();
state.pendingAt = derivePendingAt(
versions: state.versions!,
lastDependencyChanged: state.lastDependencyChanged!,
);
state.finished = clock.now().toUtc();

await tx.tasks.update(state);
Expand Down Expand Up @@ -1407,7 +1418,10 @@ final class _TaskDataAccess {
tx.insert(
s
..lastDependencyChanged = publishedAt
..derivePendingAt(),
..pendingAt = derivePendingAt(
versions: s.versions!,
lastDependencyChanged: publishedAt,
),
);
return true;
}
Expand Down Expand Up @@ -1442,7 +1456,10 @@ final class _TaskDataAccess {
.where((e) => e.value.instance == instanceName)
.map((e) => MapEntry(e.key, previousVersionsMap[e.key]!)),
);
s.derivePendingAt();
s.pendingAt = derivePendingAt(
versions: s.versions!,
lastDependencyChanged: s.lastDependencyChanged!,
);
await tx.tasks.update(s);
});
}
Expand Down
184 changes: 95 additions & 89 deletions app/lib/task/models.dart
Original file line number Diff line number Diff line change
Expand Up @@ -133,95 +133,6 @@ class PackageState extends db.ExpandoModel<String> {
@db.DateTimeProperty(required: true, indexed: true)
DateTime finished = initialTimestamp;

/// Derive [pendingAt] using [versions] and [lastDependencyChanged].
///
/// When updating PackageState the pendingAt property is set to the minimum of:
/// * `scheduled + 31 days` for any version,
/// * `scheduled + 24 hours` for any version where `dependencyChanged > scheduled`
/// * `scheduled + 3 hours * attempts^2` for any version where `attempts > 0 && attempts < 3`.
void derivePendingAt() {
final versionStates = versions!.values;
pendingAt = [
// scheduled + 31 days
...versionStates.map((v) => v.scheduled.add(taskRetriggerInterval)),
// scheduled + 24 hours, where scheduled < lastDependencyChanged
...versionStates
.where((v) => v.scheduled.isBefore(lastDependencyChanged!))
.map((v) => v.scheduled.add(taskDependencyRetriggerCoolOff)),
// scheduled + 3 hours * attempts^2, where attempts > 0 && attempts < 3
...versionStates
.where((v) => v.attempts > 0 && v.attempts < taskRetryLimit)
.map((v) => v.scheduled.add(taskRetryDelay(v.attempts))),
// Pick the minimum of the candidates, default scheduling in year 3k
// if there is no date before that.
].fold(DateTime(3000), (a, b) => a!.isBefore(b) ? a : b);
}

/// Return a list of pending versions for this package.
///
/// When scheduling analysis of a package we piggyback along versions that
/// are going to be pending soon too. Hence, we return a version if:
/// * `now - scheduled > 21 days`,
/// * `lastDependencyChanged > scheduled`, or,
/// * `attempts > 0 && attempts < 3 && now - scheduled > 3 hours * attempts^2`
List<String> pendingVersions({DateTime? at}) {
final at_ = at ?? clock.now();
Duration timeSince(DateTime past) => at_.difference(past);

final list = versions!.entries
.where(
// NOTE: Any changes here must be reflected in [derivePendingAt]
(e) =>
// If scheduled more than 21 days ago
timeSince(e.value.scheduled) > minTaskRetriggerInterval ||
// If a dependency has changed since it was last scheduled
lastDependencyChanged!.isAfter(e.value.scheduled) ||
// If:
// - attempts > 0 (analysis is not done, and has been started)
// - no more than 3 attempts have been done,
// - now - scheduled > 3 hours * attempts^2
(e.value.attempts > 0 &&
e.value.attempts < taskRetryLimit &&
timeSince(e.value.scheduled) >
taskRetryDelay(e.value.attempts)),
)
.map((e) => e.key)
.map(Version.parse)
.toList();

// Prioritize stable versions first, prereleases after them (in decreasing order), e.g.
// - 2.5.0
// - 2.4.0
// - 2.0.0
// - 1.2.0
// - 3.0.0-dev2
// - 3.0.0-dev1
// - 2.7.0-beta
// - 1.0.0-dev
list.sort((a, b) => compareSemanticVersionsDesc(a, b, true, true));

// Promote the first prerelease version to the second position, e.g.
// - 2.5.0
// - 3.0.0-dev2
// - 2.4.0
// - 2.0.0
// - 1.2.0
// - 3.0.0-dev1
// - 2.7.0-beta
// - 1.0.0-dev
//
// (applicable only when the second position is a stable version)
if (list.length > 2 && !list[1].isPreRelease) {
final firstPrereleaseIndex = list.indexWhere((v) => v.isPreRelease);
if (firstPrereleaseIndex > 1) {
final v = list.removeAt(firstPrereleaseIndex);
list.insert(1, v);
}
}

return list.map((s) => s.toString()).toList();
}

/// Returns true if the current [PackageState] instance is new, no version analysis
/// has not completed yet (with neither success nor failure).
bool get hasNeverFinished => finished == initialTimestamp;
Expand All @@ -242,6 +153,101 @@ class PackageState extends db.ExpandoModel<String> {
'\n)';
}

/// Derive the `pendingAt` field using [versions] and [lastDependencyChanged].
///
/// When updating PackageState the pendingAt property is set to the minimum of:
/// * `scheduled + 31 days` for any version,
/// * `scheduled + 24 hours` for any version where `dependencyChanged > scheduled`
/// * `scheduled + 3 hours * attempts^2` for any version where `attempts > 0 && attempts < 3`.
DateTime derivePendingAt({
required Map<String, PackageVersionStateInfo> versions,
required DateTime lastDependencyChanged,
}) {
return [
// scheduled + 31 days
...versions.values.map((v) => v.scheduled.add(taskRetriggerInterval)),
// scheduled + 24 hours, where scheduled < lastDependencyChanged
...versions.values
.where((v) => v.scheduled.isBefore(lastDependencyChanged))
.map((v) => v.scheduled.add(taskDependencyRetriggerCoolOff)),
// scheduled + 3 hours * attempts^2, where attempts > 0 && attempts < 3
...versions.values
.where((v) => v.attempts > 0 && v.attempts < taskRetryLimit)
.map((v) => v.scheduled.add(taskRetryDelay(v.attempts))),
// Pick the minimum of the candidates, default scheduling in year 3k
// if there is no date before that.
].fold(DateTime(3000), (a, b) => a.isBefore(b) ? a : b);
}

/// Return a list of pending versions for this package.
///
/// When scheduling analysis of a package we piggyback along versions that
/// are going to be pending soon too. Hence, we return a version if:
/// * `now - scheduled > 21 days`,
/// * `lastDependencyChanged > scheduled`, or,
/// * `attempts > 0 && attempts < 3 && now - scheduled > 3 hours * attempts^2`
List<String> derivePendingVersions({
required Map<String, PackageVersionStateInfo> versions,
required DateTime lastDependencyChanged,
required DateTime? at,
}) {
final at_ = at ?? clock.now();
Duration timeSince(DateTime past) => at_.difference(past);

final list = versions.entries
.where(
// NOTE: Any changes here must be reflected in [derivePendingAt]
(e) =>
// If scheduled more than 21 days ago
timeSince(e.value.scheduled) > minTaskRetriggerInterval ||
// If a dependency has changed since it was last scheduled
lastDependencyChanged.isAfter(e.value.scheduled) ||
// If:
// - attempts > 0 (analysis is not done, and has been started)
// - no more than 3 attempts have been done,
// - now - scheduled > 3 hours * attempts^2
(e.value.attempts > 0 &&
e.value.attempts < taskRetryLimit &&
timeSince(e.value.scheduled) >
taskRetryDelay(e.value.attempts)),
)
.map((e) => e.key)
.map(Version.parse)
.toList();

// Prioritize stable versions first, prereleases after them (in decreasing order), e.g.
// - 2.5.0
// - 2.4.0
// - 2.0.0
// - 1.2.0
// - 3.0.0-dev2
// - 3.0.0-dev1
// - 2.7.0-beta
// - 1.0.0-dev
list.sort((a, b) => compareSemanticVersionsDesc(a, b, true, true));

// Promote the first prerelease version to the second position, e.g.
// - 2.5.0
// - 3.0.0-dev2
// - 2.4.0
// - 2.0.0
// - 1.2.0
// - 3.0.0-dev1
// - 2.7.0-beta
// - 1.0.0-dev
//
// (applicable only when the second position is a stable version)
if (list.length > 2 && !list[1].isPreRelease) {
final firstPrereleaseIndex = list.indexWhere((v) => v.isPreRelease);
if (firstPrereleaseIndex > 1) {
final v = list.removeAt(firstPrereleaseIndex);
list.insert(1, v);
}
}

return list.map((s) => s.toString()).toList();
}

/// State of a given `version` within a [PackageState].
@JsonSerializable()
class PackageVersionStateInfo {
Expand Down
11 changes: 9 additions & 2 deletions app/lib/task/scheduler.dart
Original file line number Diff line number Diff line change
Expand Up @@ -286,7 +286,11 @@ updatePackageStateWithPendingVersions(
final oldVersionsMap = {...?s.versions};

final now = clock.now();
final pendingVersions = s.pendingVersions(at: now).toList();
final pendingVersions = derivePendingVersions(
versions: s.versions!,
lastDependencyChanged: s.lastDependencyChanged!,
at: now,
).toList();
if (pendingVersions.isEmpty) {
// do not schedule anything
return null;
Expand All @@ -304,7 +308,10 @@ updatePackageStateWithPendingVersions(
finished: s.versions![v]!.finished,
),
});
s.derivePendingAt();
s.pendingAt = derivePendingAt(
versions: s.versions!,
lastDependencyChanged: s.lastDependencyChanged!,
);
await tx.tasks.update(s);

// Create payload
Expand Down