Skip to content

Commit 52d1029

Browse files
authored
Merge pull request #276 from IABTechLab/ian-UID2-6345-update-sqs-log-producer-with-circuit-breaker
update sqs log producer with circuit breaker
2 parents 3dd6205 + 3c7b595 commit 52d1029

File tree

14 files changed

+1081
-753
lines changed

14 files changed

+1081
-753
lines changed

conf/default-config.json

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,5 +32,17 @@
3232
"partners_metadata_path": null,
3333
"partners_config_path": "partners/config.json",
3434
"operator_type": "public",
35-
"uid_instance_id_prefix": "local-optout"
35+
"uid_instance_id_prefix": "local-optout",
36+
"optout_enqueue_sqs_enabled": false,
37+
"optout_sqs_queue_url": null,
38+
"optout_sqs_s3_folder": "sqs-delta",
39+
"optout_sqs_max_queue_size": 0,
40+
"optout_sqs_max_messages_per_poll": 10,
41+
"optout_sqs_visibility_timeout": 300,
42+
"optout_delta_job_timeout_seconds": 10800,
43+
"optout_s3_bucket_dropped_requests": null,
44+
"optout_max_messages_per_file": 10000,
45+
"traffic_filter_config_path": null,
46+
"traffic_calc_config_path": null,
47+
"manual_override_s3_path": null
3648
}

pom.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,11 @@
160160
<groupId>software.amazon.awssdk</groupId>
161161
<artifactId>sqs</artifactId>
162162
</dependency>
163+
<dependency>
164+
<groupId>commons-logging</groupId>
165+
<artifactId>commons-logging</artifactId>
166+
<version>1.2</version>
167+
</dependency>
163168
</dependencies>
164169

165170
<build>

src/main/java/com/uid2/optout/Const.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@ public static class Config extends com.uid2.shared.Const.Config {
3232
public static final String OptOutTrafficCalcThresholdMultiplierProp = "traffic_calc_threshold_multiplier";
3333
public static final String OptOutTrafficCalcEvaluationWindowSecondsProp = "traffic_calc_evaluation_window_seconds";
3434
public static final String OptOutTrafficCalcAllowlistRangesProp = "traffic_calc_allowlist_ranges";
35+
public static final String OptOutSqsDeltaWindowSecondsProp = "optout_sqs_delta_window_seconds";
36+
public static final String OptOutSqsMaxQueueSizeProp = "optout_sqs_max_queue_size";
3537
}
3638

3739
public static class Event {

src/main/java/com/uid2/optout/Main.java

Lines changed: 21 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,8 @@
11
package com.uid2.optout;
22

33
import com.uid2.optout.vertx.*;
4+
import com.uid2.optout.traffic.TrafficFilter.MalformedTrafficFilterConfigException;
5+
import com.uid2.optout.traffic.TrafficCalculator.MalformedTrafficCalcConfigException;
46
import com.uid2.shared.ApplicationVersion;
57
import com.uid2.shared.Utils;
68
import com.uid2.shared.attest.AttestationResponseHandler;
@@ -27,7 +29,6 @@
2729
import io.vertx.config.ConfigRetriever;
2830
import io.vertx.core.*;
2931
import io.vertx.core.http.HttpServerOptions;
30-
import io.vertx.core.http.impl.HttpUtils;
3132
import io.vertx.core.json.JsonObject;
3233
import io.vertx.micrometer.MetricsDomain;
3334
import org.slf4j.Logger;
@@ -271,39 +272,42 @@ public void run(String[] args) throws IOException {
271272
futs.add((this.uploadLastDelta(cs, logProducer, cloudSyncVerticle.eventUpload(), cloudSyncVerticle.eventRefresh())));
272273
}
273274

274-
// Deploy SQS producer if enabled
275+
// deploy sqs producer if enabled
275276
if (this.enqueueSqsEnabled) {
276-
LOGGER.info("SQS enabled, deploying OptOutSqsLogProducer");
277+
LOGGER.info("sqs enabled, deploying OptOutSqsLogProducer");
277278
try {
278-
// Create SQS-specific cloud sync with custom folder (default: "sqs-delta")
279+
// sqs delta production uses a separate s3 folder (default: "sqs-delta")
280+
// OptOutCloudSync reads from optout_s3_folder, so we override it with optout_sqs_s3_folder
279281
String sqsFolder = this.config.getString(Const.Config.OptOutSqsS3FolderProp, "sqs-delta");
280-
LOGGER.info("SQS Config - optout_sqs_s3_folder: {}, will override optout_s3_folder to: {}",
281-
sqsFolder, sqsFolder);
282-
JsonObject sqsConfig = new JsonObject().mergeIn(this.config)
282+
JsonObject sqsCloudSyncConfig = new JsonObject().mergeIn(this.config)
283283
.put(Const.Config.OptOutS3FolderProp, sqsFolder);
284-
LOGGER.info("SQS Config after merge - optout_s3_folder: {}", sqsConfig.getString(Const.Config.OptOutS3FolderProp));
285-
OptOutCloudSync sqsCs = new OptOutCloudSync(sqsConfig, true);
284+
OptOutCloudSync sqsCs = new OptOutCloudSync(sqsCloudSyncConfig, true);
286285

287-
// Create SQS-specific cloud storage instance (same bucket, different folder handling)
286+
// create cloud storage instances
288287
ICloudStorage fsSqs;
289288
boolean useStorageMock = this.config.getBoolean(Const.Config.StorageMockProp, false);
290289
if (useStorageMock) {
291-
// Reuse the same LocalStorageMock for testing
292290
fsSqs = this.fsOptOut;
293291
} else {
294-
// Create fresh CloudStorage for SQS (no path conversion wrapper)
295292
String optoutBucket = this.config.getString(Const.Config.OptOutS3BucketProp);
296-
fsSqs = CloudUtils.createStorage(optoutBucket, sqsConfig);
293+
fsSqs = CloudUtils.createStorage(optoutBucket, this.config);
297294
}
298295

299-
// Deploy SQS log producer with its own storage instance
300-
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, sqsCs);
296+
String optoutBucketDroppedRequests = this.config.getString(Const.Config.OptOutS3BucketDroppedRequestsProp);
297+
ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, this.config);
298+
299+
// deploy sqs log producer
300+
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduce, null);
301301
futs.add(this.deploySingleInstance(sqsLogProducer));
302302

303-
LOGGER.info("SQS log producer deployed - bucket: {}, folder: {}",
303+
LOGGER.info("sqs log producer deployed, bucket={}, folder={}",
304304
this.config.getString(Const.Config.OptOutS3BucketProp), sqsFolder);
305305
} catch (IOException e) {
306-
LOGGER.error("Failed to initialize SQS log producer: " + e.getMessage(), e);
306+
LOGGER.error("circuit_breaker_config_error: failed to initialize sqs log producer, delta production will be disabled: {}", e.getMessage(), e);
307+
} catch (MalformedTrafficFilterConfigException e) {
308+
LOGGER.error("circuit_breaker_config_error: traffic filter config is malformed, delta production will be disabled: {}", e.getMessage(), e);
309+
} catch (MalformedTrafficCalcConfigException e) {
310+
LOGGER.error("circuit_breaker_config_error: traffic calc config is malformed, delta production will be disabled: {}", e.getMessage(), e);
307311
}
308312
}
309313

src/main/java/com/uid2/optout/delta/DeltaFileWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -105,7 +105,7 @@ private void flushToStream(ByteArrayOutputStream stream) throws IOException {
105105
private void ensureCapacity(int dataSize) {
106106
if (buffer.capacity() < dataSize) {
107107
int newCapacity = Integer.highestOneBit(dataSize) << 1;
108-
LOGGER.info("expanding buffer size: current {}, need {}, new {}", buffer.capacity(), dataSize, newCapacity);
108+
LOGGER.info("expanding buffer: currentSize={}, neededSize={}, newSize={}", buffer.capacity(), dataSize, newCapacity);
109109
this.buffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN);
110110
}
111111
}

src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -86,10 +86,10 @@ public List<SqsParsedMessage> getMessages() {
8686
* @return BatchProcessingResult containing eligible messages and processing metadata
8787
*/
8888
public BatchProcessingResult processBatch(List<Message> messageBatch, int batchNumber) throws IOException {
89-
// Parse and sort messages by timestamp
89+
// parse and sort messages by timestamp
9090
List<SqsParsedMessage> parsedBatch = SqsMessageParser.parseAndSortMessages(messageBatch);
9191

92-
// Identify and delete corrupt messages
92+
// identify and delete corrupt messages
9393
if (parsedBatch.size() < messageBatch.size()) {
9494
List<Message> invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch);
9595
if (!invalidMessages.isEmpty()) {
@@ -98,21 +98,21 @@ public BatchProcessingResult processBatch(List<Message> messageBatch, int batchN
9898
}
9999
}
100100

101-
// No valid messages after deleting corrupt ones, continue reading
101+
// no valid messages after deleting corrupt ones, continue reading
102102
if (parsedBatch.isEmpty()) {
103103
LOGGER.info("no valid messages in batch {} after removing invalid messages", batchNumber);
104104
return BatchProcessingResult.corruptMessagesDeleted();
105105
}
106106

107-
// Check if the oldest message in this batch is too recent
107+
// check if the oldest message in this batch is too recent
108108
long currentTime = OptOutUtils.nowEpochSeconds();
109109
SqsParsedMessage oldestMessage = parsedBatch.get(0);
110110

111111
if (!isMessageEligible(oldestMessage, currentTime)) {
112112
return BatchProcessingResult.messagesTooRecent();
113113
}
114114

115-
// Filter for eligible messages (>= deltaWindowSeconds old)
115+
// filter for eligible messages (>= deltaWindowSeconds old)
116116
List<SqsParsedMessage> eligibleMessages = filterEligibleMessages(parsedBatch, currentTime);
117117

118118
return BatchProcessingResult.withMessages(eligibleMessages);

src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,7 +87,7 @@ public static QueueAttributes getQueueAttributes(SqsClient sqsClient, String que
8787
int delayed = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED), 0);
8888

8989
QueueAttributes queueAttributes = new QueueAttributes(visible, invisible, delayed);
90-
LOGGER.info("queue attributes: {}", queueAttributes);
90+
LOGGER.info("sqs_queue_attributes={}", queueAttributes);
9191
return queueAttributes;
9292

9393
} catch (Exception e) {

src/main/java/com/uid2/optout/sqs/SqsMessageParser.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ public static List<SqsParsedMessage> parseAndSortMessages(List<Message> messages
3232
try {
3333
// parse message body
3434
JsonObject body = new JsonObject(message.body());
35-
traceId = body.getString("trace_id");
35+
traceId = body.getString("uid_trace_id");
3636

3737
String identityHash = body.getString("identity_hash");
3838
String advertisingId = body.getString("advertising_id");

src/main/java/com/uid2/optout/sqs/SqsWindowReader.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ public static class WindowReadResult {
5050
private final List<SqsParsedMessage> messages;
5151
private final long windowStart;
5252
private final StopReason stopReason;
53-
private final int rawMessagesRead; // total messages pulled from SQS
53+
private final int rawMessagesRead; // total messages pulled from sqs
5454

5555
private WindowReadResult(List<SqsParsedMessage> messages, long windowStart, StopReason stopReason, int rawMessagesRead) {
5656
this.messages = messages;
@@ -97,15 +97,15 @@ public WindowReadResult readWindow() throws IOException {
9797
List<SqsParsedMessage> windowMessages = new ArrayList<>();
9898
long currentWindowStart = 0;
9999
int batchNumber = 0;
100-
int rawMessagesRead = 0; // track total messages pulled from SQS
100+
int rawMessagesRead = 0; // track total messages pulled from sqs
101101

102102
while (true) {
103103
if (windowMessages.size() >= maxMessagesPerWindow) {
104104
LOGGER.warn("high_message_volume: message limit exceeded while reading window, {} messages >= limit {}", windowMessages.size(), maxMessagesPerWindow);
105105
return WindowReadResult.messageLimitExceeded(windowMessages, currentWindowStart, rawMessagesRead);
106106
}
107107

108-
// Read one batch from SQS (up to 10 messages)
108+
// read one batch from sqs (up to 10 messages)
109109
List<Message> rawBatch = SqsMessageOperations.receiveMessagesFromSqs(
110110
this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout);
111111

@@ -122,21 +122,21 @@ public WindowReadResult readWindow() throws IOException {
122122
if (batchResult.getStopReason() == StopReason.MESSAGES_TOO_RECENT) {
123123
return WindowReadResult.messagesTooRecent(windowMessages, currentWindowStart, rawMessagesRead);
124124
}
125-
// Corrupt messages were deleted, continue reading
125+
// corrupt messages were deleted, continue reading
126126
continue;
127127
}
128128

129-
// Add eligible messages to current window
129+
// add eligible messages to current window
130130
boolean newWindow = false;
131131
for (SqsParsedMessage msg : batchResult.getMessages()) {
132132
long msgWindowStart = msg.timestamp();
133133

134-
// Discover start of window
134+
// discover start of window
135135
if (currentWindowStart == 0) {
136136
currentWindowStart = msgWindowStart;
137137
}
138138

139-
// Discover next window
139+
// discover next window
140140
if (msgWindowStart > currentWindowStart + this.deltaWindowSeconds) {
141141
newWindow = true;
142142
}

src/main/java/com/uid2/optout/traffic/TrafficCalculator.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -322,8 +322,8 @@ public TrafficStatus calculateStatus(List<SqsParsedMessage> sqsMessages, QueueAt
322322
}
323323

324324
/**
325-
* find the newest timestamp from delta files.
326-
* reads the newest delta file and returns its maximum timestamp.
325+
* Find the newest timestamp from delta files.
326+
* Reads the newest delta file and returns its maximum timestamp.
327327
*/
328328
private long findNewestDeltaTimestamp(List<String> deltaS3Paths) throws IOException {
329329
if (deltaS3Paths == null || deltaS3Paths.isEmpty()) {

0 commit comments

Comments
 (0)