Skip to content

Commit fe25477

Browse files
committed
refactor: SqsFifoProcessor for functionalities used in fifo processor classes
1 parent b27f651 commit fe25477

File tree

3 files changed

+27
-53
lines changed

3 files changed

+27
-53
lines changed

packages/batch/src/SqsFifoPartialProcessor.ts

Lines changed: 12 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import type { SQSRecord } from 'aws-lambda';
22
import { BatchProcessorSync } from './BatchProcessorSync.js';
3+
import { SqsFifoProcessor } from './SqsFifoProcessor.js';
34
import { EventType } from './constants.js';
45
import {
56
type BatchProcessingError,
@@ -46,17 +47,13 @@ import type {
4647
*/
4748
class SqsFifoPartialProcessor extends BatchProcessorSync {
4849
/**
49-
* The ID of the current message group being processed.
50+
* Processor for handling SQS FIFO message
5051
*/
51-
#currentGroupId?: string;
52-
/**
53-
* A set of group IDs that have already encountered failures.
54-
*/
55-
#failedGroupIds: Set<string>;
52+
readonly #processor: SqsFifoProcessor;
5653

5754
public constructor() {
5855
super(EventType.SQS);
59-
this.#failedGroupIds = new Set<string>();
56+
this.#processor = new SqsFifoProcessor();
6057
}
6158

6259
/**
@@ -70,9 +67,7 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
7067
record: EventSourceDataClassTypes,
7168
exception: Error
7269
): FailureResponse {
73-
if (this.options?.skipGroupOnError && this.#currentGroupId) {
74-
this.#addToFailedGroup(this.#currentGroupId);
75-
}
70+
this.#processor.processFailureForCurrentGroup(this.options);
7671

7772
return super.failureHandler(record, exception);
7873
}
@@ -101,24 +96,17 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
10196
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
10297
let currentIndex = 0;
10398
for (const record of this.records) {
104-
this.#setCurrentGroup((record as SQSRecord).attributes?.MessageGroupId);
99+
this.#processor.setCurrentGroup(
100+
(record as SQSRecord).attributes?.MessageGroupId
101+
);
105102

106-
// If we have any failed messages, we should then short circuit the process and
107-
// fail remaining messages unless `skipGroupOnError` is true
108-
const shouldShortCircuit =
109-
!this.options?.skipGroupOnError && this.failureMessages.length !== 0;
110-
if (shouldShortCircuit) {
103+
if (
104+
this.#processor.shouldShortCircuit(this.failureMessages, this.options)
105+
) {
111106
return this.shortCircuitProcessing(currentIndex, processedRecords);
112107
}
113108

114-
// If `skipGroupOnError` is true and the current group has previously failed,
115-
// then we should skip processing the current group.
116-
const shouldSkipCurrentGroup =
117-
this.options?.skipGroupOnError &&
118-
this.#currentGroupId &&
119-
this.#failedGroupIds.has(this.#currentGroupId);
120-
121-
const result = shouldSkipCurrentGroup
109+
const result = this.#processor.shouldSkipCurrentGroup(this.options)
122110
? this.#processFailRecord(
123111
record,
124112
new SqsFifoMessageGroupShortCircuitError()
@@ -161,15 +149,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
161149
return processedRecords;
162150
}
163151

164-
/**
165-
* Adds the specified group ID to the set of failed group IDs.
166-
*
167-
* @param group - The group ID to be added to the set of failed group IDs.
168-
*/
169-
#addToFailedGroup(group: string): void {
170-
this.#failedGroupIds.add(group);
171-
}
172-
173152
/**
174153
* Processes a fail record.
175154
*
@@ -184,15 +163,6 @@ class SqsFifoPartialProcessor extends BatchProcessorSync {
184163

185164
return this.failureHandler(data, exception);
186165
}
187-
188-
/**
189-
* Sets the current group ID for the message being processed.
190-
*
191-
* @param group - The group ID of the current message being processed.
192-
*/
193-
#setCurrentGroup(group?: string): void {
194-
this.#currentGroupId = group;
195-
}
196166
}
197167

198168
export { SqsFifoPartialProcessor };

packages/batch/src/SqsFifoPartialProcessorAsync.ts

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
import type { SQSRecord } from 'aws-lambda';
22
import { BatchProcessor } from './BatchProcessor.js';
3-
import { SqsFifoProcessingUtils } from './SqsFifoProcessingUtils.js';
3+
import { SqsFifoProcessor } from './SqsFifoProcessor.js';
44
import { EventType } from './constants.js';
55
import {
66
type BatchProcessingError,
@@ -47,13 +47,13 @@ import type {
4747
*/
4848
class SqsFifoPartialProcessorAsync extends BatchProcessor {
4949
/**
50-
* Utility class for processing SQS FIFO queues
50+
* Processor for handling SQS FIFO message
5151
*/
52-
readonly #utils: SqsFifoProcessingUtils;
52+
readonly #processor: SqsFifoProcessor;
5353

5454
public constructor() {
5555
super(EventType.SQS);
56-
this.#utils = new SqsFifoProcessingUtils();
56+
this.#processor = new SqsFifoProcessor();
5757
}
5858

5959
/**
@@ -66,7 +66,7 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
6666
record: EventSourceDataClassTypes,
6767
exception: Error
6868
): FailureResponse {
69-
this.#utils.processFailureForCurrentGroup(this.options);
69+
this.#processor.processFailureForCurrentGroup(this.options);
7070

7171
return super.failureHandler(record, exception);
7272
}
@@ -95,15 +95,17 @@ class SqsFifoPartialProcessorAsync extends BatchProcessor {
9595
const processedRecords: (SuccessResponse | FailureResponse)[] = [];
9696
let currentIndex = 0;
9797
for (const record of this.records) {
98-
this.#utils.setCurrentGroup(
98+
this.#processor.setCurrentGroup(
9999
(record as SQSRecord).attributes?.MessageGroupId
100100
);
101101

102-
if (this.#utils.shouldShortCircuit(this.failureMessages, this.options)) {
102+
if (
103+
this.#processor.shouldShortCircuit(this.failureMessages, this.options)
104+
) {
103105
return this.shortCircuitProcessing(currentIndex, processedRecords);
104106
}
105107

106-
const result = this.#utils.shouldSkipCurrentGroup(this.options)
108+
const result = this.#processor.shouldSkipCurrentGroup(this.options)
107109
? this.#processFailRecord(
108110
record,
109111
new SqsFifoMessageGroupShortCircuitError()

packages/batch/src/SqsFifoProcessingUtils.ts renamed to packages/batch/src/SqsFifoProcessor.ts

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,11 @@ import type {
44
} from './types.js';
55

66
/**
7-
* Utility class to handle processing of SQS FIFO messages.
7+
* Class representing a processor for SQS FIFO messages.
8+
* This class provides utilities for handling message groups, including tracking failed groups,
9+
* determining whether to short-circuit processing, and skipping groups based on processing options.
810
*/
9-
class SqsFifoProcessingUtils {
11+
class SqsFifoProcessor {
1012
/**
1113
* The ID of the current message group being processed.
1214
*/
@@ -84,4 +86,4 @@ class SqsFifoProcessingUtils {
8486
}
8587
}
8688

87-
export { SqsFifoProcessingUtils };
89+
export { SqsFifoProcessor };

0 commit comments

Comments
 (0)