Skip to content
5 changes: 5 additions & 0 deletions app/lib/shared/exceptions.dart
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@ class NotAcceptableException extends ResponseException {
: super._(406, 'NotAcceptable', message);
}

/// Thrown when the underlying analysis task has been aborted.
class TaskAbortedException extends ResponseException {
TaskAbortedException(String message) : super._(400, 'TaskAborted', message);
}

/// Thrown when request input is invalid, bad payload, wrong querystring, etc.
class InvalidInputException extends ResponseException {
InvalidInputException._(String message)
Expand Down
85 changes: 56 additions & 29 deletions app/lib/task/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ import 'package:pub_dev/task/global_lock.dart';
import 'package:pub_dev/task/handlers.dart';
import 'package:pub_dev/task/models.dart'
show
AbortedTokenInfo,
PackageState,
PackageStateInfo,
PackageVersionStateInfo,
Expand Down Expand Up @@ -436,9 +437,19 @@ class TaskBackend {

// List of versions that are tracked, but don't exist. These have
// probably been deselected by _versionsToTrack.
final deselectedVersions = [
...state.versions!.keys.whereNot(versions.contains),
];
final deselectedVersions = <String>[];
for (final e in state.versions!.entries) {
if (!versions.contains(e.key)) {
deselectedVersions.add(e.key);

final token = e.value.secretToken;
if (token != null) {
state.abortedTokens ??= <AbortedTokenInfo>[];
state.abortedTokens!.add(AbortedTokenInfo(
token: token, expires: clock.fromNow(days: 1)));
}
}
}

// There should never be an overlap between versions untracked and
// versions that tracked by now deselected.
Expand Down Expand Up @@ -467,6 +478,7 @@ class TaskBackend {
),
});
state.derivePendingAt();
state.abortedTokens?.removeWhere((t) => t.expires.isAfter(clock.now()));

_log.info('Update state tracking for $packageName');
tx.insert(state);
Expand Down Expand Up @@ -611,18 +623,8 @@ class TaskBackend {

final key = PackageState.createKey(_db, runtimeVersion, package);
final state = await _db.lookupOrNull<PackageState>(key);
if (state == null || state.versions![version] == null) {
throw NotFoundException.resource('$package/$version');
}
final versionState = state.versions![version]!;

// Check the secret token
if (!versionState.isAuthorized(_extractBearerToken(request))) {
throw AuthenticationException.authenticationRequired();
}
assert(versionState.scheduled != initialTimestamp);
assert(versionState.instance != null);
assert(versionState.zone != null);
final versionState =
_extractAndVerifyVersionState(package, version, state, request);

// Set expiration of signed URLs to remaining execution time + 5 min to
// allow for clock skew.
Expand Down Expand Up @@ -685,23 +687,13 @@ class TaskBackend {
await withRetryTransaction(_db, (tx) async {
final key = PackageState.createKey(_db, runtimeVersion, package);
final state = await tx.lookupOrNull<PackageState>(key);
if (state == null || state.versions![version] == null) {
throw NotFoundException.resource('$package/$version');
}
final versionState = state.versions![version]!;

// Check the secret token
if (!versionState.isAuthorized(_extractBearerToken(request))) {
throw AuthenticationException.authenticationRequired();
}
assert(versionState.scheduled != initialTimestamp);
assert(versionState.instance != null);
assert(versionState.zone != null);
final versionState =
_extractAndVerifyVersionState(package, version, state, request);

// Update dependencies, if pana summary has dependencies
if (summary != null && summary.allDependencies != null) {
final updatedDependencies = _updatedDependencies(
state.dependencies,
state!.dependencies,
summary.allDependencies,
// for logging only
package: package,
Expand All @@ -719,7 +711,7 @@ class TaskBackend {
instance = versionState.instance!;

// Remove instanceName, zone, secretToken, and set attempts = 0
state.versions![version] = PackageVersionStateInfo(
state!.versions![version] = PackageVersionStateInfo(
scheduled: versionState.scheduled,
docs: hasDocIndexHtml,
pana: summary != null,
Expand Down Expand Up @@ -1169,6 +1161,41 @@ String? _extractBearerToken(shelf.Request request) {
return parts.last.trim();
}

PackageVersionStateInfo _extractAndVerifyVersionState(
String package,
String version,
PackageState? state,
shelf.Request request,
) {
final token = _extractBearerToken(request);
if (token == null) {
throw AuthenticationException.authenticationRequired();
}
if (state == null) {
throw NotFoundException.resource('$package/$version');
}
final versionState = state.versions![version];
if (versionState == null) {
// check if the task was aborted
final abortedToken =
state.abortedTokens?.firstWhereOrNull((t) => t.token == token);
if (abortedToken != null && abortedToken.expires.isBefore(clock.now())) {
throw TaskAbortedException('$package/$version has been aborted.');
}
// otherwise throw a generic not found error
throw NotFoundException.resource('$package/$version');
}

// Check the secret token
if (!versionState.isAuthorized(token)) {
throw AuthenticationException.authenticationRequired();
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is vulnerable to timing attacks!

I'd suggest adding an isAuthorized method to AbortedTokenInfo object.

Also versionState != null doesn't imply that the token couldn't be aborted.

I'd suggest:

  • Always check abortedTokens, ideally always compare all of them (not just the first, because we want fixed time behavior)
  • Then check versionState.isAuthorized

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

assert(versionState.scheduled != initialTimestamp);
assert(versionState.instance != null);
assert(versionState.zone != null);
return versionState;
}

/// Given a list of versions return the list of versions that should be
/// tracked for analysis.
///
Expand Down
56 changes: 56 additions & 0 deletions app/lib/task/models.dart
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import 'dart:convert' show json;

import 'package:clock/clock.dart';
import 'package:json_annotation/json_annotation.dart';
import 'package:pub_dev/admin/actions/actions.dart';

import '../shared/datastore.dart' as db;
import '../shared/versions.dart' as shared_versions;
Expand Down Expand Up @@ -107,6 +108,12 @@ class PackageState extends db.ExpandoModel<String> {
@PackageVersionStateMapProperty(required: true)
Map<String, PackageVersionStateInfo>? versions;

/// The list of tokens that were removed from this [PackageState].
/// When a worker reports back using one of these tokens, they will
/// recieve a [TaskAbortedException].
@AbortedTokenListProperty()
List<AbortedTokenInfo>? abortedTokens;

/// Next [DateTime] at which point some package version becomes pending.
@db.DateTimeProperty(required: true, indexed: true)
DateTime? pendingAt;
Expand Down Expand Up @@ -407,3 +414,52 @@ enum PackageVersionStatus {
/// Analysis failed to report a result.
failed,
}

/// Tracks a token that was removed from the [PackageState], but a worker
/// may still use it to report a completed task. Such workers may recieve
/// an error code that says they shouldn't really panic on the rejection.
@JsonSerializable()
class AbortedTokenInfo {
final String token;
final DateTime expires;

AbortedTokenInfo({
required this.token,
required this.expires,
});

factory AbortedTokenInfo.fromJson(Map<String, dynamic> m) =>
_$AbortedTokenInfoFromJson(m);
Map<String, dynamic> toJson() => _$AbortedTokenInfoToJson(this);
}

/// A [db.Property] encoding a List os [AbortedTokenInfo] as JSON.
class AbortedTokenListProperty extends db.Property {
const AbortedTokenListProperty({String? propertyName, bool required = false})
: super(propertyName: propertyName, required: required, indexed: false);

@override
Object? encodeValue(
db.ModelDB mdb,
Object? value, {
bool forComparison = false,
}) =>
json.encode(
(value as List<AbortedTokenInfo>?)?.map((e) => e.toJson()).toList());

@override
Object? decodePrimitiveValue(
db.ModelDB mdb,
Object? value,
) =>
value == null
? null
: (json.decode(value as String) as List?)
?.map((e) => AbortedTokenInfo.fromJson(e as Map<String, dynamic>))
.toList();

@override
bool validate(db.ModelDB mdb, Object? value) =>
super.validate(mdb, value) &&
(value == null || value is List<AbortedTokenInfo>);
}
12 changes: 12 additions & 0 deletions app/lib/task/models.g.dart

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 4 additions & 4 deletions pkg/pub_worker/lib/src/analyze.dart
Original file line number Diff line number Diff line change
Expand Up @@ -92,12 +92,12 @@ Future<void> analyze(Payload payload) async {
'analyze.',
);
}
} on TaskAbortedException catch (e, st) {
_log.warning(
'failed to upload ${payload.package} / ${p.version}', e, st);
} catch (e, st) {
_log.shout(
'failed to process ${payload.package} / ${p.version}',
e,
st,
);
'failed to process ${payload.package} / ${p.version}', e, st);
}
}
} finally {
Expand Down
32 changes: 32 additions & 0 deletions pkg/pub_worker/lib/src/upload.dart
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
// BSD-style license that can be found in the LICENSE file.

import 'dart:async';
import 'dart:convert';
import 'dart:io';

import 'package:_pub_shared/data/package_api.dart' show UploadInfo;
Expand Down Expand Up @@ -40,6 +41,15 @@ Future<void> upload(
));
final res = await Response.fromStream(await client.send(req));

// Special case `TaskAborted` response code, it means that the analysis
// is no longer selected or the secret token timed out / was replaced
// (it may need a different analysis round).
if (res.statusCode == 400 &&
_extractExceptionCode(res) == 'TaskAborted') {
_log.warning(
'Task aborted, failed to upload: $filename, status = ${res.statusCode}');
throw TaskAbortedException(res.body);
}
if (400 <= res.statusCode && res.statusCode < 500) {
_log.shout('Failed to upload: $filename, status = ${res.statusCode}');
throw UploadException(
Expand Down Expand Up @@ -80,3 +90,25 @@ final class UploadException implements Exception {
final class IntermittentUploadException extends UploadException {
IntermittentUploadException(String message) : super(message);
}

final class TaskAbortedException extends UploadException {
TaskAbortedException(String message) : super(message);
}

String? _extractExceptionCode(Response res) {
try {
final map = json.decode(res.body);
if (map is! Map) {
return null;
}
final error = map['error'];
if (error is! Map) {
return null;
}
final code = error['code'];
return code?.toString();
} on FormatException catch (_) {
// ignore
}
return null;
}
Loading