Skip to content

Commit 0265ae2

Browse files
committed
Merge branch 'ian-UID2-6337-asynchronous-full-queue-process' into ian-UID2-6146-update-traffic-calculator-to-hardcoded-baseline
2 parents 019f426 + 2873ea2 commit 0265ae2

File tree

4 files changed

+48
-19
lines changed

4 files changed

+48
-19
lines changed

src/main/java/com/uid2/optout/Const.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public static class Config extends com.uid2.shared.Const.Config {
2424
public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout";
2525
public static final String OptOutDeltaJobTimeoutSecondsProp = "optout_delta_job_timeout_seconds";
2626
public static final String OptOutS3BucketDroppedRequestsProp = "optout_s3_bucket_dropped_requests";
27+
public static final String OptOutMaxMessagesPerFileProp = "optout_max_messages_per_file";
2728
public static final String TrafficFilterConfigPathProp = "traffic_filter_config_path";
2829
public static final String TrafficCalcConfigPathProp = "traffic_calc_config_path";
2930
public static final String ManualOverrideS3PathProp = "manual_override_s3_path";

src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@
66
public class DeltaProductionResult {
77
private final int deltasProduced;
88
private final int entriesProcessed;
9-
private final boolean stoppedDueToRecentMessages;
109

11-
public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToRecentMessages) {
10+
/*
11+
* indicates that there are still messages in the queue, however,
12+
* not enough time has elapsed to produce a delta file.
13+
* We produce in batches of (5 minutes)
14+
*/
15+
private final boolean stoppedDueToMessagesTooRecent;
16+
17+
public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToMessagesTooRecent) {
1218
this.deltasProduced = deltasProduced;
1319
this.entriesProcessed = entriesProcessed;
14-
this.stoppedDueToRecentMessages = stoppedDueToRecentMessages;
20+
this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent;
1521
}
1622

1723
public int getDeltasProduced() {
@@ -22,8 +28,8 @@ public int getEntriesProcessed() {
2228
return entriesProcessed;
2329
}
2430

25-
public boolean stoppedDueToRecentMessages() {
26-
return stoppedDueToRecentMessages;
31+
public boolean stoppedDueToMessagesTooRecent() {
32+
return stoppedDueToMessagesTooRecent;
2733
}
2834
}
2935

src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public class OptOutSqsLogProducer extends AbstractVerticle {
8585
private final int visibilityTimeout;
8686
private final int deltaWindowSeconds; // Time window for each delta file (5 minutes = 300 seconds)
8787
private final int jobTimeoutSeconds;
88+
private final int maxMessagesPerFile; // Memory protection: max messages per delta file
8889
private final int listenPort;
8990
private final String internalApiKey;
9091
private final InternalAuthMiddleware internalAuth;
@@ -138,6 +139,7 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O
138139
this.visibilityTimeout = jsonConfig.getInteger(Const.Config.OptOutSqsVisibilityTimeoutProp, 240); // 4 minutes default
139140
this.deltaWindowSeconds = 300; // Fixed 5 minutes for all deltas
140141
this.jobTimeoutSeconds = jsonConfig.getInteger(Const.Config.OptOutDeltaJobTimeoutSecondsProp, 10800); // 3 hours default
142+
this.maxMessagesPerFile = jsonConfig.getInteger(Const.Config.OptOutMaxMessagesPerFileProp, 10000); // Memory protection limit
141143

142144
// HTTP server configuration - use port offset + 1 to avoid conflicts
143145
this.listenPort = Const.Port.ServicePortForOptOut + Utils.getPortOffset() + 1;
@@ -149,11 +151,12 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O
149151
int bufferSize = jsonConfig.getInteger(Const.Config.OptOutProducerBufferSizeProp);
150152
this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN);
151153

152-
// Initialize window reader
154+
// Initialize window reader with memory protection limit
153155
this.windowReader = new SqsWindowReader(
154156
this.sqsClient, this.queueUrl, this.maxMessagesPerPoll,
155-
this.visibilityTimeout, this.deltaWindowSeconds
157+
this.visibilityTimeout, this.deltaWindowSeconds, this.maxMessagesPerFile
156158
);
159+
LOGGER.info("OptOutSqsLogProducer initialized with maxMessagesPerFile: {}", this.maxMessagesPerFile);
157160
}
158161

159162
@Override
@@ -344,7 +347,7 @@ private JsonObject produceDeltasBlocking() throws Exception {
344347
DeltaProductionResult deltaResult = this.produceBatchedDeltas();
345348

346349
// Determine status based on results
347-
if (deltaResult.getDeltasProduced() == 0 && deltaResult.stoppedDueToRecentMessages()) {
350+
if (deltaResult.getDeltasProduced() == 0 && deltaResult.stoppedDueToMessagesTooRecent()) {
348351
// No deltas produced because all messages were too recent
349352
result.put("status", "skipped");
350353
result.put("reason", "All messages too recent");
@@ -365,30 +368,31 @@ private JsonObject produceDeltasBlocking() throws Exception {
365368
/**
366369
* Reads messages from SQS and produces delta files in 5 minute batches.
367370
* Continues until queue is empty or messages are too recent.
371+
* Windows are limited to maxMessagesPerFile for memory protection.
368372
*
369373
* @return DeltaProductionResult with counts and stop reason
370374
* @throws IOException if delta production fails
371375
*/
372376
private DeltaProductionResult produceBatchedDeltas() throws IOException {
373377
int deltasProduced = 0;
374378
int totalEntriesProcessed = 0;
375-
boolean stoppedDueToRecentMessages = false;
379+
boolean stoppedDueToMessagesTooRecent = false;
376380

377381
long jobStartTime = OptOutUtils.nowEpochSeconds();
378-
LOGGER.info("Starting delta production from SQS queue");
382+
LOGGER.info("Starting delta production from SQS queue (maxMessagesPerFile: {})", this.maxMessagesPerFile);
379383

380384
// Read and process windows until done
381385
while (true) {
382386
if(checkJobTimeout(jobStartTime)){
383387
break;
384388
}
385389

386-
// Read one complete 5-minute window
390+
// Read one complete 5-minute window (limited to maxMessagesPerFile)
387391
SqsWindowReader.WindowReadResult windowResult = windowReader.readWindow();
388392

389393
// If no messages, we're done (queue empty or messages too recent)
390394
if (windowResult.isEmpty()) {
391-
stoppedDueToRecentMessages = windowResult.stoppedDueToRecentMessages();
395+
stoppedDueToMessagesTooRecent = windowResult.stoppedDueToMessagesTooRecent();
392396
LOGGER.info("Delta production complete - no more eligible messages");
393397
break;
394398
}
@@ -422,7 +426,7 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException {
422426
LOGGER.info("Delta production complete: took {}s, produced {} deltas, processed {} entries",
423427
totalDuration, deltasProduced, totalEntriesProcessed);
424428

425-
return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, stoppedDueToRecentMessages);
429+
return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, stoppedDueToMessagesTooRecent);
426430
}
427431

428432
/**

src/main/java/com/uid2/optout/vertx/SqsWindowReader.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.uid2.optout.vertx;
22

3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
35
import software.amazon.awssdk.services.sqs.SqsClient;
46
import software.amazon.awssdk.services.sqs.model.Message;
57

@@ -9,23 +11,30 @@
911
/**
1012
* Reads messages from SQS for complete 5-minute time windows.
1113
* Handles accumulation of all messages for a window before returning.
14+
* Limits messages per window to prevent memory issues.
1215
*/
1316
public class SqsWindowReader {
17+
private static final Logger LOGGER = LoggerFactory.getLogger(SqsWindowReader.class);
18+
1419
private final SqsClient sqsClient;
1520
private final String queueUrl;
1621
private final int maxMessagesPerPoll;
1722
private final int visibilityTimeout;
1823
private final int deltaWindowSeconds;
24+
private final int maxMessagesPerFile;
1925
private final SqsBatchProcessor batchProcessor;
20-
26+
2127
public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerPoll,
22-
int visibilityTimeout, int deltaWindowSeconds) {
28+
int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerFile) {
2329
this.sqsClient = sqsClient;
2430
this.queueUrl = queueUrl;
2531
this.maxMessagesPerPoll = maxMessagesPerPoll;
2632
this.visibilityTimeout = visibilityTimeout;
2733
this.deltaWindowSeconds = deltaWindowSeconds;
34+
this.maxMessagesPerFile = maxMessagesPerFile;
2835
this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds);
36+
LOGGER.info("SqsWindowReader initialized with: maxMessagesPerFile: {}, maxMessagesPerPoll: {}, visibilityTimeout: {}, deltaWindowSeconds: {}",
37+
maxMessagesPerFile, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds);
2938
}
3039

3140
/**
@@ -34,18 +43,19 @@ public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerP
3443
public static class WindowReadResult {
3544
private final List<SqsParsedMessage> messages;
3645
private final long windowStart;
37-
private final boolean stoppedDueToRecentMessages;
46+
private final boolean stoppedDueToMessagesTooRecent;
3847

39-
public WindowReadResult(List<SqsParsedMessage> messages, long windowStart, boolean stoppedDueToRecentMessages) {
48+
public WindowReadResult(List<SqsParsedMessage> messages, long windowStart,
49+
boolean stoppedDueToMessagesTooRecent) {
4050
this.messages = messages;
4151
this.windowStart = windowStart;
42-
this.stoppedDueToRecentMessages = stoppedDueToRecentMessages;
52+
this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent;
4353
}
4454

4555
public List<SqsParsedMessage> getMessages() { return messages; }
4656
public long getWindowStart() { return windowStart; }
4757
public boolean isEmpty() { return messages.isEmpty(); }
48-
public boolean stoppedDueToRecentMessages() { return stoppedDueToRecentMessages; }
58+
public boolean stoppedDueToMessagesTooRecent() { return stoppedDueToMessagesTooRecent; }
4959
}
5060

5161
/**
@@ -54,6 +64,7 @@ public WindowReadResult(List<SqsParsedMessage> messages, long windowStart, boole
5464
* - We discover the next window
5565
* - Queue is empty (no more messages)
5666
* - Messages are too recent (all messages younger than 5 minutes)
67+
* - Message limit is reached (memory protection)
5768
*
5869
* @return WindowReadResult with messages for the window, or empty if done
5970
*/
@@ -62,6 +73,13 @@ public WindowReadResult readWindow() {
6273
long currentWindowStart = 0;
6374

6475
while (true) {
76+
// Check if we've hit the message limit
77+
if (windowMessages.size() >= this.maxMessagesPerFile) {
78+
LOGGER.warn("Window message limit reached ({} messages). Truncating window starting at {} for memory protection.",
79+
this.maxMessagesPerFile, currentWindowStart);
80+
return new WindowReadResult(windowMessages, currentWindowStart, false);
81+
}
82+
6583
// Read one batch from SQS (up to 10 messages)
6684
List<Message> rawBatch = SqsMessageOperations.receiveMessagesFromSqs(
6785
this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout);

0 commit comments

Comments
 (0)