Skip to content

Commit 4360068

Browse files
authored
Merge pull request #252 from IABTechLab/ian-UID2-6337-asynchronous-full-queue-process
process async with polling, no message limit
2 parents cabdce5 + 4388f1b commit 4360068

15 files changed

+3794
-326
lines changed

src/main/java/com/uid2/optout/Const.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,16 @@ public static class Config extends com.uid2.shared.Const.Config {
2222
public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // Default: "sqs-delta" - folder within same S3 bucket as regular optout
2323
public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll";
2424
public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout";
25+
public static final String OptOutDeltaJobTimeoutSecondsProp = "optout_delta_job_timeout_seconds";
26+
public static final String OptOutS3BucketDroppedRequestsProp = "optout_s3_bucket_dropped_requests";
27+
public static final String OptOutMaxMessagesPerFileProp = "optout_max_messages_per_file";
28+
public static final String TrafficFilterConfigPathProp = "traffic_filter_config_path";
29+
public static final String TrafficCalcConfigPathProp = "traffic_calc_config_path";
30+
public static final String ManualOverrideS3PathProp = "manual_override_s3_path";
31+
public static final String OptOutTrafficCalcBaselineTrafficProp = "traffic_calc_baseline_traffic";
32+
public static final String OptOutTrafficCalcThresholdMultiplierProp = "traffic_calc_threshold_multiplier";
33+
public static final String OptOutTrafficCalcEvaluationWindowSecondsProp = "traffic_calc_evaluation_window_seconds";
34+
public static final String OptOutTrafficCalcAllowlistRangesProp = "traffic_calc_allowlist_ranges";
2535
}
2636

2737
public static class Event {
Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package com.uid2.optout.vertx;
2+
3+
import io.vertx.core.json.JsonObject;
4+
import java.time.Instant;
5+
6+
/**
7+
* Represents the status and result of an async delta production job on a pod.
8+
*
9+
* This class tracks the lifecycle of a delta production job including its state
10+
* (running, completed, failed), timing information, and result or error details.
11+
*
12+
*/
13+
public class DeltaProduceJobStatus {
14+
private final Instant startTime;
15+
private volatile JobState state;
16+
private volatile JsonObject result;
17+
private volatile String errorMessage;
18+
private volatile Instant endTime;
19+
20+
public enum JobState {
21+
RUNNING,
22+
COMPLETED,
23+
FAILED
24+
}
25+
26+
public DeltaProduceJobStatus() {
27+
this.startTime = Instant.now();
28+
this.state = JobState.RUNNING;
29+
}
30+
31+
/**
32+
* Mark the job as completed with the given result.
33+
* @param result The result details as a JsonObject
34+
*/
35+
public void complete(JsonObject result) {
36+
this.result = result;
37+
this.state = JobState.COMPLETED;
38+
this.endTime = Instant.now();
39+
}
40+
41+
/**
42+
* Mark the job as failed with the given error message.
43+
* @param errorMessage Description of the failure
44+
*/
45+
public void fail(String errorMessage) {
46+
this.errorMessage = errorMessage;
47+
this.state = JobState.FAILED;
48+
this.endTime = Instant.now();
49+
}
50+
51+
/**
52+
* Get the current state of the job.
53+
* @return The job state
54+
*/
55+
public JobState getState() {
56+
return state;
57+
}
58+
59+
/**
60+
* Convert the job status to a JSON representation for API responses.
61+
* @return JsonObject with state, timing, and result/error information
62+
*/
63+
public JsonObject toJson() {
64+
JsonObject json = new JsonObject()
65+
.put("state", state.name().toLowerCase())
66+
.put("start_time", startTime.toString());
67+
68+
if (endTime != null) {
69+
json.put("end_time", endTime.toString());
70+
long durationSeconds = endTime.getEpochSecond() - startTime.getEpochSecond();
71+
json.put("duration_seconds", durationSeconds);
72+
}
73+
74+
if (state == JobState.COMPLETED && result != null) {
75+
json.put("result", result);
76+
}
77+
78+
if (state == JobState.FAILED && errorMessage != null) {
79+
json.put("error", errorMessage);
80+
}
81+
82+
return json;
83+
}
84+
}
85+

src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,17 @@ public class DeltaProductionResult {
77
private final int deltasProduced;
88
private final int entriesProcessed;
99

10-
public DeltaProductionResult(int deltasProduced, int entriesProcessed) {
10+
/*
11+
* indicates that there are still messages in the queue, however,
12+
* not enough time has elapsed to produce a delta file.
13+
* We produce in batches of (5 minutes)
14+
*/
15+
private final boolean stoppedDueToMessagesTooRecent;
16+
17+
public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToMessagesTooRecent) {
1118
this.deltasProduced = deltasProduced;
1219
this.entriesProcessed = entriesProcessed;
20+
this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent;
1321
}
1422

1523
public int getDeltasProduced() {
@@ -19,5 +27,9 @@ public int getDeltasProduced() {
1927
public int getEntriesProcessed() {
2028
return entriesProcessed;
2129
}
30+
31+
public boolean stoppedDueToMessagesTooRecent() {
32+
return stoppedDueToMessagesTooRecent;
33+
}
2234
}
2335

0 commit comments

Comments
 (0)