diff --git a/conf/default-config.json b/conf/default-config.json index 6190657..0543dca 100644 --- a/conf/default-config.json +++ b/conf/default-config.json @@ -24,6 +24,9 @@ "optout_delete_expired": true, "optout_s3_bucket": null, "optout_s3_folder": null, + "optout_legacy_producer_s3_folder": null, + "optout_sqs_s3_folder": null, + "optout_sqs_queue_url": null, "cloud_download_threads": 8, "cloud_upload_threads": 2, "cloud_refresh_interval": 60, @@ -34,8 +37,6 @@ "operator_type": "public", "uid_instance_id_prefix": "local-optout", "optout_enqueue_sqs_enabled": false, - "optout_sqs_queue_url": null, - "optout_sqs_s3_folder": "sqs-delta", "optout_sqs_max_queue_size": 0, "optout_sqs_max_messages_per_poll": 10, "optout_sqs_visibility_timeout": 300, diff --git a/pom.xml b/pom.xml index 8a41868..2a8877a 100644 --- a/pom.xml +++ b/pom.xml @@ -15,7 +15,7 @@ 4.5.21 1.1.0 - 11.1.91 + 11.2.0 ${project.version} 5.10.1 5.10.1 diff --git a/src/main/java/com/uid2/optout/Const.java b/src/main/java/com/uid2/optout/Const.java index a415fff..296e166 100644 --- a/src/main/java/com/uid2/optout/Const.java +++ b/src/main/java/com/uid2/optout/Const.java @@ -19,7 +19,8 @@ public static class Config extends com.uid2.shared.Const.Config { public static final String PartnersMetadataPathProp = "partners_metadata_path"; public static final String OptOutSqsQueueUrlProp = "optout_sqs_queue_url"; public static final String OptOutSqsEnabledProp = "optout_enqueue_sqs_enabled"; - public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // Default: "sqs-delta" - folder within same S3 bucket as regular optout + public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // sqs delta producer writes to this folder + public static final String OptOutLegacyProducerS3FolderProp = "optout_legacy_producer_s3_folder"; // legacy producer writes to this folder public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll"; public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout"; public static final String OptOutDeltaJobTimeoutSecondsProp = "optout_delta_job_timeout_seconds"; diff --git a/src/main/java/com/uid2/optout/Main.java b/src/main/java/com/uid2/optout/Main.java index 848ab2e..fcd4aa6 100644 --- a/src/main/java/com/uid2/optout/Main.java +++ b/src/main/java/com/uid2/optout/Main.java @@ -246,7 +246,12 @@ public void run(String[] args) throws IOException { List futs = new ArrayList<>(); - // create optout cloud sync verticle + // determine folder configuration for old producer vs readers + String legacyFolder = this.config.getString(Const.Config.OptOutLegacyProducerS3FolderProp); + String mainFolder = this.config.getString(Const.Config.OptOutS3FolderProp); + boolean useLegacyFolder = legacyFolder != null && !legacyFolder.equals(mainFolder); + + // main CloudSyncVerticle - downloads from optout_s3_folder (for readers: partition generator, log sender) OptOutCloudSync cs = new OptOutCloudSync(this.config, true); CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle("optout", this.fsOptOut, this.fsLocal, cs, this.config); @@ -257,19 +262,40 @@ public void run(String[] args) throws IOException { futs.add(this.createOperatorKeyRotator()); if (!this.observeOnly) { - // enable partition producing + // enable partition producing (reads from main folder via cs) cs.enableDeltaMerging(vertx, Const.Event.PartitionProduce); // create partners config monitor futs.add(this.createPartnerConfigMonitor(cloudSyncVerticle.eventDownloaded())); + // create CloudSyncVerticle for old producer uploads + // if legacy folder is configured, create upload-only verticle; otherwise reuse main verticle + OptOutCloudSync uploadCs; + CloudSyncVerticle uploadVerticle; + if (useLegacyFolder) { + LOGGER.info("old producer uploads configured to use folder: {} (readers use: {})", legacyFolder, mainFolder); + JsonObject legacyConfig = new JsonObject().mergeIn(this.config) + .put(Const.Config.OptOutS3FolderProp, legacyFolder); + uploadCs = new OptOutCloudSync(legacyConfig, true, true); + // upload-only verticle: won't download legacy files + uploadVerticle = new CloudSyncVerticle("optout-legacy", this.fsOptOut, this.fsLocal, uploadCs, this.config); + futs.add(this.deploySingleInstance(uploadVerticle)); + } else { + // no legacy folder configured, old producer uploads to main folder + uploadCs = cs; + uploadVerticle = cloudSyncVerticle; + } + // create & deploy log producer verticle - String eventUpload = cloudSyncVerticle.eventUpload(); - OptOutLogProducer logProducer = new OptOutLogProducer(this.config, eventUpload, eventUpload); + // deltas go to uploadVerticle (legacy folder in legacy mode, main folder otherwise) + // partitions always go to main folder (cloudSyncVerticle) since they read from main folder + OptOutLogProducer logProducer = new OptOutLogProducer(this.config, uploadVerticle.eventUpload(), cloudSyncVerticle.eventUpload()); futs.add(this.deploySingleInstance(logProducer)); // upload last delta produced and potentially not uploaded yet - futs.add((this.uploadLastDelta(cs, logProducer, cloudSyncVerticle.eventUpload(), cloudSyncVerticle.eventRefresh()))); + // always use main cs/cloudSyncVerticle for refresh mechanism (legacy verticle is upload-only) + // uploadCs is used only for determining the cloud path (legacy folder vs main folder) + futs.add((this.uploadLastDelta(cs, uploadCs, logProducer, cloudSyncVerticle.eventRefresh()))); } // deploy sqs producer if enabled @@ -297,7 +323,8 @@ public void run(String[] args) throws IOException { ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, this.config); // deploy sqs log producer - OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduce, null); + // fires DeltaProduced (notification) not DeltaProduce (trigger) to avoid triggering old producer + OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduced, null); futs.add(this.deploySingleInstance(sqsLogProducer)); LOGGER.info("sqs log producer deployed, bucket={}, folder={}", @@ -340,7 +367,8 @@ public void run(String[] args) throws IOException { }); } - private Future uploadLastDelta(OptOutCloudSync cs, OptOutLogProducer logProducer, String eventUpload, String eventRefresh) { + + private Future uploadLastDelta(OptOutCloudSync cs, OptOutCloudSync uploadCs, OptOutLogProducer logProducer, String eventRefresh) { final String deltaLocalPath; try { deltaLocalPath = logProducer.getLastDelta(); @@ -359,7 +387,8 @@ private Future uploadLastDelta(OptOutCloudSync cs, OptOutLogProducer logProducer handler.set(cs.registerNewCloudPathsHandler(cloudPaths -> { try { cs.unregisterNewCloudPathsHandler(handler.get()); - final String deltaCloudPath = cs.toCloudPath(deltaLocalPath); + // use uploadCs for cloud path (may be different folder than cs) + final String deltaCloudPath = uploadCs.toCloudPath(deltaLocalPath); if (cloudPaths.contains(deltaCloudPath)) { // if delta is already uploaded, the work is already done LOGGER.info("found no last delta that needs to be uploaded");