Skip to content

Commit e31e2b3

Browse files
committed
add todos
1 parent bf665a0 commit e31e2b3

File tree

4 files changed

+7
-7
lines changed

4 files changed

+7
-7
lines changed

src/main/java/com/uid2/optout/delta/S3UploadService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ public void uploadAndDeleteMessages(byte[] data, String s3Path, List<Message> me
7171
if (onSuccess != null) {
7272
onSuccess.onSuccess(messages.size());
7373
}
74-
} catch (Exception e) {
74+
} catch (Exception e) { // TODO: catch specific exceptions
7575
LOGGER.error("s3_error: failed to upload delta or dropped requests to path={}", s3Path, e);
7676
throw new IOException("s3 upload failed: " + s3Path, e);
7777
}

src/main/java/com/uid2/optout/delta/StopReason.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ public enum StopReason {
2727
MESSAGE_LIMIT_EXCEEDED,
2828

2929
/**
30-
* Pre-existing manual override was set (checked at job start).
30+
* Pre-existing manual override was set to DELAYED_PROCESSING (checked at job start).
3131
*/
3232
MANUAL_OVERRIDE_ACTIVE,
3333

src/main/java/com/uid2/optout/sqs/SqsBatchProcessor.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ public BatchProcessingResult processBatch(List<Message> messageBatch, int batchN
8383
List<Message> invalidMessages = identifyInvalidMessages(messageBatch, parsedBatch);
8484
if (!invalidMessages.isEmpty()) {
8585
LOGGER.error("sqs_error: found {} invalid messages in batch {}, deleting", invalidMessages.size(), batchNumber);
86-
SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, invalidMessages);
86+
SqsMessageOperations.deleteMessagesFromSqs(this.sqsClient, this.queueUrl, invalidMessages); // TODO: send to a folder in the dropped requests bucket before deleting.
8787
}
8888
}
8989

src/main/java/com/uid2/optout/sqs/SqsWindowReader.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,10 @@ public SqsWindowReader(SqsClient sqsClient, String queueUrl, int maxMessagesPerP
2929
int visibilityTimeout, int deltaWindowSeconds, int maxMessagesPerWindow) {
3030
this.sqsClient = sqsClient;
3131
this.queueUrl = queueUrl;
32-
this.maxMessagesPerPoll = maxMessagesPerPoll;
33-
this.visibilityTimeout = visibilityTimeout;
32+
this.maxMessagesPerPoll = maxMessagesPerPoll; // 10 max
33+
this.visibilityTimeout = visibilityTimeout; // TODO: ensure we can process all messages before visibility timeout
3434
this.deltaWindowSeconds = deltaWindowSeconds;
35-
this.maxMessagesPerWindow = maxMessagesPerWindow;
35+
this.maxMessagesPerWindow = maxMessagesPerWindow; // TODO: ensure we can process all messages before visibility timeout
3636
this.batchProcessor = new SqsBatchProcessor(sqsClient, queueUrl, deltaWindowSeconds);
3737
LOGGER.info("initialized: maxMessagesPerWindow={}, maxMessagesPerPoll={}, visibilityTimeout={}, deltaWindowSeconds={}",
3838
maxMessagesPerWindow, maxMessagesPerPoll, visibilityTimeout, deltaWindowSeconds);
@@ -94,7 +94,7 @@ public WindowReadResult readWindow() {
9494
int batchNumber = 0;
9595
int rawMessagesRead = 0; // track total messages pulled from SQS
9696

97-
while (true) {
97+
while (true) { // TODO: add a timeout to the loop
9898
if (windowMessages.size() >= maxMessagesPerWindow) {
9999
LOGGER.warn("high_message_volume: message limit exceeded while reading window, {} messages >= limit {}", windowMessages.size(), maxMessagesPerWindow);
100100
return WindowReadResult.messageLimitExceeded(windowMessages, currentWindowStart, rawMessagesRead);

0 commit comments

Comments
 (0)