Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
24 commits
Select commit Hold shift + click to select a range
a74dfed
update to use DeltaProductionOrchestrator
Ian-Nara Dec 12, 2025
eeeebf6
update comment
Ian-Nara Dec 12, 2025
52558a5
update comments
Ian-Nara Dec 12, 2025
07cf7e3
comment updates
Ian-Nara Dec 12, 2025
95dd7d5
rename trace_id to uid_trace_id
Ian-Nara Dec 13, 2025
6b4350f
add queue size limit
Ian-Nara Dec 13, 2025
e86adc4
refactor to non-deprecated version of executeblocking
Ian-Nara Dec 13, 2025
8b7ccb7
add note
Ian-Nara Dec 13, 2025
89fff06
[CI Pipeline] Released Snapshot version: 4.5.1-alpha-129-SNAPSHOT
Dec 13, 2025
3adf35d
add missing dependency for new shared version
Ian-Nara Dec 13, 2025
6dccdac
[CI Pipeline] Released Snapshot version: 4.5.1-alpha-130-SNAPSHOT
Dec 13, 2025
3f2d09e
reset pom version
Ian-Nara Dec 13, 2025
3c7c275
[CI Pipeline] Released Snapshot version: 4.5.1-alpha-132-SNAPSHOT
Dec 13, 2025
3379660
add debugging info
Ian-Nara Dec 13, 2025
fb99b65
[CI Pipeline] Released Snapshot version: 4.5.2-alpha-133-SNAPSHOT
Dec 13, 2025
7975de7
Merge branch 'ian-UID2-6345-update-sqs-log-producer-with-circuit-brea…
Ian-Nara Dec 13, 2025
e2478bb
[CI Pipeline] Released Snapshot version: 4.5.3-alpha-134-SNAPSHOT
Dec 13, 2025
a124eeb
reset pom.xml version
Ian-Nara Dec 14, 2025
0a6f8fa
remove unused parameter
Ian-Nara Dec 14, 2025
72d0126
updating comment format
Ian-Nara Dec 14, 2025
377680e
add default config values
Ian-Nara Dec 14, 2025
2e043b1
[CI Pipeline] Released Snapshot version: 4.5.1-alpha-135-SNAPSHOT
Dec 14, 2025
947a8ff
removd deubg logs
Ian-Nara Dec 14, 2025
3c7b595
reset pom version
Ian-Nara Dec 14, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion conf/default-config.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,17 @@
"partners_metadata_path": null,
"partners_config_path": "partners/config.json",
"operator_type": "public",
"uid_instance_id_prefix": "local-optout"
"uid_instance_id_prefix": "local-optout",
"optout_enqueue_sqs_enabled": false,
"optout_sqs_queue_url": null,
"optout_sqs_s3_folder": "sqs-delta",
"optout_sqs_max_queue_size": 0,
"optout_sqs_max_messages_per_poll": 10,
"optout_sqs_visibility_timeout": 300,
"optout_delta_job_timeout_seconds": 10800,
"optout_s3_bucket_dropped_requests": null,
"optout_max_messages_per_file": 10000,
"traffic_filter_config_path": null,
"traffic_calc_config_path": null,
"manual_override_s3_path": null
}
5 changes: 5 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,11 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
<dependency>
<groupId>commons-logging</groupId>
<artifactId>commons-logging</artifactId>
<version>1.2</version>
</dependency>
</dependencies>

<build>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/uid2/optout/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ public static class Config extends com.uid2.shared.Const.Config {
public static final String OptOutTrafficCalcThresholdMultiplierProp = "traffic_calc_threshold_multiplier";
public static final String OptOutTrafficCalcEvaluationWindowSecondsProp = "traffic_calc_evaluation_window_seconds";
public static final String OptOutTrafficCalcAllowlistRangesProp = "traffic_calc_allowlist_ranges";
public static final String OptOutSqsDeltaWindowSecondsProp = "optout_sqs_delta_window_seconds";
public static final String OptOutSqsMaxQueueSizeProp = "optout_sqs_max_queue_size";
}

public static class Event {
Expand Down
38 changes: 21 additions & 17 deletions src/main/java/com/uid2/optout/Main.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package com.uid2.optout;

import com.uid2.optout.vertx.*;
import com.uid2.optout.traffic.TrafficFilter.MalformedTrafficFilterConfigException;
import com.uid2.optout.traffic.TrafficCalculator.MalformedTrafficCalcConfigException;
import com.uid2.shared.ApplicationVersion;
import com.uid2.shared.Utils;
import com.uid2.shared.attest.AttestationResponseHandler;
Expand All @@ -27,7 +29,6 @@
import io.vertx.config.ConfigRetriever;
import io.vertx.core.*;
import io.vertx.core.http.HttpServerOptions;
import io.vertx.core.http.impl.HttpUtils;
import io.vertx.core.json.JsonObject;
import io.vertx.micrometer.MetricsDomain;
import org.slf4j.Logger;
Expand Down Expand Up @@ -271,39 +272,42 @@ public void run(String[] args) throws IOException {
futs.add((this.uploadLastDelta(cs, logProducer, cloudSyncVerticle.eventUpload(), cloudSyncVerticle.eventRefresh())));
}

// Deploy SQS producer if enabled
// deploy sqs producer if enabled
if (this.enqueueSqsEnabled) {
LOGGER.info("SQS enabled, deploying OptOutSqsLogProducer");
LOGGER.info("sqs enabled, deploying OptOutSqsLogProducer");
try {
// Create SQS-specific cloud sync with custom folder (default: "sqs-delta")
// sqs delta production uses a separate s3 folder (default: "sqs-delta")
// OptOutCloudSync reads from optout_s3_folder, so we override it with optout_sqs_s3_folder
String sqsFolder = this.config.getString(Const.Config.OptOutSqsS3FolderProp, "sqs-delta");
LOGGER.info("SQS Config - optout_sqs_s3_folder: {}, will override optout_s3_folder to: {}",
sqsFolder, sqsFolder);
JsonObject sqsConfig = new JsonObject().mergeIn(this.config)
JsonObject sqsCloudSyncConfig = new JsonObject().mergeIn(this.config)
.put(Const.Config.OptOutS3FolderProp, sqsFolder);
LOGGER.info("SQS Config after merge - optout_s3_folder: {}", sqsConfig.getString(Const.Config.OptOutS3FolderProp));
OptOutCloudSync sqsCs = new OptOutCloudSync(sqsConfig, true);
OptOutCloudSync sqsCs = new OptOutCloudSync(sqsCloudSyncConfig, true);

// Create SQS-specific cloud storage instance (same bucket, different folder handling)
// create cloud storage instances
ICloudStorage fsSqs;
boolean useStorageMock = this.config.getBoolean(Const.Config.StorageMockProp, false);
if (useStorageMock) {
// Reuse the same LocalStorageMock for testing
fsSqs = this.fsOptOut;
} else {
// Create fresh CloudStorage for SQS (no path conversion wrapper)
String optoutBucket = this.config.getString(Const.Config.OptOutS3BucketProp);
fsSqs = CloudUtils.createStorage(optoutBucket, sqsConfig);
fsSqs = CloudUtils.createStorage(optoutBucket, this.config);
}

// Deploy SQS log producer with its own storage instance
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, sqsCs);
String optoutBucketDroppedRequests = this.config.getString(Const.Config.OptOutS3BucketDroppedRequestsProp);
ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, this.config);

// deploy sqs log producer
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduce, null);
futs.add(this.deploySingleInstance(sqsLogProducer));

LOGGER.info("SQS log producer deployed - bucket: {}, folder: {}",
LOGGER.info("sqs log producer deployed, bucket={}, folder={}",
this.config.getString(Const.Config.OptOutS3BucketProp), sqsFolder);
} catch (IOException e) {
LOGGER.error("Failed to initialize SQS log producer: " + e.getMessage(), e);
LOGGER.error("circuit_breaker_config_error: failed to initialize sqs log producer, delta production will be disabled: {}", e.getMessage(), e);
} catch (MalformedTrafficFilterConfigException e) {
LOGGER.error("circuit_breaker_config_error: traffic filter config is malformed, delta production will be disabled: {}", e.getMessage(), e);
} catch (MalformedTrafficCalcConfigException e) {
LOGGER.error("circuit_breaker_config_error: traffic calc config is malformed, delta production will be disabled: {}", e.getMessage(), e);
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/uid2/optout/delta/DeltaFileWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ private void flushToStream(ByteArrayOutputStream stream) throws IOException {
private void ensureCapacity(int dataSize) {
if (buffer.capacity() < dataSize) {
int newCapacity = Integer.highestOneBit(dataSize) << 1;
LOGGER.info("expanding buffer size: current {}, need {}, new {}", buffer.capacity(), dataSize, newCapacity);
LOGGER.info("expanding buffer: currentSize={}, neededSize={}, newSize={}", buffer.capacity(), dataSize, newCapacity);
this.buffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN);
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java
Original file line number Diff line number Diff line change
Expand Up @@ -86,10 +86,10 @@ public List<SqsParsedMessage> getMessages() {
* @return BatchProcessingResult containing eligible messages and processing metadata
*/
public BatchProcessingResult processBatch(List<Message> messageBatch, int batchNumber) throws IOException {
// Parse and sort messages by timestamp
// parse and sort messages by timestamp
List<SqsParsedMessage> parsedBatch = SqsMessageParser.parseAndSortMessages(messageBatch);

// Identify and delete corrupt messages
// identify and delete corrupt messages
if (parsedBatch.size() < messageBatch.size()) {
List<Message> invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch);
if (!invalidMessages.isEmpty()) {
Expand All @@ -98,21 +98,21 @@ public BatchProcessingResult processBatch(List<Message> messageBatch, int batchN
}
}

// No valid messages after deleting corrupt ones, continue reading
// no valid messages after deleting corrupt ones, continue reading
if (parsedBatch.isEmpty()) {
LOGGER.info("no valid messages in batch {} after removing invalid messages", batchNumber);
return BatchProcessingResult.corruptMessagesDeleted();
}

// Check if the oldest message in this batch is too recent
// check if the oldest message in this batch is too recent
long currentTime = OptOutUtils.nowEpochSeconds();
SqsParsedMessage oldestMessage = parsedBatch.get(0);

if (!isMessageEligible(oldestMessage, currentTime)) {
return BatchProcessingResult.messagesTooRecent();
}

// Filter for eligible messages (>= deltaWindowSeconds old)
// filter for eligible messages (>= deltaWindowSeconds old)
List<SqsParsedMessage> eligibleMessages = filterEligibleMessages(parsedBatch, currentTime);

return BatchProcessingResult.withMessages(eligibleMessages);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public static QueueAttributes getQueueAttributes(SqsClient sqsClient, String que
int delayed = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED), 0);

QueueAttributes queueAttributes = new QueueAttributes(visible, invisible, delayed);
LOGGER.info("queue attributes: {}", queueAttributes);
LOGGER.info("sqs_queue_attributes={}", queueAttributes);
return queueAttributes;

} catch (Exception e) {
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/uid2/optout/sqs/SqsMessageParser.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public static List<SqsParsedMessage> parseAndSortMessages(List<Message> messages
try {
// parse message body
JsonObject body = new JsonObject(message.body());
traceId = body.getString("trace_id");
traceId = body.getString("uid_trace_id");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you Ian for the update!


String identityHash = body.getString("identity_hash");
String advertisingId = body.getString("advertising_id");
Expand Down
14 changes: 7 additions & 7 deletions src/main/java/com/uid2/optout/sqs/SqsWindowReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public static class WindowReadResult {
private final List<SqsParsedMessage> messages;
private final long windowStart;
private final StopReason stopReason;
private final int rawMessagesRead; // total messages pulled from SQS
private final int rawMessagesRead; // total messages pulled from sqs

private WindowReadResult(List<SqsParsedMessage> messages, long windowStart, StopReason stopReason, int rawMessagesRead) {
this.messages = messages;
Expand Down Expand Up @@ -97,15 +97,15 @@ public WindowReadResult readWindow() throws IOException {
List<SqsParsedMessage> windowMessages = new ArrayList<>();
long currentWindowStart = 0;
int batchNumber = 0;
int rawMessagesRead = 0; // track total messages pulled from SQS
int rawMessagesRead = 0; // track total messages pulled from sqs

while (true) {
if (windowMessages.size() >= maxMessagesPerWindow) {
LOGGER.warn("high_message_volume: message limit exceeded while reading window, {} messages >= limit {}", windowMessages.size(), maxMessagesPerWindow);
return WindowReadResult.messageLimitExceeded(windowMessages, currentWindowStart, rawMessagesRead);
}

// Read one batch from SQS (up to 10 messages)
// read one batch from sqs (up to 10 messages)
List<Message> rawBatch = SqsMessageOperations.receiveMessagesFromSqs(
this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout);

Expand All @@ -122,21 +122,21 @@ public WindowReadResult readWindow() throws IOException {
if (batchResult.getStopReason() == StopReason.MESSAGES_TOO_RECENT) {
return WindowReadResult.messagesTooRecent(windowMessages, currentWindowStart, rawMessagesRead);
}
// Corrupt messages were deleted, continue reading
// corrupt messages were deleted, continue reading
continue;
}

// Add eligible messages to current window
// add eligible messages to current window
boolean newWindow = false;
for (SqsParsedMessage msg : batchResult.getMessages()) {
long msgWindowStart = msg.timestamp();

// Discover start of window
// discover start of window
if (currentWindowStart == 0) {
currentWindowStart = msgWindowStart;
}

// Discover next window
// discover next window
if (msgWindowStart > currentWindowStart + this.deltaWindowSeconds) {
newWindow = true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/java/com/uid2/optout/traffic/TrafficCalculator.java
Original file line number Diff line number Diff line change
Expand Up @@ -322,8 +322,8 @@ public TrafficStatus calculateStatus(List<SqsParsedMessage> sqsMessages, QueueAt
}

/**
* find the newest timestamp from delta files.
* reads the newest delta file and returns its maximum timestamp.
* Find the newest timestamp from delta files.
* Reads the newest delta file and returns its maximum timestamp.
*/
private long findNewestDeltaTimestamp(List<String> deltaS3Paths) throws IOException {
if (deltaS3Paths == null || deltaS3Paths.isEmpty()) {
Expand Down
70 changes: 70 additions & 0 deletions src/main/java/com/uid2/optout/util/HttpResponseHelper.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.uid2.optout.util;

import io.vertx.core.http.HttpHeaders;
import io.vertx.core.http.HttpServerResponse;
import io.vertx.core.json.JsonObject;

/**
* Utility class for HTTP JSON response handling.
* Ensures consistent response format across handlers.
*/
public class HttpResponseHelper {

/**
* Send a JSON response with the specified status code.
*/
public static void sendJson(HttpServerResponse resp, int statusCode, JsonObject body) {
resp.setStatusCode(statusCode)
.putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
.end(body.encode());
}

/**
* Send a 200 OK response with JSON body.
*/
public static void sendSuccess(HttpServerResponse resp, JsonObject body) {
sendJson(resp, 200, body);
}

/**
* Send a 200 OK response with status and message.
*/
public static void sendSuccess(HttpServerResponse resp, String status, String message) {
sendJson(resp, 200, new JsonObject().put("status", status).put("message", message));
}

/**
* Send a 200 OK response with idle status and message.
*/
public static void sendIdle(HttpServerResponse resp, String message) {
sendJson(resp, 200, new JsonObject().put("status", "idle").put("message", message));
}
/**
* Send a 202 Accepted response indicating async job started.
*/
public static void sendAccepted(HttpServerResponse resp, String message) {
sendJson(resp, 202, new JsonObject().put("status", "accepted").put("message", message));
}

/**
* Send a 409 Conflict response.
*/
public static void sendConflict(HttpServerResponse resp, String reason) {
sendJson(resp, 409, new JsonObject().put("status", "conflict").put("reason", reason));
}

/**
* Send a 500 Internal Server Error response.
*/
public static void sendError(HttpServerResponse resp, String error) {
sendJson(resp, 500, new JsonObject().put("status", "failed").put("error", error));
}

/**
* Send a 500 Internal Server Error response from an exception.
*/
public static void sendError(HttpServerResponse resp, Exception e) {
sendError(resp, e.getMessage());
}
}

Loading
Loading