Skip to content

Commit 8b70c20

Browse files
garyrussellartembilan
authored andcommitted
GH-3410: Add UDP SocketCustomizer
Resolves #3410 **cherry-pick to 5.3.x** * Doc polishing. # Conflicts: # spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java # spring-integration-ip/src/test/java/org/springframework/integration/ip/dsl/IpIntegrationTests.java # src/reference/asciidoc/whats-new.adoc
1 parent 9e3b5d9 commit 8b70c20

File tree

15 files changed

+190
-45
lines changed

15 files changed

+190
-45
lines changed

spring-integration-ip/src/main/java/org/springframework/integration/ip/config/IpAdapterParserUtils.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -133,6 +133,8 @@ public abstract class IpAdapterParserUtils {
133133

134134
public static final String CONNECT_TIMEOUT = "connect-timeout";
135135

136+
public static final String UDP_SOCKET_CUSTOMIZER = "socket-customizer";
137+
136138
private IpAdapterParserUtils() {
137139
}
138140

spring-integration-ip/src/main/java/org/springframework/integration/ip/config/UdpInboundChannelAdapterParser.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2002-2019 the original author or authors.
2+
* Copyright 2002-2020 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -51,6 +51,8 @@ protected AbstractBeanDefinition doParse(Element element, ParserContext parserCo
5151
IpAdapterParserUtils.TASK_EXECUTOR);
5252
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
5353
IpAdapterParserUtils.LOOKUP_HOST);
54+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element,
55+
IpAdapterParserUtils.UDP_SOCKET_CUSTOMIZER);
5456
return builder.getBeanDefinition();
5557
}
5658

spring-integration-ip/src/main/java/org/springframework/integration/ip/config/UdpOutboundChannelAdapterParser.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
*/
3636
public class UdpOutboundChannelAdapterParser extends AbstractOutboundChannelAdapterParser {
3737

38+
@Override
3839
protected AbstractBeanDefinition parseConsumer(Element element, ParserContext parserContext) {
3940
BeanDefinitionBuilder builder = parseUdp(element, parserContext);
4041
IpAdapterParserUtils.addCommonSocketOptions(builder, element);
@@ -84,6 +85,8 @@ private BeanDefinitionBuilder parseUdp(Element element, ParserContext parserCont
8485
IpAdapterParserUtils.TASK_EXECUTOR);
8586
IntegrationNamespaceUtils.setValueIfAttributeDefined(builder, element,
8687
"socket-expression", "socketExpressionString");
88+
IntegrationNamespaceUtils.setReferenceIfAttributeDefined(builder, element,
89+
IpAdapterParserUtils.UDP_SOCKET_CUSTOMIZER);
8790
return builder;
8891
}
8992

spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/AbstractUdpOutboundChannelAdapterSpec.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.springframework.integration.dsl.MessageHandlerSpec;
2323
import org.springframework.integration.expression.FunctionExpression;
24+
import org.springframework.integration.ip.udp.SocketCustomizer;
2425
import org.springframework.integration.ip.udp.UnicastSendingMessageHandler;
2526
import org.springframework.messaging.Message;
2627

@@ -133,4 +134,15 @@ public S socketExpression(String socketExpression) {
133134
return _this();
134135
}
135136

137+
/**
138+
* Configure the socket.
139+
* @param customizer the customizer.
140+
* @return the spec.
141+
* @since 5.3.3
142+
*/
143+
public S configureSocket(SocketCustomizer customizer) {
144+
this.target.setSocketCustomizer(customizer);
145+
return _this();
146+
}
147+
136148
}

spring-integration-ip/src/main/java/org/springframework/integration/ip/dsl/UdpInboundChannelAdapterSpec.java

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import org.springframework.integration.dsl.MessageProducerSpec;
2323
import org.springframework.integration.ip.udp.MulticastReceivingChannelAdapter;
24+
import org.springframework.integration.ip.udp.SocketCustomizer;
2425
import org.springframework.integration.ip.udp.UnicastReceivingChannelAdapter;
2526
import org.springframework.scheduling.TaskScheduler;
2627

@@ -50,7 +51,7 @@ protected UdpInboundChannelAdapterSpec(int port, String multicastGroup) {
5051
*/
5152
public UdpInboundChannelAdapterSpec soTimeout(int soTimeout) {
5253
this.target.setSoTimeout(soTimeout);
53-
return _this();
54+
return this;
5455
}
5556

5657
/**
@@ -60,7 +61,7 @@ public UdpInboundChannelAdapterSpec soTimeout(int soTimeout) {
6061
*/
6162
public UdpInboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) {
6263
this.target.setTaskScheduler(taskScheduler);
63-
return _this();
64+
return this;
6465
}
6566

6667
/**
@@ -70,7 +71,7 @@ public UdpInboundChannelAdapterSpec taskScheduler(TaskScheduler taskScheduler) {
7071
*/
7172
public UdpInboundChannelAdapterSpec soReceiveBufferSize(int soReceiveBufferSize) {
7273
this.target.setSoReceiveBufferSize(soReceiveBufferSize);
73-
return _this();
74+
return this;
7475
}
7576

7677
/**
@@ -80,7 +81,7 @@ public UdpInboundChannelAdapterSpec soReceiveBufferSize(int soReceiveBufferSize)
8081
*/
8182
public UdpInboundChannelAdapterSpec receiveBufferSize(int receiveBufferSize) {
8283
this.target.setReceiveBufferSize(receiveBufferSize);
83-
return _this();
84+
return this;
8485
}
8586

8687
/**
@@ -90,7 +91,7 @@ public UdpInboundChannelAdapterSpec receiveBufferSize(int receiveBufferSize) {
9091
*/
9192
public UdpInboundChannelAdapterSpec lengthCheck(boolean lengthCheck) {
9293
this.target.setLengthCheck(lengthCheck);
93-
return _this();
94+
return this;
9495
}
9596

9697
/**
@@ -100,7 +101,7 @@ public UdpInboundChannelAdapterSpec lengthCheck(boolean lengthCheck) {
100101
*/
101102
public UdpInboundChannelAdapterSpec localAddress(String localAddress) {
102103
this.target.setLocalAddress(localAddress);
103-
return _this();
104+
return this;
104105
}
105106

106107
/**
@@ -110,7 +111,7 @@ public UdpInboundChannelAdapterSpec localAddress(String localAddress) {
110111
*/
111112
public UdpInboundChannelAdapterSpec poolSize(int poolSize) {
112113
this.target.setPoolSize(poolSize);
113-
return _this();
114+
return this;
114115
}
115116

116117
/**
@@ -120,7 +121,7 @@ public UdpInboundChannelAdapterSpec poolSize(int poolSize) {
120121
*/
121122
public UdpInboundChannelAdapterSpec taskExecutor(Executor taskExecutor) {
122123
this.target.setTaskExecutor(taskExecutor);
123-
return _this();
124+
return this;
124125
}
125126

126127
/**
@@ -130,7 +131,7 @@ public UdpInboundChannelAdapterSpec taskExecutor(Executor taskExecutor) {
130131
*/
131132
public UdpInboundChannelAdapterSpec socket(DatagramSocket socket) {
132133
this.target.setSocket(socket);
133-
return _this();
134+
return this;
134135
}
135136

136137
/**
@@ -140,7 +141,7 @@ public UdpInboundChannelAdapterSpec socket(DatagramSocket socket) {
140141
*/
141142
public UdpInboundChannelAdapterSpec soSendBufferSize(int soSendBufferSize) {
142143
this.target.setSoSendBufferSize(soSendBufferSize);
143-
return _this();
144+
return this;
144145
}
145146

146147
/**
@@ -150,7 +151,18 @@ public UdpInboundChannelAdapterSpec soSendBufferSize(int soSendBufferSize) {
150151
*/
151152
public UdpInboundChannelAdapterSpec lookupHost(boolean lookupHost) {
152153
this.target.setLookupHost(lookupHost);
153-
return _this();
154+
return this;
155+
}
156+
157+
/**
158+
* Configure the socket.
159+
* @param customizer the customizer.
160+
* @return the spec.
161+
* @since 5.3.3
162+
*/
163+
public UdpInboundChannelAdapterSpec configureSocket(SocketCustomizer customizer) {
164+
this.target.setSocketCustomizer(customizer);
165+
return this;
154166
}
155167

156168
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
/**
22
* Provides TCP/UDP Component support for the Java DSL.
33
*/
4+
@org.springframework.lang.NonNullApi
45
package org.springframework.integration.ip.dsl;
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.ip.udp;
18+
19+
import java.net.DatagramSocket;
20+
import java.net.SocketException;
21+
22+
/**
23+
* Configures a socket.
24+
*
25+
* @author Gary Russell
26+
* @since 5.3.3
27+
*/
28+
@FunctionalInterface
29+
public interface SocketCustomizer {
30+
31+
/**
32+
* Configure the socket ({code setTrafficClass()}, etc).
33+
* @param socket the socket.
34+
* @throws SocketException a socket exception.
35+
*/
36+
void configure(DatagramSocket socket) throws SocketException;
37+
38+
}

spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastReceivingChannelAdapter.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import org.springframework.messaging.Message;
3636
import org.springframework.messaging.MessageHeaders;
3737
import org.springframework.messaging.MessagingException;
38+
import org.springframework.util.Assert;
3839

3940
/**
4041
* A channel adapter to receive incoming UDP packets. Packets can optionally be preceded by a
@@ -54,6 +55,7 @@ public class UnicastReceivingChannelAdapter extends AbstractInternetProtocolRece
5455

5556
private static Pattern addressPattern = Pattern.compile("([^:]*):([0-9]*)");
5657

58+
private SocketCustomizer socketCustomizer = socket -> { };
5759

5860
/**
5961
* Constructs a UnicastReceivingChannelAdapter that listens on the specified port.
@@ -85,6 +87,16 @@ public void setLengthCheck(boolean lengthCheck) {
8587
this.mapper.setLengthCheck(lengthCheck);
8688
}
8789

90+
/**
91+
* Set a customizer to further configure the socket after creation.
92+
* @param socketCustomizer the customizer.
93+
* @since 5.3.3
94+
*/
95+
public void setSocketCustomizer(SocketCustomizer socketCustomizer) {
96+
Assert.notNull(socketCustomizer, "'socketCustomizer' cannot be null");
97+
this.socketCustomizer = socketCustomizer;
98+
}
99+
88100
@Override
89101
public boolean isLongLived() {
90102
return true;
@@ -169,6 +181,7 @@ protected void sendAck(Message<byte[]> message) {
169181
if (this.soSendBufferSize > 0) {
170182
out.setSendBufferSize(this.soSendBufferSize);
171183
}
184+
this.socketCustomizer.configure(out);
172185
out.send(ackPack);
173186
out.close();
174187
}
@@ -260,7 +273,7 @@ public synchronized DatagramSocket getSocket() {
260273
}
261274

262275
/**
263-
* Sets timeout and receive buffer size
276+
* Sets timeout and receive buffer size; calls the socket customizer.
264277
*
265278
* @param socket The socket.
266279
* @throws SocketException Any socket exception.
@@ -272,6 +285,7 @@ protected void setSocketAttributes(DatagramSocket socket)
272285
if (soReceiveBufferSize > 0) {
273286
socket.setReceiveBufferSize(soReceiveBufferSize);
274287
}
288+
this.socketCustomizer.configure(socket);
275289
}
276290

277291
@Override

spring-integration-ip/src/main/java/org/springframework/integration/ip/udp/UnicastSendingMessageHandler.java

Lines changed: 23 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,9 @@ public class UnicastSendingMessageHandler extends
105105

106106
private EvaluationContext evaluationContext;
107107

108+
private SocketCustomizer socketCustomizer = socket -> { };
109+
110+
108111
/**
109112
* Basic constructor; no reliability; no acknowledgment.
110113
* @param host Destination host.
@@ -204,6 +207,25 @@ public UnicastSendingMessageHandler(String host,
204207
ackTimeout);
205208
}
206209

210+
/**
211+
* @param lengthCheck if true, a four byte binary length header is added to the
212+
* packet, allowing the receiver to check for data truncation.
213+
* @since 5.0
214+
*/
215+
public void setLengthCheck(boolean lengthCheck) {
216+
this.mapper.setLengthCheck(lengthCheck);
217+
}
218+
219+
/**
220+
* Set a customizer to further configure the socket after creation.
221+
* @param socketCustomizer the customizer.
222+
* @since 5.3.3
223+
*/
224+
public void setSocketCustomizer(SocketCustomizer socketCustomizer) {
225+
Assert.notNull(socketCustomizer, "'socketCustomizer' cannot be null");
226+
this.socketCustomizer = socketCustomizer;
227+
}
228+
207229
protected final void setReliabilityAttributes(boolean lengthCheck,
208230
boolean acknowledge, String ackHost, int ackPort, int ackTimeout) {
209231
this.mapper.setLengthCheck(lengthCheck);
@@ -221,15 +243,6 @@ protected final void setReliabilityAttributes(boolean lengthCheck,
221243
}
222244
}
223245

224-
/**
225-
* @param lengthCheck if true, a four byte binary length header is added to the
226-
* packet, allowing the receiver to check for data truncation.
227-
* @since 5.0
228-
*/
229-
public void setLengthCheck(boolean lengthCheck) {
230-
this.mapper.setLengthCheck(lengthCheck);
231-
}
232-
233246
@Override
234247
public void doStart() {
235248
if (this.acknowledge) {
@@ -496,6 +509,7 @@ protected void setSocketAttributes(DatagramSocket socket) throws SocketException
496509
if (this.getSoSendBufferSize() > 0) {
497510
socket.setSendBufferSize(this.getSoSendBufferSize());
498511
}
512+
this.socketCustomizer.configure(socket);
499513
}
500514

501515
/**

spring-integration-ip/src/main/resources/org/springframework/integration/ip/config/spring-integration-ip.xsd

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -873,6 +873,18 @@
873873
<xsd:extension base="common-attributes">
874874
<xsd:attribute name="check-length" type="xsd:string"/>
875875
<xsd:attribute name="multicast" type="xsd:string"/>
876+
<xsd:attribute name="socket-customizer" type="xsd:string">
877+
<xsd:annotation>
878+
<xsd:documentation>
879+
A SocketCustomizer bean.
880+
</xsd:documentation>
881+
<xsd:appinfo>
882+
<tool:annotation kind="ref">
883+
<tool:expected-type type="org.springframework.integration.ip.udp.SocketCustomizer"/>
884+
</tool:annotation>
885+
</xsd:appinfo>
886+
</xsd:annotation>
887+
</xsd:attribute>
876888
</xsd:extension>
877889
</xsd:complexContent>
878890
</xsd:complexType>

0 commit comments

Comments
 (0)