Skip to content

Commit ba0dc00

Browse files
authored
Merging some aspects of the ExportedApi with the bucket exporter. (#8177)
1 parent e6bb800 commit ba0dc00

File tree

4 files changed

+71
-102
lines changed

4 files changed

+71
-102
lines changed

app/lib/package/api_export/exported_api.dart

Lines changed: 35 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,10 +9,12 @@ import 'dart:io';
99
import 'package:_pub_shared/data/advisories_api.dart';
1010
import 'package:_pub_shared/data/package_api.dart';
1111
import 'package:clock/clock.dart';
12+
import 'package:crypto/crypto.dart';
1213
import 'package:gcloud/storage.dart';
1314
import 'package:logging/logging.dart';
1415
import 'package:pool/pool.dart';
1516
import 'package:pub_dev/shared/utils.dart';
17+
import '../../shared/storage.dart';
1618
import '../../shared/versions.dart'
1719
show runtimeVersion, runtimeVersionPattern, shouldGCVersion;
1820

@@ -268,7 +270,7 @@ final class ExportedJsonFile<T> extends ExportedObject {
268270
final gzipped = _jsonGzip.encode(data);
269271
await Future.wait(_owner._prefixes.map((prefix) async {
270272
await _owner._pool.withResource(() async {
271-
await _owner._bucket.writeBytes(
273+
await _owner._bucket.writeBytesIfDifferent(
272274
prefix + _objectName,
273275
gzipped,
274276
metadata: _metadata,
@@ -307,7 +309,7 @@ final class ExportedBlob extends ExportedObject {
307309
Future<void> write(List<int> data) async {
308310
await Future.wait(_owner._prefixes.map((prefix) async {
309311
await _owner._pool.withResource(() async {
310-
await _owner._bucket.writeBytes(
312+
await _owner._bucket.writeBytesIfDifferent(
311313
prefix + _objectName,
312314
data,
313315
metadata: _metadata,
@@ -333,3 +335,34 @@ final class ExportedBlob extends ExportedObject {
333335
}));
334336
}
335337
}
338+
339+
extension on Bucket {
340+
Future<void> writeBytesIfDifferent(
341+
String name,
342+
List<int> bytes, {
343+
ObjectMetadata? metadata,
344+
}) async {
345+
if (await _hasSameContent(name, bytes)) {
346+
return;
347+
}
348+
await uploadWithRetry(
349+
this,
350+
name,
351+
bytes.length,
352+
() => Stream.value(bytes),
353+
metadata: metadata,
354+
);
355+
}
356+
357+
Future<bool> _hasSameContent(String name, List<int> bytes) async {
358+
final info = await tryInfo(name);
359+
if (info == null) {
360+
return false;
361+
}
362+
if (info.length != bytes.length) {
363+
return false;
364+
}
365+
final md5Hash = md5.convert(bytes).bytes;
366+
return fixedTimeIntListEquals(info.md5Hash, md5Hash);
367+
}
368+
}

app/lib/package/export_api_to_bucket.dart

Lines changed: 8 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,8 @@
22
// for details. All rights reserved. Use of this source code is governed by a
33
// BSD-style license that can be found in the LICENSE file.
44

5-
import 'dart:io';
6-
75
import 'package:basics/basics.dart';
86
import 'package:clock/clock.dart';
9-
import 'package:crypto/crypto.dart';
107
import 'package:gcloud/service_scope.dart' as ss;
118
import 'package:gcloud/storage.dart';
129
import 'package:logging/logging.dart';
@@ -17,9 +14,9 @@ import 'package:retry/retry.dart';
1714
import '../search/backend.dart';
1815
import '../shared/datastore.dart';
1916
import '../shared/storage.dart';
20-
import '../shared/utils.dart';
2117
import '../shared/versions.dart';
2218
import '../task/global_lock.dart';
19+
import 'api_export/exported_api.dart';
2320
import 'backend.dart';
2421
import 'models.dart';
2522

@@ -28,20 +25,6 @@ final Logger _logger = Logger('export_api_to_bucket');
2825
/// The default concurrency to upload API JSON files to the bucket.
2926
const _defaultBucketUpdateConcurrency = 8;
3027

31-
/// The default cache timeout for content.
32-
const _pkgApiMaxCacheAge = Duration(minutes: 10);
33-
const _pkgNameCompletionDataMaxAge = Duration(hours: 8);
34-
35-
List<String> _apiPkgObjectNames(String package) => [
36-
'$runtimeVersion/api/packages/$package',
37-
'current/api/packages/$package',
38-
];
39-
40-
List<String> _apiPkgNameCompletionDataNames() => [
41-
'$runtimeVersion/api/package-name-completion-data',
42-
'current/api/package-name-completion-data',
43-
];
44-
4528
/// Sets the API Exporter service.
4629
void registerApiExporter(ApiExporter value) =>
4730
ss.register(#_apiExporter, value);
@@ -50,14 +33,16 @@ void registerApiExporter(ApiExporter value) =>
5033
ApiExporter? get apiExporter => ss.lookup(#_apiExporter) as ApiExporter?;
5134

5235
class ApiExporter {
36+
final ExportedApi _api;
5337
final Bucket _bucket;
5438
final int _concurrency;
5539
final _pkgLastUpdated = <String, _PkgUpdatedEvent>{};
5640

5741
ApiExporter({
5842
required Bucket bucket,
5943
int concurrency = _defaultBucketUpdateConcurrency,
60-
}) : _bucket = bucket,
44+
}) : _api = ExportedApi(storageService, bucket),
45+
_bucket = bucket,
6146
_concurrency = concurrency;
6247

6348
/// Runs a forever loop and tries to get a global lock.
@@ -89,12 +74,8 @@ class ApiExporter {
8974

9075
/// Gets and uploads the package name completion data.
9176
Future<void> uploadPkgNameCompletionData() async {
92-
final bytes = await searchBackend.getPackageNameCompletionDataJsonGz();
93-
final bytesAndHash = _BytesAndHash(bytes);
94-
for (final objectName in _apiPkgNameCompletionDataNames()) {
95-
await _upsert(objectName, bytesAndHash,
96-
maxAge: _pkgNameCompletionDataMaxAge);
97-
}
77+
await _api.packageNameCompletionData
78+
.write(await searchBackend.getPackageNameCompletionData());
9879
}
9980

10081
/// Note: there is no global locking here, the full scan should be called
@@ -187,19 +168,11 @@ class ApiExporter {
187168
/// the endpoint name in the file location.
188169
Future<void> _uploadPackageToBucket(String package) async {
189170
final data = await retry(() => packageBackend.listVersions(package));
190-
final rawBytes = jsonUtf8Encoder.convert(data.toJson());
191-
final bytes = gzip.encode(rawBytes);
192-
final bytesAndHash = _BytesAndHash(bytes);
193-
194-
for (final objectName in _apiPkgObjectNames(package)) {
195-
await _upsert(objectName, bytesAndHash, maxAge: _pkgApiMaxCacheAge);
196-
}
171+
await _api.package(package).versions.write(data);
197172
}
198173

199174
Future<void> _deletePackageFromBucket(String package) async {
200-
for (final objectName in _apiPkgObjectNames(package)) {
201-
await _bucket.tryDelete(objectName);
202-
}
175+
await _api.package(package).delete();
203176
}
204177

205178
Stream<_PkgUpdatedEvent> _queryRecentPkgUpdatedEvents(DateTime since) async* {
@@ -250,39 +223,6 @@ class ApiExporter {
250223
await deleteBucketFolderRecursively(_bucket, '$v/', concurrency: 4);
251224
}
252225
}
253-
254-
Future<void> _upsert(
255-
String objectName,
256-
_BytesAndHash bytesAndHash, {
257-
required Duration maxAge,
258-
}) async {
259-
if (await _isSameContent(objectName, bytesAndHash)) {
260-
return;
261-
}
262-
await uploadWithRetry(
263-
_bucket,
264-
objectName,
265-
bytesAndHash.bytes.length,
266-
() => Stream.value(bytesAndHash.bytes),
267-
metadata: ObjectMetadata(
268-
contentType: 'application/json; charset="utf-8"',
269-
contentEncoding: 'gzip',
270-
cacheControl: 'public, max-age=${maxAge.inSeconds}',
271-
),
272-
);
273-
}
274-
275-
Future<bool> _isSameContent(
276-
String objectName, _BytesAndHash bytesAndHash) async {
277-
final info = await _bucket.tryInfo(objectName);
278-
if (info == null) {
279-
return false;
280-
}
281-
if (info.length != bytesAndHash.length) {
282-
return false;
283-
}
284-
return fixedTimeIntListEquals(info.md5Hash, bytesAndHash.md5Hash);
285-
}
286226
}
287227

288228
typedef _PkgUpdatedEvent = ({String package, DateTime updated, bool isVisible});
@@ -296,12 +236,3 @@ extension on Package {
296236
_PkgUpdatedEvent asPkgUpdatedEvent() =>
297237
(package: name!, updated: updated!, isVisible: isVisible);
298238
}
299-
300-
class _BytesAndHash {
301-
final List<int> bytes;
302-
303-
_BytesAndHash(this.bytes);
304-
305-
int get length => bytes.length;
306-
late final md5Hash = md5.convert(bytes).bytes;
307-
}

app/lib/search/backend.dart

Lines changed: 20 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -495,24 +495,29 @@ class SearchBackend {
495495
'delete-old-search-snapshots cleared $counts entries ($runtimeVersion)');
496496
}
497497

498+
/// Creates the content for the /api/package-name-completion-data endpoint.
499+
Future<Map<String, Object?>> getPackageNameCompletionData() async {
500+
final rs = await searchClient.search(
501+
ServiceSearchQuery.parse(
502+
tagsPredicate: TagsPredicate.regularSearch(),
503+
limit: 20000,
504+
),
505+
// Do not cache response at the search client level, as we'll be caching
506+
// it in a processed form much longer.
507+
skipCache: true,
508+
// Do not apply rate limit here.
509+
sourceIp: null,
510+
);
511+
return {
512+
'packages': rs.packageHits.map((p) => p.package).toList(),
513+
};
514+
}
515+
498516
/// Creates the gzipped byte content for the /api/package-name-completion-data endpoint.
499517
Future<List<int>> getPackageNameCompletionDataJsonGz() async {
500518
final bytes = await cache.packageNameCompletionDataJsonGz().get(() async {
501-
final rs = await searchClient.search(
502-
ServiceSearchQuery.parse(
503-
tagsPredicate: TagsPredicate.regularSearch(),
504-
limit: 20000,
505-
),
506-
// Do not cache response at the search client level, as we'll be caching
507-
// it in a processed form much longer.
508-
skipCache: true,
509-
// Do not apply rate limit here.
510-
sourceIp: null,
511-
);
512-
513-
return gzip.encode(jsonUtf8Encoder.convert({
514-
'packages': rs.packageHits.map((p) => p.package).toList(),
515-
}));
519+
final data = await getPackageNameCompletionData();
520+
return gzip.encode(jsonUtf8Encoder.convert(data));
516521
});
517522
return bytes!;
518523
}

app/test/package/export_api_to_bucket_test.dart

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -45,20 +45,20 @@ void main() {
4545
.toList();
4646
expect(files.toSet(), {
4747
'$runtimeVersion/api/package-name-completion-data',
48-
'current/api/package-name-completion-data',
48+
'latest/api/package-name-completion-data',
4949
'$runtimeVersion/api/packages/flutter_titanium',
5050
'$runtimeVersion/api/packages/neon',
5151
'$runtimeVersion/api/packages/oxygen',
52-
'current/api/packages/flutter_titanium',
53-
'current/api/packages/neon',
54-
'current/api/packages/oxygen',
52+
'latest/api/packages/flutter_titanium',
53+
'latest/api/packages/neon',
54+
'latest/api/packages/oxygen',
5555
});
5656

5757
Future<Object?> readAndDecodeJson(String path) async => json
5858
.decode(utf8.decode(gzip.decode(await bucket.readAsBytes(path))));
5959

6060
expect(
61-
await readAndDecodeJson('current/api/packages/neon'),
61+
await readAndDecodeJson('latest/api/packages/neon'),
6262
{
6363
'name': 'neon',
6464
'latest': isNotEmpty,
@@ -67,7 +67,7 @@ void main() {
6767
);
6868

6969
expect(
70-
await readAndDecodeJson('current/api/package-name-completion-data'),
70+
await readAndDecodeJson('latest/api/package-name-completion-data'),
7171
{
7272
'packages': hasLength(3),
7373
},
@@ -81,7 +81,7 @@ void main() {
8181
final bucket =
8282
storageService.bucket(activeConfiguration.exportedApiBucketName!);
8383
final originalBytes =
84-
await bucket.readAsBytes('current/api/packages/oxygen');
84+
await bucket.readAsBytes('latest/api/packages/oxygen');
8585

8686
final pubspecContent = generatePubspecYaml('oxygen', '9.0.0');
8787
final message = await createPubApiClient(authToken: adminClientToken)
@@ -91,7 +91,7 @@ void main() {
9191

9292
await Future.delayed(Duration(seconds: 1));
9393
final updatedBytes =
94-
await bucket.readAsBytes('current/api/packages/oxygen');
94+
await bucket.readAsBytes('latest/api/packages/oxygen');
9595
expect(originalBytes.length, lessThan(updatedBytes.length));
9696
});
9797
});

0 commit comments

Comments
 (0)