Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,19 @@
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.28.11</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>com.uid2</groupId>
Expand Down Expand Up @@ -133,6 +146,10 @@
<version>5.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
7 changes: 7 additions & 0 deletions src/main/java/com/uid2/optout/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ public static class Config extends com.uid2.shared.Const.Config {
public static final String OptOutDeleteExpiredProp = "optout_delete_expired";
public static final String PartnersConfigPathProp = "partners_config_path";
public static final String PartnersMetadataPathProp = "partners_metadata_path";
public static final String OptOutSqsQueueUrlProp = "optout_sqs_queue_url";
public static final String OptOutSqsEnabledProp = "optout_sqs_enabled";
public static final String OptOutSqsS3FolderProp = "optout_sqs_s3_folder"; // Default: "sqs-delta" - folder within same S3 bucket as regular optout
public static final String OptOutSqsDeltaWindowSecondsProp = "optout_sqs_delta_window_seconds"; // interval for each delta file
public static final String OptOutSqsPollIntervalMsProp = "optout_sqs_poll_interval_ms"; // interval for polling SQS
public static final String OptOutSqsMaxMessagesPerPollProp = "optout_sqs_max_messages_per_poll"; // max messages per poll
public static final String OptOutSqsVisibilityTimeoutProp = "optout_sqs_visibility_timeout"; // visibility timeout for messages
}

public static class Event {
Expand Down
23 changes: 23 additions & 0 deletions src/main/java/com/uid2/optout/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class Main {
private final ICloudStorage fsPartnerConfig;
private final RotatingOperatorKeyProvider operatorKeyProvider;
private final boolean observeOnly;
private final boolean sqsEnabled;
private final UidInstanceIdProvider uidInstanceIdProvider;

public Main(Vertx vertx, JsonObject config) throws Exception {
Expand All @@ -73,6 +74,7 @@ public Main(Vertx vertx, JsonObject config) throws Exception {
if (this.observeOnly) {
LOGGER.warn("Running Observe ONLY mode: no producer, no sender");
}
this.sqsEnabled = config.getBoolean(Const.Config.OptOutSqsEnabledProp, false);
this.uidInstanceIdProvider = new UidInstanceIdProvider(config);

boolean useStorageMock = config.getBoolean(Const.Config.StorageMockProp, false);
Expand Down Expand Up @@ -267,6 +269,27 @@ public void run(String[] args) throws IOException {

// upload last delta produced and potentially not uploaded yet
futs.add((this.uploadLastDelta(cs, logProducer, cloudSyncVerticle.eventUpload(), cloudSyncVerticle.eventRefresh())));

// Deploy SQS producer if enabled
if (this.sqsEnabled) {
LOGGER.info("SQS enabled, deploying OptOutSqsLogProducer");
try {
// Create SQS-specific cloud sync with custom folder (default: "sqs-delta")
// Uses same S3 bucket as regular optout (fsOptOut) but different folder
String sqsFolder = this.config.getString(Const.Config.OptOutSqsS3FolderProp, "sqs-delta");
JsonObject sqsConfig = new JsonObject().mergeIn(this.config)
.put(Const.Config.OptOutS3FolderProp, sqsFolder);
OptOutCloudSync sqsCs = new OptOutCloudSync(sqsConfig, true);

// Deploy SQS log producer - reuses fsOptOut for S3 access
OptOutSqsLogProducer sqsLogProducer = new OptOutSqsLogProducer(this.config, this.fsOptOut, sqsCs);
futs.add(this.deploySingleInstance(sqsLogProducer));

LOGGER.info("SQS log producer deployed - same bucket as optout, folder: {}", sqsFolder);
} catch (IOException e) {
LOGGER.error("Failed to initialize SQS log producer: " + e.getMessage(), e);
}
}
}

Supplier<Verticle> svcSupplier = () -> {
Expand Down
1 change: 1 addition & 0 deletions src/main/java/com/uid2/optout/vertx/Endpoints.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ public enum Endpoints {
OPTOUT_REFRESH("/optout/refresh"),
OPTOUT_WRITE("/optout/write"),
OPTOUT_REPLICATE("/optout/replicate"),
OPTOUT_QUEUE("/optout/queue"),
OPTOUT_PARTNER_MOCK("/optout/partner_mock");
private final String path;

Expand Down
167 changes: 167 additions & 0 deletions src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

import java.net.URL;
import java.time.Instant;
Expand Down Expand Up @@ -61,6 +64,9 @@ public class OptOutServiceVerticle extends AbstractVerticle {
private final boolean enableOptOutPartnerMock;
private final String internalApiKey;
private final InternalAuthMiddleware internalAuth;
private final SqsClient sqsClient;
private final String sqsQueueUrl;
private final boolean sqsEnabled;

public OptOutServiceVerticle(Vertx vertx,
IAuthorizableProvider clientKeyProvider,
Expand Down Expand Up @@ -106,6 +112,37 @@ public OptOutServiceVerticle(Vertx vertx,
this.internalApiKey = jsonConfig.getString(Const.Config.OptOutInternalApiTokenProp);
this.internalAuth = new InternalAuthMiddleware(this.internalApiKey, "optout");
this.enableOptOutPartnerMock = jsonConfig.getBoolean(Const.Config.OptOutPartnerEndpointMockProp);

// Initialize SQS client
this.sqsEnabled = jsonConfig.getBoolean(Const.Config.OptOutSqsEnabledProp, false);
this.sqsQueueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp);

LOGGER.info("=== SQS Configuration ===");
LOGGER.info("SQS Enabled: " + this.sqsEnabled);
LOGGER.info("SQS Queue URL: " + this.sqsQueueUrl);
LOGGER.info("AWS_REGION env: " + System.getenv("AWS_REGION"));
LOGGER.info("aws_region config: " + jsonConfig.getString("aws_region"));

SqsClient tempSqsClient = null;
if (this.sqsEnabled) {
if (this.sqsQueueUrl == null || this.sqsQueueUrl.isEmpty()) {
LOGGER.warn("SQS enabled but queue URL not configured");
} else {
try {
tempSqsClient = SqsClient.builder().build();
LOGGER.info("SQS client initialized successfully");
LOGGER.info("SQS client region: " + tempSqsClient.serviceClientConfiguration().region());
LOGGER.info("SQS queue URL configured: " + this.sqsQueueUrl);
} catch (Exception e) {
LOGGER.error("Failed to initialize SQS client: " + e.getMessage(), e);
tempSqsClient = null;
}
}
} else {
LOGGER.info("SQS integration disabled");
}
this.sqsClient = tempSqsClient;
LOGGER.info("=== End SQS Configuration ===");
}

public static void sendStatus(int statusCode, HttpServerResponse response) {
Expand Down Expand Up @@ -136,6 +173,14 @@ public void start(Promise<Void> startPromise) {
@Override
public void stop() {
LOGGER.info("Shutting down OptOutServiceVerticle");
if (this.sqsClient != null) {
try {
this.sqsClient.close();
LOGGER.info("SQS client closed");
} catch (Exception e) {
LOGGER.error("Error closing SQS client", e);
}
}
}

public void setCloudPaths(Collection<String> paths) {
Expand Down Expand Up @@ -246,6 +291,11 @@ private void handleHealthCheck(RoutingContext rc) {
}

private void handleReplicate(RoutingContext routingContext) {

if(this.sqsEnabled){
this.handleQueue(routingContext);
}

HttpServerRequest req = routingContext.request();
MultiMap params = req.params();
String identityHash = req.getParam(IDENTITY_HASH);
Expand Down Expand Up @@ -308,6 +358,123 @@ private void handleReplicate(RoutingContext routingContext) {
}
}

private void handleQueue(RoutingContext routingContext) {
HttpServerRequest req = routingContext.request();
MultiMap params = req.params();
String identityHash = req.getParam(IDENTITY_HASH);
String advertisingId = req.getParam(ADVERTISING_ID);
JsonObject body = routingContext.body().asJsonObject();

HttpServerResponse resp = routingContext.response();

// Validate parameters - same as replicate endpoint
if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) {
this.sendBadRequestError(resp);
return;
}
if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) {
this.sendBadRequestError(resp);
return;
}

if (!this.isGetOrPost(req)) {
this.sendBadRequestError(resp);
return;
}
if (body != null) {
this.sendBadRequestError(resp);
return;
}

// Check if SQS is enabled and configured
if (!this.sqsEnabled || this.sqsClient == null) {
this.sendServiceUnavailableError(resp, "SQS integration not enabled or configured");
return;
}

try {
final String maskedId1 = Utils.maskPii(identityHash);
final String maskedId2 = Utils.maskPii(advertisingId);

LOGGER.info("=== Processing SQS Queue Request ===");
LOGGER.info("Identity Hash (masked): " + maskedId1);
LOGGER.info("Advertising ID (masked): " + maskedId2);
LOGGER.info("SQS Client null? " + (this.sqsClient == null));
LOGGER.info("Queue URL: " + this.sqsQueueUrl);

// Create message body as JSON
JsonObject messageBody = new JsonObject()
.put(IDENTITY_HASH, identityHash)
.put(ADVERTISING_ID, advertisingId)
.put("timestamp", OptOutUtils.nowEpochSeconds());

LOGGER.info("Message body: " + messageBody.encode());

// Send message to SQS queue
vertx.executeBlocking(promise -> {
try {
LOGGER.info("About to send message to SQS...");
LOGGER.info("SQS Client region in worker thread: " + this.sqsClient.serviceClientConfiguration().region());

SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
.queueUrl(this.sqsQueueUrl)
.messageBody(messageBody.encode())
.build();

LOGGER.info("SendMessageRequest created, calling sendMessage...");
SendMessageResponse response = this.sqsClient.sendMessage(sendMsgRequest);

LOGGER.info("SQS sendMessage succeeded, messageId: " + response.messageId());
promise.complete(response.messageId());
} catch (Exception e) {
LOGGER.error("!!! Exception in SQS sendMessage !!!");
LOGGER.error("Exception type: " + e.getClass().getName());
LOGGER.error("Exception message: " + e.getMessage());
LOGGER.error("Full stack trace:", e);

// Log additional SQS-specific details if available
if (e.getCause() != null) {
LOGGER.error("Caused by: " + e.getCause().getClass().getName());
LOGGER.error("Cause message: " + e.getCause().getMessage());
}

promise.fail(e);
}
}, res -> {
if (res.failed()) {
LOGGER.error("=== SQS Queue Request FAILED ===");
LOGGER.error("Failed to queue optout request - identity_hash: " + maskedId1 +
", advertising_id: " + maskedId2);
LOGGER.error("Failure cause: " + res.cause().getMessage());
LOGGER.error("Full error:", res.cause());

this.sendInternalServerError(resp, "Failed to queue message: " + res.cause().getMessage());
} else {
String messageId = (String) res.result();
LOGGER.info("=== SQS Queue Request SUCCEEDED ===");
LOGGER.info("Queued optout request - identity_hash: " + maskedId1 +
", advertising_id: " + maskedId2 +
", messageId: " + messageId);

// Return success with message ID
JsonObject responseJson = new JsonObject()
.put("messageId", messageId)
.put("status", "queued");

resp.setStatusCode(200)
.putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
.setChunked(true)
.write(responseJson.encode());
resp.end();
}
});
} catch (Exception ex) {
LOGGER.error("=== Exception in handleQueue wrapper ===");
LOGGER.error("Error processing queue request: " + ex.getMessage(), ex);
this.sendInternalServerError(resp, ex.getMessage());
}
}

private void handleWrite(RoutingContext routingContext) {
HttpServerRequest req = routingContext.request();
MultiMap params = req.params();
Expand Down
Loading