-
Notifications
You must be signed in to change notification settings - Fork 7
enable enqueue SQS #233
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
enable enqueue SQS #233
Changes from 7 commits
70b65a8
085135c
87e8f62
3393981
dbf0cff
89b13a2
22395fd
ee7f006
a02a3ae
e7ee9b3
206bf05
f4eddb4
abc53b7
bec3e11
01c6721
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,6 +33,9 @@ | |
| import io.vertx.ext.web.RoutingContext; | ||
| import io.vertx.ext.web.handler.BodyHandler; | ||
| import io.vertx.ext.web.handler.CorsHandler; | ||
| import software.amazon.awssdk.services.sqs.SqsClient; | ||
| import software.amazon.awssdk.services.sqs.model.SendMessageRequest; | ||
| import software.amazon.awssdk.services.sqs.model.SendMessageResponse; | ||
|
|
||
| import java.net.URL; | ||
| import java.time.Instant; | ||
|
|
@@ -45,6 +48,10 @@ | |
| public class OptOutServiceVerticle extends AbstractVerticle { | ||
| public static final String IDENTITY_HASH = "identity_hash"; | ||
| public static final String ADVERTISING_ID = "advertising_id"; | ||
| public static final String UID_TRACE_ID = "UID-Trace-Id"; | ||
| public static final String CLIENT_IP = "client_ip"; | ||
| public static final String EMAIL = "email"; | ||
| public static final String PHONE = "phone"; | ||
| public static final long MAX_REQUEST_BODY_SIZE = 1 << 20; // 1MB | ||
|
|
||
| private static final Logger LOGGER = LoggerFactory.getLogger(OptOutServiceVerticle.class); | ||
|
|
@@ -61,6 +68,9 @@ public class OptOutServiceVerticle extends AbstractVerticle { | |
| private final boolean enableOptOutPartnerMock; | ||
| private final String internalApiKey; | ||
| private final InternalAuthMiddleware internalAuth; | ||
| private final SqsClient sqsClient; | ||
| private final String sqsQueueUrl; | ||
| private final boolean sqsEnabled; | ||
|
|
||
| public OptOutServiceVerticle(Vertx vertx, | ||
| IAuthorizableProvider clientKeyProvider, | ||
|
|
@@ -106,6 +116,29 @@ public OptOutServiceVerticle(Vertx vertx, | |
| this.internalApiKey = jsonConfig.getString(Const.Config.OptOutInternalApiTokenProp); | ||
| this.internalAuth = new InternalAuthMiddleware(this.internalApiKey, "optout"); | ||
| this.enableOptOutPartnerMock = jsonConfig.getBoolean(Const.Config.OptOutPartnerEndpointMockProp); | ||
|
|
||
| this.sqsEnabled = jsonConfig.getBoolean(Const.Config.OptOutSqsEnabledProp, false); | ||
| this.sqsQueueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp); | ||
|
|
||
| SqsClient tempSqsClient = null; | ||
| if (this.sqsEnabled) { | ||
| if (this.sqsQueueUrl == null || this.sqsQueueUrl.isEmpty()) { | ||
| LOGGER.warn("SQS enabled but queue URL not configured"); | ||
| } else { | ||
| try { | ||
| tempSqsClient = SqsClient.builder().build(); | ||
| LOGGER.info("SQS client initialized successfully"); | ||
| LOGGER.info("SQS client region: " + tempSqsClient.serviceClientConfiguration().region()); | ||
| LOGGER.info("SQS queue URL configured: " + this.sqsQueueUrl); | ||
| } catch (Exception e) { | ||
| LOGGER.error("Failed to initialize SQS client: " + e.getMessage(), e); | ||
| tempSqsClient = null; | ||
| } | ||
| } | ||
| } else { | ||
| LOGGER.info("SQS integration disabled"); | ||
| } | ||
| this.sqsClient = tempSqsClient; | ||
| } | ||
|
|
||
| public static void sendStatus(int statusCode, HttpServerResponse response) { | ||
|
|
@@ -136,6 +169,14 @@ public void start(Promise<Void> startPromise) { | |
| @Override | ||
| public void stop() { | ||
| LOGGER.info("Shutting down OptOutServiceVerticle"); | ||
| if (this.sqsClient != null) { | ||
| try { | ||
| this.sqsClient.close(); | ||
| LOGGER.info("SQS client closed"); | ||
| } catch (Exception e) { | ||
| LOGGER.error("Error closing SQS client", e); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| public void setCloudPaths(Collection<String> paths) { | ||
|
|
@@ -246,6 +287,11 @@ private void handleHealthCheck(RoutingContext rc) { | |
| } | ||
|
|
||
| private void handleReplicate(RoutingContext routingContext) { | ||
|
|
||
| if(this.sqsEnabled){ | ||
| this.handleQueue(routingContext); | ||
| } | ||
|
|
||
| HttpServerRequest req = routingContext.request(); | ||
| MultiMap params = req.params(); | ||
| String identityHash = req.getParam(IDENTITY_HASH); | ||
|
|
@@ -307,6 +353,82 @@ private void handleReplicate(RoutingContext routingContext) { | |
| } | ||
| } | ||
| } | ||
|
|
||
| private void handleQueue(RoutingContext routingContext) { | ||
| HttpServerRequest req = routingContext.request(); | ||
| MultiMap params = req.params(); | ||
| String identityHash = req.getParam(IDENTITY_HASH); | ||
| String advertisingId = req.getParam(ADVERTISING_ID); | ||
| JsonObject body = routingContext.body().asJsonObject(); | ||
| String traceId = req.getHeader(UID_TRACE_ID); | ||
| String clientIp = body.getString(CLIENT_IP); | ||
|
||
| String email = body.getString(EMAIL); | ||
| String phone = body.getString(PHONE); | ||
|
|
||
| HttpServerResponse resp = routingContext.response(); | ||
|
|
||
| // while old delta production is enabled, response is handled by replicate logic | ||
|
|
||
| // Validate parameters - same as replicate | ||
| if (identityHash == null || params.getAll(IDENTITY_HASH).size() != 1) { | ||
| // this.sendBadRequestError(resp); | ||
| return; | ||
| } | ||
| if (advertisingId == null || params.getAll(ADVERTISING_ID).size() != 1) { | ||
| // this.sendBadRequestError(resp); | ||
| return; | ||
| } | ||
|
|
||
| if (!this.isGetOrPost(req)) { | ||
| // this.sendBadRequestError(resp); | ||
| return; | ||
| } | ||
|
|
||
| try { | ||
| JsonObject messageBody = new JsonObject() | ||
| .put(IDENTITY_HASH, identityHash) | ||
| .put(ADVERTISING_ID, advertisingId) | ||
| .put("trace_id", traceId) | ||
| .put("client_ip", clientIp) | ||
| .put("email", email) | ||
| .put("phone", phone); | ||
|
|
||
| // Send message to SQS queue | ||
| vertx.executeBlocking(promise -> { | ||
| try { | ||
| SendMessageRequest sendMsgRequest = SendMessageRequest.builder() | ||
| .queueUrl(this.sqsQueueUrl) | ||
| .messageBody(messageBody.encode()) | ||
| .build(); | ||
|
|
||
| SendMessageResponse response = this.sqsClient.sendMessage(sendMsgRequest); | ||
| promise.complete(response.messageId()); | ||
| } catch (Exception e) { | ||
| promise.fail(e); | ||
| } | ||
| }, res -> { | ||
| if (res.failed()) { | ||
| // this.sendInternalServerError(resp, "Failed to queue message: " + res.cause().getMessage()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add logging |
||
| LOGGER.error("Failed to queue message: " + res.cause().getMessage()); | ||
| } else { | ||
| String messageId = (String) res.result(); | ||
|
|
||
| JsonObject responseJson = new JsonObject() | ||
| .put("status", "queued"); | ||
|
|
||
| LOGGER.info("Message queued successfully: " + messageId); | ||
|
|
||
| // resp.setStatusCode(200) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add logging |
||
| // .putHeader(HttpHeaders.CONTENT_TYPE, "application/json") | ||
| // .setChunked(true) | ||
| // .write(responseJson.encode()); | ||
| // resp.end(); | ||
| } | ||
| }); | ||
| } catch (Exception ex) { | ||
| // this.sendInternalServerError(resp, ex.getMessage()); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add logging ?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. added |
||
| } | ||
| } | ||
|
|
||
| private void handleWrite(RoutingContext routingContext) { | ||
| HttpServerRequest req = routingContext.request(); | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if(this.sqsEnabled && this.sqsClient != null)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated