@@ -246,8 +246,18 @@ public void run(String[] args) throws IOException {
246246
247247 List <Future > futs = new ArrayList <>();
248248
249- // create optout cloud sync verticle (reads from optout_s3_folder)
250- OptOutCloudSync cs = new OptOutCloudSync (this .config , true );
249+ // create optout cloud sync verticle
250+ // if optout_legacy_producer_s3_folder is set, use it for the old producer's CloudSync
251+ // this allows redirecting old producer output to a different folder during migration
252+ String legacyFolder = this .config .getString (Const .Config .OptOutLegacyProducerS3FolderProp );
253+ String mainFolder = this .config .getString (Const .Config .OptOutS3FolderProp );
254+ JsonObject cloudSyncConfig = this .config ;
255+ if (legacyFolder != null && !legacyFolder .equals (mainFolder )) {
256+ cloudSyncConfig = new JsonObject ().mergeIn (this .config )
257+ .put (Const .Config .OptOutS3FolderProp , legacyFolder );
258+ LOGGER .info ("old producer configured to use folder: {} (main folder: {})" , legacyFolder , mainFolder );
259+ }
260+ OptOutCloudSync cs = new OptOutCloudSync (cloudSyncConfig , true );
251261 CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle ("optout" , this .fsOptOut , this .fsLocal , cs , this .config );
252262
253263 // deploy optout cloud sync verticle
@@ -268,21 +278,8 @@ public void run(String[] args) throws IOException {
268278 OptOutLogProducer logProducer = new OptOutLogProducer (this .config , eventUpload , eventUpload );
269279 futs .add (this .deploySingleInstance (logProducer ));
270280
271- // create cloud sync for legacy producer uploads
272- // if optout_legacy_producer_s3_folder is set, old producer writes to that folder instead of optout_s3_folder
273- String legacyFolder = this .config .getString (Const .Config .OptOutLegacyProducerS3FolderProp );
274- OptOutCloudSync legacyProducerCs ;
275- if (legacyFolder != null ) {
276- JsonObject legacyConfig = new JsonObject ().mergeIn (this .config )
277- .put (Const .Config .OptOutS3FolderProp , legacyFolder );
278- legacyProducerCs = new OptOutCloudSync (legacyConfig , true );
279- LOGGER .info ("legacy producer will write to separate folder: {}" , legacyFolder );
280- } else {
281- legacyProducerCs = cs ;
282- }
283-
284281 // upload last delta produced and potentially not uploaded yet
285- futs .add ((this .uploadLastDelta (legacyProducerCs , logProducer , cloudSyncVerticle .eventUpload (), cloudSyncVerticle .eventRefresh ())));
282+ futs .add ((this .uploadLastDelta (cs , logProducer , cloudSyncVerticle .eventUpload (), cloudSyncVerticle .eventRefresh ())));
286283 }
287284
288285 // deploy sqs producer if enabled
0 commit comments