Skip to content

Commit 1c1db04

Browse files
committed
add delta production orchestrator and tests
1 parent 4e150d0 commit 1c1db04

File tree

3 files changed

+753
-0
lines changed

3 files changed

+753
-0
lines changed
Lines changed: 287 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,287 @@
1+
package com.uid2.optout.delta;
2+
3+
import com.uid2.optout.sqs.SqsMessageOperations;
4+
import com.uid2.optout.sqs.SqsParsedMessage;
5+
import com.uid2.optout.sqs.SqsWindowReader;
6+
import com.uid2.optout.traffic.TrafficCalculator;
7+
import com.uid2.optout.traffic.TrafficCalculator.TrafficStatus;
8+
import com.uid2.optout.traffic.TrafficFilter;
9+
import com.uid2.shared.optout.OptOutCloudSync;
10+
import com.uid2.shared.optout.OptOutUtils;
11+
import io.vertx.core.json.JsonArray;
12+
import io.vertx.core.json.JsonObject;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
import software.amazon.awssdk.services.sqs.SqsClient;
16+
import software.amazon.awssdk.services.sqs.model.Message;
17+
18+
import java.io.ByteArrayOutputStream;
19+
import java.io.IOException;
20+
import java.time.Instant;
21+
import java.time.temporal.ChronoUnit;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.function.Consumer;
25+
import java.util.stream.Collectors;
26+
27+
/**
28+
* Orchestrates the delta production workflow.
29+
*
30+
* <p>This class encapsulates the core delta production logic:</p>
31+
* <ul>
32+
* <li>Reading messages from SQS in 5-minute windows</li>
33+
* <li>Filtering denylisted messages</li>
34+
* <li>Checking circuit breakers (manual override, traffic calculator)</li>
35+
* <li>Constructing delta files and dropped request files</li>
36+
* <li>Uploading to S3 and deleting processed messages</li>
37+
* </ul>
38+
*
39+
*/
40+
public class DeltaProductionOrchestrator {
41+
private static final Logger LOGGER = LoggerFactory.getLogger(DeltaProductionOrchestrator.class);
42+
43+
private final SqsClient sqsClient;
44+
private final String queueUrl;
45+
private final int replicaId;
46+
private final int deltaWindowSeconds;
47+
private final int jobTimeoutSeconds;
48+
49+
private final SqsWindowReader windowReader;
50+
private final DeltaFileWriter deltaFileWriter;
51+
private final S3UploadService deltaUploadService;
52+
private final S3UploadService droppedRequestUploadService;
53+
private final ManualOverrideService manualOverrideService;
54+
private final TrafficFilter trafficFilter;
55+
private final TrafficCalculator trafficCalculator;
56+
private final OptOutCloudSync cloudSync;
57+
private final DeltaProductionMetrics metrics;
58+
59+
public DeltaProductionOrchestrator(
60+
SqsClient sqsClient,
61+
String queueUrl,
62+
int replicaId,
63+
int deltaWindowSeconds,
64+
int jobTimeoutSeconds,
65+
SqsWindowReader windowReader,
66+
DeltaFileWriter deltaFileWriter,
67+
S3UploadService deltaUploadService,
68+
S3UploadService droppedRequestUploadService,
69+
ManualOverrideService manualOverrideService,
70+
TrafficFilter trafficFilter,
71+
TrafficCalculator trafficCalculator,
72+
OptOutCloudSync cloudSync,
73+
DeltaProductionMetrics metrics) {
74+
this.sqsClient = sqsClient;
75+
this.queueUrl = queueUrl;
76+
this.replicaId = replicaId;
77+
this.deltaWindowSeconds = deltaWindowSeconds;
78+
this.jobTimeoutSeconds = jobTimeoutSeconds;
79+
this.windowReader = windowReader;
80+
this.deltaFileWriter = deltaFileWriter;
81+
this.deltaUploadService = deltaUploadService;
82+
this.droppedRequestUploadService = droppedRequestUploadService;
83+
this.manualOverrideService = manualOverrideService;
84+
this.trafficFilter = trafficFilter;
85+
this.trafficCalculator = trafficCalculator;
86+
this.cloudSync = cloudSync;
87+
this.metrics = metrics;
88+
}
89+
90+
/**
91+
* Produces delta files from SQS queue in batched 5-minute windows.
92+
*
93+
* Continues until queue is empty, messages are too recent, circuit breaker is triggered, or job timeout is reached.
94+
*
95+
* @param onDeltaProduced Called with delta filename after each successful delta upload (for event & metrics publishing)
96+
* @return DeltaProductionResult with production statistics
97+
* @throws IOException if delta production fails
98+
*/
99+
public DeltaProductionResult produceBatchedDeltas(Consumer<String> onDeltaProduced) throws IOException {
100+
101+
// check for manual override
102+
if (manualOverrideService.isDelayedProcessing()) {
103+
LOGGER.info("manual override set to DELAYED_PROCESSING, skipping production");
104+
return DeltaProductionResult.builder().stopReason(StopReason.MANUAL_OVERRIDE_ACTIVE).build();
105+
}
106+
107+
DeltaProductionResult.Builder result = DeltaProductionResult.builder();
108+
long jobStartTime = OptOutUtils.nowEpochSeconds();
109+
110+
LOGGER.info("starting delta production from SQS queue (replicaId: {}, deltaWindowSeconds: {}, jobTimeoutSeconds: {})",
111+
this.replicaId, this.deltaWindowSeconds, this.jobTimeoutSeconds);
112+
113+
// read and process windows until done
114+
while (!isJobTimedOut(jobStartTime)) {
115+
116+
// read one complete 5-minute window
117+
SqsWindowReader.WindowReadResult windowResult = windowReader.readWindow();
118+
119+
// if no messages, we're done (queue empty or messages too recent)
120+
if (windowResult.isEmpty()) {
121+
result.stopReason(windowResult.getStopReason());
122+
LOGGER.info("delta production complete - no more eligible messages (reason: {})", windowResult.getStopReason().name());
123+
break;
124+
}
125+
126+
// process this window
127+
boolean isDelayedProcessing = processWindow(windowResult, result, onDeltaProduced);
128+
129+
// circuit breaker triggered
130+
if (isDelayedProcessing) {
131+
result.stopReason(StopReason.CIRCUIT_BREAKER_TRIGGERED);
132+
return result.build();
133+
}
134+
}
135+
136+
return result.build();
137+
}
138+
139+
/**
140+
* Processes a single 5-minute window of messages.
141+
*
142+
* @param windowResult The window data to process
143+
* @param result The builder to accumulate statistics into
144+
* @param onDeltaProduced Callback for when a delta is produced
145+
* @return true if the circuit breaker triggered
146+
*/
147+
private boolean processWindow(SqsWindowReader.WindowReadResult windowResult,
148+
DeltaProductionResult.Builder result,
149+
Consumer<String> onDeltaProduced) throws IOException {
150+
long windowStart = windowResult.getWindowStart();
151+
List<SqsParsedMessage> messages = windowResult.getMessages();
152+
153+
// check for manual override
154+
if (manualOverrideService.isDelayedProcessing()) {
155+
LOGGER.info("manual override set to DELAYED_PROCESSING, stopping production");
156+
return true;
157+
}
158+
159+
// create buffers for current window
160+
ByteArrayOutputStream deltaStream = new ByteArrayOutputStream();
161+
JsonArray droppedRequestStream = new JsonArray();
162+
163+
// get file names for current window
164+
String deltaName = OptOutUtils.newDeltaFileName(this.replicaId);
165+
String droppedRequestName = generateDroppedRequestFileName();
166+
167+
// write start of delta
168+
deltaFileWriter.writeStartOfDelta(deltaStream, windowStart);
169+
170+
// separate messages into delta entries and dropped requests
171+
List<SqsParsedMessage> deltaMessages = new ArrayList<>();
172+
List<SqsParsedMessage> droppedMessages = new ArrayList<>();
173+
174+
for (SqsParsedMessage msg : messages) {
175+
if (trafficFilter.isDenylisted(msg)) {
176+
writeDroppedRequestEntry(droppedRequestStream, msg);
177+
droppedMessages.add(msg);
178+
} else {
179+
deltaFileWriter.writeOptOutEntry(deltaStream, msg.hashBytes(), msg.idBytes(), msg.timestamp());
180+
deltaMessages.add(msg);
181+
}
182+
}
183+
184+
// check traffic calculator - pass counts for accurate invisible message deduplication
185+
int filteredAsTooRecentCount = windowResult.getRawMessagesRead() - messages.size();
186+
SqsMessageOperations.QueueAttributes queueAttributes = SqsMessageOperations.getQueueAttributes(this.sqsClient, this.queueUrl);
187+
TrafficStatus trafficStatus = this.trafficCalculator.calculateStatus(deltaMessages, queueAttributes, droppedMessages.size(), filteredAsTooRecentCount);
188+
189+
if (trafficStatus == TrafficStatus.DELAYED_PROCESSING) {
190+
LOGGER.error("circuit_breaker_triggered: traffic spike detected, stopping production and setting manual override");
191+
manualOverrideService.setDelayedProcessing();
192+
return true;
193+
}
194+
195+
// upload delta file if there are non-denylisted messages
196+
if (!deltaMessages.isEmpty()) {
197+
uploadDelta(deltaStream, deltaName, windowStart, deltaMessages, onDeltaProduced);
198+
result.incrementDeltas(deltaMessages.size());
199+
}
200+
201+
// upload dropped request file if there are denylisted messages
202+
if (!droppedMessages.isEmpty() && droppedRequestUploadService != null) {
203+
uploadDroppedRequests(droppedRequestStream, droppedRequestName, windowStart, droppedMessages);
204+
result.incrementDroppedRequests(droppedMessages.size());
205+
}
206+
207+
LOGGER.info("processed window [{}, {}]: {} entries, {} dropped requests",
208+
windowStart, windowStart + this.deltaWindowSeconds,
209+
deltaMessages.size(), droppedMessages.size());
210+
211+
return false;
212+
}
213+
214+
private void uploadDelta(ByteArrayOutputStream deltaStream, String deltaName,
215+
long windowStart, List<SqsParsedMessage> messages,
216+
Consumer<String> onDeltaProduced) throws IOException {
217+
// add end-of-delta entry
218+
long endTimestamp = windowStart + this.deltaWindowSeconds;
219+
deltaFileWriter.writeEndOfDelta(deltaStream, endTimestamp);
220+
221+
// convert delta stream to byte array
222+
byte[] deltaData = deltaStream.toByteArray();
223+
String s3Path = this.cloudSync.toCloudPath(deltaName);
224+
225+
// get original messages for deletion
226+
List<Message> originalMessages = messages.stream().map(SqsParsedMessage::originalMessage).collect(Collectors.toList());
227+
228+
// upload and delete
229+
deltaUploadService.uploadAndDeleteMessages(deltaData, s3Path, originalMessages, (count) -> {
230+
metrics.recordDeltaProduced(count);
231+
onDeltaProduced.accept(deltaName);
232+
});
233+
}
234+
235+
private void uploadDroppedRequests(JsonArray droppedRequestStream, String droppedRequestName,
236+
long windowStart, List<SqsParsedMessage> messages) throws IOException {
237+
238+
// convert dropped request stream to byte array
239+
byte[] droppedRequestData = droppedRequestStream.encode().getBytes();
240+
241+
// get original messages for deletion
242+
List<Message> originalMessages = messages.stream().map(SqsParsedMessage::originalMessage).collect(Collectors.toList());
243+
244+
// upload and delete
245+
droppedRequestUploadService.uploadAndDeleteMessages(droppedRequestData, droppedRequestName, originalMessages,
246+
metrics::recordDroppedRequestsProduced);
247+
}
248+
249+
/**
250+
* Writes a dropped request entry to the JSON array.
251+
*/
252+
private void writeDroppedRequestEntry(JsonArray droppedRequestArray, SqsParsedMessage parsed) {
253+
String messageBody = parsed.originalMessage().body();
254+
JsonObject messageJson = new JsonObject(messageBody);
255+
droppedRequestArray.add(messageJson);
256+
}
257+
258+
/**
259+
* Generates a unique filename for dropped requests.
260+
*/
261+
private String generateDroppedRequestFileName() {
262+
return String.format("%s%03d_%s_%08x.json",
263+
"optout-dropped-",
264+
replicaId,
265+
Instant.now().truncatedTo(ChronoUnit.SECONDS).toString().replace(':', '.'),
266+
OptOutUtils.rand.nextInt());
267+
}
268+
269+
/**
270+
* Checks if the job has exceeded its timeout.
271+
*/
272+
private boolean isJobTimedOut(long jobStartTime) {
273+
long elapsedTime = OptOutUtils.nowEpochSeconds() - jobStartTime;
274+
275+
if (elapsedTime > 3600) { // 1 hour - log warning
276+
LOGGER.error("delta_job_timeout: job has been running for {} seconds", elapsedTime);
277+
}
278+
279+
if (elapsedTime > this.jobTimeoutSeconds) {
280+
LOGGER.error("delta_job_timeout: job exceeded timeout, running for {} seconds (timeout: {}s)",
281+
elapsedTime, this.jobTimeoutSeconds);
282+
return true;
283+
}
284+
return false;
285+
}
286+
}
287+

src/main/java/com/uid2/optout/delta/DeltaProductionResult.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ public Builder incrementDroppedRequests(int count) {
111111
return this;
112112
}
113113

114+
public Builder stopReason(StopReason reason) {
115+
this.stopReason = reason;
116+
return this;
117+
}
118+
114119
/**
115120
* Builds the DeltaProductionResult with the accumulated statistics.
116121
*/

0 commit comments

Comments
 (0)