Skip to content

Commit 79f6479

Browse files
garyrussellartembilan
authored andcommitted
GH-2736: Add DSL support for async TCP OB Gateway
1 parent 5cb3f21 commit 79f6479

File tree

2 files changed

+40
-4
lines changed

2 files changed

+40
-4
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/TcpOutboundGatewaySpec.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ public TcpOutboundGatewaySpec(TcpClientConnectionFactorySpec connectionFactorySp
6868
*/
6969
public TcpOutboundGatewaySpec remoteTimeout(long remoteTimeout) {
7070
this.target.setRemoteTimeout(remoteTimeout);
71-
return _this();
71+
return this;
7272
}
7373

7474
/**
@@ -86,7 +86,7 @@ public TcpOutboundGatewaySpec remoteTimeout(long remoteTimeout) {
8686
*/
8787
public <P> TcpOutboundGatewaySpec remoteTimeout(Function<Message<P>, ?> remoteTimeoutFunction) {
8888
this.target.setRemoteTimeoutExpression(new FunctionExpression<>(remoteTimeoutFunction));
89-
return _this();
89+
return this;
9090
}
9191

9292
/**
@@ -100,7 +100,18 @@ public <P> TcpOutboundGatewaySpec remoteTimeout(Function<Message<P>, ?> remoteTi
100100
*/
101101
public TcpOutboundGatewaySpec closeStreamAfterSend(boolean closeStreamAfterSend) {
102102
this.target.setCloseStreamAfterSend(closeStreamAfterSend);
103-
return _this();
103+
return this;
104+
}
105+
106+
/**
107+
* Set to true to release the sending thread and receive the reply asynchronously.
108+
* @param async true for asynchronous request/reply.
109+
* @return the spec.
110+
* @since 5.3
111+
*/
112+
public TcpOutboundGatewaySpec async(boolean async) {
113+
this.target.setAsync(async);
114+
return this;
104115
}
105116

106117
@Override

spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2016-2019 the original author or authors.
2+
* Copyright 2016-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -56,6 +56,7 @@
5656
import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter;
5757
import org.springframework.integration.ip.util.TestingUtilities;
5858
import org.springframework.integration.support.MessageBuilder;
59+
import org.springframework.integration.test.util.TestUtils;
5960
import org.springframework.messaging.Message;
6061
import org.springframework.messaging.MessageChannel;
6162
import org.springframework.messaging.support.GenericMessage;
@@ -100,6 +101,9 @@ public class IpIntegrationTests {
100101
@Autowired
101102
private QueueChannel udpIn;
102103

104+
@Autowired
105+
private TcpOutboundGateway tcpOutAsync;
106+
103107
@Autowired
104108
private Config config;
105109

@@ -210,6 +214,11 @@ public void onApplicationEvent(TcpConnectionServerListeningEvent event) {
210214
.convertSendAndReceive("foo", String.class)).isEqualTo("reply:FOO");
211215
}
212216

217+
@Test
218+
void async() {
219+
assertThat(TestUtils.getPropertyValue(this.tcpOutAsync, "async", Boolean.class)).isTrue();
220+
}
221+
213222
@Configuration
214223
@EnableIntegration
215224
public static class Config {
@@ -293,6 +302,22 @@ public TcpOutboundGateway tcpOut() {
293302
.get();
294303
}
295304

305+
@Bean
306+
public AbstractClientConnectionFactory client2() {
307+
return Tcp.netClient("localhost", server1().getPort())
308+
.serializer(TcpCodecs.crlf())
309+
.deserializer(TcpCodecs.lengthHeader1())
310+
.get();
311+
}
312+
313+
@Bean
314+
public TcpOutboundGateway tcpOutAsync() {
315+
return Tcp.outboundGateway(client2())
316+
.async(true)
317+
.remoteTimeout(m -> 5000)
318+
.get();
319+
}
320+
296321
@Bean
297322
public AtomicBoolean adviceCalled() {
298323
return new AtomicBoolean();

0 commit comments

Comments
 (0)