Skip to content

Commit f9beef9

Browse files
committed
refactor: use SqsFifoProcessingUtils inside SqsFifoPartialProcessorAsync
1 parent ae5c3ce commit f9beef9

File tree

2 files changed

+96
-55
lines changed

2 files changed

+96
-55
lines changed

packages/batch/src/SqsFifoPartialProcessorAsync.ts

Lines changed: 10 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { SQSRecord } from 'aws-lambda';
22
import { BatchProcessor } from './BatchProcessor.js';
3+
import { SqsFifoProcessingUtils } from './SqsFifoProcessingUtils.js';
34
import { EventType } from './constants.js';
45
import {
56
type BatchProcessingError,
@@ -46,17 +47,13 @@ import type {
4647
*/
4748
class SqsFifoPartialProcessorAsync extends BatchProcessor {
4849
/**
49-
* The ID of the current message group being processed.
50+
* Utility class for processing SQS FIFO queues
5051
*/
51-
#currentGroupId?: string;
52-
/**
53-
* A set of group IDs that have already encountered failures.
54-
*/
55-
readonly #failedGroupIds: Set<string>;
52+
readonly #utils: SqsFifoProcessingUtils;
5653

5754
public constructor() {
5855
super(EventType.SQS);
59-
this.#failedGroupIds = new Set<string>();
56+
this.#utils = new SqsFifoProcessingUtils();
6057
}
6158

6259
/**
@@ -70,9 +67,7 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
7067
record: EventSourceDataClassTypes,
7168
exception: Error
7269
): FailureResponse {
73-
if (this.options?.skipGroupOnError && this.#currentGroupId) {
74-
this.#addToFailedGroup(this.#currentGroupId);
75-
}
70+
this.#utils.processFailureForCurrentGroup(this.options);
7671

7772
return super.failureHandler(record, exception);
7873
}
@@ -101,13 +96,15 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
10196
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
10297
let currentIndex = 0;
10398
for (const record of this.records) {
104-
this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId);
99+
this.#utils.setCurrentGroup(
100+
(record as SQSRecord).attributes?.MessageGroupId
101+
);
105102

106-
if (this.#shouldShortCircuit()) {
103+
if (this.#utils.shouldShortCircuit(this.failureMessages, this.options)) {
107104
return this.shortCircuitProcessing(currentIndex, processedRecords);
108105
}
109106

110-
const result = this.#shouldSkipCurrentGroup()
107+
const result = this.#utils.shouldSkipCurrentGroup(this.options)
111108
? this.#processFailRecord(
112109
record,
113110
new SqsFifoMessageGroupShortCircuitError()
@@ -150,15 +147,6 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
150147
return processedRecords;
151148
}
152149

153-
/**
154-
* Adds the specified group ID to the set of failed group IDs.
155-
*
156-
* @param group - The group ID to be added to the set of failed group IDs.
157-
*/
158-
#addToFailedGroup(group: string): void {
159-
this.#failedGroupIds.add(group);
160-
}
161-
162150
/**
163151
* Processes a fail record.
164152
*
@@ -173,39 +161,6 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
173161

174162
return this.failureHandler(data, exception);
175163
}
176-
177-
/**
178-
* Sets the current group ID for the message being processed.
179-
*
180-
* @param group - The group ID of the current message being processed.
181-
*/
182-
#setCurrentGroup(group?: string): void {
183-
this.#currentGroupId = group;
184-
}
185-
186-
/**
187-
* Determines whether the current group should be short-circuited.
188-
*
189-
* If we have any failed messages, we should then short circuit the process and
190-
* fail remaining messages unless `skipGroupOnError` is true
191-
*/
192-
#shouldShortCircuit(): boolean {
193-
return !this.options?.skipGroupOnError && this.failureMessages.length !== 0;
194-
}
195-
196-
/**
197-
* Determines whether the current group should be skipped.
198-
*
199-
* If `skipGroupOnError` is true and the current group has previously failed,
200-
* then we should skip processing the current group.
201-
*/
202-
#shouldSkipCurrentGroup(): boolean {
203-
return (
204-
(this.options?.skipGroupOnError ?? false) &&
205-
this.#currentGroupId &&
206-
this.#failedGroupIds.has(this.#currentGroupId)
207-
);
208-
}
209164
}
210165

211166
export { SqsFifoPartialProcessorAsync };
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
import type {
2+
BatchProcessingOptions,
3+
EventSourceDataClassTypes,
4+
} from './types.js';
5+
6+
/**
7+
* Utility class to handle processing of SQS FIFO messages.
8+
*/
9+
class SqsFifoProcessingUtils {
10+
/**
11+
* The ID of the current message group being processed.
12+
*/
13+
#currentGroupId?: string;
14+
15+
/**
16+
* A set of group IDs that have already encountered failures.
17+
*/
18+
readonly #failedGroupIds: Set<string>;
19+
20+
public constructor() {
21+
this.#failedGroupIds = new Set<string>();
22+
}
23+
24+
/**
25+
* Adds the specified group ID to the set of failed group IDs.
26+
*
27+
* @param group - The group ID to be added to the set of failed group IDs.
28+
*/
29+
public addToFailedGroup(group: string): void {
30+
this.#failedGroupIds.add(group);
31+
}
32+
33+
/**
34+
* Sets the current group ID for the message being processed.
35+
*
36+
* @param group - The group ID of the current message being processed.
37+
*/
38+
public setCurrentGroup(group?: string): void {
39+
this.#currentGroupId = group;
40+
}
41+
42+
/**
43+
* Determines whether the current group should be short-circuited.
44+
*
45+
* If we have any failed messages, we should then short circuit the process and
46+
* fail remaining messages unless `skipGroupOnError` is true
47+
*
48+
* @param failureMessages - The list of failure messages.
49+
* @param options - The options for the batch processing.
50+
*/
51+
public shouldShortCircuit(
52+
failureMessages: EventSourceDataClassTypes[],
53+
options?: BatchProcessingOptions
54+
): boolean {
55+
return !options?.skipGroupOnError && failureMessages.length !== 0;
56+
}
57+
58+
/**
59+
* Determines whether the current group should be skipped.
60+
*
61+
* If `skipGroupOnError` is true and the current group has previously failed,
62+
* then we should skip processing the current group.
63+
*
64+
* @param options - The options for the batch processing.
65+
*/
66+
public shouldSkipCurrentGroup(options?: BatchProcessingOptions): boolean {
67+
return (
68+
(options?.skipGroupOnError ?? false) &&
69+
this.#currentGroupId &&
70+
this.#failedGroupIds.has(this.#currentGroupId)
71+
);
72+
}
73+
74+
/**
75+
* Handles failure for current group
76+
*
77+
* @param options - The options for the batch processing.
78+
*/
79+
public processFailureForCurrentGroup(options?: BatchProcessingOptions) {
80+
if (options?.skipGroupOnError && this.#currentGroupId) {
81+
this.addToFailedGroup(this.#currentGroupId);
82+
}
83+
}
84+
}
85+
86+
export { SqsFifoProcessingUtils };

0 commit comments

Comments
 (0)