3333import io .vertx .ext .web .RoutingContext ;
3434import io .vertx .ext .web .handler .BodyHandler ;
3535import io .vertx .ext .web .handler .CorsHandler ;
36+ import software .amazon .awssdk .services .sqs .SqsClient ;
37+ import software .amazon .awssdk .services .sqs .model .SendMessageRequest ;
38+ import software .amazon .awssdk .services .sqs .model .SendMessageResponse ;
3639
3740import java .net .URL ;
3841import java .time .Instant ;
4548public class OptOutServiceVerticle extends AbstractVerticle {
4649 public static final String IDENTITY_HASH = "identity_hash" ;
4750 public static final String ADVERTISING_ID = "advertising_id" ;
51+ public static final String UID_TRACE_ID = "UID-Trace-Id" ;
52+ public static final String CLIENT_IP = "client_ip" ;
53+ public static final String EMAIL = "email" ;
54+ public static final String PHONE = "phone" ;
4855 public static final long MAX_REQUEST_BODY_SIZE = 1 << 20 ; // 1MB
4956
5057 private static final Logger LOGGER = LoggerFactory .getLogger (OptOutServiceVerticle .class );
@@ -61,6 +68,9 @@ public class OptOutServiceVerticle extends AbstractVerticle {
6168 private final boolean enableOptOutPartnerMock ;
6269 private final String internalApiKey ;
6370 private final InternalAuthMiddleware internalAuth ;
71+ private final SqsClient sqsClient ;
72+ private final String sqsQueueUrl ;
73+ private final boolean sqsEnabled ;
6474
6575 public OptOutServiceVerticle (Vertx vertx ,
6676 IAuthorizableProvider clientKeyProvider ,
@@ -106,6 +116,29 @@ public OptOutServiceVerticle(Vertx vertx,
106116 this .internalApiKey = jsonConfig .getString (Const .Config .OptOutInternalApiTokenProp );
107117 this .internalAuth = new InternalAuthMiddleware (this .internalApiKey , "optout" );
108118 this .enableOptOutPartnerMock = jsonConfig .getBoolean (Const .Config .OptOutPartnerEndpointMockProp );
119+
120+ this .sqsEnabled = jsonConfig .getBoolean (Const .Config .OptOutSqsEnabledProp , false );
121+ this .sqsQueueUrl = jsonConfig .getString (Const .Config .OptOutSqsQueueUrlProp );
122+
123+ SqsClient tempSqsClient = null ;
124+ if (this .sqsEnabled ) {
125+ if (this .sqsQueueUrl == null || this .sqsQueueUrl .isEmpty ()) {
126+ LOGGER .warn ("SQS enabled but queue URL not configured" );
127+ } else {
128+ try {
129+ tempSqsClient = SqsClient .builder ().build ();
130+ LOGGER .info ("SQS client initialized successfully" );
131+ LOGGER .info ("SQS client region: " + tempSqsClient .serviceClientConfiguration ().region ());
132+ LOGGER .info ("SQS queue URL configured: " + this .sqsQueueUrl );
133+ } catch (Exception e ) {
134+ LOGGER .error ("Failed to initialize SQS client: " + e .getMessage (), e );
135+ tempSqsClient = null ;
136+ }
137+ }
138+ } else {
139+ LOGGER .info ("SQS integration disabled" );
140+ }
141+ this .sqsClient = tempSqsClient ;
109142 }
110143
111144 public static void sendStatus (int statusCode , HttpServerResponse response ) {
@@ -136,6 +169,14 @@ public void start(Promise<Void> startPromise) {
136169 @ Override
137170 public void stop () {
138171 LOGGER .info ("Shutting down OptOutServiceVerticle" );
172+ if (this .sqsClient != null ) {
173+ try {
174+ this .sqsClient .close ();
175+ LOGGER .info ("SQS client closed" );
176+ } catch (Exception e ) {
177+ LOGGER .error ("Error closing SQS client" , e );
178+ }
179+ }
139180 }
140181
141182 public void setCloudPaths (Collection <String > paths ) {
@@ -246,6 +287,11 @@ private void handleHealthCheck(RoutingContext rc) {
246287 }
247288
248289 private void handleReplicate (RoutingContext routingContext ) {
290+
291+ if (this .sqsEnabled && this .sqsClient != null ){
292+ this .handleQueue (routingContext );
293+ }
294+
249295 HttpServerRequest req = routingContext .request ();
250296 MultiMap params = req .params ();
251297 String identityHash = req .getParam (IDENTITY_HASH );
@@ -307,6 +353,83 @@ private void handleReplicate(RoutingContext routingContext) {
307353 }
308354 }
309355 }
356+
357+ private void handleQueue (RoutingContext routingContext ) {
358+ HttpServerRequest req = routingContext .request ();
359+ MultiMap params = req .params ();
360+ String identityHash = req .getParam (IDENTITY_HASH );
361+ String advertisingId = req .getParam (ADVERTISING_ID );
362+ JsonObject body = routingContext .body ().asJsonObject ();
363+ String traceId = req .getHeader (UID_TRACE_ID );
364+ String clientIp = body != null ? body .getString (CLIENT_IP ) : null ;
365+ String email = body != null ? body .getString (EMAIL ) : null ;
366+ String phone = body != null ? body .getString (PHONE ) : null ;
367+
368+ HttpServerResponse resp = routingContext .response ();
369+
370+ // while old delta production is enabled, response is handled by replicate logic
371+
372+ // Validate parameters - same as replicate
373+ if (identityHash == null || params .getAll (IDENTITY_HASH ).size () != 1 ) {
374+ // this.sendBadRequestError(resp);
375+ return ;
376+ }
377+ if (advertisingId == null || params .getAll (ADVERTISING_ID ).size () != 1 ) {
378+ // this.sendBadRequestError(resp);
379+ return ;
380+ }
381+
382+ if (!this .isGetOrPost (req )) {
383+ // this.sendBadRequestError(resp);
384+ return ;
385+ }
386+
387+ try {
388+ JsonObject messageBody = new JsonObject ()
389+ .put (IDENTITY_HASH , identityHash )
390+ .put (ADVERTISING_ID , advertisingId )
391+ .put ("trace_id" , traceId )
392+ .put ("client_ip" , clientIp )
393+ .put ("email" , email )
394+ .put ("phone" , phone );
395+
396+ // Send message to SQS queue
397+ vertx .executeBlocking (promise -> {
398+ try {
399+ SendMessageRequest sendMsgRequest = SendMessageRequest .builder ()
400+ .queueUrl (this .sqsQueueUrl )
401+ .messageBody (messageBody .encode ())
402+ .build ();
403+
404+ SendMessageResponse response = this .sqsClient .sendMessage (sendMsgRequest );
405+ promise .complete (response .messageId ());
406+ } catch (Exception e ) {
407+ promise .fail (e );
408+ }
409+ }, res -> {
410+ if (res .failed ()) {
411+ // this.sendInternalServerError(resp, "Failed to queue message: " + res.cause().getMessage());
412+ LOGGER .error ("Failed to queue message: " + res .cause ().getMessage ());
413+ } else {
414+ String messageId = (String ) res .result ();
415+
416+ JsonObject responseJson = new JsonObject ()
417+ .put ("status" , "queued" );
418+
419+ LOGGER .info ("Message queued successfully: " + messageId );
420+
421+ // resp.setStatusCode(200)
422+ // .putHeader(HttpHeaders.CONTENT_TYPE, "application/json")
423+ // .setChunked(true)
424+ // .write(responseJson.encode());
425+ // resp.end();
426+ }
427+ });
428+ } catch (Exception ex ) {
429+ // this.sendInternalServerError(resp, ex.getMessage());
430+ LOGGER .error ("Error processing queue request: " + ex .getMessage (), ex );
431+ }
432+ }
310433
311434 private void handleWrite (RoutingContext routingContext ) {
312435 HttpServerRequest req = routingContext .request ();
0 commit comments