Skip to content

Commit 97a0682

Browse files
committed
feat: revert to original implementation for SqsFifoPartialProcessor
1 parent 8c407bc commit 97a0682

File tree

1 file changed

+117
-9
lines changed

1 file changed

+117
-9
lines changed

packages/batch/src/SqsFifoPartialProcessor.ts

Lines changed: 117 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,17 @@
11
import type { SQSRecord } from 'aws-lambda';
22
import { BatchProcessorSync } from './BatchProcessorSync.js';
3-
import { SqsFifo } from './SqsFifo.js';
43
import { EventType } from './constants.js';
5-
import { SqsFifoMessageGroupShortCircuitError } from './errors.js';
6-
import type { FailureResponse, SuccessResponse } from './types.js';
4+
import {
5+
type BatchProcessingError,
6+
SqsFifoMessageGroupShortCircuitError,
7+
SqsFifoShortCircuitError,
8+
} from './errors.js';
9+
import type {
10+
BaseRecord,
11+
EventSourceDataClassTypes,
12+
FailureResponse,
13+
SuccessResponse,
14+
} from './types.js';
715

816
/**
917
* Batch processor for SQS FIFO queues
@@ -36,10 +44,39 @@ import type { FailureResponse, SuccessResponse } from './types.js';
3644
* });
3745
* ```
3846
*/
39-
class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) {
47+
class SqsFifoPartialProcessor extends BatchProcessorSync {
48+
/**
49+
* The ID of the current message group being processed.
50+
*/
51+
#currentGroupId?: string;
52+
/**
53+
* A set of group IDs that have already encountered failures.
54+
*/
55+
#failedGroupIds: Set<string>;
56+
4057
public constructor() {
4158
super(EventType.SQS);
59+
this.#failedGroupIds = new Set<string>();
4260
}
61+
62+
/**
63+
* Handles a failure for a given record.
64+
* Adds the current group ID to the set of failed group IDs if `skipGroupOnError` is true.
65+
* @param record - The record that failed.
66+
* @param exception - The error that occurred.
67+
* @returns The failure response.
68+
*/
69+
public failureHandler(
70+
record: EventSourceDataClassTypes,
71+
exception: Error
72+
): FailureResponse {
73+
if (this.options?.skipGroupOnError && this.#currentGroupId) {
74+
this.#addToFailedGroup(this.#currentGroupId);
75+
}
76+
77+
return super.failureHandler(record, exception);
78+
}
79+
4380
/**
4481
* Process a record with a synchronous handler
4582
*
@@ -64,14 +101,25 @@ class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) {
64101
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
65102
let currentIndex = 0;
66103
for (const record of this.records) {
67-
this._setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId);
104+
this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId);
68105

69-
if (this._shouldShortCircuit()) {
70-
return this._shortCircuitProcessing(currentIndex, processedRecords);
106+
// If we have any failed messages, we should then short circuit the process and
107+
// fail remaining messages unless `skipGroupOnError` is true
108+
const shouldShortCircuit =
109+
!this.options?.skipGroupOnError && this.failureMessages.length !== 0;
110+
if (shouldShortCircuit) {
111+
return this.shortCircuitProcessing(currentIndex, processedRecords);
71112
}
72113

73-
const result = this._shouldSkipCurrentGroup()
74-
? this._processFailRecord(
114+
// If `skipGroupOnError` is true and the current group has previously failed,
115+
// then we should skip processing the current group.
116+
const shouldSkipCurrentGroup =
117+
this.options?.skipGroupOnError &&
118+
this.#currentGroupId &&
119+
this.#failedGroupIds.has(this.#currentGroupId);
120+
121+
const result = shouldSkipCurrentGroup
122+
? this.#processFailRecord(
75123
record,
76124
new SqsFifoMessageGroupShortCircuitError()
77125
)
@@ -85,6 +133,66 @@ class SqsFifoPartialProcessor extends SqsFifo(BatchProcessorSync) {
85133

86134
return processedRecords;
87135
}
136+
137+
/**
138+
* Starting from the first failure index, fail all remaining messages regardless
139+
* of their group ID.
140+
*
141+
* This short circuit mechanism is used when we detect a failed message in the batch.
142+
*
143+
* Since messages in a FIFO queue are processed in order, we must stop processing any
144+
* remaining messages in the batch to prevent out-of-order processing.
145+
*
146+
* @param firstFailureIndex Index of first message that failed
147+
* @param processedRecords Array of response items that have been processed both successfully and unsuccessfully
148+
*/
149+
protected shortCircuitProcessing(
150+
firstFailureIndex: number,
151+
processedRecords: (SuccessResponse | FailureResponse)[]
152+
): (SuccessResponse | FailureResponse)[] {
153+
const remainingRecords = this.records.slice(firstFailureIndex);
154+
155+
for (const record of remainingRecords) {
156+
this.#processFailRecord(record, new SqsFifoShortCircuitError());
157+
}
158+
159+
this.clean();
160+
161+
return processedRecords;
162+
}
163+
164+
/**
165+
* Adds the specified group ID to the set of failed group IDs.
166+
*
167+
* @param group - The group ID to be added to the set of failed group IDs.
168+
*/
169+
#addToFailedGroup(group: string): void {
170+
this.#failedGroupIds.add(group);
171+
}
172+
173+
/**
174+
* Processes a fail record.
175+
*
176+
* @param record - The record that failed.
177+
* @param exception - The error that occurred.
178+
*/
179+
#processFailRecord(
180+
record: BaseRecord,
181+
exception: BatchProcessingError
182+
): FailureResponse {
183+
const data = this.toBatchType(record, this.eventType);
184+
185+
return this.failureHandler(data, exception);
186+
}
187+
188+
/**
189+
* Sets the current group ID for the message being processed.
190+
*
191+
* @param group - The group ID of the current message being processed.
192+
*/
193+
#setCurrentGroup(group?: string): void {
194+
this.#currentGroupId = group;
195+
}
88196
}
89197

90198
export { SqsFifoPartialProcessor };

0 commit comments

Comments
 (0)