1- package com .uid2 .optout .vertx ;
1+ package com .uid2 .optout .sqs ;
22
3+ import com .uid2 .optout .delta .StopReason ;
34import com .uid2 .shared .optout .OptOutUtils ;
45import org .slf4j .Logger ;
56import org .slf4j .LoggerFactory ;
67import software .amazon .awssdk .services .sqs .SqsClient ;
78import software .amazon .awssdk .services .sqs .model .Message ;
89
9- import java .util .ArrayList ;
10- import java .util .HashSet ;
1110import java .util .List ;
1211import java .util .Set ;
12+ import java .util .stream .Collectors ;
1313
1414/**
1515 * Applies parsing, validation, filtering, and deletion of corrupted SQS messages.
@@ -29,46 +29,46 @@ public SqsBatchProcessor(SqsClient sqsClient, String queueUrl, int deltaWindowSe
2929 }
3030
3131 /**
32- * Result of processing a batch of messages from SQS.
33- * Encapsulates eligible messages and metadata about the processing .
32+ * Result of processing a batch of (10) messages from SQS.
33+ * Encapsulates eligible messages and the reason for stopping (if any) .
3434 */
3535 public static class BatchProcessingResult {
3636 private final List <SqsParsedMessage > eligibleMessages ;
37- private final boolean shouldStopProcessing ;
37+ private final StopReason stopReason ;
3838
39- private BatchProcessingResult (List <SqsParsedMessage > eligibleMessages , boolean shouldStopProcessing ) {
39+ private BatchProcessingResult (List <SqsParsedMessage > eligibleMessages , StopReason stopReason ) {
4040 this .eligibleMessages = eligibleMessages ;
41- this .shouldStopProcessing = shouldStopProcessing ;
41+ this .stopReason = stopReason ;
4242 }
4343
44- public static BatchProcessingResult withEligibleMessages (List <SqsParsedMessage > messages ) {
45- return new BatchProcessingResult (messages , false );
44+ public static BatchProcessingResult withMessages (List <SqsParsedMessage > messages ) {
45+ return new BatchProcessingResult (messages , StopReason . NONE );
4646 }
4747
48- public static BatchProcessingResult stopProcessing () {
49- return new BatchProcessingResult (new ArrayList <> (), true );
48+ public static BatchProcessingResult messagesTooRecent () {
49+ return new BatchProcessingResult (List . of (), StopReason . MESSAGES_TOO_RECENT );
5050 }
5151
52- public static BatchProcessingResult empty () {
53- return new BatchProcessingResult (new ArrayList <> (), false );
52+ public static BatchProcessingResult corruptMessagesDeleted () {
53+ return new BatchProcessingResult (List . of (), StopReason . NONE );
5454 }
5555
56- public boolean isEmpty () {
57- return eligibleMessages .isEmpty ();
56+ public boolean hasMessages () {
57+ return ! eligibleMessages .isEmpty ();
5858 }
5959
60- public boolean shouldStopProcessing () {
61- return shouldStopProcessing ;
60+ public StopReason getStopReason () {
61+ return stopReason ;
6262 }
6363
64- public List <SqsParsedMessage > getEligibleMessages () {
64+ public List <SqsParsedMessage > getMessages () {
6565 return eligibleMessages ;
6666 }
6767 }
6868
6969 /**
7070 * Processes a batch of messages: parses, validates, cleans up invalid messages,
71- * and filters for eligible messages based on age threshold (message is less than 5 minutes old )
71+ * and filters for eligible messages based on age threshold (message is older than deltaWindowSeconds )
7272 *
7373 * @param messageBatch Raw messages from SQS
7474 * @param batchNumber The batch number (for logging)
@@ -82,59 +82,53 @@ public BatchProcessingResult processBatch(List<Message> messageBatch, int batchN
8282 if (parsedBatch .size () < messageBatch .size ()) {
8383 List <Message > invalidMessages = identifyInvalidMessages (messageBatch , parsedBatch );
8484 if (!invalidMessages .isEmpty ()) {
85- LOGGER .error ("Found {} invalid messages in batch {} (failed parsing). Deleting from queue." ,
86- invalidMessages .size (), batchNumber );
85+ LOGGER .error ("sqs_error: found {} invalid messages in batch {}, deleting" , invalidMessages .size (), batchNumber );
8786 SqsMessageOperations .deleteMessagesFromSqs (this .sqsClient , this .queueUrl , invalidMessages );
8887 }
8988 }
9089
91- // If no valid messages, return empty result
90+ // No valid messages after deleting corrupt ones, continue reading
9291 if (parsedBatch .isEmpty ()) {
93- LOGGER .warn ( "No valid messages in batch {} (all failed parsing) " , batchNumber );
94- return BatchProcessingResult .empty ();
92+ LOGGER .info ( "no valid messages in batch {} after removing invalid messages " , batchNumber );
93+ return BatchProcessingResult .corruptMessagesDeleted ();
9594 }
9695
9796 // Check if the oldest message in this batch is too recent
9897 long currentTime = OptOutUtils .nowEpochSeconds ();
9998 SqsParsedMessage oldestMessage = parsedBatch .get (0 );
100- long messageAge = currentTime - oldestMessage .getTimestamp ();
10199
102- if (messageAge < this .deltaWindowSeconds ) {
103- // Signal to stop processing - messages are too recent
104- return BatchProcessingResult .stopProcessing ();
100+ if (!isMessageEligible (oldestMessage , currentTime )) {
101+ return BatchProcessingResult .messagesTooRecent ();
105102 }
106103
107- // Filter for eligible messages (>= 5 minutes old)
104+ // Filter for eligible messages (>= deltaWindowSeconds old)
108105 List <SqsParsedMessage > eligibleMessages = filterEligibleMessages (parsedBatch , currentTime );
109106
110- if (eligibleMessages .isEmpty ()) {
111- LOGGER .debug ("No eligible messages in batch {} (all too recent)" , batchNumber );
112- return BatchProcessingResult .empty ();
113- }
107+ return BatchProcessingResult .withMessages (eligibleMessages );
108+ }
114109
115- return BatchProcessingResult .withEligibleMessages (eligibleMessages );
110+ /**
111+ * Checks if a message is old enough to be processed.
112+ *
113+ * @param message The parsed message to check
114+ * @param currentTime Current time in epoch seconds
115+ * @return true if the message is at least deltaWindowSeconds old
116+ */
117+ private boolean isMessageEligible (SqsParsedMessage message , long currentTime ) {
118+ return currentTime - message .timestamp () >= this .deltaWindowSeconds ;
116119 }
117120
118121 /**
119122 * Filters messages to only include those where sufficient time has elapsed.
120- . *
123+ *
121124 * @param messages List of parsed messages
122125 * @param currentTime Current time in seconds
123126 * @return List of messages that meet the time threshold
124127 */
125- public List <SqsParsedMessage > filterEligibleMessages (
126- List <SqsParsedMessage > messages ,
127- long currentTime ) {
128-
129- List <SqsParsedMessage > eligibleMessages = new ArrayList <>();
130-
131- for (SqsParsedMessage pm : messages ) {
132- if (currentTime - pm .getTimestamp () >= this .deltaWindowSeconds ) {
133- eligibleMessages .add (pm );
134- }
135- }
136-
137- return eligibleMessages ;
128+ List <SqsParsedMessage > filterEligibleMessages (List <SqsParsedMessage > messages , long currentTime ) {
129+ return messages .stream ()
130+ .filter (msg -> isMessageEligible (msg , currentTime ))
131+ .collect (Collectors .toList ());
138132 }
139133
140134 /**
@@ -145,21 +139,12 @@ public List<SqsParsedMessage> filterEligibleMessages(
145139 * @return List of messages that failed to parse
146140 */
147141 private List <Message > identifyInvalidMessages (List <Message > originalBatch , List <SqsParsedMessage > parsedBatch ) {
148- // Create a set of message IDs from successfully parsed messages
149- Set <String > validMessageIds = new HashSet <>();
150- for (SqsParsedMessage parsed : parsedBatch ) {
151- validMessageIds .add (parsed .getOriginalMessage ().messageId ());
152- }
142+ Set <String > validIds = parsedBatch .stream ()
143+ .map (p -> p .originalMessage ().messageId ())
144+ .collect (Collectors .toSet ());
153145
154- // Find messages that were not successfully parsed
155- List <Message > invalidMessages = new ArrayList <>();
156- for (Message msg : originalBatch ) {
157- if (!validMessageIds .contains (msg .messageId ())) {
158- invalidMessages .add (msg );
159- }
160- }
161-
162- return invalidMessages ;
146+ return originalBatch .stream ()
147+ .filter (msg -> !validIds .contains (msg .messageId ()))
148+ .collect (Collectors .toList ());
163149 }
164150}
165-
0 commit comments