Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,17 @@
<snapshots><enabled>true</enabled></snapshots>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>bom</artifactId>
<version>2.27.2</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>com.uid2</groupId>
Expand Down Expand Up @@ -133,6 +144,10 @@
<version>5.12.0</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sqs</artifactId>
</dependency>
</dependencies>

<build>
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/uid2/optout/Const.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ public static class Config extends com.uid2.shared.Const.Config {
public static final String OptOutDeleteExpiredProp = "optout_delete_expired";
public static final String PartnersConfigPathProp = "partners_config_path";
public static final String PartnersMetadataPathProp = "partners_metadata_path";
public static final String OptOutSqsQueueUrlProp = "optout_sqs_queue_url";
public static final String OptOutSqsEnabledProp = "optout_enqueue_sqs_enabled";
}

public static class Event {
Expand Down
2 changes: 2 additions & 0 deletions src/main/java/com/uid2/optout/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ public class Main {
private final ICloudStorage fsPartnerConfig;
private final RotatingOperatorKeyProvider operatorKeyProvider;
private final boolean observeOnly;
private final boolean enqueueSqsEnabled;
private final UidInstanceIdProvider uidInstanceIdProvider;

public Main(Vertx vertx, JsonObject config) throws Exception {
Expand All @@ -73,6 +74,7 @@ public Main(Vertx vertx, JsonObject config) throws Exception {
if (this.observeOnly) {
LOGGER.warn("Running Observe ONLY mode: no producer, no sender");
}
this.enqueueSqsEnabled = config.getBoolean(Const.Config.OptOutSqsEnabledProp, false);
this.uidInstanceIdProvider = new UidInstanceIdProvider(config);

boolean useStorageMock = config.getBoolean(Const.Config.StorageMockProp, false);
Expand Down
123 changes: 123 additions & 0 deletions src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,9 @@
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;
import io.vertx.ext.web.handler.CorsHandler;
import software.amazon.awssdk.services.sqs.SqsClient;
import software.amazon.awssdk.services.sqs.model.SendMessageRequest;
import software.amazon.awssdk.services.sqs.model.SendMessageResponse;

import java.net.URL;
import java.time.Instant;
Expand All @@ -45,6 +48,10 @@
public class OptOutServiceVerticle extends AbstractVerticle {
public static final String IDENTITY_HASH = "identity_hash";
public static final String ADVERTISING_ID = "advertising_id";
public static final String UID_TRACE_ID = "UID-Trace-Id";
public static final String CLIENT_IP = "client_ip";
public static final String EMAIL = "email";
public static final String PHONE = "phone";
public static final long MAX_REQUEST_BODY_SIZE = 1 << 20; // 1MB

private static final Logger LOGGER = LoggerFactory.getLogger(OptOutServiceVerticle.class);
Expand All @@ -61,6 +68,9 @@ public class OptOutServiceVerticle extends AbstractVerticle {
private final boolean enableOptOutPartnerMock;
private final String internalApiKey;
private final InternalAuthMiddleware internalAuth;
private final SqsClient sqsClient;
private final String sqsQueueUrl;
private final boolean sqsEnabled;

public OptOutServiceVerticle(Vertx vertx,
IAuthorizableProvider clientKeyProvider,
Expand Down Expand Up @@ -106,6 +116,29 @@ public OptOutServiceVerticle(Vertx vertx,
this.internalApiKey = jsonConfig.getString(Const.Config.OptOutInternalApiTokenProp);
this.internalAuth = new InternalAuthMiddleware(this.internalApiKey, "optout");
this.enableOptOutPartnerMock = jsonConfig.getBoolean(Const.Config.OptOutPartnerEndpointMockProp);

this.sqsEnabled = jsonConfig.getBoolean(Const.Config.OptOutSqsEnabledProp, false);
this.sqsQueueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp);

SqsClient tempSqsClient = null;
if (this.sqsEnabled) {
if (this.sqsQueueUrl == null || this.sqsQueueUrl.isEmpty()) {
LOGGER.warn("SQS enabled but queue URL not configured");
} else {
try {
tempSqsClient = SqsClient.builder().build();
LOGGER.info("SQS client initialized successfully");
LOGGER.info("SQS client region: " + tempSqsClient.serviceClientConfiguration().region());
LOGGER.info("SQS queue URL configured: " + this.sqsQueueUrl);
} catch (Exception e) {
LOGGER.error("Failed to initialize SQS client: " + e.getMessage(), e);
tempSqsClient = null;
}
}
} else {
LOGGER.info("SQS integration disabled");
}
this.sqsClient = tempSqsClient;
}

public static void sendStatus(int statusCode, HttpServerResponse response) {
Expand Down Expand Up @@ -136,6 +169,14 @@ public void start(Promise<Void> startPromise) {
@Override
public void stop() {
LOGGER.info("Shutting down OptOutServiceVerticle");
if (this.sqsClient != null) {
try {
this.sqsClient.close();
LOGGER.info("SQS client closed");
} catch (Exception e) {
LOGGER.error("Error closing SQS client", e);
}
}
}

public void setCloudPaths(Collection<String> paths) {
Expand Down Expand Up @@ -246,6 +287,11 @@ private void handleHealthCheck(RoutingContext rc) {
}

private void handleReplicate(RoutingContext routingContext) {

if(this.sqsEnabled && this.sqsClient != null){
this.handleQueue(routingContext);
}

HttpServerRequest req = routingContext.request();
MultiMap params = req.params();
String identityHash = req.getParam(IDENTITY_HASH);
Expand Down Expand Up @@ -307,6 +353,83 @@ private void handleReplicate(RoutingContext routingContext) {
}
}
}

private void handleQueue(RoutingContext routingContext) {
HttpServerRequest req = routingContext.request();
MultiMap params = req.params();
String identityHash = req.getParam(IDENTITY_HASH);
String advertisingId = req.getParam(ADVERTISING_ID);
JsonObject body = routingContext.body().asJsonObject();
String traceId = req.getHeader(UID_TRACE_ID);
String clientIp = body != null ? body.getString(CLIENT_IP) : null;
String email = body != null ? body.getString(EMAIL) : null;
String phone = body != null ? body.getString(PHONE) : null;

HttpServerResponse resp = routingContext.response();

// while old delta production is enabled, response is handled by replicate logic

// Validate parameters - same as replicate
if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) {
// this.sendBadRequestError(resp);
return;
}
if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) {
// this.sendBadRequestError(resp);
return;
}

if (!this.isGetOrPost(req)) {
// this.sendBadRequestError(resp);
return;
}

try {
JsonObject messageBody = new JsonObject()
.put(IDENTITY_HASH, identityHash)
.put(ADVERTISING_ID, advertisingId)
.put("trace_id", traceId)
.put("client_ip", clientIp)
.put("email", email)
.put("phone", phone);

// Send message to SQS queue
vertx.executeBlocking(promise -> {
try {
SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
.queueUrl(this.sqsQueueUrl)
.messageBody(messageBody.encode())
.build();

SendMessageResponse response = this.sqsClient.sendMessage(sendMsgRequest);
promise.complete(response.messageId());
} catch (Exception e) {
promise.fail(e);
}
}, res -> {
if (res.failed()) {
// this.sendInternalServerError(resp, "Failed to queue message: " + res.cause().getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add logging

LOGGER.error("Failed to queue message: " + res.cause().getMessage());
} else {
String messageId = (String) res.result();

JsonObject responseJson = new JsonObject()
.put("status", "queued");

LOGGER.info("Message queued successfully: " + messageId);

// resp.setStatusCode(200)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add logging

// .putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
// .setChunked(true)
// .write(responseJson.encode());
// resp.end();
}
});
} catch (Exception ex) {
// this.sendInternalServerError(resp, ex.getMessage());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add logging ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added

LOGGER.error("Error processing queue request: " + ex.getMessage(), ex);
}
}

private void handleWrite(RoutingContext routingContext) {
HttpServerRequest req = routingContext.request();
Expand Down