@@ -246,7 +246,12 @@ public void run(String[] args) throws IOException {
246246
247247 List <Future > futs = new ArrayList <>();
248248
249- // create optout cloud sync verticle
249+ // determine folder configuration for old producer vs readers
250+ String legacyFolder = this .config .getString (Const .Config .OptOutLegacyProducerS3FolderProp );
251+ String mainFolder = this .config .getString (Const .Config .OptOutS3FolderProp );
252+ boolean useLegacyFolder = legacyFolder != null && !legacyFolder .equals (mainFolder );
253+
254+ // main CloudSyncVerticle - downloads from optout_s3_folder (for readers: partition generator, log sender)
250255 OptOutCloudSync cs = new OptOutCloudSync (this .config , true );
251256 CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle ("optout" , this .fsOptOut , this .fsLocal , cs , this .config );
252257
@@ -257,19 +262,40 @@ public void run(String[] args) throws IOException {
257262 futs .add (this .createOperatorKeyRotator ());
258263
259264 if (!this .observeOnly ) {
260- // enable partition producing
265+ // enable partition producing (reads from main folder via cs)
261266 cs .enableDeltaMerging (vertx , Const .Event .PartitionProduce );
262267
263268 // create partners config monitor
264269 futs .add (this .createPartnerConfigMonitor (cloudSyncVerticle .eventDownloaded ()));
265270
271+ // create CloudSyncVerticle for old producer uploads
272+ // if legacy folder is configured, create upload-only verticle; otherwise reuse main verticle
273+ OptOutCloudSync uploadCs ;
274+ CloudSyncVerticle uploadVerticle ;
275+ if (useLegacyFolder ) {
276+ LOGGER .info ("old producer uploads configured to use folder: {} (readers use: {})" , legacyFolder , mainFolder );
277+ JsonObject legacyConfig = new JsonObject ().mergeIn (this .config )
278+ .put (Const .Config .OptOutS3FolderProp , legacyFolder );
279+ uploadCs = new OptOutCloudSync (legacyConfig , true , true );
280+ // upload-only verticle: won't download legacy files
281+ uploadVerticle = new CloudSyncVerticle ("optout-legacy" , this .fsOptOut , this .fsLocal , uploadCs , this .config );
282+ futs .add (this .deploySingleInstance (uploadVerticle ));
283+ } else {
284+ // no legacy folder configured, old producer uploads to main folder
285+ uploadCs = cs ;
286+ uploadVerticle = cloudSyncVerticle ;
287+ }
288+
266289 // create & deploy log producer verticle
267- String eventUpload = cloudSyncVerticle .eventUpload ();
268- OptOutLogProducer logProducer = new OptOutLogProducer (this .config , eventUpload , eventUpload );
290+ // deltas go to uploadVerticle (legacy folder in legacy mode, main folder otherwise)
291+ // partitions always go to main folder (cloudSyncVerticle) since they read from main folder
292+ OptOutLogProducer logProducer = new OptOutLogProducer (this .config , uploadVerticle .eventUpload (), cloudSyncVerticle .eventUpload ());
269293 futs .add (this .deploySingleInstance (logProducer ));
270294
271295 // upload last delta produced and potentially not uploaded yet
272- futs .add ((this .uploadLastDelta (cs , logProducer , cloudSyncVerticle .eventUpload (), cloudSyncVerticle .eventRefresh ())));
296+ // always use main cs/cloudSyncVerticle for refresh mechanism (legacy verticle is upload-only)
297+ // uploadCs is used only for determining the cloud path (legacy folder vs main folder)
298+ futs .add ((this .uploadLastDelta (cs , uploadCs , logProducer , cloudSyncVerticle .eventRefresh ())));
273299 }
274300
275301 // deploy sqs producer if enabled
@@ -297,7 +323,8 @@ public void run(String[] args) throws IOException {
297323 ICloudStorage fsSqsDroppedRequests = CloudUtils .createStorage (optoutBucketDroppedRequests , this .config );
298324
299325 // deploy sqs log producer
300- OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer (this .config , fsSqs , fsSqsDroppedRequests , sqsCs , Const .Event .DeltaProduce , null );
326+ // fires DeltaProduced (notification) not DeltaProduce (trigger) to avoid triggering old producer
327+ OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer (this .config , fsSqs , fsSqsDroppedRequests , sqsCs , Const .Event .DeltaProduced , null );
301328 futs .add (this .deploySingleInstance (sqsLogProducer ));
302329
303330 LOGGER .info ("sqs log producer deployed, bucket={}, folder={}" ,
@@ -340,7 +367,8 @@ public void run(String[] args) throws IOException {
340367 });
341368 }
342369
343- private Future uploadLastDelta (OptOutCloudSync cs , OptOutLogProducer logProducer , String eventUpload , String eventRefresh ) {
370+
371+ private Future uploadLastDelta (OptOutCloudSync cs , OptOutCloudSync uploadCs , OptOutLogProducer logProducer , String eventRefresh ) {
344372 final String deltaLocalPath ;
345373 try {
346374 deltaLocalPath = logProducer .getLastDelta ();
@@ -359,7 +387,8 @@ private Future uploadLastDelta(OptOutCloudSync cs, OptOutLogProducer logProducer
359387 handler .set (cs .registerNewCloudPathsHandler (cloudPaths -> {
360388 try {
361389 cs .unregisterNewCloudPathsHandler (handler .get ());
362- final String deltaCloudPath = cs .toCloudPath (deltaLocalPath );
390+ // use uploadCs for cloud path (may be different folder than cs)
391+ final String deltaCloudPath = uploadCs .toCloudPath (deltaLocalPath );
363392 if (cloudPaths .contains (deltaCloudPath )) {
364393 // if delta is already uploaded, the work is already done
365394 LOGGER .info ("found no last delta that needs to be uploaded" );
0 commit comments