Skip to content

Commit 6e7e643

Browse files
feat: add support for batch execution in parallel with custom Executor
1 parent 8a040ac commit 6e7e643

File tree

11 files changed

+378
-47
lines changed

11 files changed

+378
-47
lines changed
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.demo.batch.dynamo;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
6+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
7+
import org.slf4j.Logger;
8+
import org.slf4j.LoggerFactory;
9+
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
10+
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
11+
12+
import java.util.concurrent.ExecutorService;
13+
import java.util.concurrent.Executors;
14+
15+
public class DynamoDBStreamBatchHandlerParallel implements RequestHandler<DynamodbEvent, StreamsEventResponse> {
16+
17+
private static final Logger LOGGER = LoggerFactory.getLogger(DynamoDBStreamBatchHandlerParallel.class);
18+
private final BatchMessageHandler<DynamodbEvent, StreamsEventResponse> handler;
19+
private final ExecutorService executor;
20+
21+
public DynamoDBStreamBatchHandlerParallel() {
22+
handler = new BatchMessageHandlerBuilder()
23+
.withDynamoDbBatchHandler()
24+
.buildWithRawMessageHandler(this::processMessage);
25+
executor = Executors.newFixedThreadPool(2);
26+
}
27+
28+
@Override
29+
public StreamsEventResponse handleRequest(DynamodbEvent ddbEvent, Context context) {
30+
return handler.processBatchInParallel(ddbEvent, context, executor);
31+
}
32+
33+
private void processMessage(DynamodbEvent.DynamodbStreamRecord dynamodbStreamRecord, Context context) {
34+
LOGGER.info("Processing DynamoDB Stream Record" + dynamodbStreamRecord);
35+
}
36+
37+
}
Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,39 @@
1+
package org.demo.batch.kinesis;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
6+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
7+
import org.demo.batch.model.Product;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
11+
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
12+
13+
import java.util.concurrent.ExecutorService;
14+
import java.util.concurrent.Executors;
15+
16+
public class KinesisBatchHandlerParallel implements RequestHandler<KinesisEvent, StreamsEventResponse> {
17+
18+
private static final Logger LOGGER = LoggerFactory.getLogger(KinesisBatchHandlerParallel.class);
19+
private final BatchMessageHandler<KinesisEvent, StreamsEventResponse> handler;
20+
private final ExecutorService executor;
21+
22+
23+
public KinesisBatchHandlerParallel() {
24+
handler = new BatchMessageHandlerBuilder()
25+
.withKinesisBatchHandler()
26+
.buildWithMessageHandler(this::processMessage, Product.class);
27+
executor = Executors.newFixedThreadPool(2);
28+
}
29+
30+
@Override
31+
public StreamsEventResponse handleRequest(KinesisEvent kinesisEvent, Context context) {
32+
return handler.processBatchInParallel(kinesisEvent, context, executor);
33+
}
34+
35+
private void processMessage(Product p, Context c) {
36+
LOGGER.info("Processing product " + p);
37+
}
38+
39+
}
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
package org.demo.batch.sqs;
2+
3+
import com.amazonaws.services.lambda.runtime.Context;
4+
import com.amazonaws.services.lambda.runtime.RequestHandler;
5+
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
6+
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
7+
import org.demo.batch.model.Product;
8+
import org.slf4j.Logger;
9+
import org.slf4j.LoggerFactory;
10+
import software.amazon.lambda.powertools.batch.BatchMessageHandlerBuilder;
11+
import software.amazon.lambda.powertools.batch.handler.BatchMessageHandler;
12+
import software.amazon.lambda.powertools.logging.Logging;
13+
import software.amazon.lambda.powertools.tracing.Tracing;
14+
15+
import java.util.concurrent.ExecutorService;
16+
import java.util.concurrent.Executors;
17+
18+
public class SqsBatchHandlerParallel extends AbstractSqsBatchHandler implements RequestHandler<SQSEvent, SQSBatchResponse> {
19+
private static final Logger LOGGER = LoggerFactory.getLogger(SqsBatchHandlerParallel.class);
20+
private final BatchMessageHandler<SQSEvent, SQSBatchResponse> handler;
21+
private final ExecutorService executor;
22+
23+
public SqsBatchHandlerParallel() {
24+
handler = new BatchMessageHandlerBuilder()
25+
.withSqsBatchHandler()
26+
.buildWithMessageHandler(this::processMessage, Product.class);
27+
executor = Executors.newFixedThreadPool(2);
28+
}
29+
30+
@Logging
31+
@Tracing
32+
@Override
33+
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
34+
LOGGER.info("Processing batch of {} messages", sqsEvent.getRecords().size());
35+
return handler.processBatchInParallel(sqsEvent, context, executor);
36+
}
37+
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/BatchMessageHandler.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,9 @@
1616

1717
import com.amazonaws.services.lambda.runtime.Context;
1818

19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.ExecutorService;
21+
1922
/**
2023
* The basic interface a batch message handler must meet.
2124
*
@@ -50,4 +53,14 @@ public interface BatchMessageHandler<E, R> {
5053
* @return A partial batch response
5154
*/
5255
R processBatchInParallel(E event, Context context);
56+
57+
58+
/**
59+
* Same as {@link #processBatchInParallel(Object, Context)} but with an option to provide custom {@link Executor}
60+
* @param event The Lambda event containing the batch to process
61+
* @param context The lambda context
62+
* @param executor Custom executor to use for parallel processing
63+
* @return A partial batch response
64+
*/
65+
R processBatchInParallel(E event, Context context, Executor executor);
5366
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/DynamoDbBatchMessageHandler.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,13 @@
1717
import com.amazonaws.services.lambda.runtime.Context;
1818
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent;
1919
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
20+
21+
import java.util.ArrayList;
2022
import java.util.List;
2123
import java.util.Optional;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.Executor;
26+
import java.util.concurrent.ExecutorService;
2227
import java.util.function.BiConsumer;
2328
import java.util.function.Consumer;
2429
import java.util.stream.Collectors;
@@ -66,7 +71,9 @@ public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context
6671
.parallelStream() // Parallel processing
6772
.map(eventRecord -> {
6873
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
69-
return processBatchItem(eventRecord, context);
74+
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
75+
multiThreadMDC.removeThread(Thread.currentThread().getName());
76+
return failureOpt;
7077
})
7178
.filter(Optional::isPresent)
7279
.map(Optional::get)
@@ -75,6 +82,23 @@ public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context
7582
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
7683
}
7784

85+
@Override
86+
public StreamsEventResponse processBatchInParallel(DynamodbEvent event, Context context, Executor executor) {
87+
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
88+
89+
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
90+
List<CompletableFuture<Void>> futures = event.getRecords().stream()
91+
.map(eventRecord -> CompletableFuture.runAsync(() -> {
92+
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
93+
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
94+
failureOpt.ifPresent(batchItemFailures::add);
95+
multiThreadMDC.removeThread(Thread.currentThread().getName());
96+
}, executor))
97+
.collect(Collectors.toList());
98+
futures.forEach(CompletableFuture::join);
99+
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
100+
}
101+
78102
private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(DynamodbEvent.DynamodbStreamRecord streamRecord, Context context) {
79103
try {
80104
LOGGER.debug("Processing item {}", streamRecord.getEventID());

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/KinesisStreamsBatchMessageHandler.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,12 @@
1818
import com.amazonaws.services.lambda.runtime.Context;
1919
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
2020
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
21+
22+
import java.util.ArrayList;
2123
import java.util.List;
2224
import java.util.Optional;
25+
import java.util.concurrent.CompletableFuture;
26+
import java.util.concurrent.Executor;
2327
import java.util.function.BiConsumer;
2428
import java.util.function.Consumer;
2529
import java.util.stream.Collectors;
@@ -77,7 +81,9 @@ public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context c
7781
.parallelStream() // Parallel processing
7882
.map(eventRecord -> {
7983
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
80-
return processBatchItem(eventRecord, context);
84+
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
85+
multiThreadMDC.removeThread(Thread.currentThread().getName());
86+
return failureOpt;
8187
})
8288
.filter(Optional::isPresent)
8389
.map(Optional::get)
@@ -86,6 +92,23 @@ public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context c
8692
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
8793
}
8894

95+
@Override
96+
public StreamsEventResponse processBatchInParallel(KinesisEvent event, Context context, Executor executor) {
97+
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
98+
99+
List<StreamsEventResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
100+
List<CompletableFuture<Void>> futures = event.getRecords().stream()
101+
.map(eventRecord -> CompletableFuture.runAsync(() -> {
102+
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
103+
Optional<StreamsEventResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
104+
failureOpt.ifPresent(batchItemFailures::add);
105+
multiThreadMDC.removeThread(Thread.currentThread().getName());
106+
}, executor))
107+
.collect(Collectors.toList());
108+
futures.forEach(CompletableFuture::join);
109+
return StreamsEventResponse.builder().withBatchItemFailures(batchItemFailures).build();
110+
}
111+
89112
private Optional<StreamsEventResponse.BatchItemFailure> processBatchItem(KinesisEvent.KinesisEventRecord eventRecord, Context context) {
90113
try {
91114
LOGGER.debug("Processing item {}", eventRecord.getEventID());

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/handler/SqsBatchMessageHandler.java

Lines changed: 34 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,20 @@
1515
package software.amazon.lambda.powertools.batch.handler;
1616

1717
import com.amazonaws.services.lambda.runtime.Context;
18+
import com.amazonaws.services.lambda.runtime.events.KinesisEvent;
1819
import com.amazonaws.services.lambda.runtime.events.SQSBatchResponse;
1920
import com.amazonaws.services.lambda.runtime.events.SQSEvent;
2021
import java.util.ArrayList;
2122
import java.util.List;
2223
import java.util.Optional;
24+
import java.util.concurrent.CompletableFuture;
25+
import java.util.concurrent.Executor;
2326
import java.util.concurrent.atomic.AtomicBoolean;
2427
import java.util.function.BiConsumer;
2528
import java.util.function.Consumer;
2629
import java.util.stream.Collectors;
30+
31+
import com.amazonaws.services.lambda.runtime.events.StreamsEventResponse;
2732
import org.slf4j.Logger;
2833
import org.slf4j.LoggerFactory;
2934
import software.amazon.lambda.powertools.batch.internal.MultiThreadMDC;
@@ -99,7 +104,7 @@ public SQSBatchResponse processBatch(SQSEvent event, Context context) {
99104

100105
@Override
101106
public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context) {
102-
if (!event.getRecords().isEmpty() && event.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null) {
107+
if (isFIFOEnabled(event)) {
103108
throw new UnsupportedOperationException("FIFO queues are not supported in parallel mode, use the processBatch method instead");
104109
}
105110

@@ -109,7 +114,9 @@ public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context)
109114
.map(sqsMessage -> {
110115

111116
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
112-
return processBatchItem(sqsMessage, context);
117+
Optional<SQSBatchResponse.BatchItemFailure> failureOpt = processBatchItem(sqsMessage, context);
118+
multiThreadMDC.removeThread(Thread.currentThread().getName());
119+
return failureOpt;
113120
})
114121
.filter(Optional::isPresent)
115122
.map(Optional::get)
@@ -118,6 +125,27 @@ public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context)
118125
return SQSBatchResponse.builder().withBatchItemFailures(batchItemFailures).build();
119126
}
120127

128+
@Override
129+
public SQSBatchResponse processBatchInParallel(SQSEvent event, Context context, Executor executor) {
130+
if (isFIFOEnabled(event)) {
131+
throw new UnsupportedOperationException("FIFO queues are not supported in parallel mode, use the processBatch method instead");
132+
}
133+
134+
MultiThreadMDC multiThreadMDC = new MultiThreadMDC();
135+
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<>();
136+
List<CompletableFuture<Void>> futures = event.getRecords().stream()
137+
.map(eventRecord -> CompletableFuture.runAsync(() -> {
138+
multiThreadMDC.copyMDCToThread(Thread.currentThread().getName());
139+
Optional<SQSBatchResponse.BatchItemFailure> failureOpt = processBatchItem(eventRecord, context);
140+
failureOpt.ifPresent(batchItemFailures::add);
141+
multiThreadMDC.removeThread(Thread.currentThread().getName());
142+
}, executor))
143+
.collect(Collectors.toList());
144+
futures.forEach(CompletableFuture::join);
145+
146+
return SQSBatchResponse.builder().withBatchItemFailures(batchItemFailures).build();
147+
}
148+
121149
private Optional<SQSBatchResponse.BatchItemFailure> processBatchItem(SQSEvent.SQSMessage message, Context context) {
122150
try {
123151
LOGGER.debug("Processing message {}", message.getMessageId());
@@ -152,4 +180,8 @@ private Optional<SQSBatchResponse.BatchItemFailure> processBatchItem(SQSEvent.SQ
152180
.build());
153181
}
154182
}
183+
184+
private boolean isFIFOEnabled(SQSEvent sqsEvent) {
185+
return !sqsEvent.getRecords().isEmpty() && sqsEvent.getRecords().get(0).getAttributes().get(MESSAGE_GROUP_ID_KEY) != null;
186+
}
155187
}

powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/internal/MultiThreadMDC.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,4 +44,11 @@ public void copyMDCToThread(String thread) {
4444
mdcAwareThreads.add(thread);
4545
}
4646
}
47+
48+
public void removeThread(String thread) {
49+
if (mdcAwareThreads.contains(thread)) {
50+
LOGGER.debug("Removing thread {}", thread);
51+
mdcAwareThreads.remove(thread);
52+
}
53+
}
4754
}

0 commit comments

Comments
 (0)