|
| 1 | +import 'dart:async'; |
| 2 | +import 'dart:convert'; |
| 3 | +import 'dart:io'; |
| 4 | + |
| 5 | +import 'package:clock/clock.dart'; |
| 6 | +import 'package:gcloud/storage.dart'; |
| 7 | +import 'package:logging/logging.dart'; |
| 8 | +import 'package:pool/pool.dart'; |
| 9 | +import 'package:pub_dev/shared/utils.dart'; |
| 10 | +import '../../shared/versions.dart' |
| 11 | + show runtimeVersion, runtimeVersionPattern, shouldGCVersion; |
| 12 | + |
| 13 | +final _log = Logger('api_export:exported_bucket'); |
| 14 | + |
| 15 | +/// Interface for [Bucket] containing exported API that is served directly from |
| 16 | +/// Google Cloud Storage. |
| 17 | +/// |
| 18 | +/// This interface is responsible for: |
| 19 | +/// * Naming of files in the bucket. |
| 20 | +/// * Deletion of all files related to a package. |
| 21 | +/// * Garbage collection of unknown package entries. |
| 22 | +/// * Maintaining two prefixes with files in the bucket. |
| 23 | +/// * "latest" (that all runtimeVersions write to) |
| 24 | +/// * "<runtimeVersion>" (that only this runtimeVersion writes to) |
| 25 | +/// * Garbage collecting old prefixes. |
| 26 | +/// * Limit concurrency of operations. |
| 27 | +/// |
| 28 | +/// All writes to the bucket containing the exported API should go through this |
| 29 | +/// interface. |
| 30 | +final class ExportedApi { |
| 31 | + final Pool _pool = Pool(50); |
| 32 | + final Storage _storage; |
| 33 | + final Bucket _bucket; |
| 34 | + final List<String> _prefixes = [ |
| 35 | + 'latest', |
| 36 | + runtimeVersion, |
| 37 | + ]; |
| 38 | + |
| 39 | + ExportedApi(this._storage, this._bucket); |
| 40 | + |
| 41 | + /// Interface for writing all files related to [packageName]. |
| 42 | + ExportedPackage package(String packageName) => |
| 43 | + ExportedPackage._(this, packageName); |
| 44 | + |
| 45 | + /// Interface for writing `/api/package-name-completion-data` |
| 46 | + ExportedJsonFile get packageNameCompletionData => ExportedJsonFile._( |
| 47 | + this, |
| 48 | + '/api/package-name-completion-data', |
| 49 | + Duration(hours: 8), |
| 50 | + ); |
| 51 | + |
| 52 | + /// Run garbage collection on the bucket. |
| 53 | + /// |
| 54 | + /// This will remove all packages from `latest/` and `<runtimeVersion>/`, |
| 55 | + /// where: |
| 56 | + /// * The name of the package is not in [allPackageNames], and, |
| 57 | + /// * The file is more than one day old. |
| 58 | + /// |
| 59 | + /// This will remove prefixes other than `latest/` where [shouldGCVersion] |
| 60 | + /// returns true. |
| 61 | + Future<void> garbageCollect(Set<String> allPackageNames) async { |
| 62 | + await Future.wait([ |
| 63 | + _gcOldPrefixes(), |
| 64 | + ..._prefixes.map((prefix) => _gcPrefix(prefix, allPackageNames)), |
| 65 | + ]); |
| 66 | + } |
| 67 | + |
| 68 | + /// Garbage collect unknown packages from [prefix]. |
| 69 | + /// |
| 70 | + /// This will remove all packages from the `<prefix>/` where: |
| 71 | + /// * The name of the package is not in [allPackageNames], and, |
| 72 | + /// * The file is more than one day old. |
| 73 | + Future<void> _gcPrefix(String prefix, Set<String> allPackageNames) async { |
| 74 | + _log.info('Garbage collecting "$prefix"'); |
| 75 | + |
| 76 | + await _listBucket(prefix: prefix + '/api/packages/', delimiter: '/', |
| 77 | + (item) async { |
| 78 | + final String packageName; |
| 79 | + if (item.isObject) { |
| 80 | + assert(!item.name.endsWith('/')); |
| 81 | + packageName = item.name.split('/').last; |
| 82 | + } else { |
| 83 | + assert(item.name.endsWith('/')); |
| 84 | + packageName = item.name.without(suffix: '/').split('/').last; |
| 85 | + } |
| 86 | + if (!allPackageNames.contains(packageName)) { |
| 87 | + final info = await _bucket.info(item.name); |
| 88 | + if (info.updated.isBefore(clock.ago(days: 1))) { |
| 89 | + // Only delete if the item is more than one day old |
| 90 | + // This avoids any races where we delete files we've just created |
| 91 | + await package(packageName).delete(); |
| 92 | + } |
| 93 | + } |
| 94 | + }); |
| 95 | + |
| 96 | + await _listBucket(prefix: prefix + '/api/archives/', delimiter: '-', |
| 97 | + (item) async { |
| 98 | + if (item.isObject) { |
| 99 | + throw AssertionError('Unknown package archive at ${item.name}'); |
| 100 | + } |
| 101 | + assert(item.name.endsWith('-')); |
| 102 | + final packageName = item.name.without(suffix: '-').split('/').last; |
| 103 | + if (!allPackageNames.contains(packageName)) { |
| 104 | + final info = await _bucket.info(item.name); |
| 105 | + if (info.updated.isBefore(clock.ago(days: 1))) { |
| 106 | + // Only delete if the item is more than one day old |
| 107 | + // This avoids any races where we delete files we've just created |
| 108 | + await package(packageName).delete(); |
| 109 | + } |
| 110 | + } |
| 111 | + }); |
| 112 | + } |
| 113 | + |
| 114 | + /// Garbage collect old prefixes. |
| 115 | + /// |
| 116 | + /// This will remove prefixes other than `latest/` where [shouldGCVersion] |
| 117 | + /// returns true. |
| 118 | + Future<void> _gcOldPrefixes() async { |
| 119 | + // List all top-level prefixes, and delete the ones we don't need |
| 120 | + final topLevelprefixes = await _pool.withResource( |
| 121 | + () async => await _bucket.list(prefix: '', delimiter: '/').toList(), |
| 122 | + ); |
| 123 | + await Future.wait(topLevelprefixes.map((entry) async { |
| 124 | + if (entry.isObject) { |
| 125 | + return; // ignore top-level files |
| 126 | + } |
| 127 | + |
| 128 | + final topLevelPrefix = entry.name.without(suffix: '/'); |
| 129 | + if (_prefixes.contains(topLevelPrefix)) { |
| 130 | + return; // Don't GC prefixes we are writing to |
| 131 | + } |
| 132 | + |
| 133 | + if (!runtimeVersionPattern.hasMatch(topLevelPrefix)) { |
| 134 | + return; // Don't GC non-runtimeVersions |
| 135 | + } |
| 136 | + |
| 137 | + if (shouldGCVersion(topLevelPrefix)) { |
| 138 | + _log.info( |
| 139 | + 'Garbage collecting old prefix "$topLevelPrefix/" ' |
| 140 | + '(removing all objects under it)', |
| 141 | + ); |
| 142 | + |
| 143 | + assert(entry.name.endsWith('/')); |
| 144 | + await _listBucket( |
| 145 | + prefix: entry.name, |
| 146 | + delimiter: '', |
| 147 | + (entry) async => await _bucket.delete(entry.name), |
| 148 | + ); |
| 149 | + } |
| 150 | + })); |
| 151 | + } |
| 152 | + |
| 153 | + Future<void> _listBucket( |
| 154 | + FutureOr<void> Function(BucketEntry entry) each, { |
| 155 | + required String prefix, |
| 156 | + required String delimiter, |
| 157 | + }) async { |
| 158 | + var p = await _pool.withResource(() async => await _bucket.page( |
| 159 | + prefix: prefix, |
| 160 | + delimiter: delimiter, |
| 161 | + pageSize: 1000, |
| 162 | + )); |
| 163 | + while (true) { |
| 164 | + await Future.wait(p.items.map((item) async { |
| 165 | + await _pool.withResource(() async => await each(item)); |
| 166 | + })); |
| 167 | + |
| 168 | + if (p.isLast) break; |
| 169 | + p = await _pool.withResource(() async => await p.next(pageSize: 1000)); |
| 170 | + } |
| 171 | + } |
| 172 | +} |
| 173 | + |
| 174 | +/// Interface for writing data about a package to the exported API bucket. |
| 175 | +final class ExportedPackage { |
| 176 | + final ExportedApi _owner; |
| 177 | + final String _package; |
| 178 | + |
| 179 | + ExportedPackage._(this._owner, this._package); |
| 180 | + |
| 181 | + ExportedJsonFile _suffix(String suffix) => ExportedJsonFile._( |
| 182 | + _owner, |
| 183 | + '/api/packages/$_package$suffix', |
| 184 | + Duration(minutes: 10), |
| 185 | + ); |
| 186 | + |
| 187 | + /// Interface for writing `/api/packages/<package>`. |
| 188 | + /// |
| 189 | + /// Which contains version listing information. |
| 190 | + ExportedJsonFile get versions => _suffix(''); |
| 191 | + |
| 192 | + /// Interface for writing `/api/packages/<package>/advisories`. |
| 193 | + ExportedJsonFile get advisories => _suffix('/advisories'); |
| 194 | + |
| 195 | + /// Interace for writing `/api/archives/<package>-<version>.tar.gz`. |
| 196 | + ExportedBlob tarball(String version) => ExportedBlob._( |
| 197 | + _owner, |
| 198 | + '/api/archives/$_package-$version.tar.gz', |
| 199 | + '$_package-$version.tar.gz', |
| 200 | + 'application/octet', |
| 201 | + Duration(hours: 2), |
| 202 | + ); |
| 203 | + |
| 204 | + /// Delete all files related to this package. |
| 205 | + Future<void> delete() async { |
| 206 | + await Future.wait([ |
| 207 | + _owner._pool.withResource(() async => await versions.delete()), |
| 208 | + _owner._pool.withResource(() async => await advisories.delete()), |
| 209 | + ..._owner._prefixes.map((prefix) async { |
| 210 | + await _owner._listBucket( |
| 211 | + prefix: prefix + '/api/archives/$_package-', |
| 212 | + delimiter: '', |
| 213 | + (item) async => await _owner._bucket.delete(item.name), |
| 214 | + ); |
| 215 | + }), |
| 216 | + ]); |
| 217 | + } |
| 218 | +} |
| 219 | + |
| 220 | +/// Interface for an exported file. |
| 221 | +sealed class ExportedObject { |
| 222 | + final ExportedApi _owner; |
| 223 | + final String _objectName; |
| 224 | + ExportedObject._(this._owner, this._objectName); |
| 225 | + |
| 226 | + /// Delete this file. |
| 227 | + Future<void> delete() async { |
| 228 | + await Future.wait(_owner._prefixes.map((prefix) async { |
| 229 | + await _owner._pool.withResource(() async { |
| 230 | + await _owner._bucket.delete(prefix + _objectName); |
| 231 | + }); |
| 232 | + })); |
| 233 | + } |
| 234 | +} |
| 235 | + |
| 236 | +/// Interface for an exported JSON file. |
| 237 | +/// |
| 238 | +/// This will write JSON as gzipped UTF-8, adding headers for |
| 239 | +/// * `Content-Type`, |
| 240 | +/// * `Content-Encoding`, and, |
| 241 | +/// * `Cache-Control`. |
| 242 | +final class ExportedJsonFile extends ExportedObject { |
| 243 | + static final _jsonGzip = json.fuse(utf8).fuse(gzip); |
| 244 | + final Duration _maxAge; |
| 245 | + |
| 246 | + ExportedJsonFile._( |
| 247 | + ExportedApi _owner, |
| 248 | + String _objectName, |
| 249 | + this._maxAge, |
| 250 | + ) : super._(_owner, _objectName); |
| 251 | + |
| 252 | + late final _metadata = ObjectMetadata( |
| 253 | + contentType: 'application/json; charset="utf-8"', |
| 254 | + contentEncoding: 'gzip', |
| 255 | + cacheControl: 'public, max-age=${_maxAge.inSeconds}', |
| 256 | + ); |
| 257 | + |
| 258 | + /// Write [data] as gzipped JSON in UTF-8 format. |
| 259 | + Future<void> write(Map<String, Object?> data) async { |
| 260 | + final gzipped = _jsonGzip.encode(data); |
| 261 | + await Future.wait(_owner._prefixes.map((prefix) async { |
| 262 | + await _owner._pool.withResource(() async { |
| 263 | + await _owner._bucket.writeBytes( |
| 264 | + prefix + _objectName, |
| 265 | + gzipped, |
| 266 | + metadata: _metadata, |
| 267 | + ); |
| 268 | + }); |
| 269 | + })); |
| 270 | + } |
| 271 | +} |
| 272 | + |
| 273 | +/// Interface for an exported binary file. |
| 274 | +/// |
| 275 | +/// This will write a binary blob as is, adding headers for |
| 276 | +/// * `Content-Type`, |
| 277 | +/// * `Content-Disposition`, and, |
| 278 | +/// * `Cache-Control`. |
| 279 | +final class ExportedBlob extends ExportedObject { |
| 280 | + final String _contentType; |
| 281 | + final Duration _maxAge; |
| 282 | + final String _filename; |
| 283 | + |
| 284 | + ExportedBlob._( |
| 285 | + ExportedApi _owner, |
| 286 | + String _objectName, |
| 287 | + this._filename, |
| 288 | + this._contentType, |
| 289 | + this._maxAge, |
| 290 | + ) : super._(_owner, _objectName); |
| 291 | + |
| 292 | + late final _metadata = ObjectMetadata( |
| 293 | + contentType: _contentType, |
| 294 | + cacheControl: 'public, max-age=${_maxAge.inSeconds}', |
| 295 | + contentDisposition: 'attachment; filename="$_filename"', |
| 296 | + ); |
| 297 | + |
| 298 | + /// Write binary blob to this file. |
| 299 | + Future<void> write(List<int> data) async { |
| 300 | + await Future.wait(_owner._prefixes.map((prefix) async { |
| 301 | + await _owner._pool.withResource(() async { |
| 302 | + await _owner._bucket.writeBytes( |
| 303 | + prefix + _objectName, |
| 304 | + data, |
| 305 | + metadata: _metadata, |
| 306 | + ); |
| 307 | + }); |
| 308 | + })); |
| 309 | + } |
| 310 | + |
| 311 | + /// Copy binary blob from [absoluteObjectName] to this file. |
| 312 | + /// |
| 313 | + /// Notice that [absoluteObjectName] must be an a GCS URI including `gs://`. |
| 314 | + /// This means that it must include bucket name. |
| 315 | + /// Such URIs can be created with [Bucket.absoluteObjectName]. |
| 316 | + Future<void> copyFrom(String absoluteObjectName) async { |
| 317 | + await Future.wait(_owner._prefixes.map((prefix) async { |
| 318 | + await _owner._pool.withResource(() async { |
| 319 | + await _owner._storage.copyObject( |
| 320 | + absoluteObjectName, |
| 321 | + _owner._bucket.absoluteObjectName(prefix + _objectName), |
| 322 | + metadata: _metadata, |
| 323 | + ); |
| 324 | + }); |
| 325 | + })); |
| 326 | + } |
| 327 | +} |
0 commit comments