@@ -246,18 +246,8 @@ public void run(String[] args) throws IOException {
246246
247247 List <Future > futs = new ArrayList <>();
248248
249- // create optout cloud sync verticle
250- // if optout_legacy_producer_s3_folder is set, use it for the old producer's CloudSync
251- // this allows redirecting old producer output to a different folder during migration
252- String legacyFolder = this .config .getString (Const .Config .OptOutLegacyProducerS3FolderProp );
253- String mainFolder = this .config .getString (Const .Config .OptOutS3FolderProp );
254- JsonObject cloudSyncConfig = this .config ;
255- if (legacyFolder != null && !legacyFolder .equals (mainFolder )) {
256- cloudSyncConfig = new JsonObject ().mergeIn (this .config )
257- .put (Const .Config .OptOutS3FolderProp , legacyFolder );
258- LOGGER .info ("old producer configured to use folder: {} (main folder: {})" , legacyFolder , mainFolder );
259- }
260- OptOutCloudSync cs = new OptOutCloudSync (cloudSyncConfig , true );
249+ // main CloudSyncVerticle - downloads from optout_s3_folder (for readers: partition generator, log sender)
250+ OptOutCloudSync cs = new OptOutCloudSync (this .config , true );
261251 CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle ("optout" , this .fsOptOut , this .fsLocal , cs , this .config );
262252
263253 // deploy optout cloud sync verticle
@@ -267,19 +257,41 @@ public void run(String[] args) throws IOException {
267257 futs .add (this .createOperatorKeyRotator ());
268258
269259 if (!this .observeOnly ) {
270- // enable partition producing
260+ // enable partition producing (reads from main folder via cs)
271261 cs .enableDeltaMerging (vertx , Const .Event .PartitionProduce );
272262
273263 // create partners config monitor
274264 futs .add (this .createPartnerConfigMonitor (cloudSyncVerticle .eventDownloaded ()));
275265
276- // create & deploy log producer verticle
277- String eventUpload = cloudSyncVerticle .eventUpload ();
266+ // determine where old producer uploads should go
267+ String legacyFolder = this .config .getString (Const .Config .OptOutLegacyProducerS3FolderProp );
268+ String mainFolder = this .config .getString (Const .Config .OptOutS3FolderProp );
269+ boolean useLegacyFolder = legacyFolder != null && !legacyFolder .equals (mainFolder );
270+
271+ // create OptOutCloudSync and CloudSyncVerticle for old producer uploads
272+ // if legacy folder is configured, uploads go there; otherwise they go to main folder
273+ OptOutCloudSync uploadCs ;
274+ CloudSyncVerticle uploadVerticle ;
275+ if (useLegacyFolder ) {
276+ LOGGER .info ("old producer uploads configured to use folder: {} (readers use: {})" , legacyFolder , mainFolder );
277+ JsonObject legacyConfig = new JsonObject ().mergeIn (this .config )
278+ .put (Const .Config .OptOutS3FolderProp , legacyFolder );
279+ uploadCs = new OptOutCloudSync (legacyConfig , true );
280+ uploadVerticle = new CloudSyncVerticle ("optout-legacy" , this .fsOptOut , this .fsLocal , uploadCs , this .config );
281+ futs .add (this .deploySingleInstance (uploadVerticle ));
282+ } else {
283+ // no legacy folder configured, old producer uploads to main folder
284+ uploadCs = cs ;
285+ uploadVerticle = cloudSyncVerticle ;
286+ }
287+
288+ // create & deploy log producer verticle (fires upload events to uploadVerticle)
289+ String eventUpload = uploadVerticle .eventUpload ();
278290 OptOutLogProducer logProducer = new OptOutLogProducer (this .config , eventUpload , eventUpload );
279291 futs .add (this .deploySingleInstance (logProducer ));
280292
281293 // upload last delta produced and potentially not uploaded yet
282- futs .add ((this .uploadLastDelta (cs , logProducer , cloudSyncVerticle .eventUpload (), cloudSyncVerticle .eventRefresh ())));
294+ futs .add ((this .uploadLastDelta (uploadCs , logProducer , uploadVerticle .eventUpload (), uploadVerticle .eventRefresh ())));
283295 }
284296
285297 // deploy sqs producer if enabled
0 commit comments