Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 35 additions & 2 deletions app/lib/package/api_export/exported_api.dart
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import 'dart:io';
import 'package:_pub_shared/data/advisories_api.dart';
import 'package:_pub_shared/data/package_api.dart';
import 'package:clock/clock.dart';
import 'package:crypto/crypto.dart';
import 'package:gcloud/storage.dart';
import 'package:logging/logging.dart';
import 'package:pool/pool.dart';
import 'package:pub_dev/shared/utils.dart';
import '../../shared/storage.dart';
import '../../shared/versions.dart'
show runtimeVersion, runtimeVersionPattern, shouldGCVersion;

Expand Down Expand Up @@ -268,7 +270,7 @@ final class ExportedJsonFile<T> extends ExportedObject {
final gzipped = _jsonGzip.encode(data);
await Future.wait(_owner._prefixes.map((prefix) async {
await _owner._pool.withResource(() async {
await _owner._bucket.writeBytes(
await _owner._bucket.writeBytesIfDifferent(
prefix + _objectName,
gzipped,
metadata: _metadata,
Expand Down Expand Up @@ -307,7 +309,7 @@ final class ExportedBlob extends ExportedObject {
Future<void> write(List<int> data) async {
await Future.wait(_owner._prefixes.map((prefix) async {
await _owner._pool.withResource(() async {
await _owner._bucket.writeBytes(
await _owner._bucket.writeBytesIfDifferent(
prefix + _objectName,
data,
metadata: _metadata,
Expand All @@ -333,3 +335,34 @@ final class ExportedBlob extends ExportedObject {
}));
}
}

extension on Bucket {
Future<void> writeBytesIfDifferent(
String name,
List<int> bytes, {
ObjectMetadata? metadata,
}) async {
if (await _hasSameContent(name, bytes)) {
return;
}
await uploadWithRetry(
this,
name,
bytes.length,
() => Stream.value(bytes),
metadata: metadata,
);
}

Future<bool> _hasSameContent(String name, List<int> bytes) async {
final info = await tryInfo(name);
if (info == null) {
return false;
}
if (info.length != bytes.length) {
return false;
}
final md5Hash = md5.convert(bytes).bytes;
return fixedTimeIntListEquals(info.md5Hash, md5Hash);
}
}
85 changes: 8 additions & 77 deletions app/lib/package/export_api_to_bucket.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,8 @@
// for details. All rights reserved. Use of this source code is governed by a
// BSD-style license that can be found in the LICENSE file.

import 'dart:io';

import 'package:basics/basics.dart';
import 'package:clock/clock.dart';
import 'package:crypto/crypto.dart';
import 'package:gcloud/service_scope.dart' as ss;
import 'package:gcloud/storage.dart';
import 'package:logging/logging.dart';
Expand All @@ -17,9 +14,9 @@ import 'package:retry/retry.dart';
import '../search/backend.dart';
import '../shared/datastore.dart';
import '../shared/storage.dart';
import '../shared/utils.dart';
import '../shared/versions.dart';
import '../task/global_lock.dart';
import 'api_export/exported_api.dart';
import 'backend.dart';
import 'models.dart';

Expand All @@ -28,20 +25,6 @@ final Logger _logger = Logger('export_api_to_bucket');
/// The default concurrency to upload API JSON files to the bucket.
const _defaultBucketUpdateConcurrency = 8;

/// The default cache timeout for content.
const _pkgApiMaxCacheAge = Duration(minutes: 10);
const _pkgNameCompletionDataMaxAge = Duration(hours: 8);

List<String> _apiPkgObjectNames(String package) => [
'$runtimeVersion/api/packages/$package',
'current/api/packages/$package',
];

List<String> _apiPkgNameCompletionDataNames() => [
'$runtimeVersion/api/package-name-completion-data',
'current/api/package-name-completion-data',
];

/// Sets the API Exporter service.
void registerApiExporter(ApiExporter value) =>
ss.register(#_apiExporter, value);
Expand All @@ -50,14 +33,16 @@ void registerApiExporter(ApiExporter value) =>
ApiExporter? get apiExporter => ss.lookup(#_apiExporter) as ApiExporter?;

class ApiExporter {
final ExportedApi _api;
final Bucket _bucket;
final int _concurrency;
final _pkgLastUpdated = <String, _PkgUpdatedEvent>{};

ApiExporter({
required Bucket bucket,
int concurrency = _defaultBucketUpdateConcurrency,
}) : _bucket = bucket,
}) : _api = ExportedApi(storageService, bucket),
_bucket = bucket,
_concurrency = concurrency;

/// Runs a forever loop and tries to get a global lock.
Expand Down Expand Up @@ -89,12 +74,8 @@ class ApiExporter {

/// Gets and uploads the package name completion data.
Future<void> uploadPkgNameCompletionData() async {
final bytes = await searchBackend.getPackageNameCompletionDataJsonGz();
final bytesAndHash = _BytesAndHash(bytes);
for (final objectName in _apiPkgNameCompletionDataNames()) {
await _upsert(objectName, bytesAndHash,
maxAge: _pkgNameCompletionDataMaxAge);
}
await _api.packageNameCompletionData
.write(await searchBackend.getPackageNameCompletionData());
}

/// Note: there is no global locking here, the full scan should be called
Expand Down Expand Up @@ -187,19 +168,11 @@ class ApiExporter {
/// the endpoint name in the file location.
Future<void> _uploadPackageToBucket(String package) async {
final data = await retry(() => packageBackend.listVersions(package));
final rawBytes = jsonUtf8Encoder.convert(data.toJson());
final bytes = gzip.encode(rawBytes);
final bytesAndHash = _BytesAndHash(bytes);

for (final objectName in _apiPkgObjectNames(package)) {
await _upsert(objectName, bytesAndHash, maxAge: _pkgApiMaxCacheAge);
}
await _api.package(package).versions.write(data);
}

Future<void> _deletePackageFromBucket(String package) async {
for (final objectName in _apiPkgObjectNames(package)) {
await _bucket.tryDelete(objectName);
}
await _api.package(package).delete();
}

Stream<_PkgUpdatedEvent> _queryRecentPkgUpdatedEvents(DateTime since) async* {
Expand Down Expand Up @@ -250,39 +223,6 @@ class ApiExporter {
await deleteBucketFolderRecursively(_bucket, '$v/', concurrency: 4);
}
}

Future<void> _upsert(
String objectName,
_BytesAndHash bytesAndHash, {
required Duration maxAge,
}) async {
if (await _isSameContent(objectName, bytesAndHash)) {
return;
}
await uploadWithRetry(
_bucket,
objectName,
bytesAndHash.bytes.length,
() => Stream.value(bytesAndHash.bytes),
metadata: ObjectMetadata(
contentType: 'application/json; charset="utf-8"',
contentEncoding: 'gzip',
cacheControl: 'public, max-age=${maxAge.inSeconds}',
),
);
}

Future<bool> _isSameContent(
String objectName, _BytesAndHash bytesAndHash) async {
final info = await _bucket.tryInfo(objectName);
if (info == null) {
return false;
}
if (info.length != bytesAndHash.length) {
return false;
}
return fixedTimeIntListEquals(info.md5Hash, bytesAndHash.md5Hash);
}
}

typedef _PkgUpdatedEvent = ({String package, DateTime updated, bool isVisible});
Expand All @@ -296,12 +236,3 @@ extension on Package {
_PkgUpdatedEvent asPkgUpdatedEvent() =>
(package: name!, updated: updated!, isVisible: isVisible);
}

class _BytesAndHash {
final List<int> bytes;

_BytesAndHash(this.bytes);

int get length => bytes.length;
late final md5Hash = md5.convert(bytes).bytes;
}
35 changes: 20 additions & 15 deletions app/lib/search/backend.dart
Original file line number Diff line number Diff line change
Expand Up @@ -495,24 +495,29 @@ class SearchBackend {
'delete-old-search-snapshots cleared $counts entries ($runtimeVersion)');
}

/// Creates the content for the /api/package-name-completion-data endpoint.
Future<Map<String, Object?>> getPackageNameCompletionData() async {
final rs = await searchClient.search(
ServiceSearchQuery.parse(
tagsPredicate: TagsPredicate.regularSearch(),
limit: 20000,
),
// Do not cache response at the search client level, as we'll be caching
// it in a processed form much longer.
skipCache: true,
// Do not apply rate limit here.
sourceIp: null,
);
return {
'packages': rs.packageHits.map((p) => p.package).toList(),
};
}

/// Creates the gzipped byte content for the /api/package-name-completion-data endpoint.
Future<List<int>> getPackageNameCompletionDataJsonGz() async {
final bytes = await cache.packageNameCompletionDataJsonGz().get(() async {
final rs = await searchClient.search(
ServiceSearchQuery.parse(
tagsPredicate: TagsPredicate.regularSearch(),
limit: 20000,
),
// Do not cache response at the search client level, as we'll be caching
// it in a processed form much longer.
skipCache: true,
// Do not apply rate limit here.
sourceIp: null,
);

return gzip.encode(jsonUtf8Encoder.convert({
'packages': rs.packageHits.map((p) => p.package).toList(),
}));
final data = await getPackageNameCompletionData();
return gzip.encode(jsonUtf8Encoder.convert(data));
});
return bytes!;
}
Expand Down
16 changes: 8 additions & 8 deletions app/test/package/export_api_to_bucket_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -45,20 +45,20 @@ void main() {
.toList();
expect(files.toSet(), {
'$runtimeVersion/api/package-name-completion-data',
'current/api/package-name-completion-data',
'latest/api/package-name-completion-data',
'$runtimeVersion/api/packages/flutter_titanium',
'$runtimeVersion/api/packages/neon',
'$runtimeVersion/api/packages/oxygen',
'current/api/packages/flutter_titanium',
'current/api/packages/neon',
'current/api/packages/oxygen',
'latest/api/packages/flutter_titanium',
'latest/api/packages/neon',
'latest/api/packages/oxygen',
});

Future<Object?> readAndDecodeJson(String path) async => json
.decode(utf8.decode(gzip.decode(await bucket.readAsBytes(path))));

expect(
await readAndDecodeJson('current/api/packages/neon'),
await readAndDecodeJson('latest/api/packages/neon'),
{
'name': 'neon',
'latest': isNotEmpty,
Expand All @@ -67,7 +67,7 @@ void main() {
);

expect(
await readAndDecodeJson('current/api/package-name-completion-data'),
await readAndDecodeJson('latest/api/package-name-completion-data'),
{
'packages': hasLength(3),
},
Expand All @@ -81,7 +81,7 @@ void main() {
final bucket =
storageService.bucket(activeConfiguration.exportedApiBucketName!);
final originalBytes =
await bucket.readAsBytes('current/api/packages/oxygen');
await bucket.readAsBytes('latest/api/packages/oxygen');

final pubspecContent = generatePubspecYaml('oxygen', '9.0.0');
final message = await createPubApiClient(authToken: adminClientToken)
Expand All @@ -91,7 +91,7 @@ void main() {

await Future.delayed(Duration(seconds: 1));
final updatedBytes =
await bucket.readAsBytes('current/api/packages/oxygen');
await bucket.readAsBytes('latest/api/packages/oxygen');
expect(originalBytes.length, lessThan(updatedBytes.length));
});
});
Expand Down
Loading