Skip to content

Commit 4f35761

Browse files
authored
Fix transformer for async mode (#10625)
The `AbstractMessageProducingHandler` has `async` mode, which is handled by the reply payload type. However, the `AbstractTransformer` produces the whole `Message` with that async reply payload. * Fix `AbstractMessageProducingHandler` to check for a `Message` type on a reply object. Extract its payload for the async logic. And recreate the final message for reply after async container fulfilment * Disable `WebFluxDslTests.testHttpReactivePostWithError()` as failing against the latest SF **Auto-cherry-pick to `6.5.x` & `6.4.x`**
1 parent c420cab commit 4f35761

File tree

2 files changed

+90
-12
lines changed

2 files changed

+90
-12
lines changed

spring-integration-core/src/main/java/org/springframework/integration/handler/AbstractMessageProducingHandler.java

Lines changed: 41 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -321,25 +321,42 @@ private void doProduceOutput(Message<?> requestMessage, MessageHeaders requestHe
321321
replyChannel = getOutputChannel();
322322
}
323323

324+
Object replyPayload = reply;
325+
Message<?> replyMessage = reply instanceof Message<?> message ? message : null;
326+
327+
if (replyMessage != null) {
328+
replyPayload = replyMessage.getPayload();
329+
}
330+
324331
if (this.async) {
325-
boolean isFutureReply = reply instanceof CompletableFuture<?>;
332+
boolean isFutureReply = replyPayload instanceof CompletableFuture<?>;
326333

327334
ReactiveAdapter reactiveAdapter = null;
328335
if (!isFutureReply) {
329-
reactiveAdapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, reply);
336+
reactiveAdapter = ReactiveAdapterRegistry.getSharedInstance().getAdapter(null, replyPayload);
330337
}
331338

332339
if (isFutureReply || reactiveAdapter != null) {
333340
if (replyChannel instanceof ReactiveStreamsSubscribableChannel reactiveStreamsSubscribableChannel) {
334-
Publisher<?> reactiveReply = toPublisherReply(reply, reactiveAdapter);
341+
Publisher<?> reactiveReply = toPublisherReply(replyPayload, reactiveAdapter);
335342
reactiveStreamsSubscribableChannel
336343
.subscribeTo(
337344
Flux.from(reactiveReply)
338345
.doOnError((ex) -> sendErrorMessage(requestMessage, ex))
339-
.map(result -> createOutputMessage(result, requestHeaders)));
346+
.map(result -> {
347+
if (replyMessage != null) {
348+
return getMessageBuilderFactory()
349+
.withPayload(result)
350+
.copyHeaders(replyMessage.getHeaders())
351+
.build();
352+
}
353+
else {
354+
return createOutputMessage(result, requestHeaders);
355+
}
356+
}));
340357
}
341358
else {
342-
CompletableFuture<?> futureReply = toFutureReply(reply, reactiveAdapter);
359+
CompletableFuture<?> futureReply = toFutureReply(replyPayload, replyMessage, reactiveAdapter);
343360
futureReply.whenComplete(new ReplyFutureCallback(requestMessage, replyChannel));
344361
}
345362

@@ -359,8 +376,12 @@ private Publisher<?> toPublisherReply(Object reply, @Nullable ReactiveAdapter re
359376
}
360377
}
361378

362-
@SuppressWarnings("try")
363-
private CompletableFuture<?> toFutureReply(Object reply, @Nullable ReactiveAdapter reactiveAdapter) {
379+
@SuppressWarnings({"try", "unchecked"})
380+
private CompletableFuture<?> toFutureReply(Object reply, @Nullable Message<?> replyMessage,
381+
@Nullable ReactiveAdapter reactiveAdapter) {
382+
383+
CompletableFuture<Object> replyFuture;
384+
364385
if (reactiveAdapter != null) {
365386
Mono<?> reactiveReply;
366387
Publisher<?> publisher = reactiveAdapter.toPublisher(reply);
@@ -371,15 +392,15 @@ private CompletableFuture<?> toFutureReply(Object reply, @Nullable ReactiveAdapt
371392
reactiveReply = Mono.from(publisher);
372393
}
373394

374-
CompletableFuture<Object> replyFuture = new CompletableFuture<>();
395+
replyFuture = new CompletableFuture<>();
375396

376397
reactiveReply
377398
/*
378399
The MonoToCompletableFuture in Project Reactor does not support context propagation,
379400
and it does not suppose to, since there is no guarantee how this Future is going to
380401
be handled downstream.
381402
However, in our case we process it directly in this class in the doProduceOutput()
382-
via whenComplete() callback. So, when value is set into the Future, it is available
403+
via whenComplete() callback. So, when the value is set into the Future, it is available
383404
in the callback in the same thread immediately.
384405
*/
385406
.doOnEach((signal) -> {
@@ -400,12 +421,20 @@ via whenComplete() callback. So, when value is set into the Future, it is availa
400421
})
401422
.contextCapture()
402423
.subscribe();
403-
404-
return replyFuture;
405424
}
406425
else {
407-
return (CompletableFuture<?>) reply;
426+
replyFuture = (CompletableFuture<Object>) reply;
408427
}
428+
429+
if (replyMessage == null) {
430+
return replyFuture;
431+
}
432+
433+
return replyFuture.thenApply(result ->
434+
getMessageBuilderFactory()
435+
.withPayload(result)
436+
.copyHeaders(replyMessage.getHeaders())
437+
.build());
409438
}
410439

411440
private AbstractIntegrationMessageBuilder<?> addRoutingSlipHeader(Object reply, List<?> routingSlip,

spring-integration-core/src/test/java/org/springframework/integration/dsl/transformers/TransformerTests.java

Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,15 @@
1818

1919
import java.io.InputStream;
2020
import java.io.OutputStream;
21+
import java.time.Duration;
2122
import java.util.Collections;
2223
import java.util.Date;
2324
import java.util.Map;
25+
import java.util.concurrent.CompletableFuture;
2426

2527
import org.junit.jupiter.api.Test;
28+
import reactor.core.publisher.Flux;
29+
import reactor.test.StepVerifier;
2630

2731
import org.springframework.beans.factory.annotation.Autowired;
2832
import org.springframework.beans.factory.annotation.Qualifier;
@@ -32,6 +36,7 @@
3236
import org.springframework.integration.annotation.Transformer;
3337
import org.springframework.integration.channel.DirectChannel;
3438
import org.springframework.integration.channel.FixedSubscriberChannel;
39+
import org.springframework.integration.channel.FluxMessageChannel;
3540
import org.springframework.integration.channel.QueueChannel;
3641
import org.springframework.integration.codec.Codec;
3742
import org.springframework.integration.config.EnableIntegration;
@@ -273,6 +278,41 @@ public void testFailedTransformWithRequestHeadersCopy() {
273278
.isEqualTo("transform failed");
274279
}
275280

281+
@Autowired
282+
@Qualifier("asyncTransformerFlow.input")
283+
MessageChannel asyncTransformerFlowInput;
284+
285+
@Test
286+
void asyncTransformerReplyIsProcessed() {
287+
QueueChannel replyChannel = new QueueChannel();
288+
this.asyncTransformerFlowInput.send(
289+
MessageBuilder.withPayload("test")
290+
.setReplyChannel(replyChannel)
291+
.build());
292+
293+
Message<?> receive = replyChannel.receive(10_000);
294+
295+
assertThat(receive).extracting(Message::getPayload).isEqualTo("test async");
296+
297+
}
298+
299+
@Test
300+
void reactiveTransformerReplyIsProcessed() {
301+
FluxMessageChannel replyChannel = new FluxMessageChannel();
302+
this.asyncTransformerFlowInput.send(
303+
MessageBuilder.withPayload("test")
304+
.setReplyChannel(replyChannel)
305+
.build());
306+
307+
StepVerifier.create(
308+
Flux.from(replyChannel)
309+
.map(Message::getPayload)
310+
.cast(String.class))
311+
.expectNext("test async")
312+
.thenCancel()
313+
.verify(Duration.ofSeconds(10));
314+
}
315+
276316
@Configuration
277317
@EnableIntegration
278318
public static class ContextConfiguration {
@@ -465,6 +505,15 @@ public IntegrationFlow transformFlowWithError() {
465505
.log();
466506
}
467507

508+
@Bean
509+
public IntegrationFlow asyncTransformerFlow() {
510+
return f -> f
511+
.transformWith(endpoint -> endpoint
512+
.<String, CompletableFuture<String>>transformer(payload ->
513+
CompletableFuture.completedFuture(payload + " async"))
514+
.async(true));
515+
}
516+
468517
}
469518

470519
private static final class TestPojo {

0 commit comments

Comments
 (0)