Skip to content

Commit f671dd4

Browse files
committed
Merge branch 'ian-UID2-6146-update-traffic-calculator-to-hardcoded-baseline' into ian-UID2-6151-add-traffic-filter-class
2 parents a93b051 + 0265ae2 commit f671dd4

File tree

6 files changed

+241
-21
lines changed

6 files changed

+241
-21
lines changed

src/main/java/com/uid2/optout/Const.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ public static class Config extends com.uid2.shared.Const.Config {
2424
public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout";
2525
public static final String OptOutDeltaJobTimeoutSecondsProp = "optout_delta_job_timeout_seconds";
2626
public static final String OptOutS3BucketDroppedRequestsProp = "optout_s3_bucket_dropped_requests";
27+
public static final String OptOutMaxMessagesPerFileProp = "optout_max_messages_per_file";
2728
public static final String TrafficFilterConfigPathProp = "traffic_filter_config_path";
2829
public static final String TrafficCalcConfigPathProp = "traffic_calc_config_path";
2930
public static final String ManualOverrideS3PathProp = "manual_override_s3_path";

src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -6,12 +6,18 @@
66
public class DeltaProductionResult {
77
private final int deltasProduced;
88
private final int entriesProcessed;
9-
private final boolean stoppedDueToRecentMessages;
109

11-
public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToRecentMessages) {
10+
/*
11+
* indicates that there are still messages in the queue, however,
12+
* not enough time has elapsed to produce a delta file.
13+
* We produce in batches of (5 minutes)
14+
*/
15+
private final boolean stoppedDueToMessagesTooRecent;
16+
17+
public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToMessagesTooRecent) {
1218
this.deltasProduced = deltasProduced;
1319
this.entriesProcessed = entriesProcessed;
14-
this.stoppedDueToRecentMessages = stoppedDueToRecentMessages;
20+
this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent;
1521
}
1622

1723
public int getDeltasProduced() {
@@ -22,8 +28,8 @@ public int getEntriesProcessed() {
2228
return entriesProcessed;
2329
}
2430

25-
public boolean stoppedDueToRecentMessages() {
26-
return stoppedDueToRecentMessages;
31+
public boolean stoppedDueToMessagesTooRecent() {
32+
return stoppedDueToMessagesTooRecent;
2733
}
2834
}
2935

src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -85,6 +85,7 @@ public class OptOutSqsLogProducer extends AbstractVerticle {
8585
private final int visibilityTimeout;
8686
private final int deltaWindowSeconds; // Time window for each delta file (5 minutes = 300 seconds)
8787
private final int jobTimeoutSeconds;
88+
private final int maxMessagesPerFile; // Memory protection: max messages per delta file
8889
private final int listenPort;
8990
private final String internalApiKey;
9091
private final InternalAuthMiddleware internalAuth;
@@ -138,6 +139,7 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O
138139
this.visibilityTimeout = jsonConfig.getInteger(Const.Config.OptOutSqsVisibilityTimeoutProp, 240); // 4 minutes default
139140
this.deltaWindowSeconds = 300; // Fixed 5 minutes for all deltas
140141
this.jobTimeoutSeconds = jsonConfig.getInteger(Const.Config.OptOutDeltaJobTimeoutSecondsProp, 10800); // 3 hours default
142+
this.maxMessagesPerFile = jsonConfig.getInteger(Const.Config.OptOutMaxMessagesPerFileProp, 10000); // Memory protection limit
141143

142144
// HTTP server configuration - use port offset + 1 to avoid conflicts
143145
this.listenPort = Const.Port.ServicePortForOptOut + Utils.getPortOffset() + 1;
@@ -149,11 +151,12 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O
149151
int bufferSize = jsonConfig.getInteger(Const.Config.OptOutProducerBufferSizeProp);
150152
this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN);
151153

152-
// Initialize window reader
154+
// Initialize window reader with memory protection limit
153155
this.windowReader = new SqsWindowReader(
154156
this.sqsClient, this.queueUrl, this.maxMessagesPerPoll,
155-
this.visibilityTimeout, this.deltaWindowSeconds
157+
this.visibilityTimeout, this.deltaWindowSeconds, this.maxMessagesPerFile
156158
);
159+
LOGGER.info("OptOutSqsLogProducer initialized with maxMessagesPerFile: {}", this.maxMessagesPerFile);
157160
}
158161

159162
@Override
@@ -344,7 +347,7 @@ private JsonObject produceDeltasBlocking() throws Exception {
344347
DeltaProductionResult deltaResult = this.produceBatchedDeltas();
345348

346349
// Determine status based on results
347-
if (deltaResult.getDeltasProduced() == 0 && deltaResult.stoppedDueToRecentMessages()) {
350+
if (deltaResult.getDeltasProduced() == 0 && deltaResult.stoppedDueToMessagesTooRecent()) {
348351
// No deltas produced because all messages were too recent
349352
result.put("status", "skipped");
350353
result.put("reason", "All messages too recent");
@@ -365,30 +368,31 @@ private JsonObject produceDeltasBlocking() throws Exception {
365368
/**
366369
* Reads messages from SQS and produces delta files in 5 minute batches.
367370
* Continues until queue is empty or messages are too recent.
371+
* Windows are limited to maxMessagesPerFile for memory protection.
368372
*
369373
* @return DeltaProductionResult with counts and stop reason
370374
* @throws IOException if delta production fails
371375
*/
372376
private DeltaProductionResult produceBatchedDeltas() throws IOException {
373377
int deltasProduced = 0;
374378
int totalEntriesProcessed = 0;
375-
boolean stoppedDueToRecentMessages = false;
379+
boolean stoppedDueToMessagesTooRecent = false;
376380

377381
long jobStartTime = OptOutUtils.nowEpochSeconds();
378-
LOGGER.info("Starting delta production from SQS queue");
382+
LOGGER.info("Starting delta production from SQS queue (maxMessagesPerFile: {})", this.maxMessagesPerFile);
379383

380384
// Read and process windows until done
381385
while (true) {
382386
if(checkJobTimeout(jobStartTime)){
383387
break;
384388
}
385389

386-
// Read one complete 5-minute window
390+
// Read one complete 5-minute window (limited to maxMessagesPerFile)
387391
SqsWindowReader.WindowReadResult windowResult = windowReader.readWindow();
388392

389393
// If no messages, we're done (queue empty or messages too recent)
390394
if (windowResult.isEmpty()) {
391-
stoppedDueToRecentMessages = windowResult.stoppedDueToRecentMessages();
395+
stoppedDueToMessagesTooRecent = windowResult.stoppedDueToMessagesTooRecent();
392396
LOGGER.info("Delta production complete - no more eligible messages");
393397
break;
394398
}
@@ -422,7 +426,7 @@ private DeltaProductionResult produceBatchedDeltas() throws IOException {
422426
LOGGER.info("Delta production complete: took {}s, produced {} deltas, processed {} entries",
423427
totalDuration, deltasProduced, totalEntriesProcessed);
424428

425-
return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, stoppedDueToRecentMessages);
429+
return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, stoppedDueToMessagesTooRecent);
426430
}
427431

428432
/**

src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java

Lines changed: 43 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,20 @@ List<List<Long>> parseAllowlistRanges(JsonObject config) throws MalformedTraffic
179179

180180
ranges.sort(Comparator.comparing(range -> range.get(0)));
181181

182+
// Validate no overlapping ranges
183+
for (int i = 0; i < ranges.size() - 1; i++) {
184+
long currentEnd = ranges.get(i).get(1);
185+
long nextStart = ranges.get(i + 1).get(0);
186+
if (currentEnd >= nextStart) {
187+
LOGGER.error("Overlapping allowlist ranges detected: [{}, {}] overlaps with [{}, {}]",
188+
ranges.get(i).get(0), currentEnd, nextStart, ranges.get(i + 1).get(1));
189+
throw new MalformedTrafficCalcConfigException(
190+
"Overlapping allowlist ranges detected at indices " + i + " and " + (i + 1));
191+
}
192+
}
193+
194+
} catch (MalformedTrafficCalcConfigException e) {
195+
throw e;
182196
} catch (Exception e) {
183197
LOGGER.error("Failed to parse allowlist ranges", e);
184198
throw new MalformedTrafficCalcConfigException("Failed to parse allowlist ranges: " + e.getMessage());
@@ -215,8 +229,10 @@ public TrafficStatus calculateStatus(List<Message> sqsMessages) {
215229
long oldestQueueTs = findOldestQueueTimestamp(sqsMessages);
216230
LOGGER.info("Traffic calculation: oldestQueueTs={}", oldestQueueTs);
217231

218-
// Define start time of the delta evaluation window [newestDeltaTs - 24h, newestDeltaTs]
219-
long deltaWindowStart = newestDeltaTs - this.evaluationWindowSeconds - getAllowlistDuration(newestDeltaTs, newestDeltaTs - this.evaluationWindowSeconds);
232+
// Define start time of the delta evaluation window
233+
// We need evaluationWindowSeconds of non-allowlisted time, so we iteratively extend
234+
// the window to account for any allowlist ranges in the extended portion
235+
long deltaWindowStart = calculateWindowStartWithAllowlist(newestDeltaTs, this.evaluationWindowSeconds);
220236

221237
// Evict old cache entries (older than delta window start)
222238
evictOldCacheEntries(deltaWindowStart);
@@ -391,6 +407,31 @@ long getAllowlistDuration(long t, long windowStart) {
391407
return totalDuration;
392408
}
393409

410+
/**
411+
* Calculate the window start time that provides evaluationWindowSeconds of non-allowlisted time.
412+
* Iteratively extends the window to account for allowlist ranges that may fall in extended portions.
413+
*/
414+
long calculateWindowStartWithAllowlist(long newestDeltaTs, int evaluationWindowSeconds) {
415+
long allowlistDuration = getAllowlistDuration(newestDeltaTs, newestDeltaTs - evaluationWindowSeconds);
416+
417+
// Each iteration discovers at least one new allowlist range, so max iterations = number of ranges
418+
int maxIterations = this.allowlistRanges.size() + 1;
419+
420+
for (int i = 0; i < maxIterations && allowlistDuration > 0; i++) {
421+
long newWindowStart = newestDeltaTs - evaluationWindowSeconds - allowlistDuration;
422+
long newAllowlistDuration = getAllowlistDuration(newestDeltaTs, newWindowStart);
423+
424+
if (newAllowlistDuration == allowlistDuration) {
425+
// No new allowlist time in extended portion, we've converged
426+
break;
427+
}
428+
429+
allowlistDuration = newAllowlistDuration;
430+
}
431+
432+
return newestDeltaTs - evaluationWindowSeconds - allowlistDuration;
433+
}
434+
394435
/**
395436
* Find the oldest SQS queue message timestamp
396437
*/

src/main/java/com/uid2/optout/vertx/SqsWindowReader.java

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package com.uid2.optout.vertx;
22

3+
import org.slf4j.Logger;
4+
import org.slf4j.LoggerFactory;
35
import software.amazon.awssdk.services.sqs.SqsClient;
46
import software.amazon.awssdk.services.sqs.model.Message;
57

@@ -9,23 +11,30 @@
911
/**
1012
* Reads messages from SQS for complete 5-minute time windows.
1113
* Handles accumulation of all messages for a window before returning.
14+
* Limits messages per window to prevent memory issues.
1215
*/
1316
public class SqsWindowReader {
17+
private static final Logger LOGGER = LoggerFactory.getLogger(SqsWindowReader.class);
18+
1419
private final SqsClient sqsClient;
1520
private final String queueUrl;
1621
private final int maxMessagesPerPoll;
1722
private final int visibilityTimeout;
1823
private final int deltaWindowSeconds;
24+
private final int maxMessagesPerFile;
1925
private final SqsBatchProcessor batchProcessor;
20-
26+
2127
public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerPoll,
22-
int visibilityTimeout, int deltaWindowSeconds) {
28+
int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerFile) {
2329
this.sqsClient = sqsClient;
2430
this.queueUrl = queueUrl;
2531
this.maxMessagesPerPoll = maxMessagesPerPoll;
2632
this.visibilityTimeout = visibilityTimeout;
2733
this.deltaWindowSeconds = deltaWindowSeconds;
34+
this.maxMessagesPerFile = maxMessagesPerFile;
2835
this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds);
36+
LOGGER.info("SqsWindowReader initialized with: maxMessagesPerFile: {}, maxMessagesPerPoll: {}, visibilityTimeout: {}, deltaWindowSeconds: {}",
37+
maxMessagesPerFile, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds);
2938
}
3039

3140
/**
@@ -34,18 +43,19 @@ public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerP
3443
public static class WindowReadResult {
3544
private final List<SqsParsedMessage> messages;
3645
private final long windowStart;
37-
private final boolean stoppedDueToRecentMessages;
46+
private final boolean stoppedDueToMessagesTooRecent;
3847

39-
public WindowReadResult(List<SqsParsedMessage> messages, long windowStart, boolean stoppedDueToRecentMessages) {
48+
public WindowReadResult(List<SqsParsedMessage> messages, long windowStart,
49+
boolean stoppedDueToMessagesTooRecent) {
4050
this.messages = messages;
4151
this.windowStart = windowStart;
42-
this.stoppedDueToRecentMessages = stoppedDueToRecentMessages;
52+
this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent;
4353
}
4454

4555
public List<SqsParsedMessage> getMessages() { return messages; }
4656
public long getWindowStart() { return windowStart; }
4757
public boolean isEmpty() { return messages.isEmpty(); }
48-
public boolean stoppedDueToRecentMessages() { return stoppedDueToRecentMessages; }
58+
public boolean stoppedDueToMessagesTooRecent() { return stoppedDueToMessagesTooRecent; }
4959
}
5060

5161
/**
@@ -54,6 +64,7 @@ public WindowReadResult(List<SqsParsedMessage> messages, long windowStart, boole
5464
* - We discover the next window
5565
* - Queue is empty (no more messages)
5666
* - Messages are too recent (all messages younger than 5 minutes)
67+
* - Message limit is reached (memory protection)
5768
*
5869
* @return WindowReadResult with messages for the window, or empty if done
5970
*/
@@ -62,6 +73,13 @@ public WindowReadResult readWindow() {
6273
long currentWindowStart = 0;
6374

6475
while (true) {
76+
// Check if we've hit the message limit
77+
if (windowMessages.size() >= this.maxMessagesPerFile) {
78+
LOGGER.warn("Window message limit reached ({} messages). Truncating window starting at {} for memory protection.",
79+
this.maxMessagesPerFile, currentWindowStart);
80+
return new WindowReadResult(windowMessages, currentWindowStart, false);
81+
}
82+
6583
// Read one batch from SQS (up to 10 messages)
6684
List<Message> rawBatch = SqsMessageOperations.receiveMessagesFromSqs(
6785
this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout);

0 commit comments

Comments
 (0)