Skip to content

Commit 6b4350f

Browse files
committed
add queue size limit
1 parent 95dd7d5 commit 6b4350f

File tree

2 files changed

+19
-0
lines changed

2 files changed

+19
-0
lines changed

src/main/java/com/uid2/optout/Const.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ public static class Config extends com.uid2.shared.Const.Config {
3333
public static final String OptOutTrafficCalcEvaluationWindowSecondsProp = "traffic_calc_evaluation_window_seconds";
3434
public static final String OptOutTrafficCalcAllowlistRangesProp = "traffic_calc_allowlist_ranges";
3535
public static final String OptOutSqsDeltaWindowSecondsProp = "optout_sqs_delta_window_seconds";
36+
public static final String OptOutSqsMaxQueueSizeProp = "optout_sqs_max_queue_size";
3637
}
3738

3839
public static class Event {

src/main/java/com/uid2/optout/vertx/OptOutServiceVerticle.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
import com.uid2.optout.Const;
44
import com.uid2.optout.auth.InternalAuthMiddleware;
5+
import com.uid2.optout.sqs.SqsMessageOperations;
56
import com.uid2.optout.web.QuorumWebClient;
67
import com.uid2.shared.Utils;
78
import com.uid2.shared.attest.AttestationTokenService;
@@ -71,6 +72,7 @@ public class OptOutServiceVerticle extends AbstractVerticle {
7172
private final SqsClient sqsClient;
7273
private final String sqsQueueUrl;
7374
private final boolean sqsEnabled;
75+
private final int sqsMaxQueueSize;
7476

7577
public OptOutServiceVerticle(Vertx vertx,
7678
IAuthorizableProvider clientKeyProvider,
@@ -119,6 +121,7 @@ public OptOutServiceVerticle(Vertx vertx,
119121

120122
this.sqsEnabled = jsonConfig.getBoolean(Const.Config.OptOutSqsEnabledProp, false);
121123
this.sqsQueueUrl = jsonConfig.getString(Const.Config.OptOutSqsQueueUrlProp);
124+
this.sqsMaxQueueSize = jsonConfig.getInteger(Const.Config.OptOutSqsMaxQueueSizeProp, 0); // 0 = no limit
122125

123126
SqsClient tempSqsClient = null;
124127
if (this.sqsEnabled) {
@@ -396,6 +399,21 @@ private void handleQueue(RoutingContext routingContext) {
396399
// Send message to SQS queue
397400
vertx.executeBlocking(promise -> {
398401
try {
402+
// Check queue size limit before sending
403+
if (this.sqsMaxQueueSize > 0) {
404+
SqsMessageOperations.QueueAttributes queueAttrs =
405+
SqsMessageOperations.getQueueAttributes(this.sqsClient, this.sqsQueueUrl);
406+
if (queueAttrs != null) {
407+
int currentSize = queueAttrs.getTotalMessages();
408+
if (currentSize >= this.sqsMaxQueueSize) {
409+
LOGGER.warn("sqs_queue_full: rejecting message, currentSize={}, maxSize={}",
410+
currentSize, this.sqsMaxQueueSize);
411+
promise.fail(new IllegalStateException("queue size limit exceeded"));
412+
return;
413+
}
414+
}
415+
}
416+
399417
SendMessageRequest sendMsgRequest = SendMessageRequest.builder()
400418
.queueUrl(this.sqsQueueUrl)
401419
.messageBody(messageBody.encode())

0 commit comments

Comments
 (0)