File tree Expand file tree Collapse file tree 3 files changed +28
-26
lines changed
Expand file tree Collapse file tree 3 files changed +28
-26
lines changed Original file line number Diff line number Diff line change @@ -114,5 +114,29 @@ export function SqsFifo<
114114 _setCurrentGroup ( group ?: string ) : void {
115115 this . _currentGroupId = group ;
116116 }
117+
118+ /**
119+ * Determines whether the current group should be short-circuited.
120+ * If we have any failed messages, we should then short circuit the process and
121+ * fail remaining messages unless `skipGroupOnError` is true
122+ */
123+ _shouldShortCircuit ( ) : boolean {
124+ return (
125+ ! this . options ?. skipGroupOnError && this . failureMessages . length !== 0
126+ ) ;
127+ }
128+
129+ /**
130+ * Determines whether the current group should be skipped.
131+ * If `skipGroupOnError` is true and the current group has previously failed,
132+ * then we should skip processing the current group.
133+ */
134+ _shouldSkipCurrentGroup ( ) : boolean {
135+ return (
136+ ( this . options ?. skipGroupOnError ?? false ) &&
137+ this . _currentGroupId &&
138+ this . _failedGroupIds . has ( this . _currentGroupId )
139+ ) ;
140+ }
117141 } ;
118142}
Original file line number Diff line number Diff line change @@ -66,22 +66,11 @@ class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) {
6666 for ( const record of this . records ) {
6767 this . _setCurrentGroup ( ( record as SQSRecord ) . attributes ?. MessageGroupId ) ;
6868
69- // If we have any failed messages, we should then short circuit the process and
70- // fail remaining messages unless `skipGroupOnError` is true
71- const shouldShortCircuit =
72- ! this . options ?. skipGroupOnError && this . failureMessages . length !== 0 ;
73- if ( shouldShortCircuit ) {
69+ if ( this . _shouldShortCircuit ( ) ) {
7470 return this . _shortCircuitProcessing ( currentIndex , processedRecords ) ;
7571 }
7672
77- // If `skipGroupOnError` is true and the current group has previously failed,
78- // then we should skip processing the current group.
79- const shouldSkipCurrentGroup =
80- this . options ?. skipGroupOnError &&
81- this . _currentGroupId &&
82- this . _failedGroupIds . has ( this . _currentGroupId ) ;
83-
84- const result = shouldSkipCurrentGroup
73+ const result = this . _shouldSkipCurrentGroup ( )
8574 ? this . _processFailRecord (
8675 record ,
8776 new SqsFifoMessageGroupShortCircuitError ( )
Original file line number Diff line number Diff line change @@ -66,22 +66,11 @@ class SqsFifoPartialProcessorAsync extends SqsFifo(BatchProcessor) {
6666 for ( const record of this . records ) {
6767 this . _setCurrentGroup ( ( record as SQSRecord ) . attributes ?. MessageGroupId ) ;
6868
69- // If we have any failed messages, we should then short circuit the process and
70- // fail remaining messages unless `skipGroupOnError` is true
71- const shouldShortCircuit =
72- ! this . options ?. skipGroupOnError && this . failureMessages . length !== 0 ;
73- if ( shouldShortCircuit ) {
69+ if ( this . _shouldShortCircuit ( ) ) {
7470 return this . _shortCircuitProcessing ( currentIndex , processedRecords ) ;
7571 }
7672
77- // If `skipGroupOnError` is true and the current group has previously failed,
78- // then we should skip processing the current group.
79- const shouldSkipCurrentGroup =
80- this . options ?. skipGroupOnError &&
81- this . _currentGroupId &&
82- this . _failedGroupIds . has ( this . _currentGroupId ) ;
83-
84- const result = shouldSkipCurrentGroup
73+ const result = this . _shouldSkipCurrentGroup ( )
8574 ? this . _processFailRecord (
8675 record ,
8776 new SqsFifoMessageGroupShortCircuitError ( )
You can’t perform that action at this time.
0 commit comments