Skip to content

Commit c55eedc

Browse files
authored
Merge pull request #292 from IABTechLab/ian-UID2-6345-add-legacy-folder-config
update legacy producer folder
2 parents a1491a2 + c5cefd1 commit c55eedc

File tree

4 files changed

+43
-12
lines changed

4 files changed

+43
-12
lines changed

conf/default-config.json

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,9 @@
2424
"optout_delete_expired": true,
2525
"optout_s3_bucket": null,
2626
"optout_s3_folder": null,
27+
"optout_legacy_producer_s3_folder": null,
28+
"optout_sqs_s3_folder": null,
29+
"optout_sqs_queue_url": null,
2730
"cloud_download_threads": 8,
2831
"cloud_upload_threads": 2,
2932
"cloud_refresh_interval": 60,
@@ -34,8 +37,6 @@
3437
"operator_type": "public",
3538
"uid_instance_id_prefix": "local-optout",
3639
"optout_enqueue_sqs_enabled": false,
37-
"optout_sqs_queue_url": null,
38-
"optout_sqs_s3_folder": "sqs-delta",
3940
"optout_sqs_max_queue_size": 0,
4041
"optout_sqs_max_messages_per_poll": 10,
4142
"optout_sqs_visibility_timeout": 300,

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
<vertx.version>4.5.21</vertx.version>
1616
<!-- check micrometer.version vertx-micrometer-metrics consumes before bumping up -->
1717
<micrometer.version>1.1.0</micrometer.version>
18-
<uid2-shared.version>11.1.91</uid2-shared.version>
18+
<uid2-shared.version>11.2.0</uid2-shared.version>
1919
<image.version>${project.version}</image.version>
2020
<junit-jupiter.version>5.10.1</junit-jupiter.version>
2121
<junit-vintage.version>5.10.1</junit-vintage.version>

src/main/java/com/uid2/optout/Const.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,8 @@ public static class Config extends com.uid2.shared.Const.Config {
1919
public static final String PartnersMetadataPathProp = "partners_metadata_path";
2020
public static final String OptOutSqsQueueUrlProp = "optout_sqs_queue_url";
2121
public static final String OptOutSqsEnabledProp = "optout_enqueue_sqs_enabled";
22-
public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // Default: "sqs-delta" - folder within same S3 bucket as regular optout
22+
public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // sqs delta producer writes to this folder
23+
public static final String OptOutLegacyProducerS3FolderProp = "optout_legacy_producer_s3_folder"; // legacy producer writes to this folder
2324
public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll";
2425
public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout";
2526
public static final String OptOutDeltaJobTimeoutSecondsProp = "optout_delta_job_timeout_seconds";

src/main/java/com/uid2/optout/Main.java

Lines changed: 37 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -246,7 +246,12 @@ public void run(String[] args) throws IOException {
246246

247247
List<Future> futs = new ArrayList<>();
248248

249-
// create optout cloud sync verticle
249+
// determine folder configuration for old producer vs readers
250+
String legacyFolder = this.config.getString(Const.Config.OptOutLegacyProducerS3FolderProp);
251+
String mainFolder = this.config.getString(Const.Config.OptOutS3FolderProp);
252+
boolean useLegacyFolder = legacyFolder != null && !legacyFolder.equals(mainFolder);
253+
254+
// main CloudSyncVerticle - downloads from optout_s3_folder (for readers: partition generator, log sender)
250255
OptOutCloudSync cs = new OptOutCloudSync(this.config, true);
251256
CloudSyncVerticle cloudSyncVerticle = new CloudSyncVerticle("optout", this.fsOptOut, this.fsLocal, cs, this.config);
252257

@@ -257,19 +262,40 @@ public void run(String[] args) throws IOException {
257262
futs.add(this.createOperatorKeyRotator());
258263

259264
if (!this.observeOnly) {
260-
// enable partition producing
265+
// enable partition producing (reads from main folder via cs)
261266
cs.enableDeltaMerging(vertx, Const.Event.PartitionProduce);
262267

263268
// create partners config monitor
264269
futs.add(this.createPartnerConfigMonitor(cloudSyncVerticle.eventDownloaded()));
265270

271+
// create CloudSyncVerticle for old producer uploads
272+
// if legacy folder is configured, create upload-only verticle; otherwise reuse main verticle
273+
OptOutCloudSync uploadCs;
274+
CloudSyncVerticle uploadVerticle;
275+
if (useLegacyFolder) {
276+
LOGGER.info("old producer uploads configured to use folder: {} (readers use: {})", legacyFolder, mainFolder);
277+
JsonObject legacyConfig = new JsonObject().mergeIn(this.config)
278+
.put(Const.Config.OptOutS3FolderProp, legacyFolder);
279+
uploadCs = new OptOutCloudSync(legacyConfig, true, true);
280+
// upload-only verticle: won't download legacy files
281+
uploadVerticle = new CloudSyncVerticle("optout-legacy", this.fsOptOut, this.fsLocal, uploadCs, this.config);
282+
futs.add(this.deploySingleInstance(uploadVerticle));
283+
} else {
284+
// no legacy folder configured, old producer uploads to main folder
285+
uploadCs = cs;
286+
uploadVerticle = cloudSyncVerticle;
287+
}
288+
266289
// create & deploy log producer verticle
267-
String eventUpload = cloudSyncVerticle.eventUpload();
268-
OptOutLogProducer logProducer = new OptOutLogProducer(this.config, eventUpload, eventUpload);
290+
// deltas go to uploadVerticle (legacy folder in legacy mode, main folder otherwise)
291+
// partitions always go to main folder (cloudSyncVerticle) since they read from main folder
292+
OptOutLogProducer logProducer = new OptOutLogProducer(this.config, uploadVerticle.eventUpload(), cloudSyncVerticle.eventUpload());
269293
futs.add(this.deploySingleInstance(logProducer));
270294

271295
// upload last delta produced and potentially not uploaded yet
272-
futs.add((this.uploadLastDelta(cs, logProducer, cloudSyncVerticle.eventUpload(), cloudSyncVerticle.eventRefresh())));
296+
// always use main cs/cloudSyncVerticle for refresh mechanism (legacy verticle is upload-only)
297+
// uploadCs is used only for determining the cloud path (legacy folder vs main folder)
298+
futs.add((this.uploadLastDelta(cs, uploadCs, logProducer, cloudSyncVerticle.eventRefresh())));
273299
}
274300

275301
// deploy sqs producer if enabled
@@ -297,7 +323,8 @@ public void run(String[] args) throws IOException {
297323
ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, this.config);
298324

299325
// deploy sqs log producer
300-
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduce, null);
326+
// fires DeltaProduced (notification) not DeltaProduce (trigger) to avoid triggering old producer
327+
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduced, null);
301328
futs.add(this.deploySingleInstance(sqsLogProducer));
302329

303330
LOGGER.info("sqs log producer deployed, bucket={}, folder={}",
@@ -340,7 +367,8 @@ public void run(String[] args) throws IOException {
340367
});
341368
}
342369

343-
private Future uploadLastDelta(OptOutCloudSync cs, OptOutLogProducer logProducer, String eventUpload, String eventRefresh) {
370+
371+
private Future uploadLastDelta(OptOutCloudSync cs, OptOutCloudSync uploadCs, OptOutLogProducer logProducer, String eventRefresh) {
344372
final String deltaLocalPath;
345373
try {
346374
deltaLocalPath = logProducer.getLastDelta();
@@ -359,7 +387,8 @@ private Future uploadLastDelta(OptOutCloudSync cs, OptOutLogProducer logProducer
359387
handler.set(cs.registerNewCloudPathsHandler(cloudPaths -> {
360388
try {
361389
cs.unregisterNewCloudPathsHandler(handler.get());
362-
final String deltaCloudPath = cs.toCloudPath(deltaLocalPath);
390+
// use uploadCs for cloud path (may be different folder than cs)
391+
final String deltaCloudPath = uploadCs.toCloudPath(deltaLocalPath);
363392
if (cloudPaths.contains(deltaCloudPath)) {
364393
// if delta is already uploaded, the work is already done
365394
LOGGER.info("found no last delta that needs to be uploaded");

0 commit comments

Comments
 (0)