diff --git a/pom.xml b/pom.xml
index f5de4ae9..4bd637f7 100644
--- a/pom.xml
+++ b/pom.xml
@@ -30,6 +30,19 @@
true
+
+
+
+
+ software.amazon.awssdk
+ bom
+ 2.28.11
+ pom
+ import
+
+
+
+
com.uid2
@@ -133,6 +146,10 @@
5.12.0
test
+
+ software.amazon.awssdk
+ sqs
+
diff --git a/src/main/java/com/uid2/optout/Const.java b/src/main/java/com/uid2/optout/Const.java
index 0b51bd4e..52f7d50c 100644
--- a/src/main/java/com/uid2/optout/Const.java
+++ b/src/main/java/com/uid2/optout/Const.java
@@ -17,6 +17,13 @@ public static class Config extends com.uid2.shared.Const.Config {
public static final String OptOutDeleteExpiredProp = "optout_delete_expired";
public static final String PartnersConfigPathProp = "partners_config_path";
public static final String PartnersMetadataPathProp = "partners_metadata_path";
+ public static final String OptOutSqsQueueUrlProp = "optout_sqs_queue_url";
+ public static final String OptOutSqsEnabledProp = "optout_sqs_enabled";
+ public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // Default: "sqs-delta" - folder within same S3 bucket as regular optout
+ public static final String OptOutSqsDeltaWindowSecondsProp = "optout_sqs_delta_window_seconds"; // interval for each delta file
+ public static final String OptOutSqsPollIntervalMsProp = "optout_sqs_poll_interval_ms"; // interval for polling SQS
+ public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll"; // max messages per poll
+ public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout"; // visibility timeout for messages
}
public static class Event {
diff --git a/src/main/java/com/uid2/optout/Main.java b/src/main/java/com/uid2/optout/Main.java
index 50c01267..7aeb1f27 100644
--- a/src/main/java/com/uid2/optout/Main.java
+++ b/src/main/java/com/uid2/optout/Main.java
@@ -64,6 +64,7 @@ public class Main {
private final ICloudStorage fsPartnerConfig;
private final RotatingOperatorKeyProvider operatorKeyProvider;
private final boolean observeOnly;
+ private final boolean sqsEnabled;
private final UidInstanceIdProvider uidInstanceIdProvider;
public Main(Vertx vertx, JsonObject config) throws Exception {
@@ -73,6 +74,7 @@ public Main(Vertx vertx, JsonObject config) throws Exception {
if (this.observeOnly) {
LOGGER.warn("Running Observe ONLY mode: no producer, no sender");
}
+ this.sqsEnabled = config.getBoolean(Const.Config.OptOutSqsEnabledProp, false);
this.uidInstanceIdProvider = new UidInstanceIdProvider(config);
boolean useStorageMock = config.getBoolean(Const.Config.StorageMockProp, false);
@@ -267,6 +269,27 @@ public void run(String[] args) throws IOException {
// upload last delta produced and potentially not uploaded yet
futs.add((this.uploadLastDelta(cs, logProducer, cloudSyncVerticle.eventUpload(), cloudSyncVerticle.eventRefresh())));
+
+ // Deploy SQS producer if enabled
+ if (this.sqsEnabled) {
+ LOGGER.info("SQS enabled, deploying OptOutSqsLogProducer");
+ try {
+ // Create SQS-specific cloud sync with custom folder (default: "sqs-delta")
+ // Uses same S3 bucket as regular optout (fsOptOut) but different folder
+ String sqsFolder = this.config.getString(Const.Config.OptOutSqsS3FolderProp, "sqs-delta");
+ JsonObject sqsConfig = new JsonObject().mergeIn(this.config)
+ .put(Const.Config.OptOutS3FolderProp, sqsFolder);
+ OptOutCloudSync sqsCs = new OptOutCloudSync(sqsConfig, true);
+
+ // Deploy SQS log producer - reuses fsOptOut for S3 access
+ OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, this.fsOptOut, sqsCs);
+ futs.add(this.deploySingleInstance(sqsLogProducer));
+
+ LOGGER.info("SQS log producer deployed - same bucket as optout, folder: {}", sqsFolder);
+ } catch (IOException e) {
+ LOGGER.error("Failed to initialize SQS log producer: " + e.getMessage(), e);
+ }
+ }
}
Supplier svcSupplier = () -> {
diff --git a/src/main/java/com/uid2/optout/vertx/Endpoints.java b/src/main/java/com/uid2/optout/vertx/Endpoints.java
index 4f900651..6b028e72 100644
--- a/src/main/java/com/uid2/optout/vertx/Endpoints.java
+++ b/src/main/java/com/uid2/optout/vertx/Endpoints.java
@@ -9,6 +9,7 @@ public enum Endpoints {
OPTOUT_REFRESH("/optout/refresh"),
OPTOUT_WRITE("/optout/write"),
OPTOUT_REPLICATE("/optout/replicate"),
+ OPTOUT_QUEUE("/optout/queue"),
OPTOUT_PARTNER_MOCK("/optout/partner_mock");
private final String path;
diff --git a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java
index 0f8b5128..eadcd97f 100644
--- a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java
+++ b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java
@@ -33,6 +33,9 @@
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
+import software.amazon.awssdk.services.sqs.model.SendMessageResponse;
import java.net.URL;
import java.time.Instant;
@@ -61,6 +64,9 @@ public class OptOutServiceVerticle extends AbstractVerticle {
private final boolean enableOptOutPartnerMock;
private final String internalApiKey;
private final InternalAuthMiddleware internalAuth;
+ private final SqsClient sqsClient;
+ private final String sqsQueueUrl;
+ private final boolean sqsEnabled;
public OptOutServiceVerticle(Vertx vertx,
IAuthorizableProvider clientKeyProvider,
@@ -106,6 +112,37 @@ public OptOutServiceVerticle(Vertx vertx,
this.internalApiKey = jsonConfig.getString(Const.Config.OptOutInternalApiTokenProp);
this.internalAuth = new InternalAuthMiddleware(this.internalApiKey, "optout");
this.enableOptOutPartnerMock = jsonConfig.getBoolean(Const.Config.OptOutPartnerEndpointMockProp);
+
+ // Initialize SQS client
+ this.sqsEnabled = jsonConfig.getBoolean(Const.Config.OptOutSqsEnabledProp, false);
+ this.sqsQueueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp);
+
+ LOGGER.info("=== SQS Configuration ===");
+ LOGGER.info("SQS Enabled: " + this.sqsEnabled);
+ LOGGER.info("SQS Queue URL: " + this.sqsQueueUrl);
+ LOGGER.info("AWS_REGION env: " + System.getenv("AWS_REGION"));
+ LOGGER.info("aws_region config: " + jsonConfig.getString("aws_region"));
+
+ SqsClient tempSqsClient = null;
+ if (this.sqsEnabled) {
+ if (this.sqsQueueUrl == null || this.sqsQueueUrl.isEmpty()) {
+ LOGGER.warn("SQS enabled but queue URL not configured");
+ } else {
+ try {
+ tempSqsClient = SqsClient.builder().build();
+ LOGGER.info("SQS client initialized successfully");
+ LOGGER.info("SQS client region: " + tempSqsClient.serviceClientConfiguration().region());
+ LOGGER.info("SQS queue URL configured: " + this.sqsQueueUrl);
+ } catch (Exception e) {
+ LOGGER.error("Failed to initialize SQS client: " + e.getMessage(), e);
+ tempSqsClient = null;
+ }
+ }
+ } else {
+ LOGGER.info("SQS integration disabled");
+ }
+ this.sqsClient = tempSqsClient;
+ LOGGER.info("=== End SQS Configuration ===");
}
public static void sendStatus(int statusCode, HttpServerResponse response) {
@@ -136,6 +173,14 @@ public void start(Promise startPromise) {
@Override
public void stop() {
LOGGER.info("Shutting down OptOutServiceVerticle");
+ if (this.sqsClient != null) {
+ try {
+ this.sqsClient.close();
+ LOGGER.info("SQS client closed");
+ } catch (Exception e) {
+ LOGGER.error("Error closing SQS client", e);
+ }
+ }
}
public void setCloudPaths(Collection paths) {
@@ -246,6 +291,11 @@ private void handleHealthCheck(RoutingContext rc) {
}
private void handleReplicate(RoutingContext routingContext) {
+
+ if(this.sqsEnabled){
+ this.handleQueue(routingContext);
+ }
+
HttpServerRequest req = routingContext.request();
MultiMap params = req.params();
String identityHash = req.getParam(IDENTITY_HASH);
@@ -308,6 +358,123 @@ private void handleReplicate(RoutingContext routingContext) {
}
}
+ private void handleQueue(RoutingContext routingContext) {
+ HttpServerRequest req = routingContext.request();
+ MultiMap params = req.params();
+ String identityHash = req.getParam(IDENTITY_HASH);
+ String advertisingId = req.getParam(ADVERTISING_ID);
+ JsonObject body = routingContext.body().asJsonObject();
+
+ HttpServerResponse resp = routingContext.response();
+
+ // Validate parameters - same as replicate endpoint
+ if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) {
+ this.sendBadRequestError(resp);
+ return;
+ }
+ if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) {
+ this.sendBadRequestError(resp);
+ return;
+ }
+
+ if (!this.isGetOrPost(req)) {
+ this.sendBadRequestError(resp);
+ return;
+ }
+ if (body != null) {
+ this.sendBadRequestError(resp);
+ return;
+ }
+
+ // Check if SQS is enabled and configured
+ if (!this.sqsEnabled || this.sqsClient == null) {
+ this.sendServiceUnavailableError(resp, "SQS integration not enabled or configured");
+ return;
+ }
+
+ try {
+ final String maskedId1 = Utils.maskPii(identityHash);
+ final String maskedId2 = Utils.maskPii(advertisingId);
+
+ LOGGER.info("=== Processing SQS Queue Request ===");
+ LOGGER.info("Identity Hash (masked): " + maskedId1);
+ LOGGER.info("Advertising ID (masked): " + maskedId2);
+ LOGGER.info("SQS Client null? " + (this.sqsClient == null));
+ LOGGER.info("Queue URL: " + this.sqsQueueUrl);
+
+ // Create message body as JSON
+ JsonObject messageBody = new JsonObject()
+ .put(IDENTITY_HASH, identityHash)
+ .put(ADVERTISING_ID, advertisingId)
+ .put("timestamp", OptOutUtils.nowEpochSeconds());
+
+ LOGGER.info("Message body: " + messageBody.encode());
+
+ // Send message to SQS queue
+ vertx.executeBlocking(promise -> {
+ try {
+ LOGGER.info("About to send message to SQS...");
+ LOGGER.info("SQS Client region in worker thread: " + this.sqsClient.serviceClientConfiguration().region());
+
+ SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
+ .queueUrl(this.sqsQueueUrl)
+ .messageBody(messageBody.encode())
+ .build();
+
+ LOGGER.info("SendMessageRequest created, calling sendMessage...");
+ SendMessageResponse response = this.sqsClient.sendMessage(sendMsgRequest);
+
+ LOGGER.info("SQS sendMessage succeeded, messageId: " + response.messageId());
+ promise.complete(response.messageId());
+ } catch (Exception e) {
+ LOGGER.error("!!! Exception in SQS sendMessage !!!");
+ LOGGER.error("Exception type: " + e.getClass().getName());
+ LOGGER.error("Exception message: " + e.getMessage());
+ LOGGER.error("Full stack trace:", e);
+
+ // Log additional SQS-specific details if available
+ if (e.getCause() != null) {
+ LOGGER.error("Caused by: " + e.getCause().getClass().getName());
+ LOGGER.error("Cause message: " + e.getCause().getMessage());
+ }
+
+ promise.fail(e);
+ }
+ }, res -> {
+ if (res.failed()) {
+ LOGGER.error("=== SQS Queue Request FAILED ===");
+ LOGGER.error("Failed to queue optout request - identity_hash: " + maskedId1 +
+ ", advertising_id: " + maskedId2);
+ LOGGER.error("Failure cause: " + res.cause().getMessage());
+ LOGGER.error("Full error:", res.cause());
+
+ this.sendInternalServerError(resp, "Failed to queue message: " + res.cause().getMessage());
+ } else {
+ String messageId = (String) res.result();
+ LOGGER.info("=== SQS Queue Request SUCCEEDED ===");
+ LOGGER.info("Queued optout request - identity_hash: " + maskedId1 +
+ ", advertising_id: " + maskedId2 +
+ ", messageId: " + messageId);
+
+ // Return success with message ID
+ JsonObject responseJson = new JsonObject()
+ .put("messageId", messageId)
+ .put("status", "queued");
+
+ resp.setStatusCode(200)
+ .putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
+ .setChunked(true)
+ .write(responseJson.encode());
+ resp.end();
+ }
+ });
+ } catch (Exception ex) {
+ LOGGER.error("=== Exception in handleQueue wrapper ===");
+ LOGGER.error("Error processing queue request: " + ex.getMessage(), ex);
+ this.sendInternalServerError(resp, ex.getMessage());
+ }
+ }
+
private void handleWrite(RoutingContext routingContext) {
HttpServerRequest req = routingContext.request();
MultiMap params = req.params();
diff --git a/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java
new file mode 100644
index 00000000..b7802a37
--- /dev/null
+++ b/src/main/java/com/uid2/optout/vertx/OptOutSqsLogProducer.java
@@ -0,0 +1,473 @@
+package com.uid2.optout.vertx;
+
+import com.uid2.optout.Const;
+import com.uid2.shared.cloud.ICloudStorage;
+import com.uid2.shared.health.HealthComponent;
+import com.uid2.shared.health.HealthManager;
+import com.uid2.shared.optout.*;
+import io.micrometer.core.instrument.Counter;
+import io.micrometer.core.instrument.Metrics;
+import io.vertx.core.*;
+import io.vertx.core.json.JsonObject;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.sqs.SqsClient;
+import software.amazon.awssdk.services.sqs.model.*;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.ArrayList;
+import java.util.List;
+
+public class OptOutSqsLogProducer extends AbstractVerticle {
+ private static final Logger LOGGER = LoggerFactory.getLogger(OptOutSqsLogProducer.class);
+ private final HealthComponent healthComponent = HealthManager.instance.registerComponent("sqs-log-producer");
+
+ private final SqsClient sqsClient;
+ private final String queueUrl;
+ private final String eventDeltaProduced;
+ private final int pollIntervalMs;
+ private final int maxMessagesPerPoll;
+ private final int visibilityTimeout;
+ private final int deltaWindowSeconds; // Time window for each delta file (e.g., 5 minutes = 300 seconds)
+ private final int replicaId;
+ private final ICloudStorage cloudStorage;
+ private final OptOutCloudSync cloudSync;
+
+ private Counter counterDeltaProduced = Counter
+ .builder("uid2_optout_sqs_delta_produced_total")
+ .description("counter for how many optout delta files are produced from SQS")
+ .register(Metrics.globalRegistry);
+
+ private Counter counterEntriesProcessed = Counter
+ .builder("uid2_optout_sqs_entries_processed_total")
+ .description("counter for how many optout entries are processed from SQS")
+ .register(Metrics.globalRegistry);
+
+ private ByteBuffer buffer;
+ private ByteArrayOutputStream currentDeltaStream = null;
+ private String currentDeltaName = null;
+ private Long currentDeltaWindowStart = null; // Unix epoch seconds for the start of current delta's time window
+ private boolean writeInProgress = false;
+ private boolean shutdownInProgress = false;
+
+ private long pollTimerId = -1;
+
+ // Track messages in current delta - only delete from SQS after successful S3 upload
+ private final List currentDeltaMessages = new ArrayList<>();
+
+ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync) throws IOException {
+ this(jsonConfig, cloudStorage, cloudSync, Const.Event.DeltaProduced);
+ }
+
+ public OptOutSqsLogProducer(JsonObject jsonConfig, ICloudStorage cloudStorage, OptOutCloudSync cloudSync, String eventDeltaProduced) throws IOException {
+ this.eventDeltaProduced = eventDeltaProduced;
+ this.replicaId = OptOutUtils.getReplicaId(jsonConfig);
+ this.cloudStorage = cloudStorage;
+ this.cloudSync = cloudSync;
+
+ // Initialize SQS client
+ this.queueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp);
+ if (this.queueUrl == null || this.queueUrl.isEmpty()) {
+ throw new IOException("SQS queue URL not configured");
+ }
+
+ this.sqsClient = SqsClient.builder().build();
+ LOGGER.info("SQS client initialized for queue: " + this.queueUrl);
+
+ // SQS polling configuration
+ this.pollIntervalMs = jsonConfig.getInteger(Const.Config.OptOutSqsPollIntervalMsProp, 5000); // 5 seconds default
+ this.maxMessagesPerPoll = jsonConfig.getInteger(Const.Config.OptOutSqsMaxMessagesPerPollProp, 10); // SQS max is 10
+ this.visibilityTimeout = jsonConfig.getInteger(Const.Config.OptOutSqsVisibilityTimeoutProp, 300); // 5 minutes default
+
+ // Delta time window configuration - each delta file represents this many seconds of opt-out data
+ this.deltaWindowSeconds = jsonConfig.getInteger(Const.Config.OptOutSqsDeltaWindowSecondsProp, 300); // 5 minutes default
+
+ // Initialize buffer (same value as OptOutLogProducer)
+ int bufferSize = jsonConfig.getInteger(Const.Config.OptOutProducerBufferSizeProp);
+ this.buffer = ByteBuffer.allocate(bufferSize).order(ByteOrder.LITTLE_ENDIAN);
+
+ LOGGER.info("SQS Log Producer initialized - poll interval: {}ms, max messages: {}, visibility timeout: {}s, delta window: {}s, replica id: {}",
+ this.pollIntervalMs, this.maxMessagesPerPoll, this.visibilityTimeout, this.deltaWindowSeconds, this.replicaId);
+ }
+
+ @Override
+ public void start(Promise startPromise) {
+ LOGGER.info("Starting SQS Log Producer...");
+
+ try {
+ // Start SQS polling timer - delta rotation happens automatically based on message timestamps
+ this.pollTimerId = vertx.setPeriodic(this.pollIntervalMs, id -> this.pollSqsAndWrite());
+
+ this.healthComponent.setHealthStatus(true);
+ startPromise.complete();
+ LOGGER.info("SQS Log Producer deployed successfully - polling every {}ms, delta window {}s",
+ this.pollIntervalMs, this.deltaWindowSeconds);
+
+ } catch (Exception e) {
+ LOGGER.error("Failed to start SQS Log Producer", e);
+ this.healthComponent.setHealthStatus(false, e.getMessage());
+ startPromise.fail(e);
+ }
+ }
+
+ @Override
+ public void stop(Promise stopPromise) {
+ LOGGER.info("Stopping SQS Log Producer...");
+ this.shutdownInProgress = true;
+
+ // Cancel polling timer
+ if (this.pollTimerId >= 0) {
+ vertx.cancelTimer(this.pollTimerId);
+ }
+
+ // Upload current delta if any
+ vertx.executeBlocking(promise -> {
+ try {
+ this.uploadCurrentDelta(true);
+ promise.complete();
+ } catch (Exception e) {
+ LOGGER.error("Error during shutdown upload", e);
+ promise.fail(e);
+ }
+ }, res -> {
+ // Close SQS client
+ if (this.sqsClient != null) {
+ try {
+ this.sqsClient.close();
+ LOGGER.info("SQS client closed");
+ } catch (Exception e) {
+ LOGGER.error("Error closing SQS client", e);
+ }
+ }
+
+ stopPromise.complete();
+ LOGGER.info("SQS Log Producer stopped");
+ });
+ }
+
+ private void pollSqsAndWrite() {
+ if (this.shutdownInProgress || this.writeInProgress) {
+ return;
+ }
+
+ this.writeInProgress = true;
+
+ vertx.executeBlocking(promise -> {
+ try {
+ List messages = this.receiveMessagesFromSqs();
+
+ if (messages.isEmpty()) {
+ promise.complete(0);
+ return;
+ }
+
+ // Process messages based on their timestamps, creating new deltas as needed
+ int totalProcessed = this.processMessagesWithTimeWindowing(messages);
+
+ promise.complete(totalProcessed);
+
+ } catch (Exception e) {
+ LOGGER.error("Error in SQS poll and write", e);
+ promise.fail(e);
+ }
+ }, res -> {
+ this.writeInProgress = false;
+
+ if (res.succeeded()) {
+ int written = (Integer) res.result();
+ if (written > 0) {
+ LOGGER.debug("Processed {} entries from SQS", written);
+ this.counterEntriesProcessed.increment(written);
+ }
+ } else {
+ LOGGER.error("Failed to process SQS messages", res.cause());
+ }
+ });
+ }
+
+ private List receiveMessagesFromSqs() {
+ try {
+ ReceiveMessageRequest receiveRequest = ReceiveMessageRequest.builder()
+ .queueUrl(this.queueUrl)
+ .maxNumberOfMessages(this.maxMessagesPerPoll)
+ .visibilityTimeout(this.visibilityTimeout)
+ .waitTimeSeconds(0) // Non-blocking poll
+ .build();
+
+ ReceiveMessageResponse response = sqsClient.receiveMessage(receiveRequest);
+
+ LOGGER.debug("Received {} messages from SQS", response.messages().size());
+ return response.messages();
+
+ } catch (Exception e) {
+ LOGGER.error("Error receiving messages from SQS", e);
+ return new ArrayList<>();
+ }
+ }
+
+ // Process messages with time-based windowing - creates multiple deltas if messages span multiple time windows
+ private int processMessagesWithTimeWindowing(List messages) {
+ // Parse and sort messages by timestamp
+ List parsedMessages = new ArrayList<>();
+ for (Message message : messages) {
+ try {
+ JsonObject body = new JsonObject(message.body());
+ String identityHash = body.getString("identity_hash");
+ String advertisingId = body.getString("advertising_id");
+ Long timestamp = body.getLong("timestamp"); // todo where is this from?
+
+ if (identityHash == null || advertisingId == null || timestamp == null) {
+ LOGGER.error("Invalid message format, skipping: {}", message.body());
+ continue;
+ }
+
+ byte[] hashBytes = OptOutUtils.base64StringTobyteArray(identityHash);
+ byte[] idBytes = OptOutUtils.base64StringTobyteArray(advertisingId);
+
+ if (hashBytes == null || idBytes == null) {
+ LOGGER.error("Invalid base64 encoding, skipping message");
+ continue;
+ }
+
+ parsedMessages.add(new ParsedMessage(message, hashBytes, idBytes, timestamp));
+ } catch (Exception e) {
+ LOGGER.error("Error parsing SQS message", e);
+ }
+ }
+
+ // Sort by timestamp
+ parsedMessages.sort((a, b) -> Long.compare(a.timestamp, b.timestamp));
+
+ int totalProcessed = 0;
+
+ // Process each message, rotating deltas as needed based on time windows
+ for (ParsedMessage parsed : parsedMessages) {
+ try {
+ // Check if we need to rotate to a new delta based on the message timestamp
+ if (this.shouldRotateDelta(parsed.timestamp)) {
+ // Upload current delta if it exists (this will delete messages from SQS after successful upload)
+ if (this.currentDeltaStream != null) {
+ this.uploadCurrentDelta(false);
+ }
+ // Start a new delta for this time window
+ this.startNewDelta(parsed.timestamp);
+ }
+
+ // Ensure we have a delta stream
+ if (this.currentDeltaStream == null) {
+ this.startNewDelta(parsed.timestamp);
+ }
+
+ // Write entry to current delta
+ this.writeEntryToDelta(parsed.hashBytes, parsed.idBytes, parsed.timestamp);
+
+ // Track this message with the current delta - will be deleted from SQS after successful S3 upload
+ this.currentDeltaMessages.add(parsed.originalMessage);
+ totalProcessed++;
+
+ } catch (Exception e) {
+ LOGGER.error("Error processing message with timestamp " + parsed.timestamp, e);
+ }
+ }
+
+ return totalProcessed;
+ }
+
+ // Helper class to hold parsed message data
+ private static class ParsedMessage {
+ final Message originalMessage;
+ final byte[] hashBytes;
+ final byte[] idBytes;
+ final long timestamp;
+
+ ParsedMessage(Message originalMessage, byte[] hashBytes, byte[] idBytes, long timestamp) {
+ this.originalMessage = originalMessage;
+ this.hashBytes = hashBytes;
+ this.idBytes = idBytes;
+ this.timestamp = timestamp;
+ }
+ }
+
+ // Check if we should rotate to a new delta based on message timestamp
+ private boolean shouldRotateDelta(long messageTimestamp) {
+ if (this.currentDeltaWindowStart == null) {
+ return false; // No current delta
+ }
+
+ long windowEnd = this.currentDeltaWindowStart + this.deltaWindowSeconds;
+ return messageTimestamp >= windowEnd;
+ }
+
+ // Start a new delta for the time window containing the given timestamp
+ private void startNewDelta(long messageTimestamp) throws IOException {
+ // Calculate the window start for this message (round down to delta window boundary)
+ long windowStart = (messageTimestamp / this.deltaWindowSeconds) * this.deltaWindowSeconds; // todo - should we keep fixed 5 minute or only produce deltas when there are messages
+
+ this.currentDeltaWindowStart = windowStart;
+ this.currentDeltaName = OptOutUtils.newDeltaFileName(this.replicaId);
+ this.currentDeltaStream = new ByteArrayOutputStream();
+
+ // Add a special first entry with null hash and the window start timestamp
+ buffer.put(OptOutUtils.nullHashBytes);
+ buffer.put(OptOutUtils.nullHashBytes);
+ buffer.putLong(windowStart);
+ buffer.flip();
+ byte[] firstEntry = new byte[buffer.remaining()];
+ buffer.get(firstEntry);
+ this.currentDeltaStream.write(firstEntry);
+ buffer.clear();
+
+ LOGGER.info("Started new delta: {} for time window [{}, {})",
+ this.currentDeltaName, windowStart, windowStart + this.deltaWindowSeconds);
+ }
+
+ // Write a single entry to the current delta
+ private void writeEntryToDelta(byte[] hashBytes, byte[] idBytes, long timestamp) throws IOException {
+ this.checkBufferSize(OptOutConst.EntrySize);
+
+ OptOutEntry.writeTo(buffer, hashBytes, idBytes, timestamp);
+ buffer.flip();
+ byte[] data = new byte[buffer.remaining()];
+ buffer.get(data);
+ this.currentDeltaStream.write(data);
+ buffer.clear();
+ }
+
+ private void deleteMessagesFromSqs(List messages) {
+ if (messages.isEmpty()) {
+ return;
+ }
+
+ vertx.executeBlocking(promise -> {
+ try {
+ // SQS allows batch delete of up to 10 messages at a time
+ List entries = new ArrayList<>();
+ int batchId = 0;
+ int totalDeleted = 0;
+
+ for (Message msg : messages) {
+ entries.add(DeleteMessageBatchRequestEntry.builder()
+ .id(String.valueOf(batchId++))
+ .receiptHandle(msg.receiptHandle())
+ .build());
+
+ // Send batch when we reach 10 messages or at the end
+ if (entries.size() == 10 || batchId == messages.size()) {
+ DeleteMessageBatchRequest deleteRequest = DeleteMessageBatchRequest.builder()
+ .queueUrl(this.queueUrl)
+ .entries(entries)
+ .build();
+
+ DeleteMessageBatchResponse deleteResponse = sqsClient.deleteMessageBatch(deleteRequest);
+
+ if (!deleteResponse.failed().isEmpty()) {
+ LOGGER.error("Failed to delete {} messages from SQS", deleteResponse.failed().size());
+ } else {
+ totalDeleted += entries.size();
+ }
+
+ entries.clear();
+ }
+ }
+
+ LOGGER.debug("Deleted {} messages from SQS", totalDeleted);
+ promise.complete();
+
+ } catch (Exception e) {
+ LOGGER.error("Error deleting messages from SQS", e);
+ promise.fail(e);
+ }
+ });
+ }
+
+ // Upload current delta to S3 (called when time window is complete or during shutdown)
+ private void uploadCurrentDelta(boolean shuttingDown) {
+ if (this.currentDeltaStream == null || this.currentDeltaName == null) {
+ if (shuttingDown) {
+ this.shutdownInProgress = true;
+ }
+ return;
+ }
+
+ try {
+ // Add a special last entry with ffff hash and the window end timestamp
+ long endTimestamp = this.currentDeltaWindowStart != null ?
+ this.currentDeltaWindowStart + this.deltaWindowSeconds :
+ OptOutUtils.nowEpochSeconds();
+
+ buffer.put(OptOutUtils.onesHashBytes);
+ buffer.put(OptOutUtils.onesHashBytes);
+ buffer.putLong(endTimestamp);
+ buffer.flip();
+ byte[] lastEntry = new byte[buffer.remaining()];
+ buffer.get(lastEntry);
+ this.currentDeltaStream.write(lastEntry);
+ buffer.clear();
+
+ // Get the complete delta data
+ byte[] deltaData = this.currentDeltaStream.toByteArray();
+
+ // Generate S3 path using cloud sync
+ String s3Path = this.cloudSync.toCloudPath(this.currentDeltaName);
+
+ // Upload to S3 using ICloudStorage
+ LOGGER.info("Uploading delta to S3: {} ({} bytes, window: [{}, {}))",
+ s3Path, deltaData.length, this.currentDeltaWindowStart, endTimestamp);
+
+ boolean uploadSucceeded = false;
+ try (ByteArrayInputStream inputStream = new ByteArrayInputStream(deltaData)) {
+ this.cloudStorage.upload(inputStream, s3Path);
+ LOGGER.info("Successfully uploaded delta to S3: {}", s3Path);
+ uploadSucceeded = true;
+
+ // Publish event
+ this.publishDeltaProducedEvent(this.currentDeltaName);
+
+ } catch (Exception uploadEx) {
+ LOGGER.error("Failed to upload delta to S3: " + uploadEx.getMessage(), uploadEx);
+ // Don't delete messages from SQS if upload failed - they'll be retried
+ }
+
+ // CRITICAL: Only delete messages from SQS after successful S3 upload
+ if (uploadSucceeded && !this.currentDeltaMessages.isEmpty()) {
+ LOGGER.info("Deleting {} messages from SQS after successful S3 upload", this.currentDeltaMessages.size());
+ this.deleteMessagesFromSqs(new ArrayList<>(this.currentDeltaMessages));
+ this.currentDeltaMessages.clear();
+ }
+
+ // Close the stream and reset
+ this.currentDeltaStream.close();
+ this.currentDeltaStream = null;
+ this.currentDeltaName = null;
+ this.currentDeltaWindowStart = null;
+
+ } catch (Exception ex) {
+ LOGGER.error("Error uploading delta: " + ex.getMessage(), ex);
+ // Don't clear currentDeltaMessages - messages will remain in SQS for retry
+ }
+
+ if (shuttingDown) {
+ this.shutdownInProgress = true;
+ }
+ }
+
+ private void publishDeltaProducedEvent(String newDelta) {
+ this.counterDeltaProduced.increment();
+ vertx.eventBus().publish(this.eventDeltaProduced, newDelta);
+ LOGGER.info("Published delta.produced event for: {}", newDelta);
+ }
+
+ private void checkBufferSize(int dataSize) {
+ ByteBuffer b = this.buffer;
+ if (b.capacity() < dataSize) {
+ int newCapacity = Integer.highestOneBit(dataSize) << 1;
+ LOGGER.warn("Expanding buffer size: current {}, need {}, new {}", b.capacity(), dataSize, newCapacity);
+ this.buffer = ByteBuffer.allocate(newCapacity).order(ByteOrder.LITTLE_ENDIAN);
+ }
+ }
+}
+