Skip to content

Commit 6e60cb7

Browse files
committed
pass read messages as combined number for invisible deduplication
1 parent 5f85b57 commit 6e60cb7

File tree

4 files changed

+23
-26
lines changed

4 files changed

+23
-26
lines changed

src/main/java/com/uid2/optout/delta/DeltaProductionOrchestrator.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -181,10 +181,9 @@ private boolean processWindow(SqsWindowReader.WindowReadResult windowResult,
181181
}
182182
}
183183

184-
// check traffic calculator - pass counts for accurate invisible message deduplication
185-
int filteredAsTooRecentCount = windowResult.getRawMessagesRead() - messages.size();
184+
// check traffic calculator
186185
SqsMessageOperations.QueueAttributes queueAttributes = SqsMessageOperations.getQueueAttributes(this.sqsClient, this.queueUrl);
187-
TrafficStatus trafficStatus = this.trafficCalculator.calculateStatus(deltaMessages, queueAttributes, droppedMessages.size(), filteredAsTooRecentCount);
186+
TrafficStatus trafficStatus = this.trafficCalculator.calculateStatus(deltaMessages, queueAttributes, windowResult.getRawMessagesRead());
188187

189188
if (trafficStatus == TrafficStatus.DELAYED_PROCESSING) {
190189
LOGGER.error("circuit_breaker_triggered: traffic spike detected, stopping production and setting manual override");

src/main/java/com/uid2/optout/traffic/TrafficCalculator.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ List<List<Long>> parseAllowlistRanges(JsonObject config) throws MalformedTraffic
216216
* @param filteredAsTooRecentCount Number of messages filtered as "too recent" by window reader
217217
* @return TrafficStatus (DELAYED_PROCESSING or DEFAULT)
218218
*/
219-
public TrafficStatus calculateStatus(List<SqsParsedMessage> sqsMessages, QueueAttributes queueAttributes, int denylistedCount, int filteredAsTooRecentCount) {
219+
public TrafficStatus calculateStatus(List<SqsParsedMessage> sqsMessages, QueueAttributes queueAttributes, int rawMessagesRead) {
220220

221221
try {
222222
// get list of delta files from s3 (sorted newest to oldest)
@@ -298,15 +298,13 @@ public TrafficStatus calculateStatus(List<SqsParsedMessage> sqsMessages, QueueAt
298298

299299
// add invisible messages being processed by other consumers
300300
// (notVisible count includes our messages, so subtract what we've read to avoid double counting)
301-
// ourMessages = delta messages + denylisted messages + filtered as "too recent" messages
302301
int otherConsumersMessages = 0;
303302
if (queueAttributes != null) {
304303
int totalInvisible = queueAttributes.getApproximateNumberOfMessagesNotVisible();
305-
int ourMessages = (sqsMessages != null ? sqsMessages.size() : 0) + denylistedCount + filteredAsTooRecentCount;
306-
otherConsumersMessages = Math.max(0, totalInvisible - ourMessages);
304+
otherConsumersMessages = Math.max(0, totalInvisible - rawMessagesRead);
307305
totalRecords += otherConsumersMessages;
308306
LOGGER.info("traffic calculation: adding {} invisible messages from other consumers (totalInvisible={}, ourMessages={})",
309-
otherConsumersMessages, totalInvisible, ourMessages);
307+
otherConsumersMessages, totalInvisible, rawMessagesRead);
310308
}
311309

312310
// determine status

src/test/java/com/uid2/optout/delta/DeltaProductionOrchestratorTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -60,7 +60,7 @@ void setUp() {
6060

6161
// default behavior
6262
when(mockManualOverrideService.isDelayedProcessing()).thenReturn(false);
63-
when(mockTrafficCalculator.calculateStatus(anyList(), any(), anyInt(), anyInt()))
63+
when(mockTrafficCalculator.calculateStatus(anyList(), any(), anyInt()))
6464
.thenReturn(TrafficCalculator.TrafficStatus.DEFAULT);
6565
when(mockCloudSync.toCloudPath(anyString())).thenAnswer(inv -> "delta/" + inv.getArgument(0));
6666

@@ -357,7 +357,7 @@ void testProduceBatchedDeltas_circuitBreakerTriggered_stopsAndSetsOverride() thr
357357
when(mockTrafficFilter.isDenylisted(any())).thenReturn(false);
358358

359359
// Setup - circuit breaker triggered
360-
when(mockTrafficCalculator.calculateStatus(anyList(), any(), anyInt(), anyInt()))
360+
when(mockTrafficCalculator.calculateStatus(anyList(), any(), anyInt()))
361361
.thenReturn(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING);
362362

363363
DeltaProductionResult result = orchestrator.produceBatchedDeltas(name -> {});

src/test/java/com/uid2/optout/traffic/TrafficCalculatorTest.java

Lines changed: 16 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1106,7 +1106,7 @@ void testCalculateStatus_noDeltaFiles() throws Exception {
11061106
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
11071107

11081108
// Act & Assert - should throw exception when no delta files
1109-
assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0, 0));
1109+
assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0));
11101110
}
11111111

11121112
@Test
@@ -1134,7 +1134,7 @@ void testCalculateStatus_normalTraffic() throws Exception {
11341134

11351135
// Act
11361136
List<SqsParsedMessage> sqsMessages = Arrays.asList(createSqsMessage(t));
1137-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
1137+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
11381138

11391139
// Assert - 100+1 < 5 * 50 = 250, so should be DEFAULT
11401140
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1165,7 +1165,7 @@ void testCalculateStatus_delayedProcessing() throws Exception {
11651165

11661166
// Act
11671167
List<SqsParsedMessage> sqsMessages = Arrays.asList(createSqsMessage(t));
1168-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
1168+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
11691169

11701170
// Assert - 100+1 >= 5 * 10 = 50, DELAYED_PROCESSING
11711171
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
@@ -1188,7 +1188,7 @@ void testCalculateStatus_noSqsMessages() throws Exception {
11881188
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
11891189

11901190
// Act - null SQS messages
1191-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(null, null, 0, 0);
1191+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(null, null, 0);
11921192

11931193
// Assert - should still calculate based on delta files, DEFAULT
11941194
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1211,7 +1211,7 @@ void testCalculateStatus_emptySqsMessages() throws Exception {
12111211
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
12121212

12131213
// Act - empty SQS messages
1214-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList(), null, 0, 0);
1214+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList(), null, 0);
12151215

12161216
// Assert - should still calculate based on delta files, DEFAULT
12171217
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1243,7 +1243,7 @@ void testCalculateStatus_multipleSqsMessages() throws Exception {
12431243
for (int i = 0; i < 30; i++) {
12441244
sqsMessages.add(createSqsMessage(t - i * 10));
12451245
}
1246-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
1246+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
12471247

12481248
// Assert - DELAYED_PROCESSING
12491249
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
@@ -1286,7 +1286,7 @@ void testCalculateStatus_withTrafficCalcConfig() throws Exception {
12861286

12871287
// Act
12881288
List<SqsParsedMessage> sqsMessages = Arrays.asList(createSqsMessage(t));
1289-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
1289+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
12901290

12911291
// Assert - should filter out entries in traffic calc config ranges
12921292
// Only 300 from window count (not in traffic calc config ranges) + 1 SQS = 301
@@ -1312,13 +1312,13 @@ void testCalculateStatus_cacheUtilization() throws Exception {
13121312

13131313
// Act - first call should populate cache
13141314
List<SqsParsedMessage> sqsMessages = Arrays.asList(createSqsMessage(t));
1315-
calculator.calculateStatus(sqsMessages, null, 0, 0);
1315+
calculator.calculateStatus(sqsMessages, null, 0);
13161316

13171317
Map<String, Object> stats = calculator.getCacheStats();
13181318
int cachedFiles = (Integer) stats.get("cached_files");
13191319

13201320
// Second call should use cache (no additional S3 download)
1321-
calculator.calculateStatus(sqsMessages, null, 0, 0);
1321+
calculator.calculateStatus(sqsMessages, null, 0);
13221322

13231323
Map<String, Object> stats2 = calculator.getCacheStats();
13241324
int cachedFiles2 = (Integer) stats2.get("cached_files");
@@ -1340,7 +1340,7 @@ void testCalculateStatus_s3Exception() throws Exception {
13401340
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
13411341

13421342
// Act & Assert - should throw exception on S3 error
1343-
assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0, 0));
1343+
assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0));
13441344
}
13451345

13461346
@Test
@@ -1354,7 +1354,7 @@ void testCalculateStatus_deltaFileReadException() throws Exception {
13541354
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
13551355

13561356
// Act & Assert - should throw exception on S3 download error
1357-
assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0, 0));
1357+
assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0));
13581358
}
13591359

13601360
@Test
@@ -1391,7 +1391,7 @@ void testCalculateStatus_multipleDeltaFiles() throws Exception {
13911391

13921392
// Act
13931393
List<SqsParsedMessage> sqsMessages = Arrays.asList(createSqsMessage(t));
1394-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
1394+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
13951395

13961396
// Assert - DEFAULT
13971397
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1425,7 +1425,7 @@ void testCalculateStatus_windowBoundaryTimestamp() throws Exception {
14251425

14261426
// Act
14271427
List<SqsParsedMessage> sqsMessages = Arrays.asList(createSqsMessage(t));
1428-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
1428+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
14291429

14301430
// Assert - DEFAULT
14311431
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
@@ -1449,7 +1449,7 @@ void testCalculateStatus_timestampsCached() throws Exception {
14491449

14501450
// Act
14511451
List<SqsParsedMessage> sqsMessages = Arrays.asList(createSqsMessage(t));
1452-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
1452+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
14531453

14541454
// Assert
14551455
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1493,7 +1493,7 @@ void testCalculateStatus_delayedProcessingFromQueueAttributesOnly() throws Excep
14931493
SqsMessageOperations.QueueAttributes queueAttributes =
14941494
new SqsMessageOperations.QueueAttributes(0, 600, 0);
14951495

1496-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes, 0, 0);
1496+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes, 0);
14971497

14981498
// Assert - DELAYED_PROCESSING due to high invisible message count from other consumers
14991499
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
@@ -1533,7 +1533,7 @@ void testCalculateStatus_delayedProcessingFromBothQueueAndMessages() throws Exce
15331533
SqsMessageOperations.QueueAttributes queueAttributes =
15341534
new SqsMessageOperations.QueueAttributes(0, 450, 0);
15351535

1536-
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes, 0, 0);
1536+
TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes, 0);
15371537

15381538
// Assert - DELAYED_PROCESSING due to combined count exceeding threshold
15391539
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);

0 commit comments

Comments
 (0)