diff --git a/src/main/java/com/uid2/optout/delta/DeltaProductionOrchestrator.java b/src/main/java/com/uid2/optout/delta/DeltaProductionOrchestrator.java
new file mode 100644
index 0000000..a4ffc19
--- /dev/null
+++ b/src/main/java/com/uid2/optout/delta/DeltaProductionOrchestrator.java
@@ -0,0 +1,293 @@
+package com.uid2.optout.delta;
+
+import com.uid2.optout.sqs.SqsMessageOperations;
+import com.uid2.optout.sqs.SqsParsedMessage;
+import com.uid2.optout.sqs.SqsWindowReader;
+import com.uid2.optout.traffic.TrafficCalculator;
+import com.uid2.optout.traffic.TrafficCalculator.TrafficStatus;
+import com.uid2.optout.traffic.TrafficFilter;
+import com.uid2.shared.optout.OptOutCloudSync;
+import com.uid2.shared.optout.OptOutUtils;
+import io.vertx.core.json.JsonArray;
+import io.vertx.core.json.JsonObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.Message;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.time.Instant;
+import java.time.temporal.ChronoUnit;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+/**
+ * Orchestrates the delta production workflow.
+ *
+ *
This class encapsulates the core delta production logic:
+ *
+ * - Reading messages from SQS in 5-minute windows
+ * - Filtering denylisted messages
+ * - Checking circuit breakers (manual override, traffic calculator)
+ * - Constructing delta files and dropped request files
+ * - Uploading to S3 and deleting processed messages
+ *
+ *
+ */
+public class DeltaProductionOrchestrator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(DeltaProductionOrchestrator.class);
+
+ private final SqsClient sqsClient;
+ private final String queueUrl;
+ private final int replicaId;
+ private final int deltaWindowSeconds;
+ private final int jobTimeoutSeconds;
+
+ private final SqsWindowReader windowReader;
+ private final DeltaFileWriter deltaFileWriter;
+ private final S3UploadService deltaUploadService;
+ private final S3UploadService droppedRequestUploadService;
+ private final ManualOverrideService manualOverrideService;
+ private final TrafficFilter trafficFilter;
+ private final TrafficCalculator trafficCalculator;
+ private final OptOutCloudSync cloudSync;
+ private final DeltaProductionMetrics metrics;
+
+ public DeltaProductionOrchestrator(
+ SqsClient sqsClient,
+ String queueUrl,
+ int replicaId,
+ int deltaWindowSeconds,
+ int jobTimeoutSeconds,
+ SqsWindowReader windowReader,
+ DeltaFileWriter deltaFileWriter,
+ S3UploadService deltaUploadService,
+ S3UploadService droppedRequestUploadService,
+ ManualOverrideService manualOverrideService,
+ TrafficFilter trafficFilter,
+ TrafficCalculator trafficCalculator,
+ OptOutCloudSync cloudSync,
+ DeltaProductionMetrics metrics) {
+ this.sqsClient = sqsClient;
+ this.queueUrl = queueUrl;
+ this.replicaId = replicaId;
+ this.deltaWindowSeconds = deltaWindowSeconds;
+ this.jobTimeoutSeconds = jobTimeoutSeconds;
+ this.windowReader = windowReader;
+ this.deltaFileWriter = deltaFileWriter;
+ this.deltaUploadService = deltaUploadService;
+ this.droppedRequestUploadService = droppedRequestUploadService;
+ this.manualOverrideService = manualOverrideService;
+ this.trafficFilter = trafficFilter;
+ this.trafficCalculator = trafficCalculator;
+ this.cloudSync = cloudSync;
+ this.metrics = metrics;
+ }
+
+ /**
+ * Produces delta files from SQS queue in batched 5-minute windows.
+ *
+ * Continues until queue is empty, messages are too recent, circuit breaker is triggered, or job timeout is reached.
+ *
+ * @param onDeltaProduced Called with delta filename after each successful delta upload (for event & metrics publishing)
+ * @return DeltaProductionResult with production statistics
+ * @throws IOException if delta production fails
+ */
+ public DeltaProductionResult produceBatchedDeltas(Consumer onDeltaProduced) throws IOException {
+
+ // check for manual override
+ if (manualOverrideService.isDelayedProcessing()) {
+ LOGGER.info("manual override set to DELAYED_PROCESSING, skipping production");
+ return DeltaProductionResult.builder().stopReason(StopReason.MANUAL_OVERRIDE_ACTIVE).build();
+ }
+
+ DeltaProductionResult.Builder result = DeltaProductionResult.builder();
+ long jobStartTime = OptOutUtils.nowEpochSeconds();
+
+ LOGGER.info("starting delta production from SQS queue (replicaId: {}, deltaWindowSeconds: {}, jobTimeoutSeconds: {})",
+ this.replicaId, this.deltaWindowSeconds, this.jobTimeoutSeconds);
+
+ // read and process windows until done
+ while (!isJobTimedOut(jobStartTime)) {
+
+ // read one complete 5-minute window
+ SqsWindowReader.WindowReadResult windowResult = windowReader.readWindow();
+
+ // if no messages, we're done (queue empty or messages too recent)
+ if (windowResult.isEmpty()) {
+ result.stopReason(windowResult.getStopReason());
+ LOGGER.info("delta production complete - no more eligible messages (reason: {})", windowResult.getStopReason().name());
+ break;
+ }
+
+ // process this window
+ boolean isDelayedProcessing = processWindow(windowResult, result, onDeltaProduced);
+
+ // circuit breaker triggered
+ if (isDelayedProcessing) {
+ result.stopReason(StopReason.CIRCUIT_BREAKER_TRIGGERED);
+ return result.build();
+ }
+ }
+
+ return result.build();
+ }
+
+ /**
+ * Processes a single 5-minute window of messages.
+ *
+ * @param windowResult The window data to process
+ * @param result The builder to accumulate statistics into
+ * @param onDeltaProduced Callback for when a delta is produced
+ * @return true if the circuit breaker triggered
+ */
+ private boolean processWindow(SqsWindowReader.WindowReadResult windowResult,
+ DeltaProductionResult.Builder result,
+ Consumer onDeltaProduced) throws IOException {
+ long windowStart = windowResult.getWindowStart();
+ List messages = windowResult.getMessages();
+
+ // check for manual override
+ if (manualOverrideService.isDelayedProcessing()) {
+ LOGGER.info("manual override set to DELAYED_PROCESSING, stopping production");
+ return true;
+ }
+
+ // create buffers for current window
+ ByteArrayOutputStream deltaStream = new ByteArrayOutputStream();
+ JsonArray droppedRequestStream = new JsonArray();
+
+ // get file names for current window
+ String deltaName = OptOutUtils.newDeltaFileName(this.replicaId);
+ String droppedRequestName = generateDroppedRequestFileName();
+
+ // write start of delta
+ deltaFileWriter.writeStartOfDelta(deltaStream, windowStart);
+
+ // separate messages into delta entries and dropped requests
+ List deltaMessages = new ArrayList<>();
+ List droppedMessages = new ArrayList<>();
+
+ for (SqsParsedMessage msg : messages) {
+ if (trafficFilter.isDenylisted(msg)) {
+ writeDroppedRequestEntry(droppedRequestStream, msg);
+ droppedMessages.add(msg);
+ } else {
+ deltaFileWriter.writeOptOutEntry(deltaStream, msg.hashBytes(), msg.idBytes(), msg.timestamp());
+ deltaMessages.add(msg);
+ }
+ }
+
+ // check traffic calculator
+ SqsMessageOperations.QueueAttributes queueAttributes = SqsMessageOperations.getQueueAttributes(this.sqsClient, this.queueUrl);
+ TrafficStatus trafficStatus = this.trafficCalculator.calculateStatus(deltaMessages, queueAttributes, windowResult.getRawMessagesRead());
+
+ if (trafficStatus == TrafficStatus.DELAYED_PROCESSING) {
+ LOGGER.error("circuit_breaker_triggered: traffic spike detected, stopping production and setting manual override");
+ manualOverrideService.setDelayedProcessing();
+ return true;
+ }
+
+ // upload delta file if there are non-denylisted messages
+ if (!deltaMessages.isEmpty()) {
+ uploadDelta(deltaStream, deltaName, windowStart, deltaMessages, onDeltaProduced);
+ result.incrementDeltas(deltaMessages.size());
+ }
+
+ // upload dropped request file if there are denylisted messages
+ if (!droppedMessages.isEmpty() && droppedRequestUploadService != null) {
+ uploadDroppedRequests(droppedRequestStream, droppedRequestName, windowStart, droppedMessages);
+ result.incrementDroppedRequests(droppedMessages.size());
+ }
+
+ LOGGER.info("processed window [{}, {}]: {} entries, {} dropped requests",
+ windowStart, windowStart + this.deltaWindowSeconds,
+ deltaMessages.size(), droppedMessages.size());
+
+ return false;
+ }
+
+ /**
+ * Adds end-of-delta entry to delta stream and converts to byte array,
+ * then uploads delta file to S3 and deletes associated messages from SQS.
+ */
+ private void uploadDelta(ByteArrayOutputStream deltaStream, String deltaName,
+ long windowStart, List messages,
+ Consumer onDeltaProduced) throws IOException {
+ // add end-of-delta entry
+ long endTimestamp = windowStart + this.deltaWindowSeconds;
+ deltaFileWriter.writeEndOfDelta(deltaStream, endTimestamp);
+
+ // convert delta stream to byte array
+ byte[] deltaData = deltaStream.toByteArray();
+ String s3Path = this.cloudSync.toCloudPath(deltaName);
+
+ // get original messages for deletion
+ List originalMessages = messages.stream().map(SqsParsedMessage::originalMessage).collect(Collectors.toList());
+
+ // upload and delete
+ deltaUploadService.uploadAndDeleteMessages(deltaData, s3Path, originalMessages, (count) -> {
+ metrics.recordDeltaProduced(count);
+ onDeltaProduced.accept(deltaName);
+ });
+ }
+
+ /**
+ * Uploads dropped requests to S3 and deletes associated messages from SQS.
+ */
+ private void uploadDroppedRequests(JsonArray droppedRequestStream, String droppedRequestName,
+ long windowStart, List messages) throws IOException {
+
+ // convert dropped request stream to byte array
+ byte[] droppedRequestData = droppedRequestStream.encode().getBytes();
+
+ // get original messages for deletion
+ List originalMessages = messages.stream().map(SqsParsedMessage::originalMessage).collect(Collectors.toList());
+
+ // upload and delete
+ droppedRequestUploadService.uploadAndDeleteMessages(droppedRequestData, droppedRequestName, originalMessages,
+ metrics::recordDroppedRequestsProduced);
+ }
+
+ /**
+ * Writes a dropped request entry to the JSON array.
+ */
+ private void writeDroppedRequestEntry(JsonArray droppedRequestArray, SqsParsedMessage parsed) {
+ String messageBody = parsed.originalMessage().body();
+ JsonObject messageJson = new JsonObject(messageBody);
+ droppedRequestArray.add(messageJson);
+ }
+
+ /**
+ * Generates a unique filename for dropped requests.
+ */
+ private String generateDroppedRequestFileName() {
+ return String.format("%s%03d_%s_%08x.json",
+ "optout-dropped-",
+ replicaId,
+ Instant.now().truncatedTo(ChronoUnit.SECONDS).toString().replace(':', '.'),
+ OptOutUtils.rand.nextInt());
+ }
+
+ /**
+ * Checks if the job has exceeded its timeout.
+ */
+ private boolean isJobTimedOut(long jobStartTime) {
+ long elapsedTime = OptOutUtils.nowEpochSeconds() - jobStartTime;
+
+ if (elapsedTime > 3600) { // 1 hour - log warning
+ LOGGER.error("delta_job_timeout: job has been running for {} seconds", elapsedTime);
+ }
+
+ if (elapsedTime > this.jobTimeoutSeconds) {
+ LOGGER.error("delta_job_timeout: job exceeded timeout, running for {} seconds (timeout: {}s)",
+ elapsedTime, this.jobTimeoutSeconds);
+ return true;
+ }
+ return false;
+ }
+}
+
diff --git a/src/main/java/com/uid2/optout/delta/DeltaProductionResult.java b/src/main/java/com/uid2/optout/delta/DeltaProductionResult.java
index 73be044..aa8f7b7 100644
--- a/src/main/java/com/uid2/optout/delta/DeltaProductionResult.java
+++ b/src/main/java/com/uid2/optout/delta/DeltaProductionResult.java
@@ -111,6 +111,11 @@ public Builder incrementDroppedRequests(int count) {
return this;
}
+ public Builder stopReason(StopReason reason) {
+ this.stopReason = reason;
+ return this;
+ }
+
/**
* Builds the DeltaProductionResult with the accumulated statistics.
*/
diff --git a/src/main/java/com/uid2/optout/traffic/TrafficCalculator.java b/src/main/java/com/uid2/optout/traffic/TrafficCalculator.java
index 6fd5149..7e090b8 100644
--- a/src/main/java/com/uid2/optout/traffic/TrafficCalculator.java
+++ b/src/main/java/com/uid2/optout/traffic/TrafficCalculator.java
@@ -216,7 +216,7 @@ List> parseAllowlistRanges(JsonObject config) throws MalformedTraffic
* @param filteredAsTooRecentCount Number of messages filtered as "too recent" by window reader
* @return TrafficStatus (DELAYED_PROCESSING or DEFAULT)
*/
- public TrafficStatus calculateStatus(List sqsMessages, QueueAttributes queueAttributes, int denylistedCount, int filteredAsTooRecentCount) {
+ public TrafficStatus calculateStatus(List sqsMessages, QueueAttributes queueAttributes, int rawMessagesRead) {
try {
// get list of delta files from s3 (sorted newest to oldest)
@@ -298,15 +298,13 @@ public TrafficStatus calculateStatus(List sqsMessages, QueueAt
// add invisible messages being processed by other consumers
// (notVisible count includes our messages, so subtract what we've read to avoid double counting)
- // ourMessages = delta messages + denylisted messages + filtered as "too recent" messages
int otherConsumersMessages = 0;
if (queueAttributes != null) {
int totalInvisible = queueAttributes.getApproximateNumberOfMessagesNotVisible();
- int ourMessages = (sqsMessages != null ? sqsMessages.size() : 0) + denylistedCount + filteredAsTooRecentCount;
- otherConsumersMessages = Math.max(0, totalInvisible - ourMessages);
+ otherConsumersMessages = Math.max(0, totalInvisible - rawMessagesRead);
totalRecords += otherConsumersMessages;
LOGGER.info("traffic calculation: adding {} invisible messages from other consumers (totalInvisible={}, ourMessages={})",
- otherConsumersMessages, totalInvisible, ourMessages);
+ otherConsumersMessages, totalInvisible, rawMessagesRead);
}
// determine status
diff --git a/src/test/java/com/uid2/optout/delta/DeltaProductionOrchestratorTest.java b/src/test/java/com/uid2/optout/delta/DeltaProductionOrchestratorTest.java
new file mode 100644
index 0000000..429b170
--- /dev/null
+++ b/src/test/java/com/uid2/optout/delta/DeltaProductionOrchestratorTest.java
@@ -0,0 +1,461 @@
+package com.uid2.optout.delta;
+
+import com.uid2.optout.sqs.SqsParsedMessage;
+import com.uid2.optout.sqs.SqsWindowReader;
+import com.uid2.optout.traffic.TrafficCalculator;
+import com.uid2.optout.traffic.TrafficFilter;
+import com.uid2.shared.optout.OptOutCloudSync;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.GetQueueAttributesRequest;
+import software.amazon.awssdk.services.sqs.model.GetQueueAttributesResponse;
+import software.amazon.awssdk.services.sqs.model.Message;
+import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName;
+import software.amazon.awssdk.services.sqs.model.QueueAttributeName;
+
+import java.io.IOException;
+import java.util.*;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
+
+import static org.junit.jupiter.api.Assertions.*;
+import static org.mockito.ArgumentMatchers.*;
+import static org.mockito.Mockito.*;
+
+class DeltaProductionOrchestratorTest {
+
+ // test constants
+ private static final String TEST_QUEUE_URL = "https://sqs.test.amazonaws.com/123456789/test-queue";
+ private static final int REPLICA_ID = 1;
+ private static final int DELTA_WINDOW_SECONDS = 300; // 5 minutes
+ private static final int JOB_TIMEOUT_SECONDS = 3600; // 1 hour
+
+ // mocks
+ private SqsClient mockSqsClient;
+ private SqsWindowReader mockWindowReader;
+ private DeltaFileWriter mockDeltaFileWriter;
+ private S3UploadService mockDeltaUploadService;
+ private S3UploadService mockDroppedRequestUploadService;
+ private ManualOverrideService mockManualOverrideService;
+ private TrafficFilter mockTrafficFilter;
+ private TrafficCalculator mockTrafficCalculator;
+ private OptOutCloudSync mockCloudSync;
+ private DeltaProductionMetrics mockMetrics;
+
+ private DeltaProductionOrchestrator orchestrator;
+
+ @BeforeEach
+ void setUp() {
+ mockSqsClient = mock(SqsClient.class);
+ mockWindowReader = mock(SqsWindowReader.class);
+ mockDeltaFileWriter = mock(DeltaFileWriter.class);
+ mockDeltaUploadService = mock(S3UploadService.class);
+ mockDroppedRequestUploadService = mock(S3UploadService.class);
+ mockManualOverrideService = mock(ManualOverrideService.class);
+ mockTrafficFilter = mock(TrafficFilter.class);
+ mockTrafficCalculator = mock(TrafficCalculator.class);
+ mockCloudSync = mock(OptOutCloudSync.class);
+ mockMetrics = mock(DeltaProductionMetrics.class);
+
+ // default behavior
+ when(mockManualOverrideService.isDelayedProcessing()).thenReturn(false);
+ when(mockTrafficCalculator.calculateStatus(anyList(), any(), anyInt()))
+ .thenReturn(TrafficCalculator.TrafficStatus.DEFAULT);
+ when(mockCloudSync.toCloudPath(anyString())).thenAnswer(inv -> "delta/" + inv.getArgument(0));
+
+ // mock sqs queue attributes
+ GetQueueAttributesResponse queueAttributesResponse = GetQueueAttributesResponse.builder()
+ .attributes(Map.of(
+ QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, "0",
+ QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, "0",
+ QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, "0"
+ ))
+ .build();
+ when(mockSqsClient.getQueueAttributes(any(GetQueueAttributesRequest.class)))
+ .thenReturn(queueAttributesResponse);
+
+ // mock delta upload service to invoke callback
+ try {
+ doAnswer(invocation -> {
+ S3UploadService.UploadSuccessCallback callback = invocation.getArgument(3);
+ if (callback != null) {
+ List messages = invocation.getArgument(2);
+ callback.onSuccess(messages.size());
+ }
+ return null;
+ }).when(mockDeltaUploadService).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+
+ orchestrator = new DeltaProductionOrchestrator(
+ mockSqsClient,
+ TEST_QUEUE_URL,
+ REPLICA_ID,
+ DELTA_WINDOW_SECONDS,
+ JOB_TIMEOUT_SECONDS,
+ mockWindowReader,
+ mockDeltaFileWriter,
+ mockDeltaUploadService,
+ mockDroppedRequestUploadService,
+ mockManualOverrideService,
+ mockTrafficFilter,
+ mockTrafficCalculator,
+ mockCloudSync,
+ mockMetrics
+ );
+ }
+
+ // ==================== Manual Override Tests ====================
+
+ @Test
+ void testProduceBatchedDeltas_manualOverrideActive_returnsEarly() throws IOException {
+ // Setup - manual override is active
+ when(mockManualOverrideService.isDelayedProcessing()).thenReturn(true);
+
+ // Act
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(name -> {});
+
+ // Assert - stop reason, totals
+ assertEquals(StopReason.MANUAL_OVERRIDE_ACTIVE, result.getStopReason());
+ assertEquals(0, result.getDeltasProduced());
+ assertEquals(0, result.getEntriesProcessed());
+
+ // Assert - nothing read
+ verify(mockWindowReader, never()).readWindow();
+ // Assert - nothing uploaded
+ verify(mockDeltaUploadService, never()).uploadAndDeleteMessages(any(), any(), any(), any());
+ verify(mockDroppedRequestUploadService, never()).uploadAndDeleteMessages(any(), any(), any(), any());
+ }
+
+ @Test
+ void testProduceBatchedDeltas_manualOverrideActiveDuringProcessing_stops() throws IOException {
+ // Setup - first call returns false, second call returns true
+ when(mockManualOverrideService.isDelayedProcessing())
+ .thenReturn(false)
+ .thenReturn(true);
+
+ // Setup - window with 5 messages
+ List messages = createParsedMessages(5);
+ SqsWindowReader.WindowReadResult windowResult = SqsWindowReader.WindowReadResult.withMessages(
+ messages, System.currentTimeMillis() / 1000 - 600, 5);
+ when(mockWindowReader.readWindow()).thenReturn(windowResult);
+
+ // Act
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(name -> {});
+
+ // Assert - stop reason
+ assertEquals(StopReason.CIRCUIT_BREAKER_TRIGGERED, result.getStopReason());
+
+ // Assert - nothing uploaded
+ verify(mockDeltaUploadService, never()).uploadAndDeleteMessages(any(), any(), any(), any());
+ }
+
+ // ==================== Empty Queue Tests ====================
+
+ @Test
+ void testProduceBatchedDeltas_emptyQueue_returnsQueueEmpty() throws IOException {
+ // Setup - empty window
+ SqsWindowReader.WindowReadResult emptyResult = SqsWindowReader.WindowReadResult.queueEmpty(
+ Collections.emptyList(), 0, 0);
+ when(mockWindowReader.readWindow()).thenReturn(emptyResult);
+
+ // Act
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(name -> {});
+
+ // Assert - stop reason, totals
+ assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason());
+ assertEquals(0, result.getDeltasProduced());
+ assertEquals(0, result.getEntriesProcessed());
+ }
+
+ @Test
+ void testProduceBatchedDeltas_messagesTooRecent_returnsMessagesTooRecent() throws IOException {
+ // Setup - window with messages too recent
+ SqsWindowReader.WindowReadResult tooRecentResult = SqsWindowReader.WindowReadResult.messagesTooRecent(
+ Collections.emptyList(), 0, 5);
+ when(mockWindowReader.readWindow()).thenReturn(tooRecentResult);
+
+ // Act
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(name -> {});
+
+ // Assert - stop reason, totals
+ assertEquals(StopReason.MESSAGES_TOO_RECENT, result.getStopReason());
+ assertEquals(0, result.getDeltasProduced());
+ }
+
+ // ==================== Normal Processing Tests ====================
+
+ @Test
+ void testProduceBatchedDeltas_singleWindow_producesOneDelta() throws IOException {
+ // Setup - window with 10 messages, then empty
+ List messages = createParsedMessages(10);
+ long windowStart = System.currentTimeMillis() / 1000 - 600;
+
+ SqsWindowReader.WindowReadResult windowResult = SqsWindowReader.WindowReadResult.withMessages(
+ messages, windowStart, 10);
+ SqsWindowReader.WindowReadResult emptyResult = SqsWindowReader.WindowReadResult.queueEmpty(
+ Collections.emptyList(), 0, 0);
+
+ when(mockWindowReader.readWindow())
+ .thenReturn(windowResult)
+ .thenReturn(emptyResult);
+
+ // Setup - no denylisted messages
+ when(mockTrafficFilter.isDenylisted(any())).thenReturn(false);
+
+ // Act
+ AtomicReference producedDelta = new AtomicReference<>();
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(producedDelta::set);
+
+ // Assert - stop reason, totals
+ assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason());
+ assertEquals(1, result.getDeltasProduced());
+ assertEquals(10, result.getEntriesProcessed());
+ assertEquals(0, result.getDroppedRequestFilesProduced());
+ assertNotNull(producedDelta.get());
+
+ // Assert - delta file constructed
+ verify(mockDeltaFileWriter).writeStartOfDelta(any(), eq(windowStart));
+ verify(mockDeltaFileWriter, times(10)).writeOptOutEntry(any(), any(), any(), anyLong());
+ verify(mockDeltaFileWriter).writeEndOfDelta(any(), eq(windowStart + DELTA_WINDOW_SECONDS));
+
+ // Assert - delta uploaded
+ verify(mockDeltaUploadService).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+ }
+
+ @Test
+ void testProduceBatchedDeltas_multipleWindows_producesMultipleDeltas() throws IOException {
+ // Setup - window with 5 messages, then window with 3 messages, then empty
+ List messages1 = createParsedMessages(5);
+ List messages2 = createParsedMessages(3);
+ long windowStart1 = System.currentTimeMillis() / 1000 - 900;
+ long windowStart2 = System.currentTimeMillis() / 1000 - 600;
+
+ SqsWindowReader.WindowReadResult windowResult1 = SqsWindowReader.WindowReadResult.withMessages(
+ messages1, windowStart1, 5);
+ SqsWindowReader.WindowReadResult windowResult2 = SqsWindowReader.WindowReadResult.withMessages(
+ messages2, windowStart2, 3);
+ SqsWindowReader.WindowReadResult emptyResult = SqsWindowReader.WindowReadResult.queueEmpty(
+ Collections.emptyList(), 0, 0);
+
+ when(mockWindowReader.readWindow())
+ .thenReturn(windowResult1)
+ .thenReturn(windowResult2)
+ .thenReturn(emptyResult);
+
+ // Setup - no denylisted messages
+ when(mockTrafficFilter.isDenylisted(any())).thenReturn(false);
+
+ // Act
+ AtomicInteger deltaCount = new AtomicInteger(0);
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(name -> deltaCount.incrementAndGet());
+
+ // Assert - stop reason, totals
+ assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason());
+ assertEquals(2, result.getDeltasProduced());
+ assertEquals(8, result.getEntriesProcessed());
+ assertEquals(2, deltaCount.get());
+
+ // Assert - delta file constructed
+ verify(mockDeltaFileWriter, times(2)).writeStartOfDelta(any(), anyLong());
+ verify(mockDeltaFileWriter, times(2)).writeEndOfDelta(any(), anyLong());
+ verify(mockDeltaFileWriter, times(8)).writeOptOutEntry(any(), any(), any(), anyLong());
+
+ // Assert - delta uploaded
+ verify(mockDeltaUploadService, times(2)).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+ }
+
+ // ==================== Traffic Filter (Denylist) Tests ====================
+
+ @Test
+ void testProduceBatchedDeltas_withDenylistedMessages_uploadsDroppedRequests() throws IOException {
+ // Setup - window with 10 messages, then empty
+ List messages = createParsedMessages(10);
+ long windowStart = System.currentTimeMillis() / 1000 - 600;
+
+ SqsWindowReader.WindowReadResult windowResult = SqsWindowReader.WindowReadResult.withMessages(
+ messages, windowStart, 10);
+ SqsWindowReader.WindowReadResult emptyResult = SqsWindowReader.WindowReadResult.queueEmpty(
+ Collections.emptyList(), 0, 0);
+
+ when(mockWindowReader.readWindow())
+ .thenReturn(windowResult)
+ .thenReturn(emptyResult);
+
+ // Setup - first 3 messages are denylisted
+ when(mockTrafficFilter.isDenylisted(any()))
+ .thenReturn(true, true, true, false, false, false, false, false, false, false);
+
+ // Act
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(name -> {});
+
+ // Assert - stop reason, totals
+ assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason());
+ assertEquals(1, result.getDeltasProduced());
+ assertEquals(7, result.getEntriesProcessed());
+ assertEquals(1, result.getDroppedRequestFilesProduced());
+ assertEquals(3, result.getDroppedRequestsProcessed());
+
+ // Assert - delta file constructed
+ verify(mockDeltaFileWriter, times(1)).writeStartOfDelta(any(), anyLong());
+ verify(mockDeltaFileWriter, times(1)).writeEndOfDelta(any(), anyLong());
+ verify(mockDeltaFileWriter, times(7)).writeOptOutEntry(any(), any(), any(), anyLong());
+
+ // Assert - delta & dropped requests uploaded
+ verify(mockDeltaUploadService).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+ verify(mockDroppedRequestUploadService).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+ }
+
+ @Test
+ void testProduceBatchedDeltas_allMessagesDenylisted_noDeltaUpload() throws IOException {
+ // Setup - window with 5 messages
+ List messages = createParsedMessages(5);
+ long windowStart = System.currentTimeMillis() / 1000 - 600;
+
+ SqsWindowReader.WindowReadResult windowResult = SqsWindowReader.WindowReadResult.withMessages(
+ messages, windowStart, 5);
+ SqsWindowReader.WindowReadResult emptyResult = SqsWindowReader.WindowReadResult.queueEmpty(
+ Collections.emptyList(), 0, 0);
+ when(mockWindowReader.readWindow())
+ .thenReturn(windowResult)
+ .thenReturn(emptyResult);
+
+ // Setup - all messages are denylisted
+ when(mockTrafficFilter.isDenylisted(any())).thenReturn(true);
+
+ // Act
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(name -> {});
+
+ // Assert - stop reason, totals
+ assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason());
+ assertEquals(0, result.getDeltasProduced());
+ assertEquals(0, result.getEntriesProcessed());
+ assertEquals(1, result.getDroppedRequestFilesProduced());
+ assertEquals(5, result.getDroppedRequestsProcessed());
+
+ // Assert - delta not uploaded, dropped requests uploaded
+ verify(mockDeltaUploadService, never()).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+ verify(mockDroppedRequestUploadService).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+ }
+
+ // ==================== Circuit Breaker Tests ====================
+
+ @Test
+ void testProduceBatchedDeltas_circuitBreakerTriggered_stopsAndSetsOverride() throws IOException {
+ // Setup - window with 10 messages
+ List messages = createParsedMessages(10);
+ long windowStart = System.currentTimeMillis() / 1000 - 600;
+
+ SqsWindowReader.WindowReadResult windowResult = SqsWindowReader.WindowReadResult.withMessages(
+ messages, windowStart, 10);
+
+ when(mockWindowReader.readWindow()).thenReturn(windowResult);
+
+ // Setup - no denylisted messages
+ when(mockTrafficFilter.isDenylisted(any())).thenReturn(false);
+
+ // Setup - circuit breaker triggered
+ when(mockTrafficCalculator.calculateStatus(anyList(), any(), anyInt()))
+ .thenReturn(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING);
+
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(name -> {});
+
+ // Assert - stop reason, totals
+ assertEquals(StopReason.CIRCUIT_BREAKER_TRIGGERED, result.getStopReason());
+ assertEquals(0, result.getDeltasProduced());
+ assertEquals(0, result.getEntriesProcessed());
+ assertEquals(0, result.getDroppedRequestFilesProduced());
+ assertEquals(0, result.getDroppedRequestsProcessed());
+
+ // Assert - manual override set
+ verify(mockManualOverrideService).setDelayedProcessing();
+
+ // Assert - delta not uploaded, dropped requests not uploaded
+ verify(mockDeltaUploadService, never()).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+ verify(mockDroppedRequestUploadService, never()).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+ }
+
+ // ==================== Callback Tests ====================
+
+ @Test
+ void testProduceBatchedDeltas_callbackInvokedWithDeltaName() throws IOException {
+ // Setup - window with 5 messages, then empty
+ List messages = createParsedMessages(5);
+ long windowStart = System.currentTimeMillis() / 1000 - 600;
+
+ SqsWindowReader.WindowReadResult windowResult = SqsWindowReader.WindowReadResult.withMessages(
+ messages, windowStart, 5);
+ SqsWindowReader.WindowReadResult emptyResult = SqsWindowReader.WindowReadResult.queueEmpty(
+ Collections.emptyList(), 0, 0);
+
+ when(mockWindowReader.readWindow())
+ .thenReturn(windowResult)
+ .thenReturn(emptyResult);
+
+ // Setup - no denylisted messages
+ when(mockTrafficFilter.isDenylisted(any())).thenReturn(false);
+
+ // Act
+ List producedDeltas = new ArrayList<>();
+ DeltaProductionResult result = orchestrator.produceBatchedDeltas(producedDeltas::add);
+
+ // Assert - stop reason, totals
+ assertEquals(StopReason.QUEUE_EMPTY, result.getStopReason());
+ assertEquals(1, result.getDeltasProduced());
+ assertEquals(5, result.getEntriesProcessed());
+ assertEquals(0, result.getDroppedRequestFilesProduced());
+ assertEquals(0, result.getDroppedRequestsProcessed());
+
+ // Assert - delta uploaded
+ verify(mockDeltaUploadService).uploadAndDeleteMessages(any(), anyString(), anyList(), any());
+
+ // Assert - callback invoked with delta name
+ assertEquals(1, producedDeltas.size());
+ assertTrue(producedDeltas.get(0).contains("optout-delta"));
+
+ // Assert - metrics recorded
+ verify(mockMetrics).recordDeltaProduced(5);
+ }
+
+ // ==================== Helper Methods ====================
+
+ private List createParsedMessages(int count) {
+ List messages = new ArrayList<>();
+ long baseTimestamp = System.currentTimeMillis() / 1000 - 600;
+
+ for (int i = 0; i < count; i++) {
+ Message rawMessage = createRawMessage("msg-" + i, baseTimestamp + i);
+ byte[] hashBytes = new byte[32];
+ byte[] idBytes = new byte[32];
+ Arrays.fill(hashBytes, (byte) i);
+ Arrays.fill(idBytes, (byte) (i + 1));
+
+ SqsParsedMessage parsed = new SqsParsedMessage(
+ rawMessage,
+ hashBytes,
+ idBytes,
+ baseTimestamp + i,
+ "test" + i + "@email.com", // email
+ null, // phone
+ "192.168.1." + i, // clientIp
+ "trace-" + i // traceId
+ );
+ messages.add(parsed);
+ }
+ return messages;
+ }
+
+ private Message createRawMessage(String messageId, long timestampSeconds) {
+ Map attrs = new HashMap<>();
+ attrs.put(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf(timestampSeconds * 1000));
+
+ return Message.builder()
+ .messageId(messageId)
+ .receiptHandle("receipt-" + messageId)
+ .body("{\"identity_hash\":\"test\",\"advertising_id\":\"test\"}")
+ .attributes(attrs)
+ .build();
+ }
+}
diff --git a/src/test/java/com/uid2/optout/traffic/TrafficCalculatorTest.java b/src/test/java/com/uid2/optout/traffic/TrafficCalculatorTest.java
index 4e3d346..f85523e 100644
--- a/src/test/java/com/uid2/optout/traffic/TrafficCalculatorTest.java
+++ b/src/test/java/com/uid2/optout/traffic/TrafficCalculatorTest.java
@@ -1106,7 +1106,7 @@ void testCalculateStatus_noDeltaFiles() throws Exception {
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
// Act & Assert - should throw exception when no delta files
- assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0, 0));
+ assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0));
}
@Test
@@ -1134,7 +1134,7 @@ void testCalculateStatus_normalTraffic() throws Exception {
// Act
List sqsMessages = Arrays.asList(createSqsMessage(t));
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
// Assert - 100+1 < 5 * 50 = 250, so should be DEFAULT
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1165,7 +1165,7 @@ void testCalculateStatus_delayedProcessing() throws Exception {
// Act
List sqsMessages = Arrays.asList(createSqsMessage(t));
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
// Assert - 100+1 >= 5 * 10 = 50, DELAYED_PROCESSING
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
@@ -1188,7 +1188,7 @@ void testCalculateStatus_noSqsMessages() throws Exception {
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
// Act - null SQS messages
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(null, null, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(null, null, 0);
// Assert - should still calculate based on delta files, DEFAULT
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1211,7 +1211,7 @@ void testCalculateStatus_emptySqsMessages() throws Exception {
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
// Act - empty SQS messages
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList(), null, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList(), null, 0);
// Assert - should still calculate based on delta files, DEFAULT
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1243,7 +1243,7 @@ void testCalculateStatus_multipleSqsMessages() throws Exception {
for (int i = 0; i < 30; i++) {
sqsMessages.add(createSqsMessage(t - i * 10));
}
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
// Assert - DELAYED_PROCESSING
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
@@ -1286,7 +1286,7 @@ void testCalculateStatus_withTrafficCalcConfig() throws Exception {
// Act
List sqsMessages = Arrays.asList(createSqsMessage(t));
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
// Assert - should filter out entries in traffic calc config ranges
// Only 300 from window count (not in traffic calc config ranges) + 1 SQS = 301
@@ -1312,13 +1312,13 @@ void testCalculateStatus_cacheUtilization() throws Exception {
// Act - first call should populate cache
List sqsMessages = Arrays.asList(createSqsMessage(t));
- calculator.calculateStatus(sqsMessages, null, 0, 0);
+ calculator.calculateStatus(sqsMessages, null, 0);
Map stats = calculator.getCacheStats();
int cachedFiles = (Integer) stats.get("cached_files");
// Second call should use cache (no additional S3 download)
- calculator.calculateStatus(sqsMessages, null, 0, 0);
+ calculator.calculateStatus(sqsMessages, null, 0);
Map stats2 = calculator.getCacheStats();
int cachedFiles2 = (Integer) stats2.get("cached_files");
@@ -1340,7 +1340,7 @@ void testCalculateStatus_s3Exception() throws Exception {
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
// Act & Assert - should throw exception on S3 error
- assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0, 0));
+ assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0));
}
@Test
@@ -1354,7 +1354,7 @@ void testCalculateStatus_deltaFileReadException() throws Exception {
cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH);
// Act & Assert - should throw exception on S3 download error
- assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0, 0));
+ assertThrows(RuntimeException.class, () -> calculator.calculateStatus(Collections.emptyList(), null, 0));
}
@Test
@@ -1391,7 +1391,7 @@ void testCalculateStatus_multipleDeltaFiles() throws Exception {
// Act
List sqsMessages = Arrays.asList(createSqsMessage(t));
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
// Assert - DEFAULT
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1425,7 +1425,7 @@ void testCalculateStatus_windowBoundaryTimestamp() throws Exception {
// Act
List sqsMessages = Arrays.asList(createSqsMessage(t));
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
// Assert - DEFAULT
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
@@ -1449,7 +1449,7 @@ void testCalculateStatus_timestampsCached() throws Exception {
// Act
List sqsMessages = Arrays.asList(createSqsMessage(t));
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, null, 0);
// Assert
assertEquals(TrafficCalculator.TrafficStatus.DEFAULT, status);
@@ -1493,7 +1493,7 @@ void testCalculateStatus_delayedProcessingFromQueueAttributesOnly() throws Excep
SqsMessageOperations.QueueAttributes queueAttributes =
new SqsMessageOperations.QueueAttributes(0, 600, 0);
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes, 0);
// Assert - DELAYED_PROCESSING due to high invisible message count from other consumers
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);
@@ -1533,7 +1533,7 @@ void testCalculateStatus_delayedProcessingFromBothQueueAndMessages() throws Exce
SqsMessageOperations.QueueAttributes queueAttributes =
new SqsMessageOperations.QueueAttributes(0, 450, 0);
- TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes, 0, 0);
+ TrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages, queueAttributes, 0);
// Assert - DELAYED_PROCESSING due to combined count exceeding threshold
assertEquals(TrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status);