Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions conf/default-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,9 @@
"optout_delete_expired": true,
"optout_s3_bucket": null,
"optout_s3_folder": null,
"optout_legacy_producer_s3_folder": null,
"optout_sqs_s3_folder": null,
"optout_sqs_queue_url": null,
"cloud_download_threads": 8,
"cloud_upload_threads": 2,
"cloud_refresh_interval": 60,
Expand All @@ -34,8 +37,6 @@
"operator_type": "public",
"uid_instance_id_prefix": "local-optout",
"optout_enqueue_sqs_enabled": false,
"optout_sqs_queue_url": null,
"optout_sqs_s3_folder": "sqs-delta",
"optout_sqs_max_queue_size": 0,
"optout_sqs_max_messages_per_poll": 10,
"optout_sqs_visibility_timeout": 300,
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
<vertx.version>4.5.21</vertx.version>
<!-- check micrometer.version vertx-micrometer-metrics consumes before bumping up -->
<micrometer.version>1.1.0</micrometer.version>
<uid2-shared.version>11.1.91</uid2-shared.version>
<uid2-shared.version>11.2.0</uid2-shared.version>
<image.version>${project.version}</image.version>
<junit-jupiter.version>5.10.1</junit-jupiter.version>
<junit-vintage.version>5.10.1</junit-vintage.version>
Expand Down
3 changes: 2 additions & 1 deletion src/main/java/com/uid2/optout/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ public static class Config extends com.uid2.shared.Const.Config {
public static final String PartnersMetadataPathProp = "partners_metadata_path";
public static final String OptOutSqsQueueUrlProp = "optout_sqs_queue_url";
public static final String OptOutSqsEnabledProp = "optout_enqueue_sqs_enabled";
public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // Default: "sqs-delta" - folder within same S3 bucket as regular optout
public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // sqs delta producer writes to this folder
public static final String OptOutLegacyProducerS3FolderProp = "optout_legacy_producer_s3_folder"; // legacy producer writes to this folder
public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll";
public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout";
public static final String OptOutDeltaJobTimeoutSecondsProp = "optout_delta_job_timeout_seconds";
Expand Down
45 changes: 37 additions & 8 deletions src/main/java/com/uid2/optout/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,12 @@ public void run(String[] args) throws IOException {

List<Future> futs = new ArrayList<>();

// create optout cloud sync verticle
// determine folder configuration for old producer vs readers
String legacyFolder = this.config.getString(Const.Config.OptOutLegacyProducerS3FolderProp);
String mainFolder = this.config.getString(Const.Config.OptOutS3FolderProp);
boolean useLegacyFolder = legacyFolder != null && !legacyFolder.equals(mainFolder);

// main CloudSyncVerticle - downloads from optout_s3_folder (for readers: partition generator, log sender)
OptOutCloudSync cs = new OptOutCloudSync(this.config, true);
CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle("optout", this.fsOptOut, this.fsLocal, cs, this.config);

Expand All @@ -257,19 +262,40 @@ public void run(String[] args) throws IOException {
futs.add(this.createOperatorKeyRotator());

if (!this.observeOnly) {
// enable partition producing
// enable partition producing (reads from main folder via cs)
cs.enableDeltaMerging(vertx, Const.Event.PartitionProduce);

// create partners config monitor
futs.add(this.createPartnerConfigMonitor(cloudSyncVerticle.eventDownloaded()));

// create CloudSyncVerticle for old producer uploads
// if legacy folder is configured, create upload-only verticle; otherwise reuse main verticle
OptOutCloudSync uploadCs;
CloudSyncVerticle uploadVerticle;
if (useLegacyFolder) {
LOGGER.info("old producer uploads configured to use folder: {} (readers use: {})", legacyFolder, mainFolder);
JsonObject legacyConfig = new JsonObject().mergeIn(this.config)
.put(Const.Config.OptOutS3FolderProp, legacyFolder);
uploadCs = new OptOutCloudSync(legacyConfig, true, true);
// upload-only verticle: won't download legacy files
uploadVerticle = new CloudSyncVerticle("optout-legacy", this.fsOptOut, this.fsLocal, uploadCs, this.config);
futs.add(this.deploySingleInstance(uploadVerticle));
} else {
// no legacy folder configured, old producer uploads to main folder
uploadCs = cs;
uploadVerticle = cloudSyncVerticle;
}

// create & deploy log producer verticle
String eventUpload = cloudSyncVerticle.eventUpload();
OptOutLogProducer logProducer = new OptOutLogProducer(this.config, eventUpload, eventUpload);
// deltas go to uploadVerticle (legacy folder in legacy mode, main folder otherwise)
// partitions always go to main folder (cloudSyncVerticle) since they read from main folder
OptOutLogProducer logProducer = new OptOutLogProducer(this.config, uploadVerticle.eventUpload(), cloudSyncVerticle.eventUpload());
futs.add(this.deploySingleInstance(logProducer));

// upload last delta produced and potentially not uploaded yet
futs.add((this.uploadLastDelta(cs, logProducer, cloudSyncVerticle.eventUpload(), cloudSyncVerticle.eventRefresh())));
// always use main cs/cloudSyncVerticle for refresh mechanism (legacy verticle is upload-only)
// uploadCs is used only for determining the cloud path (legacy folder vs main folder)
futs.add((this.uploadLastDelta(cs, uploadCs, logProducer, cloudSyncVerticle.eventRefresh())));
}

// deploy sqs producer if enabled
Expand Down Expand Up @@ -297,7 +323,8 @@ public void run(String[] args) throws IOException {
ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, this.config);

// deploy sqs log producer
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduce, null);
// fires DeltaProduced (notification) not DeltaProduce (trigger) to avoid triggering old producer
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduced, null);
futs.add(this.deploySingleInstance(sqsLogProducer));

LOGGER.info("sqs log producer deployed, bucket={}, folder={}",
Expand Down Expand Up @@ -340,7 +367,8 @@ public void run(String[] args) throws IOException {
});
}

private Future uploadLastDelta(OptOutCloudSync cs, OptOutLogProducer logProducer, String eventUpload, String eventRefresh) {

private Future uploadLastDelta(OptOutCloudSync cs, OptOutCloudSync uploadCs, OptOutLogProducer logProducer, String eventRefresh) {
final String deltaLocalPath;
try {
deltaLocalPath = logProducer.getLastDelta();
Expand All @@ -359,7 +387,8 @@ private Future uploadLastDelta(OptOutCloudSync cs, OptOutLogProducer logProducer
handler.set(cs.registerNewCloudPathsHandler(cloudPaths -> {
try {
cs.unregisterNewCloudPathsHandler(handler.get());
final String deltaCloudPath = cs.toCloudPath(deltaLocalPath);
// use uploadCs for cloud path (may be different folder than cs)
final String deltaCloudPath = uploadCs.toCloudPath(deltaLocalPath);
if (cloudPaths.contains(deltaCloudPath)) {
// if delta is already uploaded, the work is already done
LOGGER.info("found no last delta that needs to be uploaded");
Expand Down
Loading