11import type { SQSRecord } from 'aws-lambda' ;
22import { BatchProcessor } from './BatchProcessor.js' ;
3- import { SqsFifo } from './SqsFifo.js' ;
43import { EventType } from './constants.js' ;
5- import { SqsFifoMessageGroupShortCircuitError } from './errors.js' ;
6- import type { FailureResponse , SuccessResponse } from './types.js' ;
4+ import {
5+ type BatchProcessingError ,
6+ SqsFifoMessageGroupShortCircuitError ,
7+ SqsFifoShortCircuitError ,
8+ } from './errors.js' ;
9+ import type {
10+ BaseRecord ,
11+ EventSourceDataClassTypes ,
12+ FailureResponse ,
13+ SuccessResponse ,
14+ } from './types.js' ;
715
816/**
917 * Batch processor for SQS FIFO queues
@@ -36,10 +44,39 @@ import type { FailureResponse, SuccessResponse } from './types.js';
3644 * });
3745 * ```
3846 */
39- class SqsFifoPartialProcessorAsync extends SqsFifo ( BatchProcessor ) {
47+ class SqsFifoPartialProcessorAsync extends BatchProcessor {
48+ /**
49+ * The ID of the current message group being processed.
50+ */
51+ #currentGroupId?: string ;
52+ /**
53+ * A set of group IDs that have already encountered failures.
54+ */
55+ #failedGroupIds: Set < string > ;
56+
4057 public constructor ( ) {
4158 super ( EventType . SQS ) ;
59+ this . #failedGroupIds = new Set < string > ( ) ;
4260 }
61+
62+ /**
63+ * Handles a failure for a given record.
64+ * Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true.
65+ * @param record - The record that failed.
66+ * @param exception - The error that occurred.
67+ * @returns The failure response.
68+ */
69+ public failureHandler (
70+ record : EventSourceDataClassTypes ,
71+ exception : Error
72+ ) : FailureResponse {
73+ if ( this . options ?. skipGroupOnError && this . #currentGroupId) {
74+ this . #addToFailedGroup( this . #currentGroupId) ;
75+ }
76+
77+ return super . failureHandler ( record , exception ) ;
78+ }
79+
4380 /**
4481 * Process a record with a asynchronous handler
4582 *
@@ -64,14 +101,14 @@ class SqsFifoPartialProcessorAsync extends SqsFifo(BatchProcessor) {
64101 const processedRecords : ( SuccessResponse | FailureResponse ) [ ] = [ ] ;
65102 let currentIndex = 0 ;
66103 for ( const record of this . records ) {
67- this . _setCurrentGroup ( ( record as SQSRecord ) . attributes ?. MessageGroupId ) ;
104+ this . #setCurrentGroup ( ( record as SQSRecord ) . attributes ?. MessageGroupId ) ;
68105
69- if ( this . _shouldShortCircuit ( ) ) {
70- return this . _shortCircuitProcessing ( currentIndex , processedRecords ) ;
106+ if ( this . #shouldShortCircuit ( ) ) {
107+ return this . shortCircuitProcessing ( currentIndex , processedRecords ) ;
71108 }
72109
73- const result = this . _shouldSkipCurrentGroup ( )
74- ? this . _processFailRecord (
110+ const result = this . #shouldSkipCurrentGroup ( )
111+ ? this . #processFailRecord (
75112 record ,
76113 new SqsFifoMessageGroupShortCircuitError ( )
77114 )
@@ -85,6 +122,90 @@ class SqsFifoPartialProcessorAsync extends SqsFifo(BatchProcessor) {
85122
86123 return processedRecords ;
87124 }
125+
126+ /**
127+ * Starting from the first failure index, fail all remaining messages regardless
128+ * of their group ID.
129+ *
130+ * This short circuit mechanism is used when we detect a failed message in the batch.
131+ *
132+ * Since messages in a FIFO queue are processed in order, we must stop processing any
133+ * remaining messages in the batch to prevent out-of-order processing.
134+ *
135+ * @param firstFailureIndex Index of first message that failed
136+ * @param processedRecords Array of response items that have been processed both successfully and unsuccessfully
137+ */
138+ protected shortCircuitProcessing (
139+ firstFailureIndex : number ,
140+ processedRecords : ( SuccessResponse | FailureResponse ) [ ]
141+ ) : ( SuccessResponse | FailureResponse ) [ ] {
142+ const remainingRecords = this . records . slice ( firstFailureIndex ) ;
143+
144+ for ( const record of remainingRecords ) {
145+ this . #processFailRecord( record , new SqsFifoShortCircuitError ( ) ) ;
146+ }
147+
148+ this . clean ( ) ;
149+
150+ return processedRecords ;
151+ }
152+
153+ /**
154+ * Adds the specified group ID to the set of failed group IDs.
155+ *
156+ * @param group - The group ID to be added to the set of failed group IDs.
157+ */
158+ #addToFailedGroup( group : string ) : void {
159+ this . #failedGroupIds. add ( group ) ;
160+ }
161+
162+ /**
163+ * Processes a fail record.
164+ *
165+ * @param record - The record that failed.
166+ * @param exception - The error that occurred.
167+ */
168+ #processFailRecord(
169+ record : BaseRecord ,
170+ exception : BatchProcessingError
171+ ) : FailureResponse {
172+ const data = this . toBatchType ( record , this . eventType ) ;
173+
174+ return this . failureHandler ( data , exception ) ;
175+ }
176+
177+ /**
178+ * Sets the current group ID for the message being processed.
179+ *
180+ * @param group - The group ID of the current message being processed.
181+ */
182+ #setCurrentGroup( group ?: string ) : void {
183+ this . #currentGroupId = group ;
184+ }
185+
186+ /**
187+ * Determines whether the current group should be short-circuited.
188+ *
189+ * If we have any failed messages, we should then short circuit the process and
190+ * fail remaining messages unless `skipGroupOnError` is true
191+ */
192+ #shouldShortCircuit( ) : boolean {
193+ return ! this . options ?. skipGroupOnError && this . failureMessages . length !== 0 ;
194+ }
195+
196+ /**
197+ * Determines whether the current group should be skipped.
198+ *
199+ * If `skipGroupOnError` is true and the current group has previously failed,
200+ * then we should skip processing the current group.
201+ */
202+ #shouldSkipCurrentGroup( ) : boolean {
203+ return (
204+ ( this . options ?. skipGroupOnError ?? false ) &&
205+ this . #currentGroupId &&
206+ this . #failedGroupIds. has ( this . #currentGroupId)
207+ ) ;
208+ }
88209}
89210
90211export { SqsFifoPartialProcessorAsync } ;
0 commit comments