@@ -246,6 +246,11 @@ public void run(String[] args) throws IOException {
246246
247247 List <Future > futs = new ArrayList <>();
248248
249+ // determine folder configuration for old producer vs readers
250+ String legacyFolder = this .config .getString (Const .Config .OptOutLegacyProducerS3FolderProp );
251+ String mainFolder = this .config .getString (Const .Config .OptOutS3FolderProp );
252+ boolean useLegacyFolder = legacyFolder != null && !legacyFolder .equals (mainFolder );
253+
249254 // main CloudSyncVerticle - downloads from optout_s3_folder (for readers: partition generator, log sender)
250255 OptOutCloudSync cs = new OptOutCloudSync (this .config , true );
251256 CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle ("optout" , this .fsOptOut , this .fsLocal , cs , this .config );
@@ -263,20 +268,16 @@ public void run(String[] args) throws IOException {
263268 // create partners config monitor
264269 futs .add (this .createPartnerConfigMonitor (cloudSyncVerticle .eventDownloaded ()));
265270
266- // determine where old producer uploads should go
267- String legacyFolder = this .config .getString (Const .Config .OptOutLegacyProducerS3FolderProp );
268- String mainFolder = this .config .getString (Const .Config .OptOutS3FolderProp );
269- boolean useLegacyFolder = legacyFolder != null && !legacyFolder .equals (mainFolder );
270-
271- // create OptOutCloudSync and CloudSyncVerticle for old producer uploads
272- // if legacy folder is configured, uploads go there; otherwise they go to main folder
271+ // create CloudSyncVerticle for old producer uploads
272+ // if legacy folder is configured, create upload-only verticle; otherwise reuse main verticle
273273 OptOutCloudSync uploadCs ;
274274 CloudSyncVerticle uploadVerticle ;
275275 if (useLegacyFolder ) {
276276 LOGGER .info ("old producer uploads configured to use folder: {} (readers use: {})" , legacyFolder , mainFolder );
277277 JsonObject legacyConfig = new JsonObject ().mergeIn (this .config )
278278 .put (Const .Config .OptOutS3FolderProp , legacyFolder );
279- uploadCs = new OptOutCloudSync (legacyConfig , true );
279+ uploadCs = new OptOutCloudSync (legacyConfig , true , true );
280+ // upload-only verticle: won't download legacy files
280281 uploadVerticle = new CloudSyncVerticle ("optout-legacy" , this .fsOptOut , this .fsLocal , uploadCs , this .config );
281282 futs .add (this .deploySingleInstance (uploadVerticle ));
282283 } else {
@@ -285,9 +286,12 @@ public void run(String[] args) throws IOException {
285286 uploadVerticle = cloudSyncVerticle ;
286287 }
287288
288- // create & deploy log producer verticle (fires upload events to uploadVerticle)
289- String eventUpload = uploadVerticle .eventUpload ();
290- OptOutLogProducer logProducer = new OptOutLogProducer (this .config , eventUpload , eventUpload );
289+ // create & deploy log producer verticle
290+ // deltas go to uploadVerticle (legacy folder in legacy mode, main folder otherwise)
291+ // partitions always go to main folder (cloudSyncVerticle) since they read from main folder
292+ String eventDeltaProduced = uploadVerticle .eventUpload ();
293+ String eventPartitionProduced = cloudSyncVerticle .eventUpload ();
294+ OptOutLogProducer logProducer = new OptOutLogProducer (this .config , eventDeltaProduced , eventPartitionProduced );
291295 futs .add (this .deploySingleInstance (logProducer ));
292296
293297 // upload last delta produced and potentially not uploaded yet
0 commit comments