Skip to content

Commit 3dd6205

Browse files
authored
Merge pull request #275 from IABTechLab/ian-UID2-6345-add-delta-production-orchestrator
add delta production orchestrator and tests
2 parents 4e150d0 + 6e60cb7 commit 3dd6205

File tree

5 files changed

+778
-21
lines changed

5 files changed

+778
-21
lines changed
Lines changed: 293 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,293 @@
1+
package com.uid2.optout.delta;
2+
3+
import com.uid2.optout.sqs.SqsMessageOperations;
4+
import com.uid2.optout.sqs.SqsParsedMessage;
5+
import com.uid2.optout.sqs.SqsWindowReader;
6+
import com.uid2.optout.traffic.TrafficCalculator;
7+
import com.uid2.optout.traffic.TrafficCalculator.TrafficStatus;
8+
import com.uid2.optout.traffic.TrafficFilter;
9+
import com.uid2.shared.optout.OptOutCloudSync;
10+
import com.uid2.shared.optout.OptOutUtils;
11+
import io.vertx.core.json.JsonArray;
12+
import io.vertx.core.json.JsonObject;
13+
import org.slf4j.Logger;
14+
import org.slf4j.LoggerFactory;
15+
import software.amazon.awssdk.services.sqs.SqsClient;
16+
import software.amazon.awssdk.services.sqs.model.Message;
17+
18+
import java.io.ByteArrayOutputStream;
19+
import java.io.IOException;
20+
import java.time.Instant;
21+
import java.time.temporal.ChronoUnit;
22+
import java.util.ArrayList;
23+
import java.util.List;
24+
import java.util.function.Consumer;
25+
import java.util.stream.Collectors;
26+
27+
/**
28+
* Orchestrates the delta production workflow.
29+
*
30+
* <p>This class encapsulates the core delta production logic:</p>
31+
* <ul>
32+
* <li>Reading messages from SQS in 5-minute windows</li>
33+
* <li>Filtering denylisted messages</li>
34+
* <li>Checking circuit breakers (manual override, traffic calculator)</li>
35+
* <li>Constructing delta files and dropped request files</li>
36+
* <li>Uploading to S3 and deleting processed messages</li>
37+
* </ul>
38+
*
39+
*/
40+
public class DeltaProductionOrchestrator {
41+
private static final Logger LOGGER = LoggerFactory.getLogger(DeltaProductionOrchestrator.class);
42+
43+
private final SqsClient sqsClient;
44+
private final String queueUrl;
45+
private final int replicaId;
46+
private final int deltaWindowSeconds;
47+
private final int jobTimeoutSeconds;
48+
49+
private final SqsWindowReader windowReader;
50+
private final DeltaFileWriter deltaFileWriter;
51+
private final S3UploadService deltaUploadService;
52+
private final S3UploadService droppedRequestUploadService;
53+
private final ManualOverrideService manualOverrideService;
54+
private final TrafficFilter trafficFilter;
55+
private final TrafficCalculator trafficCalculator;
56+
private final OptOutCloudSync cloudSync;
57+
private final DeltaProductionMetrics metrics;
58+
59+
public DeltaProductionOrchestrator(
60+
SqsClient sqsClient,
61+
String queueUrl,
62+
int replicaId,
63+
int deltaWindowSeconds,
64+
int jobTimeoutSeconds,
65+
SqsWindowReader windowReader,
66+
DeltaFileWriter deltaFileWriter,
67+
S3UploadService deltaUploadService,
68+
S3UploadService droppedRequestUploadService,
69+
ManualOverrideService manualOverrideService,
70+
TrafficFilter trafficFilter,
71+
TrafficCalculator trafficCalculator,
72+
OptOutCloudSync cloudSync,
73+
DeltaProductionMetrics metrics) {
74+
this.sqsClient = sqsClient;
75+
this.queueUrl = queueUrl;
76+
this.replicaId = replicaId;
77+
this.deltaWindowSeconds = deltaWindowSeconds;
78+
this.jobTimeoutSeconds = jobTimeoutSeconds;
79+
this.windowReader = windowReader;
80+
this.deltaFileWriter = deltaFileWriter;
81+
this.deltaUploadService = deltaUploadService;
82+
this.droppedRequestUploadService = droppedRequestUploadService;
83+
this.manualOverrideService = manualOverrideService;
84+
this.trafficFilter = trafficFilter;
85+
this.trafficCalculator = trafficCalculator;
86+
this.cloudSync = cloudSync;
87+
this.metrics = metrics;
88+
}
89+
90+
/**
91+
* Produces delta files from SQS queue in batched 5-minute windows.
92+
*
93+
* Continues until queue is empty, messages are too recent, circuit breaker is triggered, or job timeout is reached.
94+
*
95+
* @param onDeltaProduced Called with delta filename after each successful delta upload (for event & metrics publishing)
96+
* @return DeltaProductionResult with production statistics
97+
* @throws IOException if delta production fails
98+
*/
99+
public DeltaProductionResult produceBatchedDeltas(Consumer<String> onDeltaProduced) throws IOException {
100+
101+
// check for manual override
102+
if (manualOverrideService.isDelayedProcessing()) {
103+
LOGGER.info("manual override set to DELAYED_PROCESSING, skipping production");
104+
return DeltaProductionResult.builder().stopReason(StopReason.MANUAL_OVERRIDE_ACTIVE).build();
105+
}
106+
107+
DeltaProductionResult.Builder result = DeltaProductionResult.builder();
108+
long jobStartTime = OptOutUtils.nowEpochSeconds();
109+
110+
LOGGER.info("starting delta production from SQS queue (replicaId: {}, deltaWindowSeconds: {}, jobTimeoutSeconds: {})",
111+
this.replicaId, this.deltaWindowSeconds, this.jobTimeoutSeconds);
112+
113+
// read and process windows until done
114+
while (!isJobTimedOut(jobStartTime)) {
115+
116+
// read one complete 5-minute window
117+
SqsWindowReader.WindowReadResult windowResult = windowReader.readWindow();
118+
119+
// if no messages, we're done (queue empty or messages too recent)
120+
if (windowResult.isEmpty()) {
121+
result.stopReason(windowResult.getStopReason());
122+
LOGGER.info("delta production complete - no more eligible messages (reason: {})", windowResult.getStopReason().name());
123+
break;
124+
}
125+
126+
// process this window
127+
boolean isDelayedProcessing = processWindow(windowResult, result, onDeltaProduced);
128+
129+
// circuit breaker triggered
130+
if (isDelayedProcessing) {
131+
result.stopReason(StopReason.CIRCUIT_BREAKER_TRIGGERED);
132+
return result.build();
133+
}
134+
}
135+
136+
return result.build();
137+
}
138+
139+
/**
140+
* Processes a single 5-minute window of messages.
141+
*
142+
* @param windowResult The window data to process
143+
* @param result The builder to accumulate statistics into
144+
* @param onDeltaProduced Callback for when a delta is produced
145+
* @return true if the circuit breaker triggered
146+
*/
147+
private boolean processWindow(SqsWindowReader.WindowReadResult windowResult,
148+
DeltaProductionResult.Builder result,
149+
Consumer<String> onDeltaProduced) throws IOException {
150+
long windowStart = windowResult.getWindowStart();
151+
List<SqsParsedMessage> messages = windowResult.getMessages();
152+
153+
// check for manual override
154+
if (manualOverrideService.isDelayedProcessing()) {
155+
LOGGER.info("manual override set to DELAYED_PROCESSING, stopping production");
156+
return true;
157+
}
158+
159+
// create buffers for current window
160+
ByteArrayOutputStream deltaStream = new ByteArrayOutputStream();
161+
JsonArray droppedRequestStream = new JsonArray();
162+
163+
// get file names for current window
164+
String deltaName = OptOutUtils.newDeltaFileName(this.replicaId);
165+
String droppedRequestName = generateDroppedRequestFileName();
166+
167+
// write start of delta
168+
deltaFileWriter.writeStartOfDelta(deltaStream, windowStart);
169+
170+
// separate messages into delta entries and dropped requests
171+
List<SqsParsedMessage> deltaMessages = new ArrayList<>();
172+
List<SqsParsedMessage> droppedMessages = new ArrayList<>();
173+
174+
for (SqsParsedMessage msg : messages) {
175+
if (trafficFilter.isDenylisted(msg)) {
176+
writeDroppedRequestEntry(droppedRequestStream, msg);
177+
droppedMessages.add(msg);
178+
} else {
179+
deltaFileWriter.writeOptOutEntry(deltaStream, msg.hashBytes(), msg.idBytes(), msg.timestamp());
180+
deltaMessages.add(msg);
181+
}
182+
}
183+
184+
// check traffic calculator
185+
SqsMessageOperations.QueueAttributes queueAttributes = SqsMessageOperations.getQueueAttributes(this.sqsClient, this.queueUrl);
186+
TrafficStatus trafficStatus = this.trafficCalculator.calculateStatus(deltaMessages, queueAttributes, windowResult.getRawMessagesRead());
187+
188+
if (trafficStatus == TrafficStatus.DELAYED_PROCESSING) {
189+
LOGGER.error("circuit_breaker_triggered: traffic spike detected, stopping production and setting manual override");
190+
manualOverrideService.setDelayedProcessing();
191+
return true;
192+
}
193+
194+
// upload delta file if there are non-denylisted messages
195+
if (!deltaMessages.isEmpty()) {
196+
uploadDelta(deltaStream, deltaName, windowStart, deltaMessages, onDeltaProduced);
197+
result.incrementDeltas(deltaMessages.size());
198+
}
199+
200+
// upload dropped request file if there are denylisted messages
201+
if (!droppedMessages.isEmpty() && droppedRequestUploadService != null) {
202+
uploadDroppedRequests(droppedRequestStream, droppedRequestName, windowStart, droppedMessages);
203+
result.incrementDroppedRequests(droppedMessages.size());
204+
}
205+
206+
LOGGER.info("processed window [{}, {}]: {} entries, {} dropped requests",
207+
windowStart, windowStart + this.deltaWindowSeconds,
208+
deltaMessages.size(), droppedMessages.size());
209+
210+
return false;
211+
}
212+
213+
/**
214+
* Adds end-of-delta entry to delta stream and converts to byte array,
215+
* then uploads delta file to S3 and deletes associated messages from SQS.
216+
*/
217+
private void uploadDelta(ByteArrayOutputStream deltaStream, String deltaName,
218+
long windowStart, List<SqsParsedMessage> messages,
219+
Consumer<String> onDeltaProduced) throws IOException {
220+
// add end-of-delta entry
221+
long endTimestamp = windowStart + this.deltaWindowSeconds;
222+
deltaFileWriter.writeEndOfDelta(deltaStream, endTimestamp);
223+
224+
// convert delta stream to byte array
225+
byte[] deltaData = deltaStream.toByteArray();
226+
String s3Path = this.cloudSync.toCloudPath(deltaName);
227+
228+
// get original messages for deletion
229+
List<Message> originalMessages = messages.stream().map(SqsParsedMessage::originalMessage).collect(Collectors.toList());
230+
231+
// upload and delete
232+
deltaUploadService.uploadAndDeleteMessages(deltaData, s3Path, originalMessages, (count) -> {
233+
metrics.recordDeltaProduced(count);
234+
onDeltaProduced.accept(deltaName);
235+
});
236+
}
237+
238+
/**
239+
* Uploads dropped requests to S3 and deletes associated messages from SQS.
240+
*/
241+
private void uploadDroppedRequests(JsonArray droppedRequestStream, String droppedRequestName,
242+
long windowStart, List<SqsParsedMessage> messages) throws IOException {
243+
244+
// convert dropped request stream to byte array
245+
byte[] droppedRequestData = droppedRequestStream.encode().getBytes();
246+
247+
// get original messages for deletion
248+
List<Message> originalMessages = messages.stream().map(SqsParsedMessage::originalMessage).collect(Collectors.toList());
249+
250+
// upload and delete
251+
droppedRequestUploadService.uploadAndDeleteMessages(droppedRequestData, droppedRequestName, originalMessages,
252+
metrics::recordDroppedRequestsProduced);
253+
}
254+
255+
/**
256+
* Writes a dropped request entry to the JSON array.
257+
*/
258+
private void writeDroppedRequestEntry(JsonArray droppedRequestArray, SqsParsedMessage parsed) {
259+
String messageBody = parsed.originalMessage().body();
260+
JsonObject messageJson = new JsonObject(messageBody);
261+
droppedRequestArray.add(messageJson);
262+
}
263+
264+
/**
265+
* Generates a unique filename for dropped requests.
266+
*/
267+
private String generateDroppedRequestFileName() {
268+
return String.format("%s%03d_%s_%08x.json",
269+
"optout-dropped-",
270+
replicaId,
271+
Instant.now().truncatedTo(ChronoUnit.SECONDS).toString().replace(':', '.'),
272+
OptOutUtils.rand.nextInt());
273+
}
274+
275+
/**
276+
* Checks if the job has exceeded its timeout.
277+
*/
278+
private boolean isJobTimedOut(long jobStartTime) {
279+
long elapsedTime = OptOutUtils.nowEpochSeconds() - jobStartTime;
280+
281+
if (elapsedTime > 3600) { // 1 hour - log warning
282+
LOGGER.error("delta_job_timeout: job has been running for {} seconds", elapsedTime);
283+
}
284+
285+
if (elapsedTime > this.jobTimeoutSeconds) {
286+
LOGGER.error("delta_job_timeout: job exceeded timeout, running for {} seconds (timeout: {}s)",
287+
elapsedTime, this.jobTimeoutSeconds);
288+
return true;
289+
}
290+
return false;
291+
}
292+
}
293+

src/main/java/com/uid2/optout/delta/DeltaProductionResult.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -111,6 +111,11 @@ public Builder incrementDroppedRequests(int count) {
111111
return this;
112112
}
113113

114+
public Builder stopReason(StopReason reason) {
115+
this.stopReason = reason;
116+
return this;
117+
}
118+
114119
/**
115120
* Builds the DeltaProductionResult with the accumulated statistics.
116121
*/

src/main/java/com/uid2/optout/traffic/TrafficCalculator.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -216,7 +216,7 @@ List<List<Long>> parseAllowlistRanges(JsonObject config) throws MalformedTraffic
216216
* @param filteredAsTooRecentCount Number of messages filtered as "too recent" by window reader
217217
* @return TrafficStatus (DELAYED_PROCESSING or DEFAULT)
218218
*/
219-
public TrafficStatus calculateStatus(List<SqsParsedMessage> sqsMessages, QueueAttributes queueAttributes, int denylistedCount, int filteredAsTooRecentCount) {
219+
public TrafficStatus calculateStatus(List<SqsParsedMessage> sqsMessages, QueueAttributes queueAttributes, int rawMessagesRead) {
220220

221221
try {
222222
// get list of delta files from s3 (sorted newest to oldest)
@@ -298,15 +298,13 @@ public TrafficStatus calculateStatus(List<SqsParsedMessage> sqsMessages, QueueAt
298298

299299
// add invisible messages being processed by other consumers
300300
// (notVisible count includes our messages, so subtract what we've read to avoid double counting)
301-
// ourMessages = delta messages + denylisted messages + filtered as "too recent" messages
302301
int otherConsumersMessages = 0;
303302
if (queueAttributes != null) {
304303
int totalInvisible = queueAttributes.getApproximateNumberOfMessagesNotVisible();
305-
int ourMessages = (sqsMessages != null ? sqsMessages.size() : 0) + denylistedCount + filteredAsTooRecentCount;
306-
otherConsumersMessages = Math.max(0, totalInvisible - ourMessages);
304+
otherConsumersMessages = Math.max(0, totalInvisible - rawMessagesRead);
307305
totalRecords += otherConsumersMessages;
308306
LOGGER.info("traffic calculation: adding {} invisible messages from other consumers (totalInvisible={}, ourMessages={})",
309-
otherConsumersMessages, totalInvisible, ourMessages);
307+
otherConsumersMessages, totalInvisible, rawMessagesRead);
310308
}
311309

312310
// determine status

0 commit comments

Comments
 (0)