diff --git a/src/main/java/com/uid2/optout/Const.java b/src/main/java/com/uid2/optout/Const.java index fc39c2f1..a4182507 100644 --- a/src/main/java/com/uid2/optout/Const.java +++ b/src/main/java/com/uid2/optout/Const.java @@ -22,6 +22,16 @@ public static class Config extends com.uid2.shared.Const.Config { public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // Default: "sqs-delta" - folder within same S3 bucket as regular optout public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll"; public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout"; + public static final String OptOutDeltaJobTimeoutSecondsProp = "optout_delta_job_timeout_seconds"; + public static final String OptOutS3BucketDroppedRequestsProp = "optout_s3_bucket_dropped_requests"; + public static final String OptOutMaxMessagesPerFileProp = "optout_max_messages_per_file"; + public static final String TrafficFilterConfigPathProp = "traffic_filter_config_path"; + public static final String TrafficCalcConfigPathProp = "traffic_calc_config_path"; + public static final String ManualOverrideS3PathProp = "manual_override_s3_path"; + public static final String OptOutTrafficCalcBaselineTrafficProp = "traffic_calc_baseline_traffic"; + public static final String OptOutTrafficCalcThresholdMultiplierProp = "traffic_calc_threshold_multiplier"; + public static final String OptOutTrafficCalcEvaluationWindowSecondsProp = "traffic_calc_evaluation_window_seconds"; + public static final String OptOutTrafficCalcAllowlistRangesProp = "traffic_calc_allowlist_ranges"; } public static class Event { diff --git a/src/main/java/com/uid2/optout/vertx/DeltaProduceJobStatus.java b/src/main/java/com/uid2/optout/vertx/DeltaProduceJobStatus.java new file mode 100644 index 00000000..e503e200 --- /dev/null +++ b/src/main/java/com/uid2/optout/vertx/DeltaProduceJobStatus.java @@ -0,0 +1,85 @@ +package com.uid2.optout.vertx; + +import io.vertx.core.json.JsonObject; +import java.time.Instant; + +/** + * Represents the status and result of an async delta production job on a pod. + * + * This class tracks the lifecycle of a delta production job including its state + * (running, completed, failed), timing information, and result or error details. + * + */ +public class DeltaProduceJobStatus { + private final Instant startTime; + private volatile JobState state; + private volatile JsonObject result; + private volatile String errorMessage; + private volatile Instant endTime; + + public enum JobState { + RUNNING, + COMPLETED, + FAILED + } + + public DeltaProduceJobStatus() { + this.startTime = Instant.now(); + this.state = JobState.RUNNING; + } + + /** + * Mark the job as completed with the given result. + * @param result The result details as a JsonObject + */ + public void complete(JsonObject result) { + this.result = result; + this.state = JobState.COMPLETED; + this.endTime = Instant.now(); + } + + /** + * Mark the job as failed with the given error message. + * @param errorMessage Description of the failure + */ + public void fail(String errorMessage) { + this.errorMessage = errorMessage; + this.state = JobState.FAILED; + this.endTime = Instant.now(); + } + + /** + * Get the current state of the job. + * @return The job state + */ + public JobState getState() { + return state; + } + + /** + * Convert the job status to a JSON representation for API responses. + * @return JsonObject with state, timing, and result/error information + */ + public JsonObject toJson() { + JsonObject json = new JsonObject() + .put("state", state.name().toLowerCase()) + .put("start_time", startTime.toString()); + + if (endTime != null) { + json.put("end_time", endTime.toString()); + long durationSeconds = endTime.getEpochSecond() - startTime.getEpochSecond(); + json.put("duration_seconds", durationSeconds); + } + + if (state == JobState.COMPLETED && result != null) { + json.put("result", result); + } + + if (state == JobState.FAILED && errorMessage != null) { + json.put("error", errorMessage); + } + + return json; + } +} + diff --git a/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java b/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java index 54a70117..7f2eb8fb 100644 --- a/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java +++ b/src/main/java/com/uid2/optout/vertx/DeltaProductionResult.java @@ -7,9 +7,17 @@ public class DeltaProductionResult { private final int deltasProduced; private final int entriesProcessed; - public DeltaProductionResult(int deltasProduced, int entriesProcessed) { + /* + * indicates that there are still messages in the queue, however, + * not enough time has elapsed to produce a delta file. + * We produce in batches of (5 minutes) + */ + private final boolean stoppedDueToMessagesTooRecent; + + public DeltaProductionResult(int deltasProduced, int entriesProcessed, boolean stoppedDueToMessagesTooRecent) { this.deltasProduced = deltasProduced; this.entriesProcessed = entriesProcessed; + this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent; } public int getDeltasProduced() { @@ -19,5 +27,9 @@ public int getDeltasProduced() { public int getEntriesProcessed() { return entriesProcessed; } + + public boolean stoppedDueToMessagesTooRecent() { + return stoppedDueToMessagesTooRecent; + } } diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index bc238fc0..910a2c61 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -26,7 +26,51 @@ import java.nio.ByteOrder; import java.util.ArrayList; import java.util.List; - +import java.util.concurrent.atomic.AtomicReference; + +/** + * SQS-based opt-out log producer that creates delta files asynchronously. + * + *

Async Job Processing

+ * The /optout/deltaproduce endpoint starts jobs asynchronously and returns immediately with HTTP 202. + * Jobs run on worker threads via {@link Vertx#executeBlocking(java.util.concurrent.Callable)} and can + * take some time to complete. Clients should poll GET /optout/deltaproduce/status to check progress. + * + *

Mutual Exclusion (Per-Pod)

+ * Each pod prevents concurrent jobs using {@link AtomicReference#compareAndSet}. + * Only ONE job can run per pod at a time. Attempting to start a second job returns HTTP 409 Conflict. + * + *

Job Lifecycle and Auto-Clearing

+ * Completed or failed jobs are automatically cleared when a new POST request is made. + * + * + *

Kubernetes Deployment with Session Affinity

+ * In K8s with multiple pods, each pod maintains its own independent job state. To ensure requests + * from the same client (e.g., a cronjob) consistently hit the same pod for job creation and status + * polling, the Service must be configured with session affinity: + *
+ * sessionAffinity: ClientIP
+ * sessionAffinityConfig:
+ *   clientIP:
+ *     timeoutSeconds: 10800  # 3 hours, adjust based on job duration
+ * 
+ * + *

This ensures all requests from the same source IP are routed to the same pod, allowing proper + * job lifecycle management (POST to start, GET to poll)

+ * + *

Note: SQS visibility timeout provides natural coordination across pods, + * limiting duplicate message processing even if multiple pods run jobs concurrently.

+ * + *

API Endpoints

+ * + */ public class OptOutSqsLogProducer extends AbstractVerticle { private static final Logger LOGGER = LoggerFactory.getLogger(OptOutSqsLogProducer.class); private final HealthComponent healthComponent = HealthManager.instance.registerComponent("sqs-log-producer"); @@ -40,6 +84,8 @@ public class OptOutSqsLogProducer extends AbstractVerticle { private final int maxMessagesPerPoll; private final int visibilityTimeout; private final int deltaWindowSeconds; // Time window for each delta file (5 minutes = 300 seconds) + private final int jobTimeoutSeconds; + private final int maxMessagesPerFile; // Memory protection: max messages per delta file private final int listenPort; private final String internalApiKey; private final InternalAuthMiddleware internalAuth; @@ -56,6 +102,12 @@ public class OptOutSqsLogProducer extends AbstractVerticle { private ByteBuffer buffer; private boolean shutdownInProgress = false; + + //Tracks the current delta production job status for this pod. + private final AtomicReference currentJob = new AtomicReference<>(null); + + // Helper for reading complete 5-minute windows from SQS + private final SqsWindowReader windowReader; public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync) throws IOException { this(jsonConfig, cloudStorage, cloudSync, Const.Event.DeltaProduce); @@ -86,6 +138,8 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O this.maxMessagesPerPoll = 10; // SQS max is 10 this.visibilityTimeout = jsonConfig.getInteger(Const.Config.OptOutSqsVisibilityTimeoutProp, 240); // 4 minutes default this.deltaWindowSeconds = 300; // Fixed 5 minutes for all deltas + this.jobTimeoutSeconds = jsonConfig.getInteger(Const.Config.OptOutDeltaJobTimeoutSecondsProp, 10800); // 3 hours default + this.maxMessagesPerFile = jsonConfig.getInteger(Const.Config.OptOutMaxMessagesPerFileProp, 10000); // Memory protection limit // HTTP server configuration - use port offset + 1 to avoid conflicts this.listenPort = Const.Port.ServicePortForOptOut + Utils.getPortOffset() + 1; @@ -96,6 +150,13 @@ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, O int bufferSize = jsonConfig.getInteger(Const.Config.OptOutProducerBufferSizeProp); this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); + + // Initialize window reader with memory protection limit + this.windowReader = new SqsWindowReader( + this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, + this.visibilityTimeout, this.deltaWindowSeconds, this.maxMessagesPerFile + ); + LOGGER.info("OptOutSqsLogProducer initialized with maxMessagesPerFile: {}", this.maxMessagesPerFile); } @Override @@ -125,41 +186,6 @@ public void start(Promise startPromise) { } } - private Router createRouter() { - Router router = Router.router(vertx); - - router.post(Endpoints.OPTOUT_DELTA_PRODUCE.toString()) - .handler(internalAuth.handleWithAudit(this::handleDeltaProduce)); - - LOGGER.info("Registered endpoint: POST {}", Endpoints.OPTOUT_DELTA_PRODUCE); - - return router; - } - - private void handleDeltaProduce(RoutingContext routingContext) { - HttpServerResponse resp = routingContext.response(); - - LOGGER.info("Delta production requested via /deltaproduce endpoint"); - - // Call the producer method - event loop guarantees serial execution - this.produceDeltasOnDemand() - .onSuccess(result -> { - LOGGER.info("Delta production completed successfully: {}", result.encode()); - resp.setStatusCode(200) - .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(result.encode()); - }) - .onFailure(error -> { - LOGGER.error("Delta production failed", error); - resp.setStatusCode(500) - .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "error") - .put("message", error.getMessage()) - .encode()); - }); - } - @Override public void stop(Promise stopPromise) { LOGGER.info("Stopping SQS Log Producer..."); @@ -178,151 +204,247 @@ public void stop(Promise stopPromise) { LOGGER.info("SQS Log Producer stopped"); } + private Router createRouter() { + Router router = Router.router(vertx); + + // POST endpoint to start delta production job (async, returns immediately) + router.post(Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .handler(internalAuth.handleWithAudit(this::handleDeltaProduceStart)); + + // GET endpoint to poll job status + router.get(Endpoints.OPTOUT_DELTA_PRODUCE.toString() + "/status") + .handler(internalAuth.handleWithAudit(this::handleDeltaProduceStatus)); + + return router; + } + + /** - * Produce delta files from SQS queue on-demand. - * Processes 5-minute batches of messages until the queue is empty. - * Serial execution is guaranteed by Vert.x event loop - no manual synchronization needed. - * @return Future with status information + * Handler for GET /optout/deltaproduce/status + * Returns the status of the current or most recent delta production job on this pod */ - private Future produceDeltasOnDemand() { - if (this.shutdownInProgress) { - return Future.failedFuture("Producer is shutting down"); + private void handleDeltaProduceStatus(RoutingContext routingContext) { + HttpServerResponse resp = routingContext.response(); + + DeltaProduceJobStatus job = currentJob.get(); + + if (job == null) { + resp.setStatusCode(200) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(new JsonObject() + .put("state", "idle") + .put("message", "No job running on this pod") + .encode()); + return; } - Promise promise = Promise.promise(); + resp.setStatusCode(200) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(job.toJson().encode()); + } - vertx.executeBlocking(blockingPromise -> { - try { - JsonObject result = new JsonObject(); - - LOGGER.info("Starting on-demand delta production from SQS queue"); - - // Receive all available messages from SQS (up to 10000) - List allMessages = SqsMessageOperations.receiveAllAvailableMessages( - this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, - this.visibilityTimeout, 1000); // process at most 10000 optout requests per optout API call - - if (allMessages.isEmpty()) { - LOGGER.info("No messages in queue"); - result.put("status", "success"); - result.put("deltas_produced", 0); - result.put("entries_processed", 0); - blockingPromise.complete(result); - return; - } - - LOGGER.info("Received {} messages from SQS", allMessages.size()); - - // Parse and sort messages by timestamp - List parsedMessages = SqsMessageParser.parseAndSortMessages(allMessages); - - if (parsedMessages.isEmpty()) { - LOGGER.warn("No valid messages after parsing"); - result.put("status", "success"); - result.put("deltas_produced", 0); - result.put("entries_processed", 0); - blockingPromise.complete(result); - return; - } - - // Filter messages: only process those where 5 minutes have elapsed since their timestamp vs current "now" time - long currentTime = OptOutUtils.nowEpochSeconds(); - List eligibleMessages = SqsMessageParser.filterEligibleMessages( - parsedMessages, this.deltaWindowSeconds, currentTime); - - if (eligibleMessages.isEmpty()) { - LOGGER.info("All {} messages are too recent (< {}s old), skipping processing", - parsedMessages.size(), this.deltaWindowSeconds); - result.put("status", "skipped"); - result.put("reason", "All messages too recent"); - result.put("deltas_produced", 0); - result.put("entries_processed", 0); - blockingPromise.complete(result); - return; - } - - if (eligibleMessages.size() < parsedMessages.size()) { - LOGGER.info("Filtered out {} too-recent messages, processing {} eligible messages", - parsedMessages.size() - eligibleMessages.size(), eligibleMessages.size()); - } - - // Process eligible messages in 5-minute batches (based on message timestamp) - DeltaProductionResult deltaResult = this.produceBatchedDeltas(eligibleMessages); - - result.put("status", "success"); - result.put("deltas_produced", deltaResult.getDeltasProduced()); - result.put("entries_processed", deltaResult.getEntriesProcessed()); - LOGGER.info("Delta production complete: {} deltas, {} entries", - deltaResult.getDeltasProduced(), deltaResult.getEntriesProcessed()); - - blockingPromise.complete(result); + /** + * Handler for POST /optout/deltaproduce + * Starts an async delta production job and returns immediately. + * + *

Auto-clearing behavior: + *

    + *
  • If no job exists or previous job is completed/failed: Starts new job immediately
  • + *
  • If a job is currently running: Returns 409 Conflict (cannot replace running jobs)
  • + *
+ * + */ + private void handleDeltaProduceStart(RoutingContext routingContext) { + HttpServerResponse resp = routingContext.response(); - } catch (Exception e) { - LOGGER.error("Error in on-demand delta production", e); - blockingPromise.fail(e); + LOGGER.info("Delta production job requested via /deltaproduce endpoint"); + + + DeltaProduceJobStatus existingJob = currentJob.get(); + + // If there's an existing job, check if it's still running + if (existingJob != null) { + if (existingJob.getState() == DeltaProduceJobStatus.JobState.RUNNING) { + // Cannot replace a running job - 409 Conflict + LOGGER.warn("Delta production job already running on this pod"); + resp.setStatusCode(409) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(new JsonObject() + .put("status", "conflict") + .put("message", "A delta production job is already running on this pod") + .put("current_job", existingJob.toJson()) + .encode()); + return; } - }, promise); + + LOGGER.info("Auto-clearing previous {} job to start new one", existingJob.getState()); + } + + DeltaProduceJobStatus newJob = new DeltaProduceJobStatus(); + + // Try to set the new job + if (!currentJob.compareAndSet(existingJob, newJob)) { + resp.setStatusCode(409) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(new JsonObject() + .put("status", "conflict") + .put("message", "Job state changed, please retry") + .encode()); + return; + } - return promise.future(); + // Start the job asynchronously + LOGGER.info("Starting delta production job"); + this.startDeltaProductionJob(newJob); + + // Return immediately with 202 Accepted + resp.setStatusCode(202) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(new JsonObject() + .put("status", "accepted") + .put("message", "Delta production job started on this pod") + .encode()); } + /** + * Starts the delta production job asynchronously + * The job runs on a worker thread and updates the DeltaProduceJobStatus when complete + */ + private void startDeltaProductionJob(DeltaProduceJobStatus job) { + vertx.executeBlocking(() -> { + LOGGER.info("Executing delta production job"); + return produceDeltasBlocking(); + }).onComplete(ar -> { + if (ar.succeeded()) { + JsonObject result = ar.result(); + job.complete(result); + LOGGER.info("Delta production job succeeded: {}", result.encode()); + } else { + String errorMsg = ar.cause().getMessage(); + job.fail(errorMsg); + LOGGER.error("Delta production job failed: {}", errorMsg, ar.cause()); + } + }); + } - private DeltaProductionResult produceBatchedDeltas(List messages) throws IOException { - int deltasProduced = 0; - int entriesProcessed = 0; + /** + * Produce delta files from SQS queue (blocking operation). + * Reads messages from queue and processes them in 5-minute windows. + * Continues until queue is empty or all remaining messages are too recent. + * This method should be called from a worker thread via executeBlocking. + * @return JsonObject with status information + * @throws Exception if production fails + */ + private JsonObject produceDeltasBlocking() throws Exception { + if (this.shutdownInProgress) { + throw new Exception("Producer is shutting down"); + } - ByteArrayOutputStream currentDeltaStream = null; - String currentDeltaName = null; - Long currentDeltaWindowStart = null; - List currentDeltaMessages = new ArrayList<>(); + JsonObject result = new JsonObject(); + LOGGER.info("Starting delta production from SQS queue"); + + // Process messages until queue is empty or messages are too recent + DeltaProductionResult deltaResult = this.produceBatchedDeltas(); + + // Determine status based on results + if (deltaResult.getDeltasProduced() == 0 && deltaResult.stoppedDueToMessagesTooRecent()) { + // No deltas produced because all messages were too recent + result.put("status", "skipped"); + result.put("reason", "All messages too recent"); + LOGGER.info("Delta production skipped: all messages too recent"); + } else { + result.put("status", "success"); + LOGGER.info("Delta production complete: {} deltas, {} entries", + deltaResult.getDeltasProduced(), deltaResult.getEntriesProcessed()); + } + + result.put("deltas_produced", deltaResult.getDeltasProduced()); + result.put("entries_processed", deltaResult.getEntriesProcessed()); - // Group messages into 5-minute windows and produce deltas - for (SqsParsedMessage parsed : messages) { - // Check if we need to start a new delta based on the message timestamp - boolean needNewDelta = false; + return result; + } - if (currentDeltaWindowStart == null) { - needNewDelta = true; - } else { - long windowEnd = currentDeltaWindowStart + this.deltaWindowSeconds; - if (parsed.getTimestamp() >= windowEnd) { - // Upload current delta - if (currentDeltaStream != null) { - this.uploadDeltaAndDeleteMessages(currentDeltaStream, currentDeltaName, currentDeltaWindowStart, currentDeltaMessages); - deltasProduced++; - currentDeltaMessages.clear(); - } - needNewDelta = true; - } - } - if (needNewDelta) { - // Start a new delta for this time window (round down to the nearest window boundary) - currentDeltaWindowStart = (parsed.getTimestamp() / this.deltaWindowSeconds) * this.deltaWindowSeconds; - currentDeltaName = OptOutUtils.newDeltaFileName(this.replicaId); - currentDeltaStream = new ByteArrayOutputStream(); - - this.writeStartOfDelta(currentDeltaStream, currentDeltaWindowStart); + /** + * Reads messages from SQS and produces delta files in 5 minute batches. + * Continues until queue is empty or messages are too recent. + * Windows are limited to maxMessagesPerFile for memory protection. + * + * @return DeltaProductionResult with counts and stop reason + * @throws IOException if delta production fails + */ + private DeltaProductionResult produceBatchedDeltas() throws IOException { + int deltasProduced = 0; + int totalEntriesProcessed = 0; + boolean stoppedDueToMessagesTooRecent = false; + + long jobStartTime = OptOutUtils.nowEpochSeconds(); + LOGGER.info("Starting delta production from SQS queue (maxMessagesPerFile: {})", this.maxMessagesPerFile); - LOGGER.info("Started new delta: {} for time window [{}, {})", - currentDeltaName, currentDeltaWindowStart, currentDeltaWindowStart + this.deltaWindowSeconds); + // Read and process windows until done + while (true) { + if(checkJobTimeout(jobStartTime)){ + break; + } + + // Read one complete 5-minute window (limited to maxMessagesPerFile) + SqsWindowReader.WindowReadResult windowResult = windowReader.readWindow(); + + // If no messages, we're done (queue empty or messages too recent) + if (windowResult.isEmpty()) { + stoppedDueToMessagesTooRecent = windowResult.stoppedDueToMessagesTooRecent(); + LOGGER.info("Delta production complete - no more eligible messages"); + break; } + + // Produce delta for this window + long windowStart = windowResult.getWindowStart(); + List messages = windowResult.getMessages(); + + // Create delta file + String deltaName = OptOutUtils.newDeltaFileName(this.replicaId); + ByteArrayOutputStream deltaStream = new ByteArrayOutputStream(); + writeStartOfDelta(deltaStream, windowStart); + + // Write all messages + List sqsMessages = new ArrayList<>(); + for (SqsParsedMessage msg : messages) { + writeOptOutEntry(deltaStream, msg.getHashBytes(), msg.getIdBytes(), msg.getTimestamp()); + sqsMessages.add(msg.getOriginalMessage()); + } + + // Upload and delete + uploadDeltaAndDeleteMessages(deltaStream, deltaName, windowStart, sqsMessages); + deltasProduced++; + totalEntriesProcessed += messages.size(); + + LOGGER.info("Produced delta for window [{}, {}] with {} messages", + windowStart, windowStart + this.deltaWindowSeconds, messages.size()); + } - // Write entry to current delta - this.writeOptOutEntry(currentDeltaStream, parsed.getHashBytes(), parsed.getIdBytes(), parsed.getTimestamp()); + long totalDuration = OptOutUtils.nowEpochSeconds() - jobStartTime; + LOGGER.info("Delta production complete: took {}s, produced {} deltas, processed {} entries", + totalDuration, deltasProduced, totalEntriesProcessed); - // Track this message with the current delta - currentDeltaMessages.add(parsed.getOriginalMessage()); - entriesProcessed++; - } + return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, stoppedDueToMessagesTooRecent); + } - // Upload the last delta if any - if (currentDeltaStream != null && !currentDeltaMessages.isEmpty()) { - this.uploadDeltaAndDeleteMessages(currentDeltaStream, currentDeltaName, currentDeltaWindowStart, currentDeltaMessages); - deltasProduced++; + /** + * Checks if job has exceeded timeout + */ + private boolean checkJobTimeout(long jobStartTime) { + long elapsedTime = OptOutUtils.nowEpochSeconds() - jobStartTime; + if (elapsedTime > 3600) { // 1 hour - log warning message + LOGGER.error("Delta production job has been running for {} seconds", + elapsedTime); + return false; } - - return new DeltaProductionResult(deltasProduced, entriesProcessed); + if (elapsedTime > this.jobTimeoutSeconds) { + LOGGER.error("Delta production job has been running for {} seconds (exceeds timeout of {}s)", + elapsedTime, this.jobTimeoutSeconds); + return true; // deadline exceeded + } + return false; } /** diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java new file mode 100644 index 00000000..f0c7a7c5 --- /dev/null +++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficCalculator.java @@ -0,0 +1,574 @@ +package com.uid2.optout.vertx; + +import com.uid2.shared.cloud.ICloudStorage; +import com.uid2.shared.optout.OptOutCollection; +import com.uid2.shared.optout.OptOutEntry; +import com.uid2.shared.optout.OptOutUtils; +import com.uid2.optout.Const; +import io.vertx.core.json.JsonObject; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; + +import java.nio.charset.StandardCharsets; + +import java.nio.file.Files; +import java.nio.file.Paths; +import java.io.InputStream; +import java.io.IOException; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +/** + * Calculates opt-out traffic patterns to determine DEFAULT or DELAYED_PROCESSING status. + * + * Compares recent ~24h traffic (sumCurrent) against a configurable baseline (baselineTraffic) of expected traffic in 24 hours. + * The baseline is multiplied by (thresholdMultiplier) to determine the threshold. + * sumCurrent excludes records in allowlist ranges (surge windows determined by engineers). + * + * Returns DELAYED_PROCESSING if sumCurrent >= thresholdMultiplier * baselineTraffic, indicating abnormal traffic spike. + */ +public class OptOutTrafficCalculator { + private static final Logger LOGGER = LoggerFactory.getLogger(OptOutTrafficCalculator.class); + + private static final int HOURS_24 = 24 * 3600; // 24 hours in seconds + + private final Map deltaFileCache = new ConcurrentHashMap<>(); + private final ICloudStorage cloudStorage; + private final String s3DeltaPrefix; // (e.g. "optout-v2/delta/") + private final String trafficCalcConfigPath; + private int baselineTraffic; + private int thresholdMultiplier; + private int evaluationWindowSeconds; + private List> allowlistRanges; + + public enum TrafficStatus { + DELAYED_PROCESSING, + DEFAULT + } + + /** + * Cache entry for a delta file containing all record timestamps. + * + * Memory usage: ~8 bytes per timestamp (long) + * 1GB of memory can store ~130 million timestamps (1024^3)/8 + */ + private static class FileRecordCache { + final List timestamps; // All non-sentinel record timestamps + final long newestTimestamp; // evict delta from cache based on oldest record timestamp + + FileRecordCache(List timestamps) { + this.timestamps = timestamps; + this.newestTimestamp = timestamps.isEmpty() ? 0 : Collections.max(timestamps); + } + } + + /** + * Exception thrown by malformed traffic calculator config + */ + public static class MalformedTrafficCalcConfigException extends Exception { + public MalformedTrafficCalcConfigException(String message) { + super(message); + } + } + + /** + * Constructor for OptOutTrafficCalculator + * + * @param cloudStorage Cloud storage for reading delta files + * @param s3DeltaPrefix S3 prefix for delta files + * @param trafficCalcConfigS3Path S3 path for traffic calc config + */ + public OptOutTrafficCalculator(ICloudStorage cloudStorage, String s3DeltaPrefix, String trafficCalcConfigPath) throws MalformedTrafficCalcConfigException { + this.cloudStorage = cloudStorage; + this.s3DeltaPrefix = s3DeltaPrefix; + this.trafficCalcConfigPath = trafficCalcConfigPath; + reloadTrafficCalcConfig(); // Load ConfigMap + + LOGGER.info("OptOutTrafficCalculator initialized: s3DeltaPrefix={}, threshold={}x", + s3DeltaPrefix, thresholdMultiplier); + } + + /** + * Reload traffic calc config from ConfigMap. + * Expected format: + * { + * "traffic_calc_evaluation_window_seconds": 86400, + * "traffic_calc_baseline_traffic": 100, + * "traffic_calc_threshold_multiplier": 5, + * "traffic_calc_allowlist_ranges": [ + * [startTimestamp1, endTimestamp1], + * [startTimestamp2, endTimestamp2] + * ], + * } + * + * Can be called periodically to pick up config changes without restarting. + */ + public void reloadTrafficCalcConfig() throws MalformedTrafficCalcConfigException { + LOGGER.info("Loading traffic calc config from ConfigMap"); + try (InputStream is = Files.newInputStream(Paths.get(trafficCalcConfigPath))) { + String content = new String(is.readAllBytes(), StandardCharsets.UTF_8); + JsonObject trafficCalcConfig = new JsonObject(content); + + // Validate required fields exist + if (!trafficCalcConfig.containsKey(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp)) { + throw new MalformedTrafficCalcConfigException("Missing required field: traffic_calc_evaluation_window_seconds"); + } + if (!trafficCalcConfig.containsKey(Const.Config.OptOutTrafficCalcBaselineTrafficProp)) { + throw new MalformedTrafficCalcConfigException("Missing required field: traffic_calc_baseline_traffic"); + } + if (!trafficCalcConfig.containsKey(Const.Config.OptOutTrafficCalcThresholdMultiplierProp)) { + throw new MalformedTrafficCalcConfigException("Missing required field: traffic_calc_threshold_multiplier"); + } + if (!trafficCalcConfig.containsKey(Const.Config.OptOutTrafficCalcAllowlistRangesProp)) { + throw new MalformedTrafficCalcConfigException("Missing required field: traffic_calc_allowlist_ranges"); + } + + this.evaluationWindowSeconds = trafficCalcConfig.getInteger(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp); + this.baselineTraffic = trafficCalcConfig.getInteger(Const.Config.OptOutTrafficCalcBaselineTrafficProp); + this.thresholdMultiplier = trafficCalcConfig.getInteger(Const.Config.OptOutTrafficCalcThresholdMultiplierProp); + + List> ranges = parseAllowlistRanges(trafficCalcConfig); + this.allowlistRanges = ranges; + + LOGGER.info("Successfully loaded traffic calc config from ConfigMap: evaluationWindowSeconds={}, baselineTraffic={}, thresholdMultiplier={}, allowlistRanges={}", + this.evaluationWindowSeconds, this.baselineTraffic, this.thresholdMultiplier, ranges.size()); + + } catch (MalformedTrafficCalcConfigException e) { + LOGGER.warn("Failed to load traffic calc config. Config is malformed: {}", trafficCalcConfigPath, e); + throw e; + } catch (Exception e) { + LOGGER.warn("Failed to load traffic calc config. Config is malformed or missing: {}", trafficCalcConfigPath, e); + throw new MalformedTrafficCalcConfigException("Failed to load traffic calc config: " + e.getMessage()); + } + } + + /** + * Parse allowlist ranges from JSON config + */ + List> parseAllowlistRanges(JsonObject config) throws MalformedTrafficCalcConfigException { + List> ranges = new ArrayList<>(); + + try { + var rangesArray = config.getJsonArray(Const.Config.OptOutTrafficCalcAllowlistRangesProp); + if (rangesArray != null) { + for (int i = 0; i < rangesArray.size(); i++) { + var rangeArray = rangesArray.getJsonArray(i); + if (rangeArray != null && rangeArray.size() >= 2) { + long start = rangeArray.getLong(0); + long end = rangeArray.getLong(1); + + if(start >= end) { + LOGGER.error("Invalid allowlist range: start must be less than end: [{}, {}]", start, end); + throw new MalformedTrafficCalcConfigException("Invalid allowlist range at index " + i + ": start must be less than end"); + } + + if (end - start > 86400) { + LOGGER.error("Invalid allowlist range: range must be less than 24 hours: [{}, {}]", start, end); + throw new MalformedTrafficCalcConfigException("Invalid allowlist range at index " + i + ": range must be less than 24 hours"); + } + + List range = Arrays.asList(start, end); + ranges.add(range); + LOGGER.info("Loaded allowlist range: [{}, {}]", start, end); + } + } + } + + ranges.sort(Comparator.comparing(range -> range.get(0))); + + // Validate no overlapping ranges + for (int i = 0; i < ranges.size() - 1; i++) { + long currentEnd = ranges.get(i).get(1); + long nextStart = ranges.get(i + 1).get(0); + if (currentEnd >= nextStart) { + LOGGER.error("Overlapping allowlist ranges detected: [{}, {}] overlaps with [{}, {}]", + ranges.get(i).get(0), currentEnd, nextStart, ranges.get(i + 1).get(1)); + throw new MalformedTrafficCalcConfigException( + "Overlapping allowlist ranges detected at indices " + i + " and " + (i + 1)); + } + } + + } catch (MalformedTrafficCalcConfigException e) { + throw e; + } catch (Exception e) { + LOGGER.error("Failed to parse allowlist ranges", e); + throw new MalformedTrafficCalcConfigException("Failed to parse allowlist ranges: " + e.getMessage()); + } + + return ranges; + } + + /** + * Calculate traffic status based on delta files and SQS queue messages. + * + * Uses the newest delta file timestamp to anchor the 24-hour delta traffic window, + * and the oldest queue timestamp to anchor the 5-minute queue window. + * + * @param sqsMessages List of SQS messages + * @return TrafficStatus (DELAYED_PROCESSING or DEFAULT) + */ + public TrafficStatus calculateStatus(List sqsMessages) { + + try { + // Get list of delta files from S3 (sorted newest to oldest) + List deltaS3Paths = listDeltaFiles(); + + if (deltaS3Paths.isEmpty()) { + LOGGER.warn("No delta files found in S3 with prefix: {}", s3DeltaPrefix); + return TrafficStatus.DEFAULT; + } + + // Find newest delta file timestamp for delta traffic window + long newestDeltaTs = findNewestDeltaTimestamp(deltaS3Paths); + LOGGER.info("Traffic calculation: newestDeltaTs={}", newestDeltaTs); + + // Find oldest SQS queue message timestamp for queue window + long oldestQueueTs = findOldestQueueTimestamp(sqsMessages); + LOGGER.info("Traffic calculation: oldestQueueTs={}", oldestQueueTs); + + // Define start time of the delta evaluation window + // We need evaluationWindowSeconds of non-allowlisted time, so we iteratively extend + // the window to account for any allowlist ranges in the extended portion + long deltaWindowStart = calculateWindowStartWithAllowlist(newestDeltaTs, this.evaluationWindowSeconds); + + // Evict old cache entries (older than delta window start) + evictOldCacheEntries(deltaWindowStart); + + // Process delta files and count records in [deltaWindowStart, newestDeltaTs] + int sum = 0; + + for (String s3Path : deltaS3Paths) { + List timestamps = getTimestampsFromFile(s3Path); + + boolean shouldStop = false; + for (long ts : timestamps) { + // Stop condition: record is older than our window + if (ts < deltaWindowStart) { + LOGGER.debug("Stopping delta file processing at timestamp {} (older than window start {})", ts, deltaWindowStart); + break; + } + + // skip records in allowlisted ranges + if (isInAllowlist(ts)) { + continue; + } + + // increment sum if record is in delta window + if (ts >= deltaWindowStart) { + sum++; + } + + } + + if (shouldStop) { + break; + } + } + + // Count SQS messages in [oldestQueueTs, oldestQueueTs + 5m] + if (sqsMessages != null && !sqsMessages.isEmpty()) { + int sqsCount = countSqsMessages(sqsMessages, oldestQueueTs); + sum += sqsCount; + } + + // Determine status + TrafficStatus status = determineStatus(sum, this.baselineTraffic); + + LOGGER.info("Traffic calculation complete: sum={}, baselineTraffic={}, thresholdMultiplier={}, status={}", + sum, this.baselineTraffic, this.thresholdMultiplier, status); + + return status; + + } catch (Exception e) { + LOGGER.error("Error calculating traffic status", e); + return TrafficStatus.DEFAULT; + } + } + + /** + * Find the newest timestamp from delta files. + * Reads the newest delta file and returns its maximum timestamp. + */ + private long findNewestDeltaTimestamp(List deltaS3Paths) throws IOException { + if (deltaS3Paths == null || deltaS3Paths.isEmpty()) { + return System.currentTimeMillis() / 1000; + } + + // Delta files are sorted (ISO 8601 format, lexicographically sortable) so first file is newest + String newestDeltaPath = deltaS3Paths.get(0); + List timestamps = getTimestampsFromFile(newestDeltaPath); + + if (timestamps.isEmpty()) { + LOGGER.warn("Newest delta file has no timestamps: {}", newestDeltaPath); + return System.currentTimeMillis() / 1000; + } + + long newestTs = Collections.max(timestamps); + LOGGER.debug("Found newest delta timestamp {} from file {}", newestTs, newestDeltaPath); + return newestTs; + } + + /** + * List all delta files from S3, sorted newest to oldest + */ + private List listDeltaFiles() { + try { + // List all objects with the delta prefix + List allFiles = cloudStorage.list(s3DeltaPrefix); + + // Filter to only .dat delta files and sort newest to oldest + return allFiles.stream() + .filter(OptOutUtils::isDeltaFile) + .sorted(OptOutUtils.DeltaFilenameComparatorDescending) + .collect(Collectors.toList()); + + } catch (Exception e) { + LOGGER.error("Failed to list delta files from S3 with prefix: {}", s3DeltaPrefix, e); + return Collections.emptyList(); + } + } + + /** + * Get timestamps from a delta file (S3 path), using cache if available + */ + private List getTimestampsFromFile(String s3Path) throws IOException { + // Extract filename from S3 path for cache key + String filename = s3Path.substring(s3Path.lastIndexOf('/') + 1); + + // Check cache first + FileRecordCache cached = deltaFileCache.get(filename); + if (cached != null) { + LOGGER.debug("Using cached timestamps for file: {}", filename); + return cached.timestamps; + } + + // Cache miss - download from S3 + LOGGER.debug("Downloading and reading timestamps from S3: {}", s3Path); + List timestamps = readTimestampsFromS3(s3Path); + + // Store in cache + deltaFileCache.put(filename, new FileRecordCache(timestamps)); + + return timestamps; + } + + /** + * Read all non-sentinel record timestamps from a delta file in S3 + */ + private List readTimestampsFromS3(String s3Path) throws IOException { + try (InputStream is = cloudStorage.download(s3Path)) { + byte[] data = is.readAllBytes(); + OptOutCollection collection = new OptOutCollection(data); + + List timestamps = new ArrayList<>(); + for (int i = 0; i < collection.size(); i++) { + OptOutEntry entry = collection.get(i); + + // Skip sentinel entries + if (entry.isSpecialHash()) { + continue; + } + + timestamps.add(entry.timestamp); + } + + return timestamps; + } catch (Exception e) { + LOGGER.error("Failed to read delta file from S3: {}", s3Path, e); + throw new IOException("Failed to read delta file from S3: " + s3Path, e); + } + } + + /** + * Calculate total duration of allowlist ranges that overlap with the given time window. + */ + long getAllowlistDuration(long t, long windowStart) { + long totalDuration = 0; + for (List range : this.allowlistRanges) { + long start = range.get(0); + long end = range.get(1); + + // Clip range to window boundaries + if (start < windowStart) { + start = windowStart; + } + if (end > t) { + end = t; + } + + // Only add duration if there's actual overlap (start < end) + if (start < end) { + totalDuration += end - start; + } + } + return totalDuration; + } + + /** + * Calculate the window start time that provides evaluationWindowSeconds of non-allowlisted time. + * Iteratively extends the window to account for allowlist ranges that may fall in extended portions. + */ + long calculateWindowStartWithAllowlist(long newestDeltaTs, int evaluationWindowSeconds) { + long allowlistDuration = getAllowlistDuration(newestDeltaTs, newestDeltaTs - evaluationWindowSeconds); + + // Each iteration discovers at least one new allowlist range, so max iterations = number of ranges + int maxIterations = this.allowlistRanges.size() + 1; + + for (int i = 0; i < maxIterations && allowlistDuration > 0; i++) { + long newWindowStart = newestDeltaTs - evaluationWindowSeconds - allowlistDuration; + long newAllowlistDuration = getAllowlistDuration(newestDeltaTs, newWindowStart); + + if (newAllowlistDuration == allowlistDuration) { + // No new allowlist time in extended portion, we've converged + break; + } + + allowlistDuration = newAllowlistDuration; + } + + return newestDeltaTs - evaluationWindowSeconds - allowlistDuration; + } + + /** + * Find the oldest SQS queue message timestamp + */ + private long findOldestQueueTimestamp(List sqsMessages) throws IOException { + long oldest = System.currentTimeMillis() / 1000; + + if (sqsMessages != null && !sqsMessages.isEmpty()) { + for (Message msg : sqsMessages) { + Long ts = extractTimestampFromMessage(msg); + if (ts != null && ts < oldest) { + oldest = ts; + } + } + } + + return oldest; + } + + /** + * Extract timestamp from SQS message (from SentTimestamp attribute) + */ + private Long extractTimestampFromMessage(Message msg) { + // Get SentTimestamp attribute (milliseconds) + String sentTimestamp = msg.attributes().get(MessageSystemAttributeName.SENT_TIMESTAMP); + if (sentTimestamp != null) { + try { + return Long.parseLong(sentTimestamp) / 1000; // Convert ms to seconds + } catch (NumberFormatException e) { + LOGGER.debug("Invalid SentTimestamp: {}", sentTimestamp); + } + } + + // Fallback: use current time + return System.currentTimeMillis() / 1000; + } + + /** + * Count SQS messages from oldestQueueTs to oldestQueueTs + 5 minutes + */ + private int countSqsMessages(List sqsMessages, long oldestQueueTs) { + + int count = 0; + long windowEnd = oldestQueueTs + 5 * 60; + + for (Message msg : sqsMessages) { + Long ts = extractTimestampFromMessage(msg); + + if (ts < oldestQueueTs || ts > windowEnd) { + continue; + } + + if (isInAllowlist(ts)) { + continue; + } + count++; + + } + + LOGGER.info("SQS messages: {} in window [oldestQueueTs={}, oldestQueueTs+5m={}]", count, oldestQueueTs, windowEnd); + return count; + } + + /** + * Check if a timestamp falls within any allowlist range + */ + boolean isInAllowlist(long timestamp) { + if (allowlistRanges == null || allowlistRanges.isEmpty()) { + return false; + } + + for (List range : allowlistRanges) { + if (range.size() < 2) { + continue; + } + + long start = range.get(0); + long end = range.get(1); + + if (timestamp >= start && timestamp <= end) { + return true; + } + } + + return false; + } + + /** + * Evict cache entries with data older than the cutoff timestamp + */ + private void evictOldCacheEntries(long cutoffTimestamp) { + int beforeSize = deltaFileCache.size(); + + deltaFileCache.entrySet().removeIf(entry -> + entry.getValue().newestTimestamp < cutoffTimestamp + ); + + int afterSize = deltaFileCache.size(); + if (beforeSize != afterSize) { + LOGGER.info("Evicted {} old cache entries (before={}, after={})", + beforeSize - afterSize, beforeSize, afterSize); + } + } + + /** + * Determine traffic status based on current vs past counts + */ + TrafficStatus determineStatus(int sumCurrent, int baselineTraffic) { + if (baselineTraffic == 0 || thresholdMultiplier == 0) { + // Avoid division by zero - if no baseline traffic, return DEFAULT status + LOGGER.warn("baselineTraffic is 0 or thresholdMultiplier is 0 returning DEFAULT status."); + return TrafficStatus.DEFAULT; + } + + if (sumCurrent >= thresholdMultiplier * baselineTraffic) { + LOGGER.warn("DELAYED_PROCESSING threshold breached: sumCurrent={} >= {}×baselineTraffic={}", + sumCurrent, thresholdMultiplier, baselineTraffic); + return TrafficStatus.DELAYED_PROCESSING; + } + + LOGGER.info("Traffic within normal range: sumCurrent={} < {}×baselineTraffic={}", + sumCurrent, thresholdMultiplier, baselineTraffic); + return TrafficStatus.DEFAULT; + } + + /** + * Get cache statistics for monitoring + */ + public Map getCacheStats() { + Map stats = new HashMap<>(); + stats.put("cached_files", deltaFileCache.size()); + + int totalTimestamps = deltaFileCache.values().stream() + .mapToInt(cache -> cache.timestamps.size()) + .sum(); + stats.put("total_cached_timestamps", totalTimestamps); + + return stats; + } + +} diff --git a/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java new file mode 100644 index 00000000..e8bd04b8 --- /dev/null +++ b/src/main/java/com/uid2/optout/vertx/OptOutTrafficFilter.java @@ -0,0 +1,172 @@ +package com.uid2.optout.vertx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.List; +import java.util.Collections; +import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Paths; +import java.nio.charset.StandardCharsets; +import io.vertx.core.json.JsonObject; +import io.vertx.core.json.JsonArray; + +public class OptOutTrafficFilter { + private static final Logger LOGGER = LoggerFactory.getLogger(OptOutTrafficFilter.class); + + private final String trafficFilterConfigPath; + List filterRules; + + /** + * Traffic filter rule defining a time range and a list of IP addresses to exclude + */ + private static class TrafficFilterRule { + private final List range; + private final List ipAddresses; + + TrafficFilterRule(List range, List ipAddresses) { + this.range = range; + this.ipAddresses = ipAddresses; + } + + public long getRangeStart() { + return range.get(0); + } + public long getRangeEnd() { + return range.get(1); + } + public List getIpAddresses() { + return ipAddresses; + } + } + + public static class MalformedTrafficFilterConfigException extends Exception { + public MalformedTrafficFilterConfigException(String message) { + super(message); + } + } + + /** + * Constructor for OptOutTrafficFilter + * + * @param trafficFilterConfigPath S3 path for traffic filter config + * @throws MalformedTrafficFilterConfigException if the traffic filter config is invalid + */ + public OptOutTrafficFilter(String trafficFilterConfigPath) throws MalformedTrafficFilterConfigException { + this.trafficFilterConfigPath = trafficFilterConfigPath; + // Initial filter rules load + this.filterRules = Collections.emptyList(); // start empty + reloadTrafficFilterConfig(); // load ConfigMap + + LOGGER.info("OptOutTrafficFilter initialized: filterRules={}", + filterRules.size()); + } + + /** + * Reload traffic filter config from ConfigMap. + * Expected format: + * { + * "denylist_requests": [ + * {range: [startTimestamp, endTimestamp], IPs: ["ip1"]}, + * {range: [startTimestamp, endTimestamp], IPs: ["ip1", "ip2"]}, + * {range: [startTimestamp, endTimestamp], IPs: ["ip1", "ip3"]}, + * ] + * } + * + * Can be called periodically to pick up config changes without restarting. + */ + public void reloadTrafficFilterConfig() throws MalformedTrafficFilterConfigException { + LOGGER.info("Loading traffic filter config from ConfigMap"); + try (InputStream is = Files.newInputStream(Paths.get(trafficFilterConfigPath))) { + String content = new String(is.readAllBytes(), StandardCharsets.UTF_8); + JsonObject filterConfigJson = new JsonObject(content); + + this.filterRules = parseFilterRules(filterConfigJson); + + LOGGER.info("Successfully loaded traffic filter config from ConfigMap: filterRules={}", + filterRules.size()); + + } catch (Exception e) { + LOGGER.warn("No traffic filter config found at: {}", trafficFilterConfigPath, e); + throw new MalformedTrafficFilterConfigException(e.getMessage()); + } + } + + /** + * Parse request filtering rules from JSON config + */ + List parseFilterRules(JsonObject config) throws MalformedTrafficFilterConfigException { + List rules = new ArrayList<>(); + try { + JsonArray denylistRequests = config.getJsonArray("denylist_requests"); + if (denylistRequests == null) { + LOGGER.error("Invalid traffic filter config: denylist_requests is null"); + throw new MalformedTrafficFilterConfigException("Invalid traffic filter config: denylist_requests is null"); + } + for (int i = 0; i < denylistRequests.size(); i++) { + JsonObject ruleJson = denylistRequests.getJsonObject(i); + + // parse range + var rangeJson = ruleJson.getJsonArray("range"); + List range = new ArrayList<>(); + if (rangeJson != null && rangeJson.size() == 2) { + long start = rangeJson.getLong(0); + long end = rangeJson.getLong(1); + + if (start >= end) { + LOGGER.error("Invalid traffic filter rule: range start must be less than end: {}", ruleJson.encode()); + throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule: range start must be less than end"); + } + range.add(start); + range.add(end); + } + + // parse IPs + var ipAddressesJson = ruleJson.getJsonArray("IPs"); + List ipAddresses = new ArrayList<>(); + if (ipAddressesJson != null) { + for (int j = 0; j < ipAddressesJson.size(); j++) { + ipAddresses.add(ipAddressesJson.getString(j)); + } + } + + // log error and throw exception if rule is invalid + if (range.size() != 2 || ipAddresses.size() == 0 || range.get(1) - range.get(0) > 86400) { // range must be 24 hours or less + LOGGER.error("Invalid traffic filter rule, range must be 24 hours or less: {}", ruleJson.encode()); + throw new MalformedTrafficFilterConfigException("Invalid traffic filter rule, range must be 24 hours or less"); + } + + TrafficFilterRule rule = new TrafficFilterRule(range, ipAddresses); + + LOGGER.info("Loaded traffic filter rule: range=[{}, {}], IPs={}", rule.getRangeStart(), rule.getRangeEnd(), rule.getIpAddresses()); + rules.add(rule); + } + return rules; + } catch (Exception e) { + LOGGER.error("Failed to parse traffic filter rules: config={}, error={}", config.encode(), e.getMessage()); + throw new MalformedTrafficFilterConfigException(e.getMessage()); + } + } + + public boolean isDenylisted(SqsParsedMessage message) { + long timestamp = message.getTimestamp(); + String clientIp = message.getClientIp(); + + if (clientIp == null || clientIp.isEmpty()) { + LOGGER.error("Request does not contain client IP, timestamp={}", timestamp); + return false; + } + + for (TrafficFilterRule rule : filterRules) { + if(timestamp >= rule.getRangeStart() && timestamp <= rule.getRangeEnd()) { + if(rule.getIpAddresses().contains(clientIp)) { + return true; + } + }; + } + return false; + } + +} \ No newline at end of file diff --git a/src/main/java/com/uid2/optout/vertx/SqsBatchProcessor.java b/src/main/java/com/uid2/optout/vertx/SqsBatchProcessor.java new file mode 100644 index 00000000..0a656e98 --- /dev/null +++ b/src/main/java/com/uid2/optout/vertx/SqsBatchProcessor.java @@ -0,0 +1,165 @@ +package com.uid2.optout.vertx; + +import com.uid2.shared.optout.OptOutUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +/** + * Applies parsing, validation, filtering, and deletion of corrupted SQS messages. + * Used by SqsWindowReader + */ +public class SqsBatchProcessor { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchProcessor.class); + + private final SqsClient sqsClient; + private final String queueUrl; + private final int deltaWindowSeconds; + + public SqsBatchProcessor(SqsClient sqsClient, String queueUrl, int deltaWindowSeconds) { + this.sqsClient = sqsClient; + this.queueUrl = queueUrl; + this.deltaWindowSeconds = deltaWindowSeconds; + } + + /** + * Result of processing a batch of messages from SQS. + * Encapsulates eligible messages and metadata about the processing. + */ + public static class BatchProcessingResult { + private final List eligibleMessages; + private final boolean shouldStopProcessing; + + private BatchProcessingResult(List eligibleMessages, boolean shouldStopProcessing) { + this.eligibleMessages = eligibleMessages; + this.shouldStopProcessing = shouldStopProcessing; + } + + public static BatchProcessingResult withEligibleMessages(List messages) { + return new BatchProcessingResult(messages, false); + } + + public static BatchProcessingResult stopProcessing() { + return new BatchProcessingResult(new ArrayList<>(), true); + } + + public static BatchProcessingResult empty() { + return new BatchProcessingResult(new ArrayList<>(), false); + } + + public boolean isEmpty() { + return eligibleMessages.isEmpty(); + } + + public boolean shouldStopProcessing() { + return shouldStopProcessing; + } + + public List getEligibleMessages() { + return eligibleMessages; + } + } + + /** + * Processes a batch of messages: parses, validates, cleans up invalid messages, + * and filters for eligible messages based on age threshold (message is less than 5 minutes old) + * + * @param messageBatch Raw messages from SQS + * @param batchNumber The batch number (for logging) + * @return BatchProcessingResult containing eligible messages and processing metadata + */ + public BatchProcessingResult processBatch(List messageBatch, int batchNumber) { + // Parse and sort messages by timestamp + List parsedBatch = SqsMessageParser.parseAndSortMessages(messageBatch); + + // Identify and delete corrupt messages + if (parsedBatch.size() < messageBatch.size()) { + List invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch); + if (!invalidMessages.isEmpty()) { + LOGGER.error("Found {} invalid messages in batch {} (failed parsing). Deleting from queue.", + invalidMessages.size(), batchNumber); + SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, invalidMessages); + } + } + + // If no valid messages, return empty result + if (parsedBatch.isEmpty()) { + LOGGER.warn("No valid messages in batch {} (all failed parsing)", batchNumber); + return BatchProcessingResult.empty(); + } + + // Check if the oldest message in this batch is too recent + long currentTime = OptOutUtils.nowEpochSeconds(); + SqsParsedMessage oldestMessage = parsedBatch.get(0); + long messageAge = currentTime - oldestMessage.getTimestamp(); + + if (messageAge < this.deltaWindowSeconds) { + // Signal to stop processing - messages are too recent + return BatchProcessingResult.stopProcessing(); + } + + // Filter for eligible messages (>= 5 minutes old) + List eligibleMessages = filterEligibleMessages(parsedBatch, currentTime); + + if (eligibleMessages.isEmpty()) { + LOGGER.debug("No eligible messages in batch {} (all too recent)", batchNumber); + return BatchProcessingResult.empty(); + } + + return BatchProcessingResult.withEligibleMessages(eligibleMessages); + } + + /** + * Filters messages to only include those where sufficient time has elapsed. +. * + * @param messages List of parsed messages + * @param currentTime Current time in seconds + * @return List of messages that meet the time threshold + */ + public List filterEligibleMessages( + List messages, + long currentTime) { + + List eligibleMessages = new ArrayList<>(); + + for (SqsParsedMessage pm : messages) { + if (currentTime - pm.getTimestamp() >= this.deltaWindowSeconds) { + eligibleMessages.add(pm); + } + } + + return eligibleMessages; + } + + /** + * Identifies messages that failed to parse by comparing the original batch with parsed results. + * + * @param originalBatch The original list of messages from SQS + * @param parsedBatch The list of successfully parsed messages + * @return List of messages that failed to parse + */ + private List identifyInvalidMessages(List originalBatch, List parsedBatch) { + // Create a set of message IDs from successfully parsed messages + Set validMessageIds = new HashSet<>(); + for (SqsParsedMessage parsed : parsedBatch) { + validMessageIds.add(parsed.getOriginalMessage().messageId()); + } + + // Find messages that were not successfully parsed + List invalidMessages = new ArrayList<>(); + for (Message msg : originalBatch) { + if (!validMessageIds.contains(msg.messageId())) { + invalidMessages.add(msg); + } + } + + return invalidMessages; + } +} + diff --git a/src/main/java/com/uid2/optout/vertx/SqsMessageParser.java b/src/main/java/com/uid2/optout/vertx/SqsMessageParser.java index eba3d443..44a6c5e9 100644 --- a/src/main/java/com/uid2/optout/vertx/SqsMessageParser.java +++ b/src/main/java/com/uid2/optout/vertx/SqsMessageParser.java @@ -34,6 +34,10 @@ public static List parseAndSortMessages(List messages JsonObject body = new JsonObject(message.body()); String identityHash = body.getString("identity_hash"); String advertisingId = body.getString("advertising_id"); + String traceId = body.getString("trace_id"); + String clientIp = body.getString("client_ip"); + String email = body.getString("email"); + String phone = body.getString("phone"); if (identityHash == null || advertisingId == null) { LOGGER.error("Invalid message format, skipping: {}", message.body()); @@ -48,7 +52,7 @@ public static List parseAndSortMessages(List messages continue; } - parsedMessages.add(new SqsParsedMessage(message, hashBytes, idBytes, timestampSeconds)); + parsedMessages.add(new SqsParsedMessage(message, hashBytes, idBytes, timestampSeconds, email, phone, clientIp, traceId)); } catch (Exception e) { LOGGER.error("Error parsing SQS message", e); } @@ -74,29 +78,5 @@ private static long extractTimestamp(Message message) { } return Long.parseLong(sentTimestampStr) / 1000; // ms to seconds } - - /** - * Filters messages to only include those where sufficient time has elapsed. - * - * @param messages List of parsed messages - * @param deltaWindowSeconds Minimum time window in seconds - * @param currentTime Current time in seconds - * @return List of messages that meet the time threshold - */ - public static List filterEligibleMessages( - List messages, - int deltaWindowSeconds, - long currentTime) { - - List eligibleMessages = new ArrayList<>(); - - for (SqsParsedMessage pm : messages) { - if (currentTime - pm.getTimestamp() >= deltaWindowSeconds) { - eligibleMessages.add(pm); - } - } - - return eligibleMessages; - } } diff --git a/src/main/java/com/uid2/optout/vertx/SqsParsedMessage.java b/src/main/java/com/uid2/optout/vertx/SqsParsedMessage.java index 57a5ab1b..1ad8ba77 100644 --- a/src/main/java/com/uid2/optout/vertx/SqsParsedMessage.java +++ b/src/main/java/com/uid2/optout/vertx/SqsParsedMessage.java @@ -10,12 +10,20 @@ public class SqsParsedMessage { private final byte[] hashBytes; private final byte[] idBytes; private final long timestamp; + private final String email; + private final String phone; + private final String clientIp; + private final String traceId; - public SqsParsedMessage(Message originalMessage, byte[] hashBytes, byte[] idBytes, long timestamp) { + public SqsParsedMessage(Message originalMessage, byte[] hashBytes, byte[] idBytes, long timestamp, String email, String phone, String clientIp, String traceId) { this.originalMessage = originalMessage; this.hashBytes = hashBytes; this.idBytes = idBytes; this.timestamp = timestamp; + this.email = email; + this.phone = phone; + this.clientIp = clientIp; + this.traceId = traceId; } public Message getOriginalMessage() { @@ -33,5 +41,21 @@ public byte[] getIdBytes() { public long getTimestamp() { return timestamp; } + + public String getEmail() { + return email; + } + + public String getPhone() { + return phone; + } + + public String getClientIp() { + return clientIp; + } + + public String getTraceId() { + return traceId; + } } diff --git a/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java b/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java new file mode 100644 index 00000000..75368c62 --- /dev/null +++ b/src/main/java/com/uid2/optout/vertx/SqsWindowReader.java @@ -0,0 +1,129 @@ +package com.uid2.optout.vertx; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.util.ArrayList; +import java.util.List; + +/** + * Reads messages from SQS for complete 5-minute time windows. + * Handles accumulation of all messages for a window before returning. + * Limits messages per window to prevent memory issues. + */ +public class SqsWindowReader { + private static final Logger LOGGER = LoggerFactory.getLogger(SqsWindowReader.class); + + private final SqsClient sqsClient; + private final String queueUrl; + private final int maxMessagesPerPoll; + private final int visibilityTimeout; + private final int deltaWindowSeconds; + private final int maxMessagesPerFile; + private final SqsBatchProcessor batchProcessor; + + public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerPoll, + int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerFile) { + this.sqsClient = sqsClient; + this.queueUrl = queueUrl; + this.maxMessagesPerPoll = maxMessagesPerPoll; + this.visibilityTimeout = visibilityTimeout; + this.deltaWindowSeconds = deltaWindowSeconds; + this.maxMessagesPerFile = maxMessagesPerFile; + this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds); + LOGGER.info("SqsWindowReader initialized with: maxMessagesPerFile: {}, maxMessagesPerPoll: {}, visibilityTimeout: {}, deltaWindowSeconds: {}", + maxMessagesPerFile, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds); + } + + /** + * Result of reading messages for a 5-minute window. + */ + public static class WindowReadResult { + private final List messages; + private final long windowStart; + private final boolean stoppedDueToMessagesTooRecent; + + public WindowReadResult(List messages, long windowStart, + boolean stoppedDueToMessagesTooRecent) { + this.messages = messages; + this.windowStart = windowStart; + this.stoppedDueToMessagesTooRecent = stoppedDueToMessagesTooRecent; + } + + public List getMessages() { return messages; } + public long getWindowStart() { return windowStart; } + public boolean isEmpty() { return messages.isEmpty(); } + public boolean stoppedDueToMessagesTooRecent() { return stoppedDueToMessagesTooRecent; } + } + + /** + * Reads messages from SQS for one complete 5-minute window. + * Keeps reading batches and accumulating messages until: + * - We discover the next window + * - Queue is empty (no more messages) + * - Messages are too recent (all messages younger than 5 minutes) + * - Message limit is reached (memory protection) + * + * @return WindowReadResult with messages for the window, or empty if done + */ + public WindowReadResult readWindow() { + List windowMessages = new ArrayList<>(); + long currentWindowStart = 0; + + while (true) { + // Check if we've hit the message limit + if (windowMessages.size() >= this.maxMessagesPerFile) { + LOGGER.warn("Window message limit reached ({} messages). Truncating window starting at {} for memory protection.", + this.maxMessagesPerFile, currentWindowStart); + return new WindowReadResult(windowMessages, currentWindowStart, false); + } + + // Read one batch from SQS (up to 10 messages) + List rawBatch = SqsMessageOperations.receiveMessagesFromSqs( + this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout); + + if (rawBatch.isEmpty()) { + // Queue empty - return what we have + return new WindowReadResult(windowMessages, currentWindowStart, false); + } + + // Process batch: parse, validate, filter + SqsBatchProcessor.BatchProcessingResult batchResult = batchProcessor.processBatch(rawBatch, 0); + + if (batchResult.isEmpty()) { + if (batchResult.shouldStopProcessing()) { + // Messages too recent - return what we have + return new WindowReadResult(windowMessages, currentWindowStart, true); + } + // corrupt messages deleted, read next messages + continue; + } + + // Add eligible messages to current window + boolean newWindow = false; + for (SqsParsedMessage msg : batchResult.getEligibleMessages()) { + long msgWindowStart = (msg.getTimestamp() / this.deltaWindowSeconds) * this.deltaWindowSeconds; + + // discover start of window + if (currentWindowStart == 0) { + currentWindowStart = msgWindowStart; + } + + // discover new window + if (msgWindowStart > currentWindowStart + this.deltaWindowSeconds) { + newWindow = true; + } + + windowMessages.add(msg); + } + + if (newWindow) { + // close current window and return + return new WindowReadResult(windowMessages, currentWindowStart, false); + } + } + } +} + diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index 199adb51..a46023f6 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -116,8 +116,9 @@ public void testDeltaProduceEndpoint_successWithMessages(TestContext context) th // Mock S3 upload doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); - // Call endpoint via HTTP int port = Const.Port.ServicePortForOptOut + 1; + + // Step 1: Start the job (POST) - returns 202 immediately vertx.createHttpClient() .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", Endpoints.OPTOUT_DELTA_PRODUCE.toString()) @@ -125,14 +126,23 @@ public void testDeltaProduceEndpoint_successWithMessages(TestContext context) th .putHeader("Authorization", "Bearer " + TEST_API_KEY) .send()) .compose(resp -> { - context.assertEquals(200, resp.statusCode()); + context.assertEquals(202, resp.statusCode()); return resp.body(); }) - .onComplete(context.asyncAssertSuccess(body -> { + .compose(body -> { JsonObject response = new JsonObject(body.toString()); - context.assertEquals("success", response.getString("status")); - context.assertTrue(response.getInteger("deltas_produced") >= 1); - context.assertTrue(response.getInteger("entries_processed") >= 2); + context.assertEquals("accepted", response.getString("status")); + + // Step 2: Poll for job completion + return pollForCompletion(context, port, 100, 50); // Poll every 100ms, max 50 times (5 seconds) + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertNotNull(result); + context.assertEquals("success", result.getString("status")); + context.assertTrue(result.getInteger("deltas_produced") >= 1); + context.assertTrue(result.getInteger("entries_processed") >= 2); // Verify S3 was called try { @@ -148,6 +158,48 @@ public void testDeltaProduceEndpoint_successWithMessages(TestContext context) th })); } + /** + * Helper method to poll for job completion + */ + private io.vertx.core.Future pollForCompletion(TestContext context, int port, long intervalMs, int maxAttempts) { + return pollForCompletionRecursive(context, port, intervalMs, maxAttempts, 0); + } + + private io.vertx.core.Future pollForCompletionRecursive(TestContext context, int port, long intervalMs, int maxAttempts, int attempt) { + if (attempt >= maxAttempts) { + return io.vertx.core.Future.failedFuture("Job did not complete within timeout"); + } + + return vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.GET, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString() + "/status") + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + if (resp.statusCode() != 200) { + return io.vertx.core.Future.failedFuture("Status check failed: " + resp.statusCode()); + } + return resp.body(); + }) + .compose(body -> { + JsonObject status = new JsonObject(body.toString()); + String state = status.getString("state"); + + if ("completed".equals(state) || "failed".equals(state)) { + return io.vertx.core.Future.succeededFuture(status); + } + + // Still running or idle, wait and poll again + io.vertx.core.Promise promise = io.vertx.core.Promise.promise(); + vertx.setTimer(intervalMs, id -> { + pollForCompletionRecursive(context, port, intervalMs, maxAttempts, attempt + 1) + .onComplete(promise); + }); + return promise.future(); + }); + } + @Test public void testDeltaProduceEndpoint_noMessages(TestContext context) { Async async = context.async(); @@ -157,6 +209,8 @@ public void testDeltaProduceEndpoint_noMessages(TestContext context) { .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); int port = Const.Port.ServicePortForOptOut + 1; + + // Start job vertx.createHttpClient() .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", Endpoints.OPTOUT_DELTA_PRODUCE.toString()) @@ -164,14 +218,23 @@ public void testDeltaProduceEndpoint_noMessages(TestContext context) { .putHeader("Authorization", "Bearer " + TEST_API_KEY) .send()) .compose(resp -> { - context.assertEquals(200, resp.statusCode()); + context.assertEquals(202, resp.statusCode()); return resp.body(); }) - .onComplete(context.asyncAssertSuccess(body -> { + .compose(body -> { JsonObject response = new JsonObject(body.toString()); - context.assertEquals("success", response.getString("status")); - context.assertEquals(0, response.getInteger("deltas_produced")); - context.assertEquals(0, response.getInteger("entries_processed")); + context.assertEquals("accepted", response.getString("status")); + + // Poll for completion + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertNotNull(result); + context.assertEquals("success", result.getString("status")); + context.assertEquals(0, result.getInteger("deltas_produced")); + context.assertEquals(0, result.getInteger("entries_processed")); // Verify no operations try { @@ -200,6 +263,8 @@ public void testDeltaProduceEndpoint_allMessagesTooRecent(TestContext context) { .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); int port = Const.Port.ServicePortForOptOut + 1; + + // Start job vertx.createHttpClient() .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", Endpoints.OPTOUT_DELTA_PRODUCE.toString()) @@ -207,13 +272,22 @@ public void testDeltaProduceEndpoint_allMessagesTooRecent(TestContext context) { .putHeader("Authorization", "Bearer " + TEST_API_KEY) .send()) .compose(resp -> { - context.assertEquals(200, resp.statusCode()); + context.assertEquals(202, resp.statusCode()); return resp.body(); }) - .onComplete(context.asyncAssertSuccess(body -> { + .compose(body -> { JsonObject response = new JsonObject(body.toString()); - context.assertEquals("skipped", response.getString("status")); - context.assertEquals("All messages too recent", response.getString("reason")); + context.assertEquals("accepted", response.getString("status")); + + // Poll for completion + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(finalStatus -> { + context.assertEquals("completed", finalStatus.getString("state")); + JsonObject result = finalStatus.getJsonObject("result"); + context.assertNotNull(result); + context.assertEquals("skipped", result.getString("status")); + context.assertEquals("All messages too recent", result.getString("reason")); // No processing should occur try { @@ -246,5 +320,145 @@ public void testDeltaProduceEndpoint_unauthorized(TestContext context) { async.complete(); })); } + + @Test + public void testDeltaProduceEndpoint_concurrentJobPrevention(TestContext context) throws Exception { + Async async = context.async(); + + // Create messages that will take some time to process + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime) + ); + + // Mock SQS to return messages + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Start first job + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + + // Immediately try to start a second job (should be rejected) + return vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()); + }) + .compose(resp -> { + context.assertEquals(409, resp.statusCode()); // Conflict - job already running + return resp.body(); + }) + .onComplete(context.asyncAssertSuccess(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("conflict", response.getString("status")); + context.assertTrue(response.getString("message").contains("already running")); + + async.complete(); + })); + } + + @Test + public void testDeltaProduceEndpoint_autoClearCompletedJob(TestContext context) throws Exception { + Async async = context.async(); + + // Create messages for first job + long oldTime1 = System.currentTimeMillis() - 400_000; + List messages1 = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime1) + ); + + // Create messages for second job + long oldTime2 = System.currentTimeMillis() - 400_000; + List messages2 = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime2) + ); + + // Mock SQS to return different messages for each job + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + // First job + .thenReturn(ReceiveMessageResponse.builder().messages(messages1).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()) + // Second job + .thenReturn(ReceiveMessageResponse.builder().messages(messages2).build()) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + + when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) + .thenReturn(DeleteMessageBatchResponse.builder().build()); + + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + + int port = Const.Port.ServicePortForOptOut + 1; + + // Start first job + vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + + // Wait for first job to complete + return pollForCompletion(context, port, 100, 50); + }) + .compose(firstJobStatus -> { + context.assertEquals("completed", firstJobStatus.getString("state")); + + // Now start a second job - should auto-clear the completed first job + return vertx.createHttpClient() + .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", + Endpoints.OPTOUT_DELTA_PRODUCE.toString()) + .compose(req -> req + .putHeader("Authorization", "Bearer " + TEST_API_KEY) + .send()); + }) + .compose(resp -> { + context.assertEquals(202, resp.statusCode()); // Should succeed (auto-cleared) + return resp.body(); + }) + .compose(body -> { + JsonObject response = new JsonObject(body.toString()); + context.assertEquals("accepted", response.getString("status")); + + // Wait for second job to complete + return pollForCompletion(context, port, 100, 50); + }) + .onComplete(context.asyncAssertSuccess(secondJobStatus -> { + context.assertEquals("completed", secondJobStatus.getString("state")); + + // Verify both jobs processed messages + verify(sqsClient, atLeast(2)).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + + async.complete(); + })); + } } diff --git a/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java b/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java new file mode 100644 index 00000000..f977233d --- /dev/null +++ b/src/test/java/com/uid2/optout/vertx/OptOutTrafficCalculatorTest.java @@ -0,0 +1,1510 @@ +package com.uid2.optout.vertx; + +import com.uid2.shared.cloud.CloudStorageException; +import com.uid2.shared.cloud.ICloudStorage; +import com.uid2.shared.optout.OptOutCollection; +import com.uid2.shared.optout.OptOutEntry; +import com.uid2.optout.Const; +import io.vertx.core.json.JsonArray; +import io.vertx.core.json.JsonObject; +import java.nio.file.Files; +import java.nio.file.Path; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.CsvSource; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; +import org.mockito.junit.jupiter.MockitoSettings; +import org.mockito.quality.Strictness; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; + +import com.uid2.optout.vertx.OptOutTrafficCalculator.MalformedTrafficCalcConfigException; +import java.io.ByteArrayInputStream; +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.Mockito.*; + + +@ExtendWith(MockitoExtension.class) +@MockitoSettings(strictness = Strictness.LENIENT) +public class OptOutTrafficCalculatorTest { + + @Mock + private ICloudStorage cloudStorage; + + private static final String S3_DELTA_PREFIX = "optout-v2/delta/"; + private static final String TRAFFIC_CONFIG_PATH = "./traffic-config.json"; + private static final int BASELINE_TRAFFIC = 100; + private static final int THRESHOLD_MULTIPLIER = 5; + private static final int EVALUATION_WINDOW_SECONDS = 24 * 3600; + + @BeforeEach + void setUp() { + // default config + JsonObject config = new JsonObject(); + config.put(Const.Config.OptOutTrafficCalcBaselineTrafficProp, BASELINE_TRAFFIC); + config.put(Const.Config.OptOutTrafficCalcThresholdMultiplierProp, THRESHOLD_MULTIPLIER); + config.put(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp, EVALUATION_WINDOW_SECONDS); + config.put(Const.Config.OptOutTrafficCalcAllowlistRangesProp, new JsonArray()); + try { + createTrafficConfigFile(config.toString()); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @AfterEach + void tearDown() { + if (Files.exists(Path.of(TRAFFIC_CONFIG_PATH))) { + try { + Files.delete(Path.of(TRAFFIC_CONFIG_PATH)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + private void createTrafficConfigFile(String content) { + try { + Path configPath = Path.of(TRAFFIC_CONFIG_PATH); + Files.writeString(configPath, content); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Helper to create config by merging partial JSON with defaults + */ + private void createConfigFromPartialJson(String partialJson) { + JsonObject partial = new JsonObject(partialJson); + JsonObject config = new JsonObject(); + + // Set defaults + if (!partial.containsKey(Const.Config.OptOutTrafficCalcBaselineTrafficProp)) { + config.put(Const.Config.OptOutTrafficCalcBaselineTrafficProp, BASELINE_TRAFFIC); + } + if (!partial.containsKey(Const.Config.OptOutTrafficCalcThresholdMultiplierProp)) { + config.put(Const.Config.OptOutTrafficCalcThresholdMultiplierProp, THRESHOLD_MULTIPLIER); + } + if (!partial.containsKey(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp)) { + config.put(Const.Config.OptOutTrafficCalcEvaluationWindowSecondsProp, EVALUATION_WINDOW_SECONDS); + } + if (!partial.containsKey(Const.Config.OptOutTrafficCalcAllowlistRangesProp)) { + config.put(Const.Config.OptOutTrafficCalcAllowlistRangesProp, new JsonArray()); + } + + // Merge in partial config (overrides defaults) + partial.forEach(entry -> config.put(entry.getKey(), entry.getValue())); + + createTrafficConfigFile(config.toString()); + } + + /** + * Helper to create config with custom threshold + */ + private void createConfigWithThreshold(int threshold) { + createConfigFromPartialJson("{\"" + Const.Config.OptOutTrafficCalcThresholdMultiplierProp + "\": " + threshold + "}"); + } + + // ============================================================================ + // SECTION 1: Constructor & Initialization Tests + // ============================================================================ + + @Test + void testConstructor_defaultThreshold() throws Exception { + // Setup - default threshold of 5 + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - DEFAULT when below threshold, DELAYED_PROCESSING when above threshold + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(10, 3); + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); // 10 < 5*3 + + status = calculator.determineStatus(15, 3); + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); // 15 >= 5*3 + } + + @Test + void testConstructor_customThreshold() throws Exception { + // Setup - custom threshold of 10 + createConfigWithThreshold(10); + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - DEFAULT when below threshold, DELAYED_PROCESSING when above threshold + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(49, 5); + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); // 49 < 10*5 + status = calculator.determineStatus(50, 5); + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); // 50 >= 10*5 + } + + @Test + void testConstructor_trafficCalcConfigLoadFailure() throws Exception { + // Setup - traffic calc config load failure + createTrafficConfigFile("Invalid JSON"); + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + }); + + // Create valid config to test reload failure + createConfigFromPartialJson("{}"); + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + createTrafficConfigFile("Invalid JSON"); + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.reloadTrafficCalcConfig(); + }); + } + + // ============================================================================ + // SECTION 2: parseTrafficCalcConfigRanges() + // ============================================================================ + + @Test + void testParseTrafficCalcConfigRanges_emptyConfig() throws Exception { + // Setup - no config + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + JsonObject emptyConfig = new JsonObject(); + + // Act + List> ranges = calculator.parseAllowlistRanges(emptyConfig); + + // Assert - empty ranges + assertTrue(ranges.isEmpty()); + } + + @Test + void testParseTrafficCalcConfigRanges_singleRange() throws Exception { + // Setup - single range + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + JsonArray ranges = new JsonArray() + .add(new JsonArray().add(1000L).add(2000L)); + configWithRanges.put("traffic_calc_allowlist_ranges", ranges); + + // Act + List> result = calculator.parseAllowlistRanges(configWithRanges); + + // Assert - single range + assertEquals(1, result.size()); + assertEquals(1000L, result.get(0).get(0)); + assertEquals(2000L, result.get(0).get(1)); + } + + @Test + void testParseTrafficCalcConfigRanges_multipleRanges() throws Exception { + // Setup - multiple ranges + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + JsonArray ranges = new JsonArray() + .add(new JsonArray().add(1000L).add(2000L)) + .add(new JsonArray().add(3000L).add(4000L)) + .add(new JsonArray().add(5000L).add(6000L)); + configWithRanges.put("traffic_calc_allowlist_ranges", ranges); + + // Act + List> result = calculator.parseAllowlistRanges(configWithRanges); + + // Assert - multiple ranges + assertEquals(3, result.size()); + assertEquals(1000L, result.get(0).get(0)); + assertEquals(3000L, result.get(1).get(0)); + assertEquals(5000L, result.get(2).get(0)); + } + + @Test + void testParseTrafficCalcConfigRanges_misorderedRange() throws Exception { + // Setup - range with end < start is malformed + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + JsonArray ranges = new JsonArray() + .add(new JsonArray().add(2000L).add(1000L)); // End before start + configWithRanges.put("traffic_calc_allowlist_ranges", ranges); + + // Act + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.parseAllowlistRanges(configWithRanges); + }); + } + + @Test + void testParseTrafficCalcConfigRanges_rangeTooLong() throws Exception { + // Setup - range longer than 24 hours is malformed + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + JsonArray ranges = new JsonArray() + .add(new JsonArray().add(2000L).add(200000L)); // Longer than 24 hours + configWithRanges.put("traffic_calc_allowlist_ranges", ranges); + + // Act + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.parseAllowlistRanges(configWithRanges); + }); + } + + @Test + void testParseTrafficCalcConfigRanges_sortsByStartTime() throws Exception { + // Setup - ranges added out of order + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + JsonArray ranges = new JsonArray() + .add(new JsonArray().add(5000L).add(6000L)) + .add(new JsonArray().add(1000L).add(2000L)) + .add(new JsonArray().add(3000L).add(4000L)); + configWithRanges.put("traffic_calc_allowlist_ranges", ranges); + + // Act + List> result = calculator.parseAllowlistRanges(configWithRanges); + + // Assert - should be sorted by start time + assertEquals(3, result.size()); + assertEquals(1000L, result.get(0).get(0)); + assertEquals(3000L, result.get(1).get(0)); + assertEquals(5000L, result.get(2).get(0)); + } + + @Test + void testParseTrafficCalcConfigRanges_invalidRangeTooFewElements() throws Exception { + // Setup - invalid range with only 1 element; + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + JsonArray ranges = new JsonArray() + .add(new JsonArray().add(1000L)) // Only 1 element + .add(new JsonArray().add(2000L).add(3000L)); // Valid + configWithRanges.put("traffic_calc_allowlist_ranges", ranges); + + // Act + List> result = calculator.parseAllowlistRanges(configWithRanges); + + // Assert - should skip invalid range + assertEquals(1, result.size()); + assertEquals(2000L, result.get(0).get(0)); + } + + @Test + void testParseTrafficCalcConfigRanges_nullArray() throws Exception { + // Setup - null array + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + configWithRanges.put("traffic_calc_allowlist_ranges", (JsonArray) null); + + // Act + List> result = calculator.parseAllowlistRanges(configWithRanges); + + // Assert - empty ranges + assertTrue(result.isEmpty()); + } + + @Test + void testParseTrafficCalcConfigRanges_overlappingRanges() throws Exception { + // Setup - overlapping ranges + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + JsonArray ranges = new JsonArray() + .add(new JsonArray().add(1000L).add(2000L)) + .add(new JsonArray().add(1500L).add(2500L)); // Overlaps with first range + configWithRanges.put("traffic_calc_allowlist_ranges", ranges); + + // Act & Assert - should throw exception due to overlap + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.parseAllowlistRanges(configWithRanges); + }); + } + + @Test + void testParseTrafficCalcConfigRanges_adjacentRangesWithSameBoundary() throws Exception { + // Setup - ranges where end of first equals start of second (touching but not overlapping semantically, but we treat as overlap) + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + JsonArray ranges = new JsonArray() + .add(new JsonArray().add(1000L).add(2000L)) + .add(new JsonArray().add(2000L).add(3000L)); // Starts exactly where first ends + configWithRanges.put("traffic_calc_allowlist_ranges", ranges); + + // Act & Assert - should throw exception because ranges touch at boundary + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.parseAllowlistRanges(configWithRanges); + }); + } + + @Test + void testParseTrafficCalcConfigRanges_nonOverlappingRanges() throws Exception { + // Setup - ranges that don't overlap (with gap between them) + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + JsonObject configWithRanges = new JsonObject(); + JsonArray ranges = new JsonArray() + .add(new JsonArray().add(1000L).add(2000L)) + .add(new JsonArray().add(2001L).add(3000L)); // Starts after first ends + configWithRanges.put("traffic_calc_allowlist_ranges", ranges); + + // Act + List> result = calculator.parseAllowlistRanges(configWithRanges); + + // Assert - should succeed with 2 ranges + assertEquals(2, result.size()); + } + + // ============================================================================ + // SECTION 3: isInTrafficCalcConfig() + // ============================================================================ + + @Test + void testIsInTrafficCalcConfig_withinSingleRange() throws Exception { + // Setup - load traffic calc config with single range [1000, 2000] + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - true when within range + assertTrue(calculator.isInAllowlist(1500L)); + } + + @Test + void testIsInTrafficCalcConfig_exactlyAtStart() throws Exception { + // Setup - load traffic calc config with single range [1000, 2000] + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - true when exactly at start of range + assertTrue(calculator.isInAllowlist(1000L)); + } + + @Test + void testIsInTrafficCalcConfig_exactlyAtEnd() throws Exception { + // Setup - load traffic calc config with single range [1000, 2000] + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - true when exactly at end of range + assertTrue(calculator.isInAllowlist(2000L)); + } + + @Test + void testIsInTrafficCalcConfig_beforeRange() throws Exception { + // Setup - load traffic calc config with single range [1000, 2000] + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - false when before range + assertFalse(calculator.isInAllowlist(999L)); + } + + @Test + void testIsInTrafficCalcConfig_afterRange() throws Exception { + // Setup - load traffic calc config with single range [1000, 2000] + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - false when after range + assertFalse(calculator.isInAllowlist(2001L)); + } + + @Test + void testIsInTrafficCalcConfig_betweenRanges() throws Exception { + // Setup - load traffic calc config with two ranges [1000, 2000] and [3000, 4000] + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000], + [3000, 4000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - false when between ranges + assertFalse(calculator.isInAllowlist(2500L)); + } + + @Test + void testIsInTrafficCalcConfig_emptyRanges() throws Exception { + // Setup uses default config from setUp() which has empty traffic calc config ranges + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - false when empty ranges + assertFalse(calculator.isInAllowlist(1500L)); + } + + @Test + void testIsInTrafficCalcConfig_nullRanges() throws Exception { + // Setup - no traffic calc config ranges loaded (will fail and set empty) + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": null + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert - false when null/empty ranges + assertFalse(calculator.isInAllowlist(1500L)); + } + + @Test + void testIsInTrafficCalcConfig_invalidRangeSize() throws Exception { + // Setup - load traffic calc config with invalid range (only 1 element) and valid range + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000], + [2000, 3000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert + assertFalse(calculator.isInAllowlist(1500L)); // Should not match invalid range + assertTrue(calculator.isInAllowlist(2500L)); // Should match valid range + } + + @Test + void testIsInTrafficCalcConfig_multipleRanges() throws Exception { + // Setup - load traffic calc config with multiple ranges + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000], + [3000, 4000], + [5000, 6000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert + assertTrue(calculator.isInAllowlist(1500L)); // In first range + assertTrue(calculator.isInAllowlist(3500L)); // In second range + assertTrue(calculator.isInAllowlist(5500L)); // In third range + assertFalse(calculator.isInAllowlist(2500L)); // Between first and second + } + + // ============================================================================ + // SECTION 4: getTrafficCalcConfigDuration() + // ============================================================================ + + @Test + void testGetTrafficCalcConfigDuration_noRanges() throws Exception { + // Setup - no ranges + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Assert + assertEquals(0L, calculator.getAllowlistDuration(10000L, 5000L)); // 0 duration when no ranges + } + + @Test + void testGetTrafficCalcConfigDuration_rangeFullyWithinWindow() throws Exception { + // Setup - range fully within window + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [6000, 7000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - window [5000, 10000], range [6000, 7000] + long duration = calculator.getAllowlistDuration(10000L, 5000L); + + // Assert - full range duration + assertEquals(1000L, duration); + } + + @Test + void testGetTrafficCalcConfigDuration_rangePartiallyOverlapsStart() throws Exception { + // Setup - range partially overlaps start of window + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [3000, 7000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - window [5000, 10000], range [3000, 7000] + long duration = calculator.getAllowlistDuration(10000L, 5000L); + + // Assert - should clip to [5000, 7000] = 2000 + assertEquals(2000L, duration); + } + + @Test + void testGetTrafficCalcConfigDuration_rangePartiallyOverlapsEnd() throws Exception { + // Setup - range partially overlaps end of window + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [8000, 12000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - window [5000, 10000], range [8000, 12000] + long duration = calculator.getAllowlistDuration(10000L, 5000L); + + // Assert - should clip to [8000, 10000] = 2000 + assertEquals(2000L, duration); + } + + @Test + void testGetTrafficCalcConfigDuration_rangeCompletelyOutsideWindow() throws Exception { + // Setup - range completely outside window + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - window [5000, 10000], range [1000, 2000] + long duration = calculator.getAllowlistDuration(10000L, 5000L); + + // Assert - 0 duration when range completely outside window + assertEquals(0L, duration); + } + + @Test + void testGetTrafficCalcConfigDuration_multipleRanges() throws Exception { + // Setup - multiple ranges + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [6000, 7000], + [8000, 9000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - window [5000, 10000], ranges [6000, 7000] and [8000, 9000] + long duration = calculator.getAllowlistDuration(10000L, 5000L); + + // Assert - 1000 + 1000 = 2000 + assertEquals(2000L, duration); + } + + @Test + void testGetTrafficCalcConfigDuration_rangeSpansEntireWindow() throws Exception { + // Setup - range spans entire window + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [3000, 12000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - window [5000, 10000], range [3000, 12000] + long duration = calculator.getAllowlistDuration(10000L, 5000L); + + // Assert - entire window is in traffic calc config ranges = 5000 + assertEquals(5000L, duration); + } + + // ============================================================================ + // SECTION 4.5: calculateWindowStartWithAllowlist() + // ============================================================================ + + @Test + void testCalculateWindowStartWithAllowlist_noAllowlist() throws Exception { + // Setup - no allowlist ranges + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - window should be [3, 8] with no extension + long windowStart = calculator.calculateWindowStartWithAllowlist(8L, 5); + + // Assert - no allowlist, so window start is simply newestDeltaTs - evaluationWindowSeconds + assertEquals(3L, windowStart); + } + + @Test + void testCalculateWindowStartWithAllowlist_allowlistInOriginalWindowOnly() throws Exception { + // Setup - allowlist range only in original window, not in extended portion + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [6, 7] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - newestDeltaTs=8, evaluationWindow=5 + // Original window [3, 8] has [6,7] allowlisted (1 hour) + // Extended portion [2, 3] has no allowlist + // So window start should be 8 - 5 - 1 = 2 + long windowStart = calculator.calculateWindowStartWithAllowlist(8L, 5); + + assertEquals(2L, windowStart); + } + + @Test + void testCalculateWindowStartWithAllowlist_allowlistInExtendedPortion() throws Exception { + // Setup - allowlist ranges in both original window AND extended portion + // This is the user's example: evaluationWindow=5, newestDeltaTs=8, allowlist={[2,3], [6,7]} + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [2, 3], + [6, 7] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + // Original window [3, 8]: [6,7] allowlisted = 1 hour + // First extension to [2, 8]: [2,3] and [6,7] allowlisted = 2 hours total + // Second extension to [1, 8]: still [2,3] and [6,7] = 2 hours (no new allowlist) + // Final: windowStart = 8 - 5 - 2 = 1 + long windowStart = calculator.calculateWindowStartWithAllowlist(8L, 5); + + assertEquals(1L, windowStart); + } + + @Test + void testCalculateWindowStartWithAllowlist_allowlistBeforeWindow() throws Exception { + // Setup - allowlist range entirely before the initial window + // This tests that we don't over-extend when allowlist is old + // evaluationWindow=5, newestDeltaTs=20, allowlist=[10,13] + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [10, 13] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + // Initial window [15, 20]: no allowlist overlap, allowlistDuration = 0 + // No extension needed + // Final: windowStart = 20 - 5 - 0 = 15 + long windowStart = calculator.calculateWindowStartWithAllowlist(20L, 5); + + // Verify: window [15, 20] has 5 hours, 0 allowlisted = 5 non-allowlisted + assertEquals(15L, windowStart); + } + + // ============================================================================ + // SECTION 5: determineStatus() + // ============================================================================ + + @Test + void testDetermineStatus_belowThreshold() throws Exception { + // Setup - below threshold + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - 10 < 5 * 3 + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(10, 3); + + // Assert - DEFAULT when below threshold + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testDetermineStatus_atThreshold() throws Exception { + // Setup - at threshold + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - 15 == 5 * 3 + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(15, 3); + + // Assert - DELAYED_PROCESSING when at threshold + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); + } + + @Test + void testDetermineStatus_aboveThreshold() throws Exception { + // Setup - above threshold + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - 20 > 5 * 3 + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(20, 3); + + // Assert - DELAYED_PROCESSING when above threshold + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); + } + + @Test + void testDetermineStatus_sumPastZero() throws Exception { + // Setup - sumPast is 0 + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - should return DEFAULT to avoid crash + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(100, 0); + + // Assert + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testDetermineStatus_bothZero() throws Exception { + // Setup - both sumCurrent and sumPast are 0; + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - should return DEFAULT to avoid crash + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(0, 0); + + // Assert + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testDetermineStatus_sumCurrentZero() throws Exception { + // Setup - sumCurrent is 0 + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - 0 < 5 * 10 + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(0, 10); + + // Assert - DEFAULT when sumCurrent is 0 + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @ParameterizedTest + @CsvSource({ + "1, 1, 1, DELAYED_PROCESSING", // threshold=1: 1 >= 1*1 + "2, 4, 2, DELAYED_PROCESSING", // threshold=2: 4 >= 2*2 + "5, 10, 2, DELAYED_PROCESSING", // threshold=5: 10 >= 5*2 + "10, 100, 10, DELAYED_PROCESSING", // threshold=10: 100 >= 10*10 + "5, 24, 5, DEFAULT", // threshold=5: 24 < 5*5 + "100, 1000, 11, DEFAULT" // threshold=100: 1000 < 100*11 + }) + void testDetermineStatus_variousThresholds(int threshold, int sumCurrent, int sumPast, String expectedStatus) throws Exception { + // Setup - various thresholds + createConfigWithThreshold(threshold); + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(sumCurrent, sumPast); + + // Assert + assertEquals(OptOutTrafficCalculator.TrafficStatus.valueOf(expectedStatus), status); + } + + @Test + void testDetermineStatus_largeNumbers() throws Exception { + // Setup - test with large numbers + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + OptOutTrafficCalculator.TrafficStatus status = calculator.determineStatus(1_000_000, 200_000); + + // Assert - 1M >= 5 * 200K = 1M + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); + } + + // ============================================================================ + // SECTION 6: S3 Config Reload Tests + // ============================================================================ + + @Test + void testReloadTrafficCalcConfig_success() throws Exception { + // Setup - initial traffic calc config + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000], + [3000, 4000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Change the traffic calc config to a new range + String newTrafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [5000, 6000] + ] + } + """; + createConfigFromPartialJson(newTrafficCalcConfigJson); + + // Act - reload the traffic calc config + calculator.reloadTrafficCalcConfig(); + + // Assert - verify new traffic calc config is loaded + assertTrue(calculator.isInAllowlist(5500L)); + } + + @Test + void testReloadTrafficCalcConfig_failure() throws Exception { + // Setup - initial traffic calc config + String trafficCalcConfigJson = """ + { + "traffic_calc_allowlist_ranges": [ + [1000, 2000] + ] + } + """; + createConfigFromPartialJson(trafficCalcConfigJson); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Now make it fail + createTrafficConfigFile("Invalid JSON"); + + // Act - should not throw exception + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.reloadTrafficCalcConfig(); + }); + + } + + @Test + public void testReloadTrafficCalcConfig_failure_missingKeys() throws Exception { + // Setup + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act & Assert missing threshold multiplier + createTrafficConfigFile("{\"traffic_calc_evaluation_window_seconds\": 86400, \"traffic_calc_baseline_traffic\": 100, \"traffic_calc_allowlist_ranges\": [ [1000, 2000] ]}"); + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.reloadTrafficCalcConfig(); + }); + + // Act & Assert missing evaluation window seconds + createTrafficConfigFile("{\"traffic_calc_threshold_multiplier\": 5, \"traffic_calc_baseline_traffic\": 100, \"traffic_calc_allowlist_ranges\": [ [1000, 2000] ]}"); + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.reloadTrafficCalcConfig(); + }); + + // Act & Assert missing baseline traffic + createTrafficConfigFile("{\"traffic_calc_threshold_multiplier\": 5, \"traffic_calc_evaluation_window_seconds\": 86400, \"traffic_calc_allowlist_ranges\": [ [1000, 2000] ]}"); + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.reloadTrafficCalcConfig(); + }); + + // Act & Assert missing traffic calc config ranges + createTrafficConfigFile("{\"traffic_calc_threshold_multiplier\": 5, \"traffic_calc_evaluation_window_seconds\": 86400, \"traffic_calc_baseline_traffic\": 100}"); + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.reloadTrafficCalcConfig(); + }); + } + + @Test + public void testReloadTrafficCalcConfig_failure_misorderedRanges() throws Exception { + // Setup - misordered ranges + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + createConfigFromPartialJson("{\"traffic_calc_allowlist_ranges\": [ [2000, 1000] ]}"); + + // Act & Assert + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.reloadTrafficCalcConfig(); + }); + } + + @Test + public void testReloadTrafficCalcConfig_failure_rangeTooLong() throws Exception { + // Setup - range greater than 24 hours + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + createConfigFromPartialJson("{\"traffic_calc_allowlist_ranges\": [ [1000, 200000] ]}"); + + // Act & Assert + assertThrows(MalformedTrafficCalcConfigException.class, () -> { + calculator.reloadTrafficCalcConfig(); + }); + } + + // ============================================================================ + // SECTION 7: Cache Management Tests (also tested in section 9) + // ============================================================================ + + @Test + void testGetCacheStats_emptyCache() throws Exception { + // Setup + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + Map stats = calculator.getCacheStats(); + + // Assert - should return empty stats + assertEquals(0, stats.get("cached_files")); + assertEquals(0, stats.get("total_cached_timestamps")); + } + + // ============================================================================ + // SECTION 8: Helper Methods for Test Data Creation + // ============================================================================ + + /** + * Create a mock SQS message with specified timestamp + */ + private Message createSqsMessage(long timestampSeconds) { + Map attributes = new HashMap<>(); + attributes.put(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf(timestampSeconds * 1000)); + + return Message.builder() + .messageId("test-msg-" + timestampSeconds) + .body("{\"test\": \"data\"}") + .attributes(attributes) + .build(); + } + + /** + * Create a mock SQS message without timestamp + */ + private Message createSqsMessageWithoutTimestamp() { + return Message.builder() + .messageId("test-msg-no-timestamp") + .body("{\"test\": \"data\"}") + .attributes(new HashMap<>()) + .build(); + } + + /** + * Create delta file bytes with specified timestamps + */ + private byte[] createDeltaFileBytes(List timestamps) throws Exception { + // Create OptOutEntry objects using newTestEntry + List entries = new ArrayList<>(); + + long idCounter = 1000; // Use incrementing IDs for test entries + for (long timestamp : timestamps) { + entries.add(OptOutEntry.newTestEntry(idCounter++, timestamp)); + } + + // Create OptOutCollection + OptOutCollection collection = new OptOutCollection(entries.toArray(new OptOutEntry[0])); + return collection.getStore(); + } + + + // ============================================================================ + // SECTION 9: Tests for calculateStatus() + // ============================================================================ + + @Test + void testCalculateStatus_noDeltaFiles() throws Exception { + // Setup - no delta files + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Collections.emptyList()); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList()); + + // Assert - should return DEFAULT when no delta files + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_normalTraffic() throws Exception { + // Setup - setup time: current time + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create delta files with timestamps distributed over 48 hours + List timestamps = new ArrayList<>(); + + // add 499 entries in current window + for (int i = 0; i < 49; i++) { + timestamps.add(t - 23*3600 + i * 60); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages); + + // Assert - 100+1 < 5 * 50 = 250, so should be DEFAULT + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_delayedProcessing() throws Exception { + // Setup - create delta files with spike in current window + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Create delta files with spike in current window + List timestamps = new ArrayList<>(); + + // add 500 entries in current window + for (int i = 0; i < 500; i++) { + timestamps.add(t - 23*3600 + i * 60); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages); + + // Assert - 100+1 >= 5 * 10 = 50, DELAYED_PROCESSING + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); + } + + @Test + void testCalculateStatus_noSqsMessages() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600, t - 7200); // Some entries + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - null SQS messages + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(null); + + // Assert - should still calculate based on delta files, DEFAULT + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_emptySqsMessages() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - empty SQS messages + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList()); + + // Assert - should still calculate based on delta files, DEFAULT + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_multipleSqsMessages() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = new ArrayList<>(); + // add 470 entries in window + for (int i = 0; i < 470; i++) { + timestamps.add(t - 24*3600 + i * 60); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Add 30 SQS entries in [t, t+5min] + List sqsMessages = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + sqsMessages.add(createSqsMessage(t - i * 10)); + } + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages); + + // Assert - DELAYED_PROCESSING + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); + } + + @Test + void testCalculateStatus_withTrafficCalcConfig() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // Traffic calc config that covers part of window + String trafficCalcConfigJson = String.format(""" + { + "traffic_calc_allowlist_ranges": [ + [%d, %d] + ] + } + """, t - 12*3600, t - 6*3600); + + List timestamps = new ArrayList<>(); + + // window - 600 entries (300 in traffic calc config range, 300 outside) + for (int i = 0; i < 300; i++) { + timestamps.add(t - 12*3600 + i); + } + for (int i = 0; i < 300; i++) { + timestamps.add(t - 3600 + i); + } + + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + createConfigFromPartialJson(trafficCalcConfigJson); + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/delta-001.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages); + + // Assert - should filter out entries in traffic calc config ranges + // Only 300 from window count (not in traffic calc config ranges) + 1 SQS = 301 + // 301 < 5*100, so DEFAULT + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_cacheUtilization() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600, t - 7200); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - first call should populate cache + List sqsMessages = Arrays.asList(createSqsMessage(t)); + calculator.calculateStatus(sqsMessages); + + Map stats = calculator.getCacheStats(); + int cachedFiles = (Integer) stats.get("cached_files"); + + // Second call should use cache (no additional S3 download) + calculator.calculateStatus(sqsMessages); + + Map stats2 = calculator.getCacheStats(); + int cachedFiles2 = (Integer) stats2.get("cached_files"); + + // Assert - cache should be populated and remain consistent + assertEquals(1, cachedFiles); + assertEquals(cachedFiles, cachedFiles2); + + // Verify S3 download was called only once per file + verify(cloudStorage, times(1)).download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat"); + } + + @Test + void testCalculateStatus_s3Exception() throws Exception { + // Setup - S3 list error + when(cloudStorage.list(S3_DELTA_PREFIX)).thenThrow(new RuntimeException("S3 error")); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - should not throw exception + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList()); + + // Assert - DEFAULT on error + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_deltaFileReadException() throws Exception { + // Setup - S3 download error + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenThrow(new CloudStorageException("Failed to download")); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - empty SQS messages + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(Collections.emptyList()); + + // Assert - DEFAULT on error + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_invalidSqsMessageTimestamp() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act - SQS message without timestamp (should use current time) + List sqsMessages = Arrays.asList(createSqsMessageWithoutTimestamp()); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages); + + // Assert - DEFAULT + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + } + + @Test + void testCalculateStatus_multipleDeltaFiles() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + // File 1 - recent entries + List timestamps1 = new ArrayList<>(); + for (int i = 0; i < 50; i++) { + timestamps1.add(t - 12*3600 + i * 1000); + } + byte[] deltaFileBytes1 = createDeltaFileBytes(timestamps1); + + // File 2 - older entries + List timestamps2 = new ArrayList<>(); + for (int i = 0; i < 30; i++) { + timestamps2.add(t - 36*3600 + i * 1000); + } + byte[] deltaFileBytes2 = createDeltaFileBytes(timestamps2); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList( + "optout-v2/delta/optout-delta--01_2025-11-13T02.00.00Z_bbbbbbbb.dat", + "optout-v2/delta/optout-delta--01_2025-11-13T01.00.00Z_aaaaaaaa.dat" + )); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T02.00.00Z_bbbbbbbb.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes1)); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T01.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes2)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages); + + // Assert - DEFAULT + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + + // Verify cache has both files + Map stats = calculator.getCacheStats(); + assertEquals(2, stats.get("cached_files")); + } + + @Test + void testCalculateStatus_windowBoundaryTimestamp() throws Exception { + // Setup - create delta file with timestamps at window boundary + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + long currentWindowStart = t - 24*3600; + List timestamps = new ArrayList<>(); + for (int i = 0; i < 250; i++) { + timestamps.add(t); + } + for (int i = 0; i < 250; i++) { + timestamps.add(currentWindowStart); + } + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages); + + // Assert - DEFAULT + assertEquals(OptOutTrafficCalculator.TrafficStatus.DELAYED_PROCESSING, status); + } + + @Test + void testCalculateStatus_timestampsCached() throws Exception { + // Setup - create delta files with some entries + long currentTime = System.currentTimeMillis() / 1000; + long t = currentTime; + + List timestamps = Arrays.asList(t - 3600, t - 7200); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + when(cloudStorage.list(S3_DELTA_PREFIX)).thenReturn(Arrays.asList("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download("optout-v2/delta/optout-delta--01_2025-11-13T00.00.00Z_aaaaaaaa.dat")) + .thenReturn(new ByteArrayInputStream(deltaFileBytes)); + + OptOutTrafficCalculator calculator = new OptOutTrafficCalculator( + cloudStorage, S3_DELTA_PREFIX, TRAFFIC_CONFIG_PATH); + + // Act + List sqsMessages = Arrays.asList(createSqsMessage(t)); + OptOutTrafficCalculator.TrafficStatus status = calculator.calculateStatus(sqsMessages); + + // Assert + assertEquals(OptOutTrafficCalculator.TrafficStatus.DEFAULT, status); + + // Cache should contain the timestamps + Map stats = calculator.getCacheStats(); + assertEquals(2, stats.get("total_cached_timestamps")); + } + +} diff --git a/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java b/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java new file mode 100644 index 00000000..63f6807c --- /dev/null +++ b/src/test/java/com/uid2/optout/vertx/OptOutTrafficFilterTest.java @@ -0,0 +1,424 @@ +package com.uid2.optout.vertx; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import software.amazon.awssdk.services.sqs.model.Message; + +import java.nio.file.Files; +import java.nio.file.Path; + +import static org.junit.Assert.*; + +public class OptOutTrafficFilterTest { + + private static final String TEST_CONFIG_PATH = "./traffic-config.json"; + + @Before + public void setUp() { + try { + Files.deleteIfExists(Path.of(TEST_CONFIG_PATH)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @After + public void tearDown() { + try { + Files.deleteIfExists(Path.of(TEST_CONFIG_PATH)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + @Test + public void testParseFilterRules_emptyRules() throws Exception { + // Setup - empty denylist + String config = """ + { + "denylist_requests": [] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - no rules + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + assertEquals(0, filter.filterRules.size()); + } + + @Test + public void testParseFilterRules_singleRule() throws Exception { + // Setup - config with one rule + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - one rule + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + assertEquals(1, filter.filterRules.size()); + } + + @Test + public void testParseFilterRules_multipleRules() throws Exception { + // Setup - config with multiple rules + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1"] + }, + { + "range": [1700010000, 1700013600], + "IPs": ["10.0.0.1", "10.0.0.2"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - two rules + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + assertEquals(2, filter.filterRules.size()); + } + + @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class) + public void testParseFilterRules_missingDenylistRequests() throws Exception { + // Setup - config without denylist_requests field + String config = """ + { + "other_field": "value" + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - throws exception + new OptOutTrafficFilter(TEST_CONFIG_PATH); + } + + @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class) + public void testParseFilterRules_invalidRange_startAfterEnd() throws Exception { + // Setup - range where start > end + String config = """ + { + "denylist_requests": [ + { + "range": [1700003600, 1700000000], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - throws exception + new OptOutTrafficFilter(TEST_CONFIG_PATH); + } + + @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class) + public void testParseFilterRules_invalidRange_startEqualsEnd() throws Exception { + // Setup - range where start == end + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700000000], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - throws exception + new OptOutTrafficFilter(TEST_CONFIG_PATH); + } + + @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class) + public void testParseFilterRules_rangeExceeds24Hours() throws Exception { + // Setup - range longer than 24 hours (86400 seconds) + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700086401], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - throws exception + new OptOutTrafficFilter(TEST_CONFIG_PATH); + } + + @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class) + public void testParseFilterRules_emptyIPs() throws Exception { + // Setup - rule with empty IP list + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": [] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - throws exception + new OptOutTrafficFilter(TEST_CONFIG_PATH); + } + + @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class) + public void testParseFilterRules_missingIPs() throws Exception { + // Setup - rule without IPs field + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - throws exception + new OptOutTrafficFilter(TEST_CONFIG_PATH); + } + + @Test + public void testIsDenylisted_matchingIPAndTimestamp() throws Exception { + // Setup - filter with denylist rule + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1", "10.0.0.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + SqsParsedMessage message = createTestMessage(1700001800, "192.168.1.1"); + + // Act & Assert - denylisted + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + assertTrue(filter.isDenylisted(message)); + } + + @Test + public void testIsDenylisted_matchingIPOutsideTimeRange() throws Exception { + // Setup - filter with denylist rule + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + + // Act & Assert - message before range not denylisted + SqsParsedMessage messageBefore = createTestMessage(1699999999, "192.168.1.1"); + assertFalse(filter.isDenylisted(messageBefore)); + // Act & Assert - message after range not denylisted + SqsParsedMessage messageAfter = createTestMessage(1700003601, "192.168.1.1"); + assertFalse(filter.isDenylisted(messageAfter)); + } + + @Test + public void testIsDenylisted_nonMatchingIP() throws Exception { + // Setup - filter with denylist rule + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + + // Act & Assert - non-matching IP not denylisted + SqsParsedMessage message = createTestMessage(1700001800, "10.0.0.1"); + assertFalse(filter.isDenylisted(message)); + } + + @Test + public void testIsDenylisted_atRangeBoundaries() throws Exception { + // Setup - filter with denylist rule + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + + // Act & Assert - message at start boundary (inclusive) denylisted + SqsParsedMessage messageAtStart = createTestMessage(1700000000, "192.168.1.1"); + assertTrue(filter.isDenylisted(messageAtStart)); + + // Act & Assert - message at end boundary (inclusive) denylisted + SqsParsedMessage messageAtEnd = createTestMessage(1700003600, "192.168.1.1"); + assertTrue(filter.isDenylisted(messageAtEnd)); + } + + @Test + public void testIsDenylisted_multipleRules() throws Exception { + // Setup - multiple denylist rules + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1"] + }, + { + "range": [1700010000, 1700013600], + "IPs": ["10.0.0.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + + // Act & Assert - message matches first rule + SqsParsedMessage msg1 = createTestMessage(1700001800, "192.168.1.1"); + assertTrue(filter.isDenylisted(msg1)); + + // Act & Assert - message matches second rule + SqsParsedMessage msg2 = createTestMessage(1700011800, "10.0.0.1"); + assertTrue(filter.isDenylisted(msg2)); + + // Act & Assert - message matches neither rule + SqsParsedMessage msg3 = createTestMessage(1700005000, "172.16.0.1"); + assertFalse(filter.isDenylisted(msg3)); + } + + @Test + public void testIsDenylisted_nullClientIp() throws Exception { + // Setup - filter with denylist rule + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + + // Act & Assert - message with null IP not denylisted + SqsParsedMessage message = createTestMessage(1700001800, null); + assertFalse(filter.isDenylisted(message)); + } + + @Test + public void testReloadTrafficFilterConfig_success() throws Exception { + // Setup - config with one rule + String initialConfig = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), initialConfig); + + // Act & Assert - one rule + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + assertEquals(1, filter.filterRules.size()); + + // Setup - update config + String updatedConfig = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700003600], + "IPs": ["192.168.1.1"] + }, + { + "range": [1700010000, 1700013600], + "IPs": ["10.0.0.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), updatedConfig); + + // Act & Assert - two rules + filter.reloadTrafficFilterConfig(); + assertEquals(2, filter.filterRules.size()); + } + + @Test(expected = OptOutTrafficFilter.MalformedTrafficFilterConfigException.class) + public void testReloadTrafficFilterConfig_fileNotFound() throws Exception { + // Setup, Act & Assert - try to create filter with non-existent config + new OptOutTrafficFilter("./non-existent-file.json"); + } + + @Test + public void testParseFilterRules_maxValidRange() throws Exception { + // Setup - range exactly 24 hours (86400 seconds) - should be valid + String config = """ + { + "denylist_requests": [ + { + "range": [1700000000, 1700086400], + "IPs": ["192.168.1.1"] + } + ] + } + """; + Files.writeString(Path.of(TEST_CONFIG_PATH), config); + + // Act & Assert - one rule + OptOutTrafficFilter filter = new OptOutTrafficFilter(TEST_CONFIG_PATH); + assertEquals(1, filter.filterRules.size()); + } + + /** + * Helper method to create test SqsParsedMessage + */ + private SqsParsedMessage createTestMessage(long timestamp, String clientIp) { + Message mockMessage = Message.builder().build(); + byte[] hash = new byte[32]; + byte[] id = new byte[32]; + return new SqsParsedMessage(mockMessage, hash, id, timestamp, null, null, clientIp, null); + } +} diff --git a/src/test/java/com/uid2/optout/vertx/SqsBatchProcessorTest.java b/src/test/java/com/uid2/optout/vertx/SqsBatchProcessorTest.java new file mode 100644 index 00000000..6fc1c865 --- /dev/null +++ b/src/test/java/com/uid2/optout/vertx/SqsBatchProcessorTest.java @@ -0,0 +1,171 @@ +package com.uid2.optout.vertx; + +import io.vertx.core.json.JsonObject; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import software.amazon.awssdk.services.sqs.model.Message; +import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; + +import java.util.*; + +import static org.junit.jupiter.api.Assertions.*; + +public class SqsBatchProcessorTest { + + private static final String VALID_HASH_BASE64 = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="; // 32 bytes + private static final String VALID_ID_BASE64 = "AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE="; // 32 bytes + private static final long TEST_TIMESTAMP_MS = 1699308900000L; // Nov 7, 2023 in ms + + private static final int DEFAULT_WINDOW_SECONDS = 300; // 5 minutes + + private SqsBatchProcessor batchProcessor; + + @BeforeEach + public void setUp() { + // Pass null for SqsClient - filterEligibleMessages doesn't use it + batchProcessor = new SqsBatchProcessor(null, "test-queue-url", DEFAULT_WINDOW_SECONDS); + } + + private Message createValidMessage(String identityHash, String advertisingId, long timestampMs) { + JsonObject body = new JsonObject() + .put("identity_hash", identityHash) + .put("advertising_id", advertisingId); + + Map attributes = new HashMap<>(); + attributes.put(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf(timestampMs)); + + return Message.builder() + .body(body.encode()) + .attributes(attributes) + .messageId("test-message-id-" + timestampMs) + .receiptHandle("test-receipt-handle") + .build(); + } + + @Test + public void testFilterEligibleMessages_allEligible() { + List messages = new ArrayList<>(); + Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); + + // Create messages from 10 minutes ago (600 seconds) + long currentTime = 1000L; + long oldTimestamp = currentTime - 600; + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], oldTimestamp, null, null, null, null)); + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], oldTimestamp - 100, null, null, null, null)); + + List result = batchProcessor.filterEligibleMessages(messages, currentTime); + + assertEquals(2, result.size()); // All should be eligible (> 5 minutes old) + } + + @Test + public void testFilterEligibleMessages_noneEligible() { + List messages = new ArrayList<>(); + Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); + + // Create messages from 1 minute ago (too recent) + long currentTime = 1000L; + long recentTimestamp = currentTime - 60; + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], recentTimestamp, null, null, null, null)); + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], recentTimestamp + 10, null, null, null, null)); + + List result = batchProcessor.filterEligibleMessages(messages, currentTime); + + assertEquals(0, result.size()); // None should be eligible (< 5 minutes old) + } + + @Test + public void testFilterEligibleMessages_mixedEligibility() { + List messages = new ArrayList<>(); + Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); + + long currentTime = 1000L; + + // Old enough (600 seconds ago) + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - 600, null, null, null, null)); + + // Exactly at threshold (300 seconds ago) + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - 300, null, null, null, null)); + + // Too recent (100 seconds ago) + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - 100, null, null, null, null)); + + List result = batchProcessor.filterEligibleMessages(messages, currentTime); + + assertEquals(2, result.size()); // First two are eligible (>= 300 seconds old) + } + + @Test + public void testFilterEligibleMessages_emptyList() { + List messages = new ArrayList<>(); + long currentTime = 1000L; + + List result = batchProcessor.filterEligibleMessages(messages, currentTime); + + assertEquals(0, result.size()); + } + + @Test + public void testFilterEligibleMessages_boundaryCases() { + List messages = new ArrayList<>(); + Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); + + long currentTime = 1000L; + int windowSeconds = DEFAULT_WINDOW_SECONDS; // 300 + + // One second too new (299 seconds ago) + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - windowSeconds + 1, null, null, null, null)); + + // Exactly at threshold (300 seconds ago) + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - windowSeconds, null, null, null, null)); + + // One second past threshold (301 seconds ago) + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - windowSeconds - 1, null, null, null, null)); + + List result = batchProcessor.filterEligibleMessages(messages, currentTime); + + // Should only include the last two (>= threshold) + assertEquals(2, result.size()); + assertEquals(currentTime - windowSeconds, result.get(0).getTimestamp()); + assertEquals(currentTime - windowSeconds - 1, result.get(1).getTimestamp()); + } + + @Test + public void testFilterEligibleMessages_preservesOrder() { + List messages = new ArrayList<>(); + Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); + + long currentTime = 1000L; + + // Add eligible messages in specific order (all older than 300 seconds) + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], 100, null, null, null, null)); + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], 200, null, null, null, null)); + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], 300, null, null, null, null)); + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], 900, null, null, null, null)); // Too recent (100 seconds ago) + + List result = batchProcessor.filterEligibleMessages(messages, currentTime); + + assertEquals(3, result.size()); + // Verify order is preserved + assertEquals(100, result.get(0).getTimestamp()); + assertEquals(200, result.get(1).getTimestamp()); + assertEquals(300, result.get(2).getTimestamp()); + } + + @Test + public void testFilterEligibleMessages_zeroWindowSeconds() { + // Create processor with 0 window seconds + SqsBatchProcessor zeroWindowProcessor = new SqsBatchProcessor(null, "test-queue-url", 0); + + List messages = new ArrayList<>(); + Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); + + long currentTime = 1000L; + messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime, null, null, null, null)); + + List result = zeroWindowProcessor.filterEligibleMessages(messages, currentTime); + + assertEquals(1, result.size()); // With 0 window, current time messages should be eligible + } +} + diff --git a/src/test/java/com/uid2/optout/vertx/SqsMessageParserTest.java b/src/test/java/com/uid2/optout/vertx/SqsMessageParserTest.java index 552c1efe..810a7a41 100644 --- a/src/test/java/com/uid2/optout/vertx/SqsMessageParserTest.java +++ b/src/test/java/com/uid2/optout/vertx/SqsMessageParserTest.java @@ -4,7 +4,6 @@ import org.junit.jupiter.api.Test; import software.amazon.awssdk.services.sqs.model.Message; import software.amazon.awssdk.services.sqs.model.MessageSystemAttributeName; - import java.util.*; import static org.junit.jupiter.api.Assertions.*; @@ -177,69 +176,6 @@ public void testParseAndSortMessages_mixValidAndInvalid() { assertEquals(TEST_TIMESTAMP_SEC + 1, result.get(1).getTimestamp()); } - @Test - public void testFilterEligibleMessages_allEligible() { - List messages = new ArrayList<>(); - Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); - - // Create messages from 10 minutes ago - long oldTimestamp = System.currentTimeMillis() / 1000 - 600; - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], oldTimestamp)); - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], oldTimestamp - 100)); - - long currentTime = System.currentTimeMillis() / 1000; - List result = SqsMessageParser.filterEligibleMessages(messages, 300, currentTime); - - assertEquals(2, result.size()); // All should be eligible (> 5 minutes old) - } - - @Test - public void testFilterEligibleMessages_noneEligible() { - List messages = new ArrayList<>(); - Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); - - // Create messages from 1 minute ago (too recent) - long recentTimestamp = System.currentTimeMillis() / 1000 - 60; - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], recentTimestamp)); - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], recentTimestamp + 10)); - - long currentTime = System.currentTimeMillis() / 1000; - List result = SqsMessageParser.filterEligibleMessages(messages, 300, currentTime); - - assertEquals(0, result.size()); // None should be eligible (< 5 minutes old) - } - - @Test - public void testFilterEligibleMessages_mixedEligibility() { - List messages = new ArrayList<>(); - Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); - - long currentTime = 1000L; - - // Old enough (600 seconds ago) - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - 600)); - - // Exactly at threshold (300 seconds ago) - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - 300)); - - // Too recent (100 seconds ago) - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - 100)); - - List result = SqsMessageParser.filterEligibleMessages(messages, 300, currentTime); - - assertEquals(2, result.size()); // First two are eligible (>= 300 seconds old) - } - - @Test - public void testFilterEligibleMessages_emptyList() { - List messages = new ArrayList<>(); - long currentTime = System.currentTimeMillis() / 1000; - - List result = SqsMessageParser.filterEligibleMessages(messages, 300, currentTime); - - assertEquals(0, result.size()); - } - @Test public void testParseAndSortMessages_timestampConversion() { // Verify milliseconds to seconds conversion @@ -282,31 +218,6 @@ public void testParseAndSortMessages_sortingOrder() { } } - @Test - public void testFilterEligibleMessages_boundaryCases() { - List messages = new ArrayList<>(); - Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); - - long currentTime = 1000L; - int windowSeconds = 300; - - // One second too new (299 seconds ago) - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - windowSeconds + 1)); - - // Exactly at threshold (300 seconds ago) - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - windowSeconds)); - - // One second past threshold (301 seconds ago) - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime - windowSeconds - 1)); - - List result = SqsMessageParser.filterEligibleMessages(messages, windowSeconds, currentTime); - - // Should only include the last two (>= threshold) - assertEquals(2, result.size()); - assertEquals(currentTime - windowSeconds, result.get(0).getTimestamp()); - assertEquals(currentTime - windowSeconds - 1, result.get(1).getTimestamp()); - } - @Test public void testParseAndSortMessages_parsesHashAndIdBytes() { Message message = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); @@ -320,28 +231,6 @@ public void testParseAndSortMessages_parsesHashAndIdBytes() { assertEquals(32, result.get(0).getIdBytes().length); } - @Test - public void testFilterEligibleMessages_preservesOrder() { - List messages = new ArrayList<>(); - Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); - - long currentTime = 1000L; - - // Add eligible messages in specific order - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], 100)); - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], 200)); - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], 300)); - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], 900)); // Too recent - - List result = SqsMessageParser.filterEligibleMessages(messages, 300, currentTime); - - assertEquals(3, result.size()); - // Verify order is preserved - assertEquals(100, result.get(0).getTimestamp()); - assertEquals(200, result.get(1).getTimestamp()); - assertEquals(300, result.get(2).getTimestamp()); - } - @Test public void testParseAndSortMessages_nullBodyFields() { JsonObject body = new JsonObject() @@ -379,17 +268,4 @@ public void testParseAndSortMessages_multipleValidMessages() { } } - @Test - public void testFilterEligibleMessages_zeroWindowSeconds() { - List messages = new ArrayList<>(); - Message mockMsg = createValidMessage(VALID_HASH_BASE64, VALID_ID_BASE64, TEST_TIMESTAMP_MS); - - long currentTime = 1000L; - messages.add(new SqsParsedMessage(mockMsg, new byte[32], new byte[32], currentTime)); - - List result = SqsMessageParser.filterEligibleMessages(messages, 0, currentTime); - - assertEquals(1, result.size()); // With 0 window, current time messages should be eligible - } } -