diff --git a/conf/default-config.json b/conf/default-config.json index 2dff0d79..61906576 100644 --- a/conf/default-config.json +++ b/conf/default-config.json @@ -32,5 +32,17 @@ "partners_metadata_path": null, "partners_config_path": "partners/config.json", "operator_type": "public", - "uid_instance_id_prefix": "local-optout" + "uid_instance_id_prefix": "local-optout", + "optout_enqueue_sqs_enabled": false, + "optout_sqs_queue_url": null, + "optout_sqs_s3_folder": "sqs-delta", + "optout_sqs_max_queue_size": 0, + "optout_sqs_max_messages_per_poll": 10, + "optout_sqs_visibility_timeout": 300, + "optout_delta_job_timeout_seconds": 10800, + "optout_s3_bucket_dropped_requests": null, + "optout_max_messages_per_file": 10000, + "traffic_filter_config_path": null, + "traffic_calc_config_path": null, + "manual_override_s3_path": null } \ No newline at end of file diff --git a/pom.xml b/pom.xml index 7e3305b8..29de00c7 100644 --- a/pom.xml +++ b/pom.xml @@ -160,6 +160,11 @@ software.amazon.awssdk sqs + + commons-logging + commons-logging + 1.2 + diff --git a/src/main/java/com/uid2/optout/Const.java b/src/main/java/com/uid2/optout/Const.java index a4182507..a415fff1 100644 --- a/src/main/java/com/uid2/optout/Const.java +++ b/src/main/java/com/uid2/optout/Const.java @@ -32,6 +32,8 @@ public static class Config extends com.uid2.shared.Const.Config { public static final String OptOutTrafficCalcThresholdMultiplierProp = "traffic_calc_threshold_multiplier"; public static final String OptOutTrafficCalcEvaluationWindowSecondsProp = "traffic_calc_evaluation_window_seconds"; public static final String OptOutTrafficCalcAllowlistRangesProp = "traffic_calc_allowlist_ranges"; + public static final String OptOutSqsDeltaWindowSecondsProp = "optout_sqs_delta_window_seconds"; + public static final String OptOutSqsMaxQueueSizeProp = "optout_sqs_max_queue_size"; } public static class Event { diff --git a/src/main/java/com/uid2/optout/Main.java b/src/main/java/com/uid2/optout/Main.java index dbccd32e..848ab2e2 100644 --- a/src/main/java/com/uid2/optout/Main.java +++ b/src/main/java/com/uid2/optout/Main.java @@ -1,6 +1,8 @@ package com.uid2.optout; import com.uid2.optout.vertx.*; +import com.uid2.optout.traffic.TrafficFilter.MalformedTrafficFilterConfigException; +import com.uid2.optout.traffic.TrafficCalculator.MalformedTrafficCalcConfigException; import com.uid2.shared.ApplicationVersion; import com.uid2.shared.Utils; import com.uid2.shared.attest.AttestationResponseHandler; @@ -27,7 +29,6 @@ import io.vertx.config.ConfigRetriever; import io.vertx.core.*; import io.vertx.core.http.HttpServerOptions; -import io.vertx.core.http.impl.HttpUtils; import io.vertx.core.json.JsonObject; import io.vertx.micrometer.MetricsDomain; import org.slf4j.Logger; @@ -271,39 +272,42 @@ public void run(String[] args) throws IOException { futs.add((this.uploadLastDelta(cs, logProducer, cloudSyncVerticle.eventUpload(), cloudSyncVerticle.eventRefresh()))); } - // Deploy SQS producer if enabled + // deploy sqs producer if enabled if (this.enqueueSqsEnabled) { - LOGGER.info("SQS enabled, deploying OptOutSqsLogProducer"); + LOGGER.info("sqs enabled, deploying OptOutSqsLogProducer"); try { - // Create SQS-specific cloud sync with custom folder (default: "sqs-delta") + // sqs delta production uses a separate s3 folder (default: "sqs-delta") + // OptOutCloudSync reads from optout_s3_folder, so we override it with optout_sqs_s3_folder String sqsFolder = this.config.getString(Const.Config.OptOutSqsS3FolderProp, "sqs-delta"); - LOGGER.info("SQS Config - optout_sqs_s3_folder: {}, will override optout_s3_folder to: {}", - sqsFolder, sqsFolder); - JsonObject sqsConfig = new JsonObject().mergeIn(this.config) + JsonObject sqsCloudSyncConfig = new JsonObject().mergeIn(this.config) .put(Const.Config.OptOutS3FolderProp, sqsFolder); - LOGGER.info("SQS Config after merge - optout_s3_folder: {}", sqsConfig.getString(Const.Config.OptOutS3FolderProp)); - OptOutCloudSync sqsCs = new OptOutCloudSync(sqsConfig, true); + OptOutCloudSync sqsCs = new OptOutCloudSync(sqsCloudSyncConfig, true); - // Create SQS-specific cloud storage instance (same bucket, different folder handling) + // create cloud storage instances ICloudStorage fsSqs; boolean useStorageMock = this.config.getBoolean(Const.Config.StorageMockProp, false); if (useStorageMock) { - // Reuse the same LocalStorageMock for testing fsSqs = this.fsOptOut; } else { - // Create fresh CloudStorage for SQS (no path conversion wrapper) String optoutBucket = this.config.getString(Const.Config.OptOutS3BucketProp); - fsSqs = CloudUtils.createStorage(optoutBucket, sqsConfig); + fsSqs = CloudUtils.createStorage(optoutBucket, this.config); } - // Deploy SQS log producer with its own storage instance - OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, sqsCs); + String optoutBucketDroppedRequests = this.config.getString(Const.Config.OptOutS3BucketDroppedRequestsProp); + ICloudStorage fsSqsDroppedRequests = CloudUtils.createStorage(optoutBucketDroppedRequests, this.config); + + // deploy sqs log producer + OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, fsSqs, fsSqsDroppedRequests, sqsCs, Const.Event.DeltaProduce, null); futs.add(this.deploySingleInstance(sqsLogProducer)); - LOGGER.info("SQS log producer deployed - bucket: {}, folder: {}", + LOGGER.info("sqs log producer deployed, bucket={}, folder={}", this.config.getString(Const.Config.OptOutS3BucketProp), sqsFolder); } catch (IOException e) { - LOGGER.error("Failed to initialize SQS log producer: " + e.getMessage(), e); + LOGGER.error("circuit_breaker_config_error: failed to initialize sqs log producer, delta production will be disabled: {}", e.getMessage(), e); + } catch (MalformedTrafficFilterConfigException e) { + LOGGER.error("circuit_breaker_config_error: traffic filter config is malformed, delta production will be disabled: {}", e.getMessage(), e); + } catch (MalformedTrafficCalcConfigException e) { + LOGGER.error("circuit_breaker_config_error: traffic calc config is malformed, delta production will be disabled: {}", e.getMessage(), e); } } diff --git a/src/main/java/com/uid2/optout/delta/DeltaFileWriter.java b/src/main/java/com/uid2/optout/delta/DeltaFileWriter.java index b9d54bcc..008447e7 100644 --- a/src/main/java/com/uid2/optout/delta/DeltaFileWriter.java +++ b/src/main/java/com/uid2/optout/delta/DeltaFileWriter.java @@ -105,7 +105,7 @@ private void flushToStream(ByteArrayOutputStream stream) throws IOException { private void ensureCapacity(int dataSize) { if (buffer.capacity() < dataSize) { int newCapacity = Integer.highestOneBit(dataSize) << 1; - LOGGER.info("expanding buffer size: current {}, need {}, new {}", buffer.capacity(), dataSize, newCapacity); + LOGGER.info("expanding buffer: currentSize={}, neededSize={}, newSize={}", buffer.capacity(), dataSize, newCapacity); this.buffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN); } } diff --git a/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java b/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java index 33fdf7e0..baa5f731 100644 --- a/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java +++ b/src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java @@ -86,10 +86,10 @@ public List getMessages() { * @return BatchProcessingResult containing eligible messages and processing metadata */ public BatchProcessingResult processBatch(List messageBatch, int batchNumber) throws IOException { - // Parse and sort messages by timestamp + // parse and sort messages by timestamp List parsedBatch = SqsMessageParser.parseAndSortMessages(messageBatch); - // Identify and delete corrupt messages + // identify and delete corrupt messages if (parsedBatch.size() < messageBatch.size()) { List invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch); if (!invalidMessages.isEmpty()) { @@ -98,13 +98,13 @@ public BatchProcessingResult processBatch(List messageBatch, int batchN } } - // No valid messages after deleting corrupt ones, continue reading + // no valid messages after deleting corrupt ones, continue reading if (parsedBatch.isEmpty()) { LOGGER.info("no valid messages in batch {} after removing invalid messages", batchNumber); return BatchProcessingResult.corruptMessagesDeleted(); } - // Check if the oldest message in this batch is too recent + // check if the oldest message in this batch is too recent long currentTime = OptOutUtils.nowEpochSeconds(); SqsParsedMessage oldestMessage = parsedBatch.get(0); @@ -112,7 +112,7 @@ public BatchProcessingResult processBatch(List messageBatch, int batchN return BatchProcessingResult.messagesTooRecent(); } - // Filter for eligible messages (>= deltaWindowSeconds old) + // filter for eligible messages (>= deltaWindowSeconds old) List eligibleMessages = filterEligibleMessages(parsedBatch, currentTime); return BatchProcessingResult.withMessages(eligibleMessages); diff --git a/src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java b/src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java index afd651cc..aeedbbb6 100644 --- a/src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java +++ b/src/main/java/com/uid2/optout/sqs/SqsMessageOperations.java @@ -87,7 +87,7 @@ public static QueueAttributes getQueueAttributes(SqsClient sqsClient, String que int delayed = parseIntOrDefault(attrs.get(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED), 0); QueueAttributes queueAttributes = new QueueAttributes(visible, invisible, delayed); - LOGGER.info("queue attributes: {}", queueAttributes); + LOGGER.info("sqs_queue_attributes={}", queueAttributes); return queueAttributes; } catch (Exception e) { diff --git a/src/main/java/com/uid2/optout/sqs/SqsMessageParser.java b/src/main/java/com/uid2/optout/sqs/SqsMessageParser.java index 10841dd8..0cf6a331 100644 --- a/src/main/java/com/uid2/optout/sqs/SqsMessageParser.java +++ b/src/main/java/com/uid2/optout/sqs/SqsMessageParser.java @@ -32,7 +32,7 @@ public static List parseAndSortMessages(List messages try { // parse message body JsonObject body = new JsonObject(message.body()); - traceId = body.getString("trace_id"); + traceId = body.getString("uid_trace_id"); String identityHash = body.getString("identity_hash"); String advertisingId = body.getString("advertising_id"); diff --git a/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java b/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java index c8733e79..4cce7e58 100644 --- a/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java +++ b/src/main/java/com/uid2/optout/sqs/SqsWindowReader.java @@ -50,7 +50,7 @@ public static class WindowReadResult { private final List messages; private final long windowStart; private final StopReason stopReason; - private final int rawMessagesRead; // total messages pulled from SQS + private final int rawMessagesRead; // total messages pulled from sqs private WindowReadResult(List messages, long windowStart, StopReason stopReason, int rawMessagesRead) { this.messages = messages; @@ -97,7 +97,7 @@ public WindowReadResult readWindow() throws IOException { List windowMessages = new ArrayList<>(); long currentWindowStart = 0; int batchNumber = 0; - int rawMessagesRead = 0; // track total messages pulled from SQS + int rawMessagesRead = 0; // track total messages pulled from sqs while (true) { if (windowMessages.size() >= maxMessagesPerWindow) { @@ -105,7 +105,7 @@ public WindowReadResult readWindow() throws IOException { return WindowReadResult.messageLimitExceeded(windowMessages, currentWindowStart, rawMessagesRead); } - // Read one batch from SQS (up to 10 messages) + // read one batch from sqs (up to 10 messages) List rawBatch = SqsMessageOperations.receiveMessagesFromSqs( this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, this.visibilityTimeout); @@ -122,21 +122,21 @@ public WindowReadResult readWindow() throws IOException { if (batchResult.getStopReason() == StopReason.MESSAGES_TOO_RECENT) { return WindowReadResult.messagesTooRecent(windowMessages, currentWindowStart, rawMessagesRead); } - // Corrupt messages were deleted, continue reading + // corrupt messages were deleted, continue reading continue; } - // Add eligible messages to current window + // add eligible messages to current window boolean newWindow = false; for (SqsParsedMessage msg : batchResult.getMessages()) { long msgWindowStart = msg.timestamp(); - // Discover start of window + // discover start of window if (currentWindowStart == 0) { currentWindowStart = msgWindowStart; } - // Discover next window + // discover next window if (msgWindowStart > currentWindowStart + this.deltaWindowSeconds) { newWindow = true; } diff --git a/src/main/java/com/uid2/optout/traffic/TrafficCalculator.java b/src/main/java/com/uid2/optout/traffic/TrafficCalculator.java index 7e090b87..49f657f0 100644 --- a/src/main/java/com/uid2/optout/traffic/TrafficCalculator.java +++ b/src/main/java/com/uid2/optout/traffic/TrafficCalculator.java @@ -322,8 +322,8 @@ public TrafficStatus calculateStatus(List sqsMessages, QueueAt } /** - * find the newest timestamp from delta files. - * reads the newest delta file and returns its maximum timestamp. + * Find the newest timestamp from delta files. + * Reads the newest delta file and returns its maximum timestamp. */ private long findNewestDeltaTimestamp(List deltaS3Paths) throws IOException { if (deltaS3Paths == null || deltaS3Paths.isEmpty()) { diff --git a/src/main/java/com/uid2/optout/util/HttpResponseHelper.java b/src/main/java/com/uid2/optout/util/HttpResponseHelper.java new file mode 100644 index 00000000..6d82f568 --- /dev/null +++ b/src/main/java/com/uid2/optout/util/HttpResponseHelper.java @@ -0,0 +1,70 @@ +package com.uid2.optout.util; + +import io.vertx.core.http.HttpHeaders; +import io.vertx.core.http.HttpServerResponse; +import io.vertx.core.json.JsonObject; + +/** + * Utility class for HTTP JSON response handling. + * Ensures consistent response format across handlers. + */ +public class HttpResponseHelper { + + /** + * Send a JSON response with the specified status code. + */ + public static void sendJson(HttpServerResponse resp, int statusCode, JsonObject body) { + resp.setStatusCode(statusCode) + .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + .end(body.encode()); + } + + /** + * Send a 200 OK response with JSON body. + */ + public static void sendSuccess(HttpServerResponse resp, JsonObject body) { + sendJson(resp, 200, body); + } + + /** + * Send a 200 OK response with status and message. + */ + public static void sendSuccess(HttpServerResponse resp, String status, String message) { + sendJson(resp, 200, new JsonObject().put("status", status).put("message", message)); + } + + /** + * Send a 200 OK response with idle status and message. + */ + public static void sendIdle(HttpServerResponse resp, String message) { + sendJson(resp, 200, new JsonObject().put("status", "idle").put("message", message)); + } + /** + * Send a 202 Accepted response indicating async job started. + */ + public static void sendAccepted(HttpServerResponse resp, String message) { + sendJson(resp, 202, new JsonObject().put("status", "accepted").put("message", message)); + } + + /** + * Send a 409 Conflict response. + */ + public static void sendConflict(HttpServerResponse resp, String reason) { + sendJson(resp, 409, new JsonObject().put("status", "conflict").put("reason", reason)); + } + + /** + * Send a 500 Internal Server Error response. + */ + public static void sendError(HttpServerResponse resp, String error) { + sendJson(resp, 500, new JsonObject().put("status", "failed").put("error", error)); + } + + /** + * Send a 500 Internal Server Error response from an exception. + */ + public static void sendError(HttpServerResponse resp, Exception e) { + sendError(resp, e.getMessage()); + } +} + diff --git a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java index 75073239..d0492893 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java @@ -2,6 +2,7 @@ import com.uid2.optout.Const; import com.uid2.optout.auth.InternalAuthMiddleware; +import com.uid2.optout.sqs.SqsMessageOperations; import com.uid2.optout.web.QuorumWebClient; import com.uid2.shared.Utils; import com.uid2.shared.attest.AttestationTokenService; @@ -71,6 +72,8 @@ public class OptOutServiceVerticle extends AbstractVerticle { private final SqsClient sqsClient; private final String sqsQueueUrl; private final boolean sqsEnabled; + private final int sqsMaxQueueSize; + private final String podName; public OptOutServiceVerticle(Vertx vertx, IAuthorizableProvider clientKeyProvider, @@ -119,6 +122,8 @@ public OptOutServiceVerticle(Vertx vertx, this.sqsEnabled = jsonConfig.getBoolean(Const.Config.OptOutSqsEnabledProp, false); this.sqsQueueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp); + this.sqsMaxQueueSize = jsonConfig.getInteger(Const.Config.OptOutSqsMaxQueueSizeProp, 0); // 0 = no limit + this.podName = jsonConfig.getString("POD_NAME"); SqsClient tempSqsClient = null; if (this.sqsEnabled) { @@ -293,6 +298,7 @@ private void handleReplicate(RoutingContext routingContext) { } HttpServerRequest req = routingContext.request(); + MultiMap params = req.params(); String identityHash = req.getParam(IDENTITY_HASH); String advertisingId = req.getParam(ADVERTISING_ID); @@ -369,7 +375,7 @@ private void handleQueue(RoutingContext routingContext) { // while old delta production is enabled, response is handled by replicate logic - // Validate parameters - same as replicate + // validate parameters - same as replicate if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) { // this.sendBadRequestError(resp); return; @@ -388,42 +394,36 @@ private void handleQueue(RoutingContext routingContext) { JsonObject messageBody = new JsonObject() .put(IDENTITY_HASH, identityHash) .put(ADVERTISING_ID, advertisingId) - .put("trace_id", traceId) + .put("uid_trace_id", traceId) .put("client_ip", clientIp) .put("email", email) .put("phone", phone); - // Send message to SQS queue - vertx.executeBlocking(promise -> { - try { - SendMessageRequest sendMsgRequest = SendMessageRequest.builder() - .queueUrl(this.sqsQueueUrl) - .messageBody(messageBody.encode()) - .build(); - - SendMessageResponse response = this.sqsClient.sendMessage(sendMsgRequest); - promise.complete(response.messageId()); - } catch (Exception e) { - promise.fail(e); - } - }, res -> { - if (res.failed()) { - // this.sendInternalServerError(resp, "Failed to queue message: " + res.cause().getMessage()); - LOGGER.error("Failed to queue message: " + res.cause().getMessage()); - } else { - String messageId = (String) res.result(); - - JsonObject responseJson = new JsonObject() - .put("status", "queued"); - - LOGGER.info("Message queued successfully: " + messageId); - - // resp.setStatusCode(200) - // .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - // .setChunked(true) - // .write(responseJson.encode()); - // resp.end(); + // send message to sqs queue + vertx.executeBlocking(() -> { + // check queue size limit before sending + if (this.sqsMaxQueueSize > 0) { + SqsMessageOperations.QueueAttributes queueAttrs = + SqsMessageOperations.getQueueAttributes(this.sqsClient, this.sqsQueueUrl); + if (queueAttrs != null) { + int currentSize = queueAttrs.getTotalMessages(); + if (currentSize >= this.sqsMaxQueueSize) { + LOGGER.warn("sqs_queue_full: rejecting message, currentSize={}, maxSize={}", + currentSize, this.sqsMaxQueueSize); + throw new IllegalStateException("queue size limit exceeded"); + } + } } + + SendMessageRequest sendMsgRequest = SendMessageRequest.builder() + .queueUrl(this.sqsQueueUrl) + .messageBody(messageBody.encode()) + .build(); + + this.sqsClient.sendMessage(sendMsgRequest); + return null; + }).onFailure(cause -> { + LOGGER.error("failed to queue message, cause={}", cause.getMessage()); }); } catch (Exception ex) { // this.sendInternalServerError(resp, ex.getMessage()); diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java index da9b68ef..26ad9d02 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java @@ -2,34 +2,39 @@ import com.uid2.optout.Const; import com.uid2.optout.auth.InternalAuthMiddleware; -import com.uid2.optout.sqs.SqsWindowReader; -import com.uid2.optout.sqs.SqsParsedMessage; -import com.uid2.optout.sqs.SqsMessageOperations; +import com.uid2.optout.delta.DeltaFileWriter; +import com.uid2.optout.delta.ManualOverrideService; +import com.uid2.optout.delta.DeltaProductionJobStatus; +import com.uid2.optout.delta.DeltaProductionMetrics; +import com.uid2.optout.delta.DeltaProductionOrchestrator; +import com.uid2.optout.delta.DeltaProductionResult; +import com.uid2.optout.delta.S3UploadService; import com.uid2.optout.delta.StopReason; +import com.uid2.optout.sqs.SqsWindowReader; +import com.uid2.optout.traffic.TrafficCalculator; +import com.uid2.optout.traffic.TrafficCalculator.MalformedTrafficCalcConfigException; +import com.uid2.optout.traffic.TrafficFilter; +import com.uid2.optout.traffic.TrafficFilter.MalformedTrafficFilterConfigException; import com.uid2.shared.Utils; import com.uid2.shared.cloud.ICloudStorage; import com.uid2.shared.health.HealthComponent; import com.uid2.shared.health.HealthManager; -import com.uid2.shared.optout.*; -import io.micrometer.core.instrument.Counter; -import io.micrometer.core.instrument.Metrics; +import com.uid2.shared.optout.OptOutCloudSync; +import com.uid2.shared.optout.OptOutUtils; + import io.vertx.core.*; import io.vertx.core.http.*; import io.vertx.core.json.JsonObject; import io.vertx.ext.web.Router; import io.vertx.ext.web.RoutingContext; + import org.slf4j.Logger; import org.slf4j.LoggerFactory; import software.amazon.awssdk.services.sqs.SqsClient; -import software.amazon.awssdk.services.sqs.model.*; -import java.io.ByteArrayInputStream; -import java.io.ByteArrayOutputStream; +import static com.uid2.optout.util.HttpResponseHelper.*; + import java.io.IOException; -import java.nio.ByteBuffer; -import java.nio.ByteOrder; -import java.util.ArrayList; -import java.util.List; import java.util.concurrent.atomic.AtomicReference; /** @@ -69,6 +74,8 @@ *

Note: SQS visibility timeout provides natural coordination across pods, * limiting duplicate message processing even if multiple pods run jobs concurrently.

* + *

Note: The delta construction and circuit breaking is delegated to DeltaProductionOrchestrator.

+ * *

API Endpoints

*
    *
  • POST /optout/deltaproduce - Start async job, auto-clears completed/failed jobs (returns 202 Accepted or 409 Conflict if running)
  • @@ -80,94 +87,88 @@ public class OptOutSqsLogProducer extends AbstractVerticle { private final HealthComponent healthComponent = HealthManager.instance.registerComponent("sqs-log-producer"); private final SqsClient sqsClient; - private final String queueUrl; private final String eventDeltaProduced; - private final int replicaId; - private final ICloudStorage cloudStorage; - private final OptOutCloudSync cloudSync; - private final int maxMessagesPerPoll; - private final int visibilityTimeout; - private final int deltaWindowSeconds; // Time window for each delta file (5 minutes = 300 seconds) - private final int jobTimeoutSeconds; - private final int maxMessagesPerFile; // Memory protection: max messages per delta file private final int listenPort; - private final String internalApiKey; private final InternalAuthMiddleware internalAuth; - - private Counter counterDeltaProduced = Counter - .builder("uid2_optout_sqs_delta_produced_total") - .description("counter for how many optout delta files are produced from SQS") - .register(Metrics.globalRegistry); - - private Counter counterEntriesProcessed = Counter - .builder("uid2_optout_sqs_entries_processed_total") - .description("counter for how many optout entries are processed from SQS") - .register(Metrics.globalRegistry); - - private ByteBuffer buffer; - private boolean shutdownInProgress = false; + private final TrafficFilter trafficFilter; + private final TrafficCalculator trafficCalculator; + private final DeltaProductionOrchestrator orchestrator; - //Tracks the current delta production job status for this pod. - private final AtomicReference currentJob = new AtomicReference<>(null); + /* + * Tracks the current delta production job status for this pod. + * Used to prevent concurrent jobs from running. + */ + private final AtomicReference currentJob = new AtomicReference<>(null); - // Helper for reading complete 5-minute windows from SQS - private final SqsWindowReader windowReader; - - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync) throws IOException { - this(jsonConfig, cloudStorage, cloudSync, Const.Event.DeltaProduce); - } - - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced) throws IOException { - this(jsonConfig, cloudStorage, cloudSync, eventDeltaProduced, null); - } + private volatile boolean shutdownInProgress = false; - // Constructor for testing - allows injecting mock SqsClient - public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced, SqsClient sqsClient) throws IOException { + public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, ICloudStorage cloudStorageDroppedRequests, OptOutCloudSync cloudSync, String eventDeltaProduced, SqsClient sqsClient) throws IOException, MalformedTrafficCalcConfigException, MalformedTrafficFilterConfigException { this.eventDeltaProduced = eventDeltaProduced; - this.replicaId = OptOutUtils.getReplicaId(jsonConfig); - this.cloudStorage = cloudStorage; - this.cloudSync = cloudSync; - - // Initialize SQS client - this.queueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp); - if (this.queueUrl == null || this.queueUrl.isEmpty()) { - throw new IOException("SQS queue URL not configured"); + + // initialize sqs client + String queueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp); + if (queueUrl == null || queueUrl.isEmpty()) { + throw new IOException("sqs queue url not configured"); } - - // Use injected client for testing, or create new one this.sqsClient = sqsClient != null ? sqsClient : SqsClient.builder().build(); - LOGGER.info("SQS client initialized for queue: " + this.queueUrl); - - // SQS Configuration - this.maxMessagesPerPoll = 10; // SQS max is 10 - this.visibilityTimeout = jsonConfig.getInteger(Const.Config.OptOutSqsVisibilityTimeoutProp, 240); // 4 minutes default - this.deltaWindowSeconds = 300; // Fixed 5 minutes for all deltas - this.jobTimeoutSeconds = jsonConfig.getInteger(Const.Config.OptOutDeltaJobTimeoutSecondsProp, 10800); // 3 hours default - this.maxMessagesPerFile = jsonConfig.getInteger(Const.Config.OptOutMaxMessagesPerFileProp, 10000); // Memory protection limit + LOGGER.info("sqs client initialized for queue: {}", queueUrl); - // HTTP server configuration - use port offset + 1 to avoid conflicts + // http server configuration this.listenPort = Const.Port.ServicePortForOptOut + Utils.getPortOffset() + 1; - // Authentication - this.internalApiKey = jsonConfig.getString(Const.Config.OptOutInternalApiTokenProp); - this.internalAuth = new InternalAuthMiddleware(this.internalApiKey, "optout-sqs"); - + // authentication + String internalApiKey = jsonConfig.getString(Const.Config.OptOutInternalApiTokenProp); + this.internalAuth = new InternalAuthMiddleware(internalApiKey, "optout-sqs"); + + // circuit breaker tools + this.trafficFilter = new TrafficFilter(jsonConfig.getString(Const.Config.TrafficFilterConfigPathProp)); + this.trafficCalculator = new TrafficCalculator(cloudStorage, jsonConfig.getString(Const.Config.OptOutSqsS3FolderProp), jsonConfig.getString(Const.Config.TrafficCalcConfigPathProp)); + + // configuration values for orchestrator setup + int replicaId = OptOutUtils.getReplicaId(jsonConfig); + int maxMessagesPerPoll = 10; // sqs max is 10 + int deltaWindowSeconds = jsonConfig.getInteger(Const.Config.OptOutSqsDeltaWindowSecondsProp, 300); // fixed 5 minutes, allow config for testing + int visibilityTimeout = jsonConfig.getInteger(Const.Config.OptOutSqsVisibilityTimeoutProp, 240); + int jobTimeoutSeconds = jsonConfig.getInteger(Const.Config.OptOutDeltaJobTimeoutSecondsProp, 10800); + int maxMessagesPerFile = jsonConfig.getInteger(Const.Config.OptOutMaxMessagesPerFileProp, 10000); int bufferSize = jsonConfig.getInteger(Const.Config.OptOutProducerBufferSizeProp); - this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN); - - // Initialize window reader with memory protection limit - this.windowReader = new SqsWindowReader( - this.sqsClient, this.queueUrl, this.maxMessagesPerPoll, - this.visibilityTimeout, this.deltaWindowSeconds, this.maxMessagesPerFile, - null, null, // will be done in Orchestrator after refactoring - this.replicaId + + // orchestrator setup + DeltaFileWriter deltaFileWriter = new DeltaFileWriter(bufferSize); + S3UploadService deltaUploadService = new S3UploadService(cloudStorage, this.sqsClient, queueUrl); + S3UploadService droppedRequestUploadService = new S3UploadService(cloudStorageDroppedRequests, this.sqsClient, queueUrl) ; + S3UploadService malformedRequestUploadService = new S3UploadService(cloudStorageDroppedRequests, this.sqsClient, queueUrl); + ManualOverrideService manualOverrideService = new ManualOverrideService(cloudStorage, jsonConfig.getString(Const.Config.ManualOverrideS3PathProp)); + SqsWindowReader windowReader = new SqsWindowReader( + this.sqsClient, queueUrl, maxMessagesPerPoll, + visibilityTimeout, deltaWindowSeconds, maxMessagesPerFile, + malformedRequestUploadService, "malformed", replicaId + ); + + this.orchestrator = new DeltaProductionOrchestrator( + this.sqsClient, + queueUrl, + replicaId, + deltaWindowSeconds, + jobTimeoutSeconds, + windowReader, + deltaFileWriter, + deltaUploadService, + droppedRequestUploadService, + manualOverrideService, + this.trafficFilter, + this.trafficCalculator, + cloudSync, + new DeltaProductionMetrics() ); - LOGGER.info("OptOutSqsLogProducer initialized with maxMessagesPerFile: {}", this.maxMessagesPerFile); + + LOGGER.info("initialized with maxMessagesPerFile={}, maxMessagesPerPoll={}, visibilityTimeout={}, deltaWindowSeconds={}, jobTimeoutSeconds={}", + maxMessagesPerFile, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds, jobTimeoutSeconds); } @Override public void start(Promise startPromise) { - LOGGER.info("Starting SQS Log Producer with HTTP endpoint..."); + LOGGER.info("starting http server on port {}", listenPort); try { vertx.createHttpServer() @@ -175,18 +176,17 @@ public void start(Promise startPromise) { .listen(listenPort, result -> { if (result.succeeded()) { this.healthComponent.setHealthStatus(true); - LOGGER.info("SQS Log Producer HTTP server started on port: {} (delta window: {}s)", - listenPort, this.deltaWindowSeconds); + LOGGER.info("http server started on port {}", listenPort); startPromise.complete(); } else { - LOGGER.error("Failed to start SQS Log Producer HTTP server", result.cause()); + LOGGER.error("failed to start http server", result.cause()); this.healthComponent.setHealthStatus(false, result.cause().getMessage()); startPromise.fail(result.cause()); } }); } catch (Exception e) { - LOGGER.error("Failed to start SQS Log Producer", e); + LOGGER.error("failed to start http server", e); this.healthComponent.setHealthStatus(false, e.getMessage()); startPromise.fail(e); } @@ -194,20 +194,19 @@ public void start(Promise startPromise) { @Override public void stop(Promise stopPromise) { - LOGGER.info("Stopping SQS Log Producer..."); + LOGGER.info("stopping"); this.shutdownInProgress = true; if (this.sqsClient != null) { try { this.sqsClient.close(); - LOGGER.info("SQS client closed"); } catch (Exception e) { - LOGGER.error("Error closing SQS client", e); + LOGGER.error("error closing sqs client", e); } } stopPromise.complete(); - LOGGER.info("SQS Log Producer stopped"); + LOGGER.info("stopped"); } private Router createRouter() { @@ -232,21 +231,14 @@ private Router createRouter() { private void handleDeltaProduceStatus(RoutingContext routingContext) { HttpServerResponse resp = routingContext.response(); - DeltaProduceJobStatus job = currentJob.get(); + DeltaProductionJobStatus job = currentJob.get(); if (job == null) { - resp.setStatusCode(200) - .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("state", "idle") - .put("message", "No job running on this pod") - .encode()); + sendIdle(resp, "no job running on this pod"); return; } - resp.setStatusCode(200) - .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(job.toJson().encode()); + sendSuccess(resp, job.toJson()); } /** @@ -263,72 +255,63 @@ private void handleDeltaProduceStatus(RoutingContext routingContext) { private void handleDeltaProduceStart(RoutingContext routingContext) { HttpServerResponse resp = routingContext.response(); - LOGGER.info("Delta production job requested via /deltaproduce endpoint"); + LOGGER.info("delta production job requested"); + try { + this.trafficFilter.reloadTrafficFilterConfig(); + } catch (MalformedTrafficFilterConfigException e) { + LOGGER.error("circuit_breaker_config_error: failed to reload traffic filter config: {}", e.getMessage(), e); + sendError(resp, e); + return; + } + + try { + this.trafficCalculator.reloadTrafficCalcConfig(); + } catch (MalformedTrafficCalcConfigException e) { + LOGGER.error("circuit_breaker_config_error: failed to reload traffic calc config: {}", e.getMessage(), e); + sendError(resp, e); + return; + } + + DeltaProductionJobStatus existingJob = currentJob.get(); - DeltaProduceJobStatus existingJob = currentJob.get(); - - // If there's an existing job, check if it's still running + // if there's an existing job, check if it's still running if (existingJob != null) { - if (existingJob.getState() == DeltaProduceJobStatus.JobState.RUNNING) { - // Cannot replace a running job - 409 Conflict - LOGGER.warn("Delta production job already running on this pod"); - resp.setStatusCode(409) - .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "conflict") - .put("message", "A delta production job is already running on this pod") - .put("current_job", existingJob.toJson()) - .encode()); + if (existingJob.getState() == DeltaProductionJobStatus.JobState.RUNNING) { + LOGGER.info("job already running, returning conflict"); + sendConflict(resp, "job already running on this pod"); return; } - LOGGER.info("Auto-clearing previous {} job to start new one", existingJob.getState()); + LOGGER.info("clearing previous {} job", existingJob.getState()); } - DeltaProduceJobStatus newJob = new DeltaProduceJobStatus(); + DeltaProductionJobStatus newJob = new DeltaProductionJobStatus(); - // Try to set the new job + // try to set the new job if (!currentJob.compareAndSet(existingJob, newJob)) { - resp.setStatusCode(409) - .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "conflict") - .put("message", "Job state changed, please retry") - .encode()); + sendConflict(resp, "job state changed, please retry"); return; } - // Start the job asynchronously - LOGGER.info("Starting delta production job"); + LOGGER.info("starting new job"); this.startDeltaProductionJob(newJob); - // Return immediately with 202 Accepted - resp.setStatusCode(202) - .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") - .end(new JsonObject() - .put("status", "accepted") - .put("message", "Delta production job started on this pod") - .encode()); + // return immediately with 202 Accepted + sendAccepted(resp, "job started"); } /** * Starts the delta production job asynchronously * The job runs on a worker thread and updates the DeltaProduceJobStatus when complete */ - private void startDeltaProductionJob(DeltaProduceJobStatus job) { - vertx.executeBlocking(() -> { - LOGGER.info("Executing delta production job"); - return produceDeltasBlocking(); - }).onComplete(ar -> { + private void startDeltaProductionJob(DeltaProductionJobStatus job) { + vertx.executeBlocking(() -> produceDeltasBlocking()).onComplete(ar -> { if (ar.succeeded()) { - JsonObject result = ar.result(); - job.complete(result); - LOGGER.info("Delta production job succeeded: {}", result.encode()); + job.complete(ar.result()); } else { - String errorMsg = ar.cause().getMessage(); - job.fail(errorMsg); - LOGGER.error("Delta production job failed: {}", errorMsg, ar.cause()); + job.fail(ar.cause().getMessage()); + LOGGER.error("delta_job_failed: {}", ar.cause().getMessage(), ar.cause()); } }); } @@ -343,221 +326,25 @@ private void startDeltaProductionJob(DeltaProduceJobStatus job) { */ private JsonObject produceDeltasBlocking() throws Exception { if (this.shutdownInProgress) { - throw new Exception("Producer is shutting down"); - } - - JsonObject result = new JsonObject(); - LOGGER.info("Starting delta production from SQS queue"); - - // Process messages until queue is empty or messages are too recent - DeltaProductionResult deltaResult = this.produceBatchedDeltas(); - - // Determine status based on results - if (deltaResult.getDeltasProduced() == 0 && deltaResult.stoppedDueToMessagesTooRecent()) { - // No deltas produced because all messages were too recent - result.put("status", "skipped"); - result.put("reason", "All messages too recent"); - LOGGER.info("Delta production skipped: all messages too recent"); - } else { - result.put("status", "success"); - LOGGER.info("Delta production complete: {} deltas, {} entries", - deltaResult.getDeltasProduced(), deltaResult.getEntriesProcessed()); - } - - result.put("deltas_produced", deltaResult.getDeltasProduced()); - result.put("entries_processed", deltaResult.getEntriesProcessed()); - - return result; - } - - - /** - * Reads messages from SQS and produces delta files in 5 minute batches. - * Continues until queue is empty or messages are too recent. - * Windows are limited to maxMessagesPerFile for memory protection. - * - * @return DeltaProductionResult with counts and stop reason - * @throws IOException if delta production fails - */ - private DeltaProductionResult produceBatchedDeltas() throws IOException { - int deltasProduced = 0; - int totalEntriesProcessed = 0; - boolean stoppedDueToMessagesTooRecent = false; - - long jobStartTime = OptOutUtils.nowEpochSeconds(); - LOGGER.info("Starting delta production from SQS queue (maxMessagesPerFile: {})", this.maxMessagesPerFile); - - // Read and process windows until done - while (true) { - if(checkJobTimeout(jobStartTime)){ - break; - } - - // Read one complete 5-minute window (limited to maxMessagesPerFile) - SqsWindowReader.WindowReadResult windowResult = windowReader.readWindow(); - - // If no messages, we're done (queue empty or messages too recent) - if (windowResult.isEmpty()) { - stoppedDueToMessagesTooRecent = windowResult.getStopReason() == StopReason.MESSAGES_TOO_RECENT; - LOGGER.info("Delta production complete - no more eligible messages"); - break; - } - - // Produce delta for this window - long windowStart = windowResult.getWindowStart(); - List messages = windowResult.getMessages(); - - // Create delta file - String deltaName = OptOutUtils.newDeltaFileName(this.replicaId); - ByteArrayOutputStream deltaStream = new ByteArrayOutputStream(); - writeStartOfDelta(deltaStream, windowStart); - - // Write all messages - List sqsMessages = new ArrayList<>(); - for (SqsParsedMessage msg : messages) { - writeOptOutEntry(deltaStream, msg.hashBytes(), msg.idBytes(), msg.timestamp()); - sqsMessages.add(msg.originalMessage()); - } - - // Upload and delete - uploadDeltaAndDeleteMessages(deltaStream, deltaName, windowStart, sqsMessages); - deltasProduced++; - totalEntriesProcessed += messages.size(); - - LOGGER.info("Produced delta for window [{}, {}] with {} messages", - windowStart, windowStart + this.deltaWindowSeconds, messages.size()); + throw new Exception("producer is shutting down"); } - long totalDuration = OptOutUtils.nowEpochSeconds() - jobStartTime; - LOGGER.info("Delta production complete: took {}s, produced {} deltas, processed {} entries", - totalDuration, deltasProduced, totalEntriesProcessed); + DeltaProductionResult result = orchestrator.produceBatchedDeltas(this::publishDeltaProducedEvent); - return new DeltaProductionResult(deltasProduced, totalEntriesProcessed, stoppedDueToMessagesTooRecent); - } + StopReason stopReason = result.getStopReason(); + boolean producedWork = result.getDeltasProduced() > 0 || result.getDroppedRequestFilesProduced() > 0; + boolean halted = stopReason == StopReason.CIRCUIT_BREAKER_TRIGGERED || stopReason == StopReason.MANUAL_OVERRIDE_ACTIVE; - /** - * Checks if job has exceeded timeout - */ - private boolean checkJobTimeout(long jobStartTime) { - long elapsedTime = OptOutUtils.nowEpochSeconds() - jobStartTime; - if (elapsedTime > 3600) { // 1 hour - log warning message - LOGGER.error("Delta production job has been running for {} seconds", - elapsedTime); - return false; - } - if (elapsedTime > this.jobTimeoutSeconds) { - LOGGER.error("Delta production job has been running for {} seconds (exceeds timeout of {}s)", - elapsedTime, this.jobTimeoutSeconds); - return true; // deadline exceeded - } - return false; - } - - /** - * Writes the start-of-delta entry with null hash and window start timestamp. - */ - private void writeStartOfDelta(ByteArrayOutputStream stream, long windowStart) throws IOException { - - this.checkBufferSize(OptOutConst.EntrySize); - - buffer.put(OptOutUtils.nullHashBytes); - buffer.put(OptOutUtils.nullHashBytes); - buffer.putLong(windowStart); + String status = halted ? "halted" : (producedWork ? "success" : "skipped"); - buffer.flip(); - byte[] entry = new byte[buffer.remaining()]; - buffer.get(entry); - - stream.write(entry); - buffer.clear(); - } - - /** - * Writes a single opt-out entry to the delta stream. - */ - private void writeOptOutEntry(ByteArrayOutputStream stream, byte[] hashBytes, byte[] idBytes, long timestamp) throws IOException { - this.checkBufferSize(OptOutConst.EntrySize); - OptOutEntry.writeTo(buffer, hashBytes, idBytes, timestamp); - buffer.flip(); - byte[] entry = new byte[buffer.remaining()]; - buffer.get(entry); - stream.write(entry); - buffer.clear(); - } - - /** - * Writes the end-of-delta sentinel entry with ones hash and window end timestamp. - */ - private void writeEndOfDelta(ByteArrayOutputStream stream, long windowEnd) throws IOException { - this.checkBufferSize(OptOutConst.EntrySize); - buffer.put(OptOutUtils.onesHashBytes); - buffer.put(OptOutUtils.onesHashBytes); - buffer.putLong(windowEnd); - buffer.flip(); - byte[] entry = new byte[buffer.remaining()]; - buffer.get(entry); - stream.write(entry); - buffer.clear(); - } - + LOGGER.info("delta production {}: {} deltas, {} entries, {} dropped files, {} dropped requests, reason={}", + status, result.getDeltasProduced(), result.getEntriesProcessed(), + result.getDroppedRequestFilesProduced(), result.getDroppedRequestsProcessed(), stopReason); - - // Upload a delta to S3 and delete messages from SQS after successful upload - private void uploadDeltaAndDeleteMessages(ByteArrayOutputStream deltaStream, String deltaName, Long windowStart, List messages) throws IOException { - try { - // Add end-of-delta entry - long endTimestamp = windowStart + this.deltaWindowSeconds; - this.writeEndOfDelta(deltaStream, endTimestamp); - - // upload - byte[] deltaData = deltaStream.toByteArray(); - String s3Path = this.cloudSync.toCloudPath(deltaName); - - LOGGER.info("SQS Delta Upload - fileName: {}, s3Path: {}, size: {} bytes, messages: {}, window: [{}, {})", - deltaName, s3Path, deltaData.length, messages.size(), windowStart, endTimestamp); - - boolean uploadSucceeded = false; - try (ByteArrayInputStream inputStream = new ByteArrayInputStream(deltaData)) { - this.cloudStorage.upload(inputStream, s3Path); - LOGGER.info("Successfully uploaded delta to S3: {}", s3Path); - uploadSucceeded = true; - - // publish event - this.publishDeltaProducedEvent(deltaName); - this.counterDeltaProduced.increment(); - this.counterEntriesProcessed.increment(messages.size()); - - } catch (Exception uploadEx) { - LOGGER.error("Failed to upload delta to S3: " + uploadEx.getMessage(), uploadEx); - throw new IOException("S3 upload failed", uploadEx); - } - - // CRITICAL: Only delete messages from SQS after successful S3 upload - if (uploadSucceeded && !messages.isEmpty()) { - LOGGER.info("Deleting {} messages from SQS after successful S3 upload", messages.size()); - SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, messages); - } - - // Close the stream - deltaStream.close(); - - } catch (Exception ex) { - LOGGER.error("Error uploading delta: " + ex.getMessage(), ex); - throw new IOException("Delta upload failed", ex); - } + return result.toJsonWithStatus(status); } - private void publishDeltaProducedEvent(String newDelta) { - vertx.eventBus().publish(this.eventDeltaProduced, newDelta); - LOGGER.info("Published delta.produced event for: {}", newDelta); - } - - private void checkBufferSize(int dataSize) { - ByteBuffer b = this.buffer; - if (b.capacity() < dataSize) { - int newCapacity = Integer.highestOneBit(dataSize) << 1; - LOGGER.warn("Expanding buffer size: current {}, need {}, new {}", b.capacity(), dataSize, newCapacity); - this.buffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN); - } + private void publishDeltaProducedEvent(String deltaName) { + vertx.eventBus().publish(this.eventDeltaProduced, deltaName); } } diff --git a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java index 85ee532d..556f72ca 100644 --- a/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java +++ b/src/test/java/com/uid2/optout/vertx/OptOutSqsLogProducerTest.java @@ -1,32 +1,43 @@ package com.uid2.optout.vertx; import com.uid2.optout.Const; +import com.uid2.shared.cloud.CloudStorageException; import com.uid2.shared.cloud.ICloudStorage; import com.uid2.shared.optout.OptOutCloudSync; +import com.uid2.shared.optout.OptOutCollection; +import com.uid2.shared.optout.OptOutEntry; import com.uid2.shared.vertx.VertxUtils; +import io.vertx.core.Future; +import io.vertx.core.Promise; import io.vertx.core.Vertx; +import io.vertx.core.http.HttpMethod; import io.vertx.core.json.JsonObject; -import io.vertx.ext.unit.Async; -import io.vertx.ext.unit.TestContext; -import io.vertx.ext.unit.junit.VertxUnitRunner; -import org.junit.*; -import org.junit.runner.RunWith; +import io.vertx.junit5.VertxExtension; +import io.vertx.junit5.VertxTestContext; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.ArgumentCaptor; import software.amazon.awssdk.services.sqs.SqsClient; import software.amazon.awssdk.services.sqs.model.*; +import java.io.ByteArrayInputStream; import java.io.InputStream; +import java.nio.file.Files; +import java.nio.file.Path; import java.util.*; import java.util.concurrent.CountDownLatch; -import static org.mockito.ArgumentMatchers.any; -import static org.mockito.ArgumentMatchers.anyString; +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.*; import static org.mockito.Mockito.*; /** * Integration tests for OptOutSqsLogProducer deltaproduce endpoint. * Requires Java 21 for Mockito compatibility with AWS SDK. */ -@RunWith(VertxUnitRunner.class) +@ExtendWith(VertxExtension.class) public class OptOutSqsLogProducerTest { private Vertx vertx; @@ -34,20 +45,30 @@ public class OptOutSqsLogProducerTest { private SqsClient sqsClient; private ICloudStorage cloudStorage; + private ICloudStorage cloudStorageDroppedRequests; private OptOutCloudSync cloudSync; private static final String TEST_QUEUE_URL = "https://sqs.test.amazonaws.com/123456789/test"; private static final String TEST_API_KEY = "test-api-key"; private static final String VALID_HASH_BASE64 = "AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="; private static final String VALID_ID_BASE64 = "AQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQEBAQE="; + private static final String TRAFFIC_FILTER_CONFIG_PATH = "./traffic-filter.json"; + private static final String TRAFFIC_CALC_CONFIG_PATH = "./traffic-calc.json"; + private static final String MANUAL_OVERRIDE_S3_PATH = "manual-override.json"; + private static final String S3_DELTA_PREFIX = "sqs-delta"; + private static final String TEST_BUCKET_DROPPED_REQUESTS = "test-bucket-dropped-requests"; + private static final int TEST_PORT = Const.Port.ServicePortForOptOut + 1; + private static final String DELTA_PRODUCE_ENDPOINT = Endpoints.OPTOUT_DELTA_PRODUCE.toString(); + private static final String STATUS_ENDPOINT = DELTA_PRODUCE_ENDPOINT + "/status"; - @Before - public void setup(TestContext context) throws Exception { - vertx = Vertx.vertx(); + @BeforeEach + public void setup(Vertx vertx, VertxTestContext testContext) throws Exception { + this.vertx = vertx; - // Create mocks + // create mocks sqsClient = mock(SqsClient.class); cloudStorage = mock(ICloudStorage.class); + cloudStorageDroppedRequests = mock(ICloudStorage.class); cloudSync = mock(OptOutCloudSync.class); JsonObject config = VertxUtils.getJsonConfig(vertx); @@ -55,34 +76,103 @@ public void setup(TestContext context) throws Exception { .put(Const.Config.OptOutSqsVisibilityTimeoutProp, 240) .put(Const.Config.OptOutProducerBufferSizeProp, 65536) .put(Const.Config.OptOutProducerReplicaIdProp, 1) - .put(Const.Config.OptOutInternalApiTokenProp, TEST_API_KEY); - - // Mock cloud sync to return proper S3 paths + .put(Const.Config.OptOutInternalApiTokenProp, TEST_API_KEY) + .put(Const.Config.TrafficFilterConfigPathProp, TRAFFIC_FILTER_CONFIG_PATH) + .put(Const.Config.TrafficCalcConfigPathProp, TRAFFIC_CALC_CONFIG_PATH) + .put(Const.Config.ManualOverrideS3PathProp, MANUAL_OVERRIDE_S3_PATH) + .put(Const.Config.OptOutS3BucketDroppedRequestsProp, TEST_BUCKET_DROPPED_REQUESTS) + .put(Const.Config.OptOutSqsS3FolderProp, S3_DELTA_PREFIX) + .put(Const.Config.OptOutMaxMessagesPerFileProp, 100); + + // mock cloud sync to return proper s3 paths when(cloudSync.toCloudPath(anyString())) .thenAnswer(inv -> "sqs-delta/delta/" + inv.getArgument(0)); - // Mock S3 upload to succeed by default + // mock s3 upload to succeed by default doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + doAnswer(inv -> null).when(cloudStorageDroppedRequests).upload(any(InputStream.class), anyString()); + + // mock s3 list and download for traffic calculator + when(cloudStorage.list(anyString())).thenReturn(Arrays.asList("sqs-delta/delta/optout-delta-001_2025-01-01T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download(MANUAL_OVERRIDE_S3_PATH)).thenThrow(new CloudStorageException("file not found")); + when(cloudStorage.download(argThat(path -> path != null && path.contains("optout-delta")))) + .thenAnswer(inv -> new ByteArrayInputStream(createMinimalDeltaFileBytes())); + + // mock getQueueAttributes to return zero messages by default + Map defaultQueueAttrs = new HashMap<>(); + defaultQueueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, "0"); + defaultQueueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, "0"); + defaultQueueAttrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, "0"); + doReturn(GetQueueAttributesResponse.builder() + .attributes(defaultQueueAttrs) + .build()) + .when(sqsClient).getQueueAttributes(any(GetQueueAttributesRequest.class)); + + + // default config files + try { + String traficFilterConfig = """ + { + "denylist_requests": [ + ] + } + """; + createTrafficConfigFile(traficFilterConfig); + + String trafficCalcConfig = """ + { + "traffic_calc_evaluation_window_seconds": 86400, + "traffic_calc_baseline_traffic": 100, + "traffic_calc_threshold_multiplier": 5, + "traffic_calc_allowlist_ranges": [] + } + """; + createTrafficCalcConfigFile(trafficCalcConfig); + } catch (Exception e) { + throw new RuntimeException(e); + } - // Create producer with mock SqsClient - producer = new OptOutSqsLogProducer(config, cloudStorage, cloudSync, Const.Event.DeltaProduce, sqsClient); + // create producer with mocks + producer = new OptOutSqsLogProducer(config, cloudStorage, cloudStorageDroppedRequests, cloudSync, Const.Event.DeltaProduce, sqsClient); - // Deploy verticle - Async async = context.async(); - vertx.deployVerticle(producer, context.asyncAssertSuccess(v -> async.complete())); + // deploy verticle + vertx.deployVerticle(producer, testContext.succeeding(id -> testContext.completeNow())); } - @After - public void tearDown(TestContext context) { - if (vertx != null) { - vertx.close(context.asyncAssertSuccess()); + @AfterEach + public void tearDown() { + // clean up config files + if (Files.exists(Path.of(TRAFFIC_FILTER_CONFIG_PATH))) { + try { + Files.delete(Path.of(TRAFFIC_FILTER_CONFIG_PATH)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + if (Files.exists(Path.of(TRAFFIC_CALC_CONFIG_PATH))) { + try { + Files.delete(Path.of(TRAFFIC_CALC_CONFIG_PATH)); + } catch (Exception e) { + throw new RuntimeException(e); + } } } + // ==================== Message Creation Helpers ==================== + private Message createMessage(String hash, String id, long timestampMs) { + return createMessage(hash, id, timestampMs, null, null, null, null); + } + + private Message createMessage(String hash, String id, long timestampMs, String email, String phone, String clientIp, String traceId) { JsonObject body = new JsonObject() .put("identity_hash", hash) .put("advertising_id", id); + + if (email != null) body.put("email", email); + if (phone != null) body.put("phone", phone); + if (clientIp != null) body.put("client_ip", clientIp); + if (traceId != null) body.put("uid_trace_id", traceId); Map attrs = new HashMap<>(); attrs.put(MessageSystemAttributeName.SENT_TIMESTAMP, String.valueOf(timestampMs)); @@ -94,381 +184,739 @@ private Message createMessage(String hash, String id, long timestampMs) { .receiptHandle("receipt-" + UUID.randomUUID()) .build(); } - - @Test - public void testDeltaProduceEndpoint_successWithMessages(TestContext context) throws Exception { - Async async = context.async(); - - // Create messages old enough to process (> 5 minutes) - long oldTime = System.currentTimeMillis() - 400_000; - List messages = Arrays.asList( - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime), - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime + 1000) - ); - - // Mock SQS operations + + /** Creates N old messages (> 5 minutes ago) with unique IPs */ + private List createOldMessages(int count) { + long baseTime = System.currentTimeMillis() - 400_000; + List messages = new ArrayList<>(); + for (int i = 0; i < count; i++) { + messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, baseTime + (i * 1000), + null, null, "10.0.0." + (i + 1), null)); + } + return messages; + } + + /** Creates N old messages with specific client IP */ + private List createOldMessagesWithIp(int count, String clientIp) { + long baseTime = System.currentTimeMillis() - 400_000; + List messages = new ArrayList<>(); + for (int i = 0; i < count; i++) { + messages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, baseTime + (i * 1000), + null, null, clientIp, null)); + } + return messages; + } + + // ==================== SQS Mock Helpers ==================== + + /** Mock SQS to return messages once, then empty */ + private void mockSqsToReturnMessages(List messages) { when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); - + } + + /** Mock SQS to return empty queue */ + private void mockSqsEmptyQueue() { + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + } + + /** Mock SQS delete to succeed */ + private void mockSqsDeleteSuccess() { when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) .thenReturn(DeleteMessageBatchResponse.builder().build()); - - // Mock S3 upload - doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); - - int port = Const.Port.ServicePortForOptOut + 1; - - // Step 1: Start the job (POST) - returns 202 immediately - vertx.createHttpClient() - .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", - Endpoints.OPTOUT_DELTA_PRODUCE.toString()) - .compose(req -> req - .putHeader("Authorization", "Bearer " + TEST_API_KEY) - .send()) - .compose(resp -> { - context.assertEquals(202, resp.statusCode()); - return resp.body(); - }) - .compose(body -> { - JsonObject response = new JsonObject(body.toString()); - context.assertEquals("accepted", response.getString("status")); - - // Step 2: Poll for job completion - return pollForCompletion(context, port, 100, 50); // Poll every 100ms, max 50 times (5 seconds) + } + + /** Mock SQS getQueueAttributes to return zero messages */ + private void mockSqsQueueAttributes() { + Map attrs = new HashMap<>(); + attrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES, "0"); + attrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_NOT_VISIBLE, "0"); + attrs.put(QueueAttributeName.APPROXIMATE_NUMBER_OF_MESSAGES_DELAYED, "0"); + doReturn(GetQueueAttributesResponse.builder().attributes(attrs).build()) + .when(sqsClient).getQueueAttributes(any(GetQueueAttributesRequest.class)); + } + + // ==================== Cloud Storage Mock Helpers ==================== + + /** Setup cloud storage mocks for spike detection test */ + private void setupCloudStorageForSpikeTest(byte[] deltaFileBytes) { + try { + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + doReturn(Arrays.asList("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_baseline.dat")) + .when(cloudStorage).list("sqs-delta"); + doAnswer(inv -> new ByteArrayInputStream(deltaFileBytes)) + .when(cloudStorage).download("sqs-delta/delta/optout-delta--01_2025-11-13T00.00.00Z_baseline.dat"); + doThrow(new CloudStorageException("file not found")) + .when(cloudStorage).download("manual-override.json"); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** Helper to reset mocks between sequential test phases */ + private void resetMocksForSecondRequest(List messages) { + reset(sqsClient, cloudStorage, cloudStorageDroppedRequests); + + mockSqsToReturnMessages(messages); + mockSqsDeleteSuccess(); + mockSqsQueueAttributes(); + + try { + doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); + doAnswer(inv -> null).when(cloudStorageDroppedRequests).upload(any(InputStream.class), anyString()); + + when(cloudStorage.list(anyString())) + .thenReturn(Arrays.asList("sqs-delta/delta/optout-delta-001_2025-01-01T00.00.00Z_aaaaaaaa.dat")); + when(cloudStorage.download(MANUAL_OVERRIDE_S3_PATH)) + .thenThrow(new CloudStorageException("file not found")); + when(cloudStorage.download(argThat(path -> path != null && path.contains("optout-delta")))) + .thenAnswer(inv -> new ByteArrayInputStream(createMinimalDeltaFileBytes())); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + // ==================== Traffic Config Helpers ==================== + + private void createTrafficConfigFile(String content) { + try { + Path configPath = Path.of(TRAFFIC_FILTER_CONFIG_PATH); + Files.writeString(configPath, content); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + private void createTrafficCalcConfigFile(String content) { + try { + Path configPath = Path.of(TRAFFIC_CALC_CONFIG_PATH); + Files.writeString(configPath, content); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** Create denylist filter config for specific IP and time range */ + private void setupDenylistConfig(String ip) { + long baseTime = System.currentTimeMillis() / 1000 - 400; + String filterConfig = String.format(""" + { + "denylist_requests": [ + { + "range": [%d, %d], + "IPs": ["%s"] + } + ] + } + """, baseTime - 100, baseTime + 100, ip); + createTrafficConfigFile(filterConfig); + } + + // ==================== HTTP Request Helpers ==================== + + /** Make HTTP request and return response with status code and parsed JSON body */ + private Future httpRequest(HttpMethod method, String path, String authToken) { + return vertx.createHttpClient() + .request(method, TEST_PORT, "127.0.0.1", path) + .compose(req -> { + if (authToken != null) { + req.putHeader("Authorization", "Bearer " + authToken); + } + return req.send(); }) - .onComplete(context.asyncAssertSuccess(finalStatus -> { - context.assertEquals("completed", finalStatus.getString("state")); - JsonObject result = finalStatus.getJsonObject("result"); - context.assertNotNull(result); - context.assertEquals("success", result.getString("status")); - context.assertTrue(result.getInteger("deltas_produced") >= 1); - context.assertTrue(result.getInteger("entries_processed") >= 2); - - // Verify S3 was called + .compose(resp -> resp.body().map(body -> { + JsonObject result = new JsonObject().put("_statusCode", resp.statusCode()); + String bodyStr = body.toString(); + // try to parse as JSON, otherwise store as raw string try { - verify(cloudStorage, atLeastOnce()).upload(any(InputStream.class), anyString()); + result.put("_body", new JsonObject(bodyStr)); } catch (Exception e) { - context.fail(e); + result.put("_bodyRaw", bodyStr); } - - // Verify SQS delete was called - verify(sqsClient, atLeastOnce()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); - - async.complete(); + return result; })); } + + /** Make authenticated HTTP request */ + private Future httpRequest(HttpMethod method, String path) { + return httpRequest(method, path, TEST_API_KEY); + } + + /** Start a delta production job and poll until completion */ + private Future startJobAndPollForCompletion() { + return httpRequest(HttpMethod.POST, DELTA_PRODUCE_ENDPOINT) + .compose(resp -> { + assertEquals(202, resp.getInteger("_statusCode")); + assertEquals("accepted", resp.getJsonObject("_body").getString("status")); + return pollForCompletion(); + }); + } + + /** POST to start a job, return status code and body */ + private Future postStartJob() { + return httpRequest(HttpMethod.POST, DELTA_PRODUCE_ENDPOINT); + } + + /** POST without authentication */ + private Future postWithoutAuth() { + return httpRequest(HttpMethod.POST, DELTA_PRODUCE_ENDPOINT, null); + } + + /** POST with specific auth token */ + private Future postWithAuth(String authToken) { + return httpRequest(HttpMethod.POST, DELTA_PRODUCE_ENDPOINT, authToken); + } + + /** Get job status */ + private Future getJobStatus() { + return httpRequest(HttpMethod.GET, STATUS_ENDPOINT) + .compose(resp -> { + assertEquals(200, resp.getInteger("_statusCode")); + return Future.succeededFuture(resp.getJsonObject("_body")); + }); + } + + // ==================== Polling Helpers ==================== - /** - * Helper method to poll for job completion - */ - private io.vertx.core.Future pollForCompletion(TestContext context, int port, long intervalMs, int maxAttempts) { - return pollForCompletionRecursive(context, port, intervalMs, maxAttempts, 0); + /** Poll for job completion with default settings */ + private Future pollForCompletion() { + return pollForCompletion(100, 50); + } + + /** Poll for job completion with custom settings */ + private Future pollForCompletion(long intervalMs, int maxAttempts) { + return pollForCompletionRecursive(intervalMs, maxAttempts, 0); } - private io.vertx.core.Future pollForCompletionRecursive(TestContext context, int port, long intervalMs, int maxAttempts, int attempt) { + private Future pollForCompletionRecursive(long intervalMs, int maxAttempts, int attempt) { if (attempt >= maxAttempts) { - return io.vertx.core.Future.failedFuture("Job did not complete within timeout"); + return Future.failedFuture("Job did not complete within timeout"); } - return vertx.createHttpClient() - .request(io.vertx.core.http.HttpMethod.GET, port, "127.0.0.1", - Endpoints.OPTOUT_DELTA_PRODUCE.toString() + "/status") - .compose(req -> req - .putHeader("Authorization", "Bearer " + TEST_API_KEY) - .send()) + return httpRequest(HttpMethod.GET, STATUS_ENDPOINT) .compose(resp -> { - if (resp.statusCode() != 200) { - return io.vertx.core.Future.failedFuture("Status check failed: " + resp.statusCode()); + if (resp.getInteger("_statusCode") != 200) { + return Future.failedFuture("Status check failed: " + resp.getInteger("_statusCode")); } - return resp.body(); - }) - .compose(body -> { - JsonObject status = new JsonObject(body.toString()); + JsonObject status = resp.getJsonObject("_body"); String state = status.getString("state"); if ("completed".equals(state) || "failed".equals(state)) { - return io.vertx.core.Future.succeededFuture(status); + return Future.succeededFuture(status); } - // Still running or idle, wait and poll again - io.vertx.core.Promise promise = io.vertx.core.Promise.promise(); - vertx.setTimer(intervalMs, id -> { - pollForCompletionRecursive(context, port, intervalMs, maxAttempts, attempt + 1) - .onComplete(promise); - }); + // still running or idle, wait and poll again + Promise promise = Promise.promise(); + vertx.setTimer(intervalMs, id -> + pollForCompletionRecursive(intervalMs, maxAttempts, attempt + 1).onComplete(promise)); return promise.future(); }); } + + // ==================== Delta File Helpers ==================== + + /** + * Create delta file bytes with specified timestamps + */ + private byte[] createDeltaFileBytes(List timestamps) throws Exception { + // Create OptOutEntry objects using newTestEntry + List entries = new ArrayList<>(); + + long idCounter = 1000; // Use incrementing IDs for test entries + for (long timestamp : timestamps) { + entries.add(OptOutEntry.newTestEntry(idCounter++, timestamp)); + } + + // Create OptOutCollection + OptOutCollection collection = new OptOutCollection(entries.toArray(new OptOutEntry[0])); + return collection.getStore(); + } + + /** + * Create minimal delta file bytes with a single recent timestamp for traffic calculator + */ + private static byte[] createMinimalDeltaFileBytes() { + try { + long timestamp = System.currentTimeMillis() / 1000 - 3600; // 1 hour ago + OptOutEntry entry = OptOutEntry.newTestEntry(1, timestamp); + OptOutCollection collection = new OptOutCollection(new OptOutEntry[]{entry}); + return collection.getStore(); + } catch (Exception e) { + throw new RuntimeException("Failed to create minimal delta file bytes", e); + } + } + + // ==================== Assertion Helpers ==================== + + /** Extract and return result from completed job status, asserting the job completed successfully */ + private JsonObject assertJobCompletedWithSuccess(JsonObject status) { + assertEquals("completed", status.getString("state")); + JsonObject result = status.getJsonObject("result"); + assertNotNull(result); + assertEquals("success", result.getString("status")); + return result; + } + + /** Assert job completed with success and verify minimum counts */ + private void assertJobCompletedWithSuccess(JsonObject status, int minEntries, int minDeltas) { + JsonObject result = assertJobCompletedWithSuccess(status); + assertTrue(result.getInteger("entries_processed") >= minEntries, + "Expected at least " + minEntries + " entries, got " + result.getInteger("entries_processed")); + assertTrue(result.getInteger("deltas_produced") >= minDeltas, + "Expected at least " + minDeltas + " deltas, got " + result.getInteger("deltas_produced")); + } + + /** Assert job completed with skipped status (no stop reason check) */ + private JsonObject assertJobCompletedWithSkipped(JsonObject status) { + assertEquals("completed", status.getString("state")); + JsonObject result = status.getJsonObject("result"); + assertNotNull(result); + assertEquals("skipped", result.getString("status")); + return result; + } + + /** Assert job completed with skipped status and specific stop reason */ + private void assertJobCompletedWithSkipped(JsonObject status, String expectedStopReason) { + JsonObject result = assertJobCompletedWithSkipped(status); + assertEquals(expectedStopReason, result.getString("stop_reason")); + } + + /** Assert job completed with halted status and specific stop reason */ + private JsonObject assertJobCompletedWithHalted(JsonObject status, String expectedStopReason) { + assertEquals("completed", status.getString("state")); + JsonObject result = status.getJsonObject("result"); + assertNotNull(result); + assertEquals("halted", result.getString("status")); + assertEquals(expectedStopReason, result.getString("stop_reason")); + return result; + } + + /** Assert job failed */ + private void assertJobFailed(JsonObject status) { + assertEquals("failed", status.getString("state")); + } + + /** Verify cloud storage upload was called at least once */ + private void verifyCloudStorageUploaded() { + try { + verify(cloudStorage, atLeastOnce()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + fail(e); + } + } + + /** Verify cloud storage upload was never called */ + private void verifyCloudStorageNotUploaded() { + try { + verify(cloudStorage, never()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + fail(e); + } + } + + /** Verify dropped requests cloud storage upload was called at least once */ + private void verifyDroppedRequestsUploaded() { + try { + verify(cloudStorageDroppedRequests, atLeastOnce()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + fail(e); + } + } + + /** Verify dropped requests cloud storage upload was never called */ + private void verifyDroppedRequestsNotUploaded() { + try { + verify(cloudStorageDroppedRequests, never()).upload(any(InputStream.class), anyString()); + } catch (Exception e) { + fail(e); + } + } + + /** Verify SQS messages were deleted */ + private void verifySqsMessagesDeleted() { + verify(sqsClient, atLeastOnce()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + /** Verify SQS messages were not deleted */ + private void verifySqsMessagesNotDeleted() { + verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + } + + /** Verify SQS receive was never called (used for auth failure tests) */ + private void verifySqsReceiveNotCalled() { + verify(sqsClient, never()).receiveMessage(any(ReceiveMessageRequest.class)); + } + + // ==================== Tests ==================== @Test - public void testDeltaProduceEndpoint_noMessages(TestContext context) { - Async async = context.async(); - - // Mock empty queue - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) - .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); - - int port = Const.Port.ServicePortForOptOut + 1; - - // Start job - vertx.createHttpClient() - .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", - Endpoints.OPTOUT_DELTA_PRODUCE.toString()) - .compose(req -> req - .putHeader("Authorization", "Bearer " + TEST_API_KEY) - .send()) - .compose(resp -> { - context.assertEquals(202, resp.statusCode()); - return resp.body(); - }) - .compose(body -> { - JsonObject response = new JsonObject(body.toString()); - context.assertEquals("accepted", response.getString("status")); - - // Poll for completion - return pollForCompletion(context, port, 100, 50); - }) - .onComplete(context.asyncAssertSuccess(finalStatus -> { - context.assertEquals("completed", finalStatus.getString("state")); - JsonObject result = finalStatus.getJsonObject("result"); - context.assertNotNull(result); - context.assertEquals("success", result.getString("status")); - context.assertEquals(0, result.getInteger("deltas_produced")); - context.assertEquals(0, result.getInteger("entries_processed")); - - // Verify no operations - try { - verify(cloudStorage, never()).upload(any(InputStream.class), anyString()); - } catch (Exception e) { - context.fail(e); - } - verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); - - async.complete(); + public void testDeltaProduceEndpoint_successWithMessages(VertxTestContext testContext) { + mockSqsToReturnMessages(createOldMessages(2)); + mockSqsDeleteSuccess(); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + assertJobCompletedWithSuccess(finalStatus, 2, 1); + verifyCloudStorageUploaded(); + verifySqsMessagesDeleted(); + testContext.completeNow(); })); } @Test - public void testDeltaProduceEndpoint_allMessagesTooRecent(TestContext context) { - Async async = context.async(); - - // Create recent messages (< 5 minutes old) + public void testDeltaProduceEndpoint_noMessages(VertxTestContext testContext) { + mockSqsEmptyQueue(); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + JsonObject result = assertJobCompletedWithSkipped(finalStatus); + assertEquals(0, result.getInteger("deltas_produced")); + assertEquals(0, result.getInteger("entries_processed")); + verifyCloudStorageNotUploaded(); + verifySqsMessagesNotDeleted(); + testContext.completeNow(); + })); + } + + @Test + public void testDeltaProduceEndpoint_allMessagesTooRecent(VertxTestContext testContext) { + // create recent messages (< 5 minutes old) long recentTime = System.currentTimeMillis() - 60_000; // 1 minute ago - List messages = Arrays.asList( - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, recentTime) - ); - - when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) - .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) - .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); - - int port = Const.Port.ServicePortForOptOut + 1; - - // Start job - vertx.createHttpClient() - .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", - Endpoints.OPTOUT_DELTA_PRODUCE.toString()) - .compose(req -> req - .putHeader("Authorization", "Bearer " + TEST_API_KEY) - .send()) - .compose(resp -> { - context.assertEquals(202, resp.statusCode()); - return resp.body(); - }) - .compose(body -> { - JsonObject response = new JsonObject(body.toString()); - context.assertEquals("accepted", response.getString("status")); - - // Poll for completion - return pollForCompletion(context, port, 100, 50); - }) - .onComplete(context.asyncAssertSuccess(finalStatus -> { - context.assertEquals("completed", finalStatus.getString("state")); - JsonObject result = finalStatus.getJsonObject("result"); - context.assertNotNull(result); - context.assertEquals("skipped", result.getString("status")); - context.assertEquals("All messages too recent", result.getString("reason")); - - // No processing should occur - try { - verify(cloudStorage, never()).upload(any(InputStream.class), anyString()); - } catch (Exception e) { - context.fail(e); - } - verify(sqsClient, never()).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); - - async.complete(); + List messages = Arrays.asList(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, recentTime)); + mockSqsToReturnMessages(messages); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + assertJobCompletedWithSkipped(finalStatus, "MESSAGES_TOO_RECENT"); + verifyCloudStorageNotUploaded(); + verifySqsMessagesNotDeleted(); + testContext.completeNow(); })); } @Test - public void testDeltaProduceEndpoint_unauthorized(TestContext context) { - Async async = context.async(); - - int port = Const.Port.ServicePortForOptOut + 1; - vertx.createHttpClient() - .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", - Endpoints.OPTOUT_DELTA_PRODUCE.toString()) - .compose(req -> req.send()) // No auth header - .compose(resp -> { - context.assertEquals(401, resp.statusCode()); - return resp.body(); - }) - .onComplete(context.asyncAssertSuccess(body -> { - // Should not call SQS when unauthorized - verify(sqsClient, never()).receiveMessage(any(ReceiveMessageRequest.class)); - async.complete(); + public void testDeltaProduceEndpoint_unauthorized(VertxTestContext testContext) { + postWithoutAuth() + .onComplete(testContext.succeeding(resp -> { + assertEquals(401, resp.getInteger("_statusCode")); + verifySqsReceiveNotCalled(); + testContext.completeNow(); })); } @Test - public void testDeltaProduceEndpoint_concurrentJobPrevention(TestContext context) throws Exception { - Async async = context.async(); - - // Latch to keep job running until we verify the conflict response - CountDownLatch uploadLatch = new CountDownLatch(1); + public void testDeltaProduceEndpoint_concurrentJobPrevention(VertxTestContext testContext) { + List messages = createOldMessages(1); + CountDownLatch processingLatch = new CountDownLatch(1); - // Create messages that will take some time to process - long oldTime = System.currentTimeMillis() - 400_000; - List messages = Arrays.asList( - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime) - ); - - // Mock SQS to return messages + // mock sqs to wait on latch before returning - keeps job running when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) - .thenReturn(ReceiveMessageResponse.builder().messages(messages).build()) + .thenAnswer(inv -> { + processingLatch.await(); + return ReceiveMessageResponse.builder().messages(messages).build(); + }) .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + mockSqsDeleteSuccess(); - when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) - .thenReturn(DeleteMessageBatchResponse.builder().build()); - - // Block upload until latch is released - doAnswer(inv -> { - uploadLatch.await(); - return null; - }).when(cloudStorage).upload(any(InputStream.class), anyString()); - - int port = Const.Port.ServicePortForOptOut + 1; - - // Start first job - vertx.createHttpClient() - .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", - Endpoints.OPTOUT_DELTA_PRODUCE.toString()) - .compose(req -> req - .putHeader("Authorization", "Bearer " + TEST_API_KEY) - .send()) - .compose(resp -> { - context.assertEquals(202, resp.statusCode()); - return resp.body(); - }) - .compose(body -> { - JsonObject response = new JsonObject(body.toString()); - context.assertEquals("accepted", response.getString("status")); - - // Immediately try to start a second job (should be rejected) - return vertx.createHttpClient() - .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", - Endpoints.OPTOUT_DELTA_PRODUCE.toString()) - .compose(req -> req - .putHeader("Authorization", "Bearer " + TEST_API_KEY) - .send()); - }) + postStartJob() .compose(resp -> { - context.assertEquals(409, resp.statusCode()); // Conflict - job already running - return resp.body(); + assertEquals(202, resp.getInteger("_statusCode")); + assertEquals("accepted", resp.getJsonObject("_body").getString("status")); + // immediately try to start a second job + return postStartJob(); }) - .onComplete(context.asyncAssertSuccess(body -> { - JsonObject response = new JsonObject(body.toString()); - context.assertEquals("conflict", response.getString("status")); - context.assertTrue(response.getString("message").contains("already running")); + .onComplete(testContext.succeeding(resp -> { + assertEquals(409, resp.getInteger("_statusCode")); // conflict - job already running + JsonObject body = resp.getJsonObject("_body"); + assertEquals("conflict", body.getString("status")); + assertTrue(body.getString("reason").contains("already running")); - // Release the latch to let the first job complete - uploadLatch.countDown(); - async.complete(); + processingLatch.countDown(); + testContext.completeNow(); })); } @Test - public void testDeltaProduceEndpoint_autoClearCompletedJob(TestContext context) throws Exception { - Async async = context.async(); - - // Create messages for first job - long oldTime1 = System.currentTimeMillis() - 400_000; - List messages1 = Arrays.asList( - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime1) - ); - - // Create messages for second job - long oldTime2 = System.currentTimeMillis() - 400_000; - List messages2 = Arrays.asList( - createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime2) - ); + public void testDeltaProduceEndpoint_autoClearCompletedJob(VertxTestContext testContext) { + List messages1 = createOldMessages(1); + List messages2 = createOldMessages(1); - // Mock SQS to return different messages for each job + // mock SQS to return different messages for each job when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) - // First job .thenReturn(ReceiveMessageResponse.builder().messages(messages1).build()) .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()) - // Second job .thenReturn(ReceiveMessageResponse.builder().messages(messages2).build()) .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + mockSqsDeleteSuccess(); - when(sqsClient.deleteMessageBatch(any(DeleteMessageBatchRequest.class))) - .thenReturn(DeleteMessageBatchResponse.builder().build()); - - doAnswer(inv -> null).when(cloudStorage).upload(any(InputStream.class), anyString()); - - int port = Const.Port.ServicePortForOptOut + 1; - - // Start first job - vertx.createHttpClient() - .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", - Endpoints.OPTOUT_DELTA_PRODUCE.toString()) - .compose(req -> req - .putHeader("Authorization", "Bearer " + TEST_API_KEY) - .send()) - .compose(resp -> { - context.assertEquals(202, resp.statusCode()); - return resp.body(); + startJobAndPollForCompletion() + .compose(firstJobStatus -> { + assertEquals("completed", firstJobStatus.getString("state")); + // start a second job - should auto-clear the completed first job + return postStartJob(); }) - .compose(body -> { - JsonObject response = new JsonObject(body.toString()); - context.assertEquals("accepted", response.getString("status")); - - // Wait for first job to complete - return pollForCompletion(context, port, 100, 50); + .compose(resp -> { + assertEquals(202, resp.getInteger("_statusCode")); // should succeed (auto-cleared) + assertEquals("accepted", resp.getJsonObject("_body").getString("status")); + return pollForCompletion(); }) - .compose(firstJobStatus -> { - context.assertEquals("completed", firstJobStatus.getString("state")); - - // Now start a second job - should auto-clear the completed first job - return vertx.createHttpClient() - .request(io.vertx.core.http.HttpMethod.POST, port, "127.0.0.1", - Endpoints.OPTOUT_DELTA_PRODUCE.toString()) - .compose(req -> req - .putHeader("Authorization", "Bearer " + TEST_API_KEY) - .send()); + .onComplete(testContext.succeeding(secondJobStatus -> { + assertEquals("completed", secondJobStatus.getString("state")); + verify(sqsClient, atLeast(2)).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); + testContext.completeNow(); + })); + } + + @Test + public void testTrafficFilter_denylistedMessagesAreDropped(VertxTestContext testContext) { + setupDenylistConfig("192.168.1.100"); + + // create mixed messages: 2 denylisted (matching ip), 2 normal + List denylistedMessages = createOldMessagesWithIp(2, "192.168.1.100"); + List normalMessages = createOldMessages(2); + List allMessages = new ArrayList<>(denylistedMessages); + allMessages.addAll(normalMessages); + + mockSqsToReturnMessages(allMessages); + mockSqsDeleteSuccess(); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + JsonObject result = assertJobCompletedWithSuccess(finalStatus); + assertEquals(2, result.getInteger("entries_processed")); + assertEquals(2, result.getInteger("dropped_requests_processed")); + verifyCloudStorageUploaded(); + verifyDroppedRequestsUploaded(); + testContext.completeNow(); + })); + } + + @Test + public void testTrafficFilter_noDenylistedMessages(VertxTestContext testContext) { + setupDenylistConfig("192.168.1.100"); + mockSqsToReturnMessages(createOldMessages(2)); // ips don't match denylist + mockSqsDeleteSuccess(); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + JsonObject result = assertJobCompletedWithSuccess(finalStatus); + assertEquals(2, result.getInteger("entries_processed")); + assertEquals(0, result.getInteger("dropped_requests_processed")); + verifyDroppedRequestsNotUploaded(); + testContext.completeNow(); + })); + } + + @Test + public void testTrafficFilter_allMessagesDenylisted(VertxTestContext testContext) { + setupDenylistConfig("192.168.1.100"); + mockSqsToReturnMessages(createOldMessagesWithIp(2, "192.168.1.100")); // all match denylist + mockSqsDeleteSuccess(); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + JsonObject result = assertJobCompletedWithSuccess(finalStatus); + assertEquals(0, result.getInteger("entries_processed")); + assertEquals(2, result.getInteger("dropped_requests_processed")); + verifyDroppedRequestsUploaded(); + verifyCloudStorageNotUploaded(); + testContext.completeNow(); + })); + } + + @Test + public void testTrafficFilter_messagesWithoutClientIp(VertxTestContext testContext) { + setupDenylistConfig("192.168.1.100"); + + // create message without client ip (should not be denylisted) + long oldTime = System.currentTimeMillis() - 400_000; + List messages = Arrays.asList( + createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, oldTime, null, null, null, null)); + mockSqsToReturnMessages(messages); + mockSqsDeleteSuccess(); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + JsonObject result = assertJobCompletedWithSuccess(finalStatus); + assertEquals(1, result.getInteger("entries_processed")); + assertEquals(0, result.getInteger("dropped_requests_processed")); + testContext.completeNow(); + })); + } + + @Test + public void testTrafficFilterConfig_reloadOnEachBatch(VertxTestContext testContext) { + // initial config with no denylist + createTrafficConfigFile(""" + { + "denylist_requests": [] + } + """); + + List messages = createOldMessagesWithIp(1, "192.168.1.100"); + mockSqsToReturnMessages(messages); + mockSqsDeleteSuccess(); + + // first request - should process normally (no denylist) + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + JsonObject result = assertJobCompletedWithSuccess(finalStatus); + assertEquals(1, result.getInteger("entries_processed")); + assertEquals(0, result.getInteger("dropped_requests_processed")); + + // update config to denylist the IP + setupDenylistConfig("192.168.1.100"); + resetMocksForSecondRequest(messages); + + // second request - should now be denylisted + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus2 -> { + JsonObject result2 = assertJobCompletedWithSuccess(finalStatus2); + assertEquals(0, result2.getInteger("entries_processed")); + assertEquals(1, result2.getInteger("dropped_requests_processed")); + testContext.completeNow(); + })); + })); + } + + @Test + public void testManualOverride_delayedProcessing(VertxTestContext testContext) throws Exception { + // mock manual override set to DELAYED_PROCESSING + JsonObject manualOverride = new JsonObject().put("manual_override", "DELAYED_PROCESSING"); + doReturn(new ByteArrayInputStream(manualOverride.encode().getBytes())) + .when(cloudStorage).download(MANUAL_OVERRIDE_S3_PATH); + + mockSqsToReturnMessages(createOldMessages(2)); + mockSqsDeleteSuccess(); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + JsonObject result = assertJobCompletedWithHalted(finalStatus, "MANUAL_OVERRIDE_ACTIVE"); + assertEquals(0, result.getInteger("entries_processed")); + assertEquals(0, result.getInteger("deltas_produced")); + verifySqsMessagesNotDeleted(); + testContext.completeNow(); + })); + } + + @Test + public void testTrafficCalculator_detectsSpikeInCurrentWindow(VertxTestContext testContext) throws Exception { + // threshold = baseline * multiplier = 100 * 5 = 500 + long currentTime = System.currentTimeMillis() / 1000; + // create 2 timestamps in delta file + List timestamps = Arrays.asList(currentTime - 3600, currentTime - 3600 + 1000); + byte[] deltaFileBytes = createDeltaFileBytes(timestamps); + + reset(cloudStorage); + setupCloudStorageForSpikeTest(deltaFileBytes); + + // create 600 SQS messages to exceed threshold + long baseTime = (currentTime - 600) * 1000; + List allMessages = new ArrayList<>(); + for (int i = 0; i < 600; i++) { + allMessages.add(createMessage(VALID_HASH_BASE64, VALID_ID_BASE64, + baseTime - (i * 100), null, null, "10.0.0." + (i % 256), null)); + } + + mockSqsToReturnMessages(allMessages); + mockSqsDeleteSuccess(); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + JsonObject result = assertJobCompletedWithHalted(finalStatus, "CIRCUIT_BREAKER_TRIGGERED"); + assertEquals(0, result.getInteger("entries_processed")); + assertEquals(0, result.getInteger("deltas_produced")); + + // verify manual override was set to delayed_processing in s3 + try { + ArgumentCaptor pathCaptor = ArgumentCaptor.forClass(String.class); + verify(cloudStorage, atLeastOnce()).upload(any(InputStream.class), pathCaptor.capture()); + boolean overrideSet = pathCaptor.getAllValues().stream() + .anyMatch(path -> path.equals("manual-override.json")); + assertTrue(overrideSet, "Manual override should be set to DELAYED_PROCESSING after detecting spike"); + } catch (Exception e) { + fail(e); + } + + testContext.completeNow(); + })); + } + + @Test + public void testS3UploadFailure_messagesNotDeletedFromSqs(VertxTestContext testContext) throws Exception { + mockSqsToReturnMessages(createOldMessages(2)); + doThrow(new RuntimeException("S3 upload failed - simulated error")) + .when(cloudStorage).upload(any(InputStream.class), anyString()); + + startJobAndPollForCompletion() + .onComplete(testContext.succeeding(finalStatus -> { + assertJobFailed(finalStatus); + assertTrue(finalStatus.getString("error").contains("S3") || + finalStatus.getString("error").contains("upload")); + verifySqsMessagesNotDeleted(); + testContext.completeNow(); + })); + } + + @Test + public void testStatusEndpoint_showsRunningJob(VertxTestContext testContext) throws Exception { + List messages = createOldMessages(1); + CountDownLatch processingLatch = new CountDownLatch(1); + + // mock sqs to wait on latch before returning - keeps job running + when(sqsClient.receiveMessage(any(ReceiveMessageRequest.class))) + .thenAnswer(inv -> { + processingLatch.await(); + return ReceiveMessageResponse.builder().messages(messages).build(); }) + .thenReturn(ReceiveMessageResponse.builder().messages(Collections.emptyList()).build()); + mockSqsDeleteSuccess(); + + postStartJob() .compose(resp -> { - context.assertEquals(202, resp.statusCode()); // Should succeed (auto-cleared) - return resp.body(); + assertEquals(202, resp.getInteger("_statusCode")); + return getJobStatus(); }) - .compose(body -> { - JsonObject response = new JsonObject(body.toString()); - context.assertEquals("accepted", response.getString("status")); - - // Wait for second job to complete - return pollForCompletion(context, port, 100, 50); + .onComplete(testContext.succeeding(status -> { + assertEquals("running", status.getString("state")); + assertNotNull(status.getString("start_time")); + + processingLatch.countDown(); + testContext.completeNow(); + })); + } + + @Test + public void testStatusEndpoint_showsFailedJob(VertxTestContext testContext) throws Exception { + mockSqsToReturnMessages(createOldMessages(1)); + doThrow(new RuntimeException("simulated s3 failure")) + .when(cloudStorage).upload(any(InputStream.class), anyString()); + + startJobAndPollForCompletion() + .compose(finalStatus -> { + assertJobFailed(finalStatus); + return getJobStatus(); }) - .onComplete(context.asyncAssertSuccess(secondJobStatus -> { - context.assertEquals("completed", secondJobStatus.getString("state")); - - // Verify both jobs processed messages - verify(sqsClient, atLeast(2)).deleteMessageBatch(any(DeleteMessageBatchRequest.class)); - - async.complete(); + .onComplete(testContext.succeeding(status -> { + assertJobFailed(status); + assertNotNull(status.getString("error")); + assertNotNull(status.getString("start_time")); + assertNotNull(status.getString("end_time")); + assertNotNull(status.getInteger("duration_seconds")); + + testContext.completeNow(); })); } -} + @Test + public void testDeltaProduceEndpoint_invalidApiKey(VertxTestContext testContext) { + postWithAuth("wrong-api-key") + .onComplete(testContext.succeeding(resp -> { + assertEquals(401, resp.getInteger("_statusCode")); + verifySqsReceiveNotCalled(); + testContext.completeNow(); + })); + } +}