diff --git a/pom.xml b/pom.xml index f5de4ae9..2de56a62 100644 --- a/pom.xml +++ b/pom.xml @@ -30,6 +30,17 @@ true + + + + software.amazon.awssdk + bom + 2.27.2 + pom + import + + + com.uid2 @@ -133,6 +144,10 @@ 5.12.0 test + + software.amazon.awssdk + sqs + diff --git a/src/main/java/com/uid2/optout/Const.java b/src/main/java/com/uid2/optout/Const.java index 0b51bd4e..f1d2004d 100644 --- a/src/main/java/com/uid2/optout/Const.java +++ b/src/main/java/com/uid2/optout/Const.java @@ -17,6 +17,8 @@ public static class Config extends com.uid2.shared.Const.Config { public static final String OptOutDeleteExpiredProp = "optout_delete_expired"; public static final String PartnersConfigPathProp = "partners_config_path"; public static final String PartnersMetadataPathProp = "partners_metadata_path"; + public static final String OptOutSqsQueueUrlProp = "optout_sqs_queue_url"; + public static final String OptOutSqsEnabledProp = "optout_enqueue_sqs_enabled"; } public static class Event { diff --git a/src/main/java/com/uid2/optout/Main.java b/src/main/java/com/uid2/optout/Main.java index 50c01267..1fec6e07 100644 --- a/src/main/java/com/uid2/optout/Main.java +++ b/src/main/java/com/uid2/optout/Main.java @@ -64,6 +64,7 @@ public class Main { private final ICloudStorage fsPartnerConfig; private final RotatingOperatorKeyProvider operatorKeyProvider; private final boolean observeOnly; + private final boolean enqueueSqsEnabled; private final UidInstanceIdProvider uidInstanceIdProvider; public Main(Vertx vertx, JsonObject config) throws Exception { @@ -73,6 +74,7 @@ public Main(Vertx vertx, JsonObject config) throws Exception { if (this.observeOnly) { LOGGER.warn("Running Observe ONLY mode: no producer, no sender"); } + this.enqueueSqsEnabled = config.getBoolean(Const.Config.OptOutSqsEnabledProp, false); this.uidInstanceIdProvider = new UidInstanceIdProvider(config); boolean useStorageMock = config.getBoolean(Const.Config.StorageMockProp, false); diff --git a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java index 0f8b5128..75073239 100644 --- a/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java +++ b/src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java @@ -33,6 +33,9 @@ import io.vertx.ext.web.RoutingContext; import io.vertx.ext.web.handler.BodyHandler; import io.vertx.ext.web.handler.CorsHandler; +import software.amazon.awssdk.services.sqs.SqsClient; +import software.amazon.awssdk.services.sqs.model.SendMessageRequest; +import software.amazon.awssdk.services.sqs.model.SendMessageResponse; import java.net.URL; import java.time.Instant; @@ -45,6 +48,10 @@ public class OptOutServiceVerticle extends AbstractVerticle { public static final String IDENTITY_HASH = "identity_hash"; public static final String ADVERTISING_ID = "advertising_id"; + public static final String UID_TRACE_ID = "UID-Trace-Id"; + public static final String CLIENT_IP = "client_ip"; + public static final String EMAIL = "email"; + public static final String PHONE = "phone"; public static final long MAX_REQUEST_BODY_SIZE = 1 << 20; // 1MB private static final Logger LOGGER = LoggerFactory.getLogger(OptOutServiceVerticle.class); @@ -61,6 +68,9 @@ public class OptOutServiceVerticle extends AbstractVerticle { private final boolean enableOptOutPartnerMock; private final String internalApiKey; private final InternalAuthMiddleware internalAuth; + private final SqsClient sqsClient; + private final String sqsQueueUrl; + private final boolean sqsEnabled; public OptOutServiceVerticle(Vertx vertx, IAuthorizableProvider clientKeyProvider, @@ -106,6 +116,29 @@ public OptOutServiceVerticle(Vertx vertx, this.internalApiKey = jsonConfig.getString(Const.Config.OptOutInternalApiTokenProp); this.internalAuth = new InternalAuthMiddleware(this.internalApiKey, "optout"); this.enableOptOutPartnerMock = jsonConfig.getBoolean(Const.Config.OptOutPartnerEndpointMockProp); + + this.sqsEnabled = jsonConfig.getBoolean(Const.Config.OptOutSqsEnabledProp, false); + this.sqsQueueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp); + + SqsClient tempSqsClient = null; + if (this.sqsEnabled) { + if (this.sqsQueueUrl == null || this.sqsQueueUrl.isEmpty()) { + LOGGER.warn("SQS enabled but queue URL not configured"); + } else { + try { + tempSqsClient = SqsClient.builder().build(); + LOGGER.info("SQS client initialized successfully"); + LOGGER.info("SQS client region: " + tempSqsClient.serviceClientConfiguration().region()); + LOGGER.info("SQS queue URL configured: " + this.sqsQueueUrl); + } catch (Exception e) { + LOGGER.error("Failed to initialize SQS client: " + e.getMessage(), e); + tempSqsClient = null; + } + } + } else { + LOGGER.info("SQS integration disabled"); + } + this.sqsClient = tempSqsClient; } public static void sendStatus(int statusCode, HttpServerResponse response) { @@ -136,6 +169,14 @@ public void start(Promise startPromise) { @Override public void stop() { LOGGER.info("Shutting down OptOutServiceVerticle"); + if (this.sqsClient != null) { + try { + this.sqsClient.close(); + LOGGER.info("SQS client closed"); + } catch (Exception e) { + LOGGER.error("Error closing SQS client", e); + } + } } public void setCloudPaths(Collection paths) { @@ -246,6 +287,11 @@ private void handleHealthCheck(RoutingContext rc) { } private void handleReplicate(RoutingContext routingContext) { + + if(this.sqsEnabled && this.sqsClient != null){ + this.handleQueue(routingContext); + } + HttpServerRequest req = routingContext.request(); MultiMap params = req.params(); String identityHash = req.getParam(IDENTITY_HASH); @@ -307,6 +353,83 @@ private void handleReplicate(RoutingContext routingContext) { } } } + + private void handleQueue(RoutingContext routingContext) { + HttpServerRequest req = routingContext.request(); + MultiMap params = req.params(); + String identityHash = req.getParam(IDENTITY_HASH); + String advertisingId = req.getParam(ADVERTISING_ID); + JsonObject body = routingContext.body().asJsonObject(); + String traceId = req.getHeader(UID_TRACE_ID); + String clientIp = body != null ? body.getString(CLIENT_IP) : null; + String email = body != null ? body.getString(EMAIL) : null; + String phone = body != null ? body.getString(PHONE) : null; + + HttpServerResponse resp = routingContext.response(); + + // while old delta production is enabled, response is handled by replicate logic + + // Validate parameters - same as replicate + if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) { + // this.sendBadRequestError(resp); + return; + } + if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) { + // this.sendBadRequestError(resp); + return; + } + + if (!this.isGetOrPost(req)) { + // this.sendBadRequestError(resp); + return; + } + + try { + JsonObject messageBody = new JsonObject() + .put(IDENTITY_HASH, identityHash) + .put(ADVERTISING_ID, advertisingId) + .put("trace_id", traceId) + .put("client_ip", clientIp) + .put("email", email) + .put("phone", phone); + + // Send message to SQS queue + vertx.executeBlocking(promise -> { + try { + SendMessageRequest sendMsgRequest = SendMessageRequest.builder() + .queueUrl(this.sqsQueueUrl) + .messageBody(messageBody.encode()) + .build(); + + SendMessageResponse response = this.sqsClient.sendMessage(sendMsgRequest); + promise.complete(response.messageId()); + } catch (Exception e) { + promise.fail(e); + } + }, res -> { + if (res.failed()) { + // this.sendInternalServerError(resp, "Failed to queue message: " + res.cause().getMessage()); + LOGGER.error("Failed to queue message: " + res.cause().getMessage()); + } else { + String messageId = (String) res.result(); + + JsonObject responseJson = new JsonObject() + .put("status", "queued"); + + LOGGER.info("Message queued successfully: " + messageId); + + // resp.setStatusCode(200) + // .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") + // .setChunked(true) + // .write(responseJson.encode()); + // resp.end(); + } + }); + } catch (Exception ex) { + // this.sendInternalServerError(resp, ex.getMessage()); + LOGGER.error("Error processing queue request: " + ex.getMessage(), ex); + } + } private void handleWrite(RoutingContext routingContext) { HttpServerRequest req = routingContext.request();