Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
8391dd1
add traffic calculator
Ian-Nara Nov 12, 2025
2f426ad
update from review
Ian-Nara Nov 13, 2025
7d18fbd
add unit tests
Ian-Nara Nov 14, 2025
726feda
allow custom eval window
Ian-Nara Nov 14, 2025
1d8f050
update comment
Ian-Nara Nov 14, 2025
4bc5e45
switch to configmap for traffic config
Ian-Nara Nov 19, 2025
72a99d0
update to all k8s
Ian-Nara Nov 19, 2025
027f576
update config validations
Ian-Nara Nov 20, 2025
58bd298
small rename
Ian-Nara Nov 21, 2025
b3cbdfd
test fix
Ian-Nara Nov 21, 2025
a802e31
undo accidental change
Ian-Nara Nov 21, 2025
e26a64f
whitespace
Ian-Nara Nov 21, 2025
fd13b69
whitespace
Ian-Nara Nov 21, 2025
14a6c1f
update traffic baseline to hardcoded
Ian-Nara Nov 25, 2025
843f41f
naming improvements
Ian-Nara Nov 25, 2025
1477e2f
naming improvements
Ian-Nara Nov 25, 2025
d75bc0d
naming improvements
Ian-Nara Nov 26, 2025
d7db764
small comment/name update
Ian-Nara Nov 26, 2025
a178540
naming update
Ian-Nara Nov 26, 2025
df7ab91
naming update
Ian-Nara Nov 26, 2025
ac2a613
process async with polling, no message limit
Ian-Nara Dec 2, 2025
38c86b3
add newest delta file logic
Ian-Nara Dec 2, 2025
160c5de
Merge branch 'ian-UID2-6337-asynchronous-full-queue-process' into ian…
Ian-Nara Dec 2, 2025
962b7cc
update comments
Ian-Nara Dec 2, 2025
16dfe53
add traffic filter
Ian-Nara Dec 2, 2025
21bfda3
merge in some message info stuff
Ian-Nara Dec 2, 2025
b520a7a
Merge branch 'ian-UID2-6337-asynchronous-full-queue-process' into ian…
Ian-Nara Dec 2, 2025
51d4a87
config names
Ian-Nara Dec 2, 2025
b7116f9
Merge branch 'ian-UID2-6337-asynchronous-full-queue-process' into ian…
Ian-Nara Dec 2, 2025
6f4cf76
merge
Ian-Nara Dec 2, 2025
458d70a
Merge branch 'ian-UID2-6146-update-traffic-calculator-to-hardcoded-ba…
Ian-Nara Dec 2, 2025
127703a
limit messages per delta file for memory protection
Ian-Nara Dec 2, 2025
2873ea2
imrpove naminig / comment
Ian-Nara Dec 2, 2025
f6b7f46
update traffic calculator
Ian-Nara Dec 4, 2025
a93b051
address comments
Ian-Nara Dec 4, 2025
019f426
undo accidental commit
Ian-Nara Dec 4, 2025
0265ae2
Merge branch 'ian-UID2-6337-asynchronous-full-queue-process' into ian…
Ian-Nara Dec 4, 2025
f671dd4
Merge branch 'ian-UID2-6146-update-traffic-calculator-to-hardcoded-ba…
Ian-Nara Dec 4, 2025
47f63b0
Merge branch 'main' into ian-UID2-6337-asynchronous-full-queue-process
Ian-Nara Dec 5, 2025
213eb5f
Merge branch 'ian-UID2-6337-asynchronous-full-queue-process' into ian…
Ian-Nara Dec 5, 2025
64f1e61
Merge branch 'ian-UID2-6146-update-traffic-calculator-to-hardcoded-ba…
Ian-Nara Dec 5, 2025
8531ec5
Merge pull request #253 from IABTechLab/ian-UID2-6151-add-traffic-fil…
Ian-Nara Dec 6, 2025
4388f1b
Merge pull request #251 from IABTechLab/ian-UID2-6146-update-traffic-…
Ian-Nara Dec 6, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/main/java/com/uid2/optout/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,16 @@ public static class Config extends com.uid2.shared.Const.Config {
public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // Default: "sqs-delta" - folder within same S3 bucket as regular optout
public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll";
public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout";
public static final String OptOutDeltaJobTimeoutSecondsProp = "optout_delta_job_timeout_seconds";
public static final String OptOutS3BucketDroppedRequestsProp = "optout_s3_bucket_dropped_requests";
public static final String OptOutMaxMessagesPerFileProp = "optout_max_messages_per_file";
public static final String TrafficFilterConfigPathProp = "traffic_filter_config_path";
public static final String TrafficCalcConfigPathProp = "traffic_calc_config_path";
public static final String ManualOverrideS3PathProp = "manual_override_s3_path";
public static final String OptOutTrafficCalcBaselineTrafficProp = "traffic_calc_baseline_traffic";
public static final String OptOutTrafficCalcThresholdMultiplierProp = "traffic_calc_threshold_multiplier";
public static final String OptOutTrafficCalcEvaluationWindowSecondsProp = "traffic_calc_evaluation_window_seconds";
public static final String OptOutTrafficCalcAllowlistRangesProp = "traffic_calc_allowlist_ranges";
}

public static class Event {
Expand Down
85 changes: 85 additions & 0 deletions src/main/java/com/uid2/optout/vertx/DeltaProduceJobStatus.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
package com.uid2.optout.vertx;

import io.vertx.core.json.JsonObject;
import java.time.Instant;

/**
* Represents the status and result of an async delta production job on a pod.
*
* This class tracks the lifecycle of a delta production job including its state
* (running, completed, failed), timing information, and result or error details.
*
*/
public class DeltaProduceJobStatus {
private final Instant startTime;
private volatile JobState state;
private volatile JsonObject result;
private volatile String errorMessage;
private volatile Instant endTime;
Comment on lines +15 to +18
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why volatile?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

State is read from the event loop thread and is written from the (single possible) worker thread caused by execute blocking.

The event loop just reads the status occasionally; the worker thread can write to it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

job.encode() to send the response is in the status handler, and job.complete(result) is in the blocking, worker thread code.


public enum JobState {
RUNNING,
COMPLETED,
FAILED
}

public DeltaProduceJobStatus() {
this.startTime = Instant.now();
this.state = JobState.RUNNING;
}

/**
* Mark the job as completed with the given result.
* @param result The result details as a JsonObject
*/
public void complete(JsonObject result) {
this.result = result;
this.state = JobState.COMPLETED;
this.endTime = Instant.now();
}

/**
* Mark the job as failed with the given error message.
* @param errorMessage Description of the failure
*/
public void fail(String errorMessage) {
this.errorMessage = errorMessage;
this.state = JobState.FAILED;
this.endTime = Instant.now();
}

/**
* Get the current state of the job.
* @return The job state
*/
public JobState getState() {
return state;
}

/**
* Convert the job status to a JSON representation for API responses.
* @return JsonObject with state, timing, and result/error information
*/
public JsonObject toJson() {
JsonObject json = new JsonObject()
.put("state", state.name().toLowerCase())
.put("start_time", startTime.toString());

if (endTime != null) {
json.put("end_time", endTime.toString());
long durationSeconds = endTime.getEpochSecond() - startTime.getEpochSecond();
json.put("duration_seconds", durationSeconds);
}

if (state == JobState.COMPLETED && result != null) {
json.put("result", result);
}

if (state == JobState.FAILED && errorMessage != null) {
json.put("error", errorMessage);
}

return json;
}
}

14 changes: 13 additions & 1 deletion src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,17 @@ public class DeltaProductionResult {
private final int deltasProduced;
private final int entriesProcessed;

public DeltaProductionResult(int deltasProduced, int entriesProcessed) {
/*
* indicates that there are still messages in the queue, however,
* not enough time has elapsed to produce a delta file.
* We produce in batches of (5 minutes)
*/
private final boolean stoppedDueToMessagesTooRecent;

public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToMessagesTooRecent) {
this.deltasProduced = deltasProduced;
this.entriesProcessed = entriesProcessed;
this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent;
}

public int getDeltasProduced() {
Expand All @@ -19,5 +27,9 @@ public int getDeltasProduced() {
public int getEntriesProcessed() {
return entriesProcessed;
}

public boolean stoppedDueToMessagesTooRecent() {
return stoppedDueToMessagesTooRecent;
}
}

Loading
Loading