@@ -9,6 +9,9 @@ import 'package:gcloud/storage.dart';
99import 'package:logging/logging.dart' ;
1010import 'package:meta/meta.dart' ;
1111import 'package:pool/pool.dart' ;
12+ import 'package:pub_dev/service/security_advisories/backend.dart' ;
13+ import 'package:pub_dev/service/security_advisories/sync_security_advisories.dart' ;
14+ import 'package:pub_dev/shared/parallel_foreach.dart' ;
1215import 'package:retry/retry.dart' ;
1316
1417import '../../search/backend.dart' ;
@@ -78,6 +81,83 @@ class ApiExporter {
7881 .write (await searchBackend.getPackageNameCompletionData ());
7982 }
8083
84+ Future <void > fullSync () async {
85+ final invisiblePackageNames = await dbService
86+ .query <ModeratedPackage >()
87+ .run ()
88+ .map ((mp) => mp.name! )
89+ .toSet ();
90+
91+ final allPackageNames = < String > {};
92+ final packageQuery = dbService.query <Package >();
93+ await packageQuery.run ().parallelForEach (_concurrency, (pkg) async {
94+ final name = pkg.name! ;
95+ if (pkg.isNotVisible) {
96+ invisiblePackageNames.add (name);
97+ return ;
98+ }
99+ allPackageNames.add (name);
100+
101+ // TODO: Consider retries around all this logic
102+ await syncPackage (name);
103+ });
104+
105+ final visibilityConflicts =
106+ allPackageNames.intersection (invisiblePackageNames);
107+ if (visibilityConflicts.isNotEmpty) {
108+ // TODO: Shout into logs
109+ }
110+
111+ await _api.garbageCollect (allPackageNames);
112+ }
113+
114+ /// Sync package and into [ExportedApi] , this will GC, etc.
115+ ///
116+ /// This is intended when:
117+ /// * Running a full background synchronization.
118+ /// * When a change in [Package.updated] is detected (maybe???)
119+ /// * A package is moderated, or other admin action is applied.
120+ Future <void > syncPackage (String package) async {
121+ final versionListing = await packageBackend.listVersions (package);
122+ // TODO: Consider skipping the cache when fetching security advisories
123+ final advisories = await securityAdvisoryBackend.listAdvisoriesResponse (
124+ package,
125+ );
126+
127+ await Future .wait (versionListing.versions.map ((v) async {
128+ // TODO: Will v.version work here, is the canonicalized version number?
129+ final (bucket, prefix) =
130+ packageBackend.packageStorage.getBucketAndPrefix (package, v.version);
131+
132+ await _api.package (package).tarball (v.version).copyFrom (bucket, prefix);
133+ }));
134+
135+ await _api.package (package).advisories.write (advisories);
136+ await _api.package (package).versions.write (versionListing);
137+
138+ // TODO: Is this the canonoical version? (probably)
139+ final allVersions = versionListing.versions.map ((v) => v.version).toSet ();
140+ await _api.package (package).garbageCollect (allVersions);
141+ }
142+
143+ /// Upload a single version of a new package.
144+ ///
145+ /// This is intended to be used when a new version of a package has been
146+ /// published.
147+ Future <void > uploadSingleVersion (
148+ String package,
149+ String version,
150+ ) async {
151+ final versionListing = await packageBackend.listVersions (package);
152+
153+ // TODO: Will v.version work here, is the canonicalized version number?
154+ final (bucket, prefix) =
155+ packageBackend.packageStorage.getBucketAndPrefix (package, version);
156+ await _api.package (package).tarball (version).copyFrom (bucket, prefix);
157+
158+ await _api.package (package).versions.write (versionListing);
159+ }
160+
81161 /// Note: there is no global locking here, the full scan should be called
82162 /// only once every day, and it may be racing against the incremental
83163 /// updates.
0 commit comments