Skip to content

Commit eacba32

Browse files
authored
Do not retry (or panic in logs) on upload exceptions when the version is no longer scheduled for analysis. (#8196)
1 parent a92e6fd commit eacba32

File tree

6 files changed

+183
-24
lines changed

6 files changed

+183
-24
lines changed

app/lib/shared/exceptions.dart

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,11 @@ class NotAcceptableException extends ResponseException {
5151
: super._(406, 'NotAcceptable', message);
5252
}
5353

54+
/// Thrown when part of the underlying analysis task has been aborted.
55+
class TaskAbortedException extends ResponseException {
56+
TaskAbortedException(String message) : super._(400, 'TaskAborted', message);
57+
}
58+
5459
/// Thrown when request input is invalid, bad payload, wrong querystring, etc.
5560
class InvalidInputException extends ResponseException {
5661
InvalidInputException._(String message)

app/lib/task/backend.dart

Lines changed: 64 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ import 'package:pub_dev/task/global_lock.dart';
3838
import 'package:pub_dev/task/handlers.dart';
3939
import 'package:pub_dev/task/models.dart'
4040
show
41+
AbortedTokenInfo,
4142
PackageState,
4243
PackageStateInfo,
4344
PackageVersionStateInfo,
@@ -454,6 +455,20 @@ class TaskBackend {
454455
return false;
455456
}
456457

458+
state.abortedTokens = [
459+
...state.versions!.entries
460+
.where((e) => deselectedVersions.contains(e.key))
461+
.map((e) => e.value)
462+
.where((vs) => vs.secretToken != null)
463+
.map(
464+
(vs) => AbortedTokenInfo(
465+
token: vs.secretToken!,
466+
expires: vs.scheduled.add(maxTaskExecutionTime),
467+
),
468+
),
469+
...?state.abortedTokens,
470+
].where((t) => t.isNotExpired).take(50).toList();
471+
457472
// Make changes!
458473
state.versions!
459474
// Remove versions that have been deselected
@@ -467,6 +482,7 @@ class TaskBackend {
467482
),
468483
});
469484
state.derivePendingAt();
485+
state.abortedTokens?.removeWhere((t) => t.expires.isAfter(clock.now()));
470486

471487
_log.info('Update state tracking for $packageName');
472488
tx.insert(state);
@@ -609,20 +625,18 @@ class TaskBackend {
609625
InvalidInputException.checkPackageName(package);
610626
version = InvalidInputException.checkSemanticVersion(version);
611627

628+
final token = _extractBearerToken(request);
629+
if (token == null) {
630+
throw AuthenticationException.authenticationRequired();
631+
}
632+
612633
final key = PackageState.createKey(_db, runtimeVersion, package);
613634
final state = await _db.lookupOrNull<PackageState>(key);
614-
if (state == null || state.versions![version] == null) {
635+
if (state == null) {
615636
throw NotFoundException.resource('$package/$version');
616637
}
617-
final versionState = state.versions![version]!;
618-
619-
// Check the secret token
620-
if (!versionState.isAuthorized(_extractBearerToken(request))) {
621-
throw AuthenticationException.authenticationRequired();
622-
}
623-
assert(versionState.scheduled != initialTimestamp);
624-
assert(versionState.instance != null);
625-
assert(versionState.zone != null);
638+
final versionState =
639+
_authorizeWorkerCallback(package, version, state, token);
626640

627641
// Set expiration of signed URLs to remaining execution time + 5 min to
628642
// allow for clock skew.
@@ -669,6 +683,11 @@ class TaskBackend {
669683
InvalidInputException.checkPackageName(package);
670684
version = InvalidInputException.checkSemanticVersion(version);
671685

686+
final token = _extractBearerToken(request);
687+
if (token == null) {
688+
throw AuthenticationException.authenticationRequired();
689+
}
690+
672691
String? zone, instance;
673692
bool isInstanceDone = false;
674693
final index = await _loadTaskResultIndex(
@@ -685,18 +704,11 @@ class TaskBackend {
685704
await withRetryTransaction(_db, (tx) async {
686705
final key = PackageState.createKey(_db, runtimeVersion, package);
687706
final state = await tx.lookupOrNull<PackageState>(key);
688-
if (state == null || state.versions![version] == null) {
707+
if (state == null) {
689708
throw NotFoundException.resource('$package/$version');
690709
}
691-
final versionState = state.versions![version]!;
692-
693-
// Check the secret token
694-
if (!versionState.isAuthorized(_extractBearerToken(request))) {
695-
throw AuthenticationException.authenticationRequired();
696-
}
697-
assert(versionState.scheduled != initialTimestamp);
698-
assert(versionState.instance != null);
699-
assert(versionState.zone != null);
710+
final versionState =
711+
_authorizeWorkerCallback(package, version, state, token);
700712

701713
// Update dependencies, if pana summary has dependencies
702714
if (summary != null && summary.allDependencies != null) {
@@ -1169,6 +1181,38 @@ String? _extractBearerToken(shelf.Request request) {
11691181
return parts.last.trim();
11701182
}
11711183

1184+
/// Authorize a worker callback for [package] / [version].
1185+
///
1186+
/// Returns the [PackageVersionStateInfo] that the worker is authenticated for.
1187+
/// Or throw [ResponseException] if authorization is not possible.
1188+
PackageVersionStateInfo _authorizeWorkerCallback(
1189+
String package,
1190+
String version,
1191+
PackageState state,
1192+
String token,
1193+
) {
1194+
// fixed-time verification of aborted tokens
1195+
final isKnownAbortedToken = state.abortedTokens
1196+
?.map((t) => t.isAuthorized(token))
1197+
.fold<bool>(false, (a, b) => a || b);
1198+
if (isKnownAbortedToken ?? false) {
1199+
throw TaskAbortedException('$package/$version has been aborted.');
1200+
}
1201+
1202+
final versionState = state.versions![version];
1203+
if (versionState == null) {
1204+
throw NotFoundException.resource('$package/$version');
1205+
}
1206+
// Check the secret token
1207+
if (!versionState.isAuthorized(token)) {
1208+
throw AuthenticationException.authenticationRequired();
1209+
}
1210+
assert(versionState.scheduled != initialTimestamp);
1211+
assert(versionState.instance != null);
1212+
assert(versionState.zone != null);
1213+
return versionState;
1214+
}
1215+
11721216
/// Given a list of versions return the list of versions that should be
11731217
/// tracked for analysis.
11741218
///

app/lib/task/models.dart

Lines changed: 63 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import 'dart:convert' show json;
66

77
import 'package:clock/clock.dart';
88
import 'package:json_annotation/json_annotation.dart';
9+
import 'package:pub_dev/admin/actions/actions.dart';
10+
import 'package:pub_dev/shared/utils.dart';
911

1012
import '../shared/datastore.dart' as db;
1113
import '../shared/versions.dart' as shared_versions;
@@ -107,6 +109,12 @@ class PackageState extends db.ExpandoModel<String> {
107109
@PackageVersionStateMapProperty(required: true)
108110
Map<String, PackageVersionStateInfo>? versions;
109111

112+
/// The list of tokens that were removed from this [PackageState].
113+
/// When a worker reports back using one of these tokens, they will
114+
/// recieve a [TaskAbortedException].
115+
@AbortedTokenListProperty()
116+
List<AbortedTokenInfo>? abortedTokens;
117+
110118
/// Next [DateTime] at which point some package version becomes pending.
111119
@db.DateTimeProperty(required: true, indexed: true)
112120
DateTime? pendingAt;
@@ -407,3 +415,58 @@ enum PackageVersionStatus {
407415
/// Analysis failed to report a result.
408416
failed,
409417
}
418+
419+
/// Tracks a token that was removed from the [PackageState], but a worker
420+
/// may still use it to report a completed task. Such workers may recieve
421+
/// an error code that says they shouldn't really panic on the rejection.
422+
@JsonSerializable()
423+
class AbortedTokenInfo {
424+
final String token;
425+
final DateTime expires;
426+
427+
AbortedTokenInfo({
428+
required this.token,
429+
required this.expires,
430+
});
431+
432+
factory AbortedTokenInfo.fromJson(Map<String, dynamic> m) =>
433+
_$AbortedTokenInfoFromJson(m);
434+
Map<String, dynamic> toJson() => _$AbortedTokenInfoToJson(this);
435+
436+
bool get isNotExpired => clock.now().isBefore(expires);
437+
438+
bool isAuthorized(String token) {
439+
return fixedTimeEquals(this.token, token) && isNotExpired;
440+
}
441+
}
442+
443+
/// A [db.Property] encoding a List os [AbortedTokenInfo] as JSON.
444+
class AbortedTokenListProperty extends db.Property {
445+
const AbortedTokenListProperty({String? propertyName, bool required = false})
446+
: super(propertyName: propertyName, required: required, indexed: false);
447+
448+
@override
449+
Object? encodeValue(
450+
db.ModelDB mdb,
451+
Object? value, {
452+
bool forComparison = false,
453+
}) =>
454+
json.encode(
455+
(value as List<AbortedTokenInfo>?)?.map((e) => e.toJson()).toList());
456+
457+
@override
458+
Object? decodePrimitiveValue(
459+
db.ModelDB mdb,
460+
Object? value,
461+
) =>
462+
value == null
463+
? null
464+
: (json.decode(value as String) as List?)
465+
?.map((e) => AbortedTokenInfo.fromJson(e as Map<String, dynamic>))
466+
.toList();
467+
468+
@override
469+
bool validate(db.ModelDB mdb, Object? value) =>
470+
super.validate(mdb, value) &&
471+
(value == null || value is List<AbortedTokenInfo>);
472+
}

app/lib/task/models.g.dart

Lines changed: 12 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pkg/pub_worker/lib/src/analyze.dart

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,14 @@ Future<void> analyze(Payload payload) async {
9292
'analyze.',
9393
);
9494
}
95+
} on TaskAbortedException catch (e, st) {
96+
_log.warning(
97+
'Task was aborted when uploading ${payload.package} / ${p.version}',
98+
e,
99+
st);
95100
} catch (e, st) {
96101
_log.shout(
97-
'failed to process ${payload.package} / ${p.version}',
98-
e,
99-
st,
100-
);
102+
'failed to process ${payload.package} / ${p.version}', e, st);
101103
}
102104
}
103105
} finally {

pkg/pub_worker/lib/src/upload.dart

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
// BSD-style license that can be found in the LICENSE file.
44

55
import 'dart:async';
6+
import 'dart:convert';
67
import 'dart:io';
78

89
import 'package:_pub_shared/data/package_api.dart' show UploadInfo;
@@ -40,6 +41,15 @@ Future<void> upload(
4041
));
4142
final res = await Response.fromStream(await client.send(req));
4243

44+
// Special case `TaskAborted` response code, it means that the analysis
45+
// is no longer selected or the secret token timed out / was replaced
46+
// (it may need a different analysis round).
47+
if (res.statusCode == 400 &&
48+
_extractExceptionCode(res) == 'TaskAborted') {
49+
_log.warning(
50+
'Task aborted, failed to upload: $filename, status = ${res.statusCode}');
51+
throw TaskAbortedException(res.body);
52+
}
4353
if (400 <= res.statusCode && res.statusCode < 500) {
4454
_log.shout('Failed to upload: $filename, status = ${res.statusCode}');
4555
throw UploadException(
@@ -80,3 +90,26 @@ final class UploadException implements Exception {
8090
final class IntermittentUploadException extends UploadException {
8191
IntermittentUploadException(String message) : super(message);
8292
}
93+
94+
final class TaskAbortedException extends UploadException {
95+
TaskAbortedException(String message) : super(message);
96+
}
97+
98+
/// Extract `error.code` from JSON body in [res].
99+
String? _extractExceptionCode(Response res) {
100+
try {
101+
final map = json.decode(res.body);
102+
if (map is! Map) {
103+
return null;
104+
}
105+
final error = map['error'];
106+
if (error is! Map) {
107+
return null;
108+
}
109+
final code = error['code'];
110+
return code?.toString();
111+
} on FormatException catch (_) {
112+
// ignore
113+
}
114+
return null;
115+
}

0 commit comments

Comments
 (0)