Skip to content

Commit a76bb24

Browse files
artembilangaryrusselloli-ver
authored
INT-3045: Add ZeroMqChannel support (#3355)
* INT-3045: Add `ZeroMqChannel` support JIRA: https://jira.spring.io/browse/INT-3045 Provide a `SubscribableChannel` implementation for ZeroMQ The general idea is to let to have a distributed channel implementation where every client can connect to a single server backed by the channel. The logic in the channel is fully transparent for end-user and there is just enough to send message to it and subscribe for receiving on the other side. If PUB/SUB model is used, all the subscribes (even over the network) going to receive the same published message. In case of PUSH/PULL only one subscriber in the whole cluster is going to get the published message * Use Reactor for better threading control * JeroMQ is not interruptible-friendly: use control sockets to stop proxy loop * Name Reactor's schedulers to avoid daemon threads * * Use try-catch-with-resource to close sockets automatically * Fix Checkstyle violations * Use `Mono.handle()` to receive data from the socket * * Optimize local for just a couple of PAIR sockets * Implement TCP binding * Add PUB/SUB tests * * Fix subscriber scheduler name * Optimize socket create logic * Add PUSH/PULL over TCP test * * Fix subscriber scheduler name * Optimize socket create logic * Add PUSH/PULL over TCP test * Implement PUB/SUB over TCP * * Introduce `ZeroMqProxy` - Spring-friendly component to configure and manage ZeroMq proxy * Use this `ZeroMqProxy` logic as an external component for `ZeroMqChannel` testing * * Fix Checkstyle * Apply docs polishing * Expose a capture socket on the proxy * Implement `DisposableBean` in the `ZeroMqProxy` to destroy an internal executor service * Add JavaDocs to `ZeroMqChannel` * Add one more `ZeroMqChannel` to TCP test to be sure that proxy distribution works well * * Add `hamcrest-core` dependency for Awatility * * Add more JavaDocs to `ZeroMqProxy` and `ZeroMqChannel` * Expose `ZeroMqChannel.setZeroMqProxy()` option for easier configuration within the same application context * Make `ZeroMqChannel` sockets configuration and connection dependant on provided `ZeroMqProxy` (if any) * Add `Consumer<ZMQ.Socket>` configuration callbacks to the `ZeroMqChannel` * Expose `ZeroMqChannel.consumeDelay` option * * Add docs for ZeroMQ * Some additions into a `reactive-streams.adoc` * Fix typo in the `xmpp.adoc` * * Add `optional` `jackson-databind` since `ZeroMqChannel` uses it by default * More words into docs * * Fix language in docs according review * Fix language in docs according review Co-authored-by: Gary Russell <[email protected]> * Apply suggestions from code review Co-authored-by: Oliver <[email protected]> * * Fix threading using a `publishOn()` for specific scheduler after `cache()` * * Remove unused import * * Change proxy port check from static `Mono.just()` to `Mono.fromCallable()` to really evaluate the current port state on every repeat * Add finite `100` repeat number to avoid infinite blocking when proxy is not started at all * Add `doOnError()` for proxy `Mono` to log `ERROR` when repeat is exhausted * * Fix Checkstyle violation Co-authored-by: Gary Russell <[email protected]> Co-authored-by: Oliver <[email protected]>
1 parent 217e43b commit a76bb24

File tree

12 files changed

+1040
-3
lines changed

12 files changed

+1040
-3
lines changed

build.gradle

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ ext {
7070
javaxAnnotationVersion= '1.3.2'
7171
javaxMailVersion = '1.6.2'
7272
jaxbVersion = '2.3.3'
73+
jeroMqVersion = '0.5.2'
7374
jmsApiVersion = '2.0.1'
7475
jpa21ApiVersion = '1.0.2.Final'
7576
jpaApiVersion = '2.2.1'
@@ -864,6 +865,18 @@ project('spring-integration-xmpp') {
864865
}
865866
}
866867

868+
project('spring-integration-zeromq') {
869+
description = 'Spring Integration ZeroMQ Support'
870+
dependencies {
871+
api project(':spring-integration-core')
872+
api "org.zeromq:jeromq:$jeroMqVersion"
873+
874+
optionalApi 'com.fasterxml.jackson.core:jackson-databind'
875+
876+
testImplementation "org.hamcrest:hamcrest-core:$hamcrestVersion"
877+
}
878+
}
879+
867880
project('spring-integration-zookeeper') {
868881
description = 'Spring Integration Zookeeper Support'
869882
dependencies {
Lines changed: 367 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,367 @@
1+
/*
2+
* Copyright 2020 the original author or authors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package org.springframework.integration.zeromq;
18+
19+
import java.util.concurrent.Executor;
20+
import java.util.concurrent.ExecutorService;
21+
import java.util.concurrent.Executors;
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
import java.util.function.Consumer;
25+
26+
import org.apache.commons.logging.Log;
27+
import org.apache.commons.logging.LogFactory;
28+
import org.zeromq.SocketType;
29+
import org.zeromq.ZContext;
30+
import org.zeromq.ZMQ;
31+
32+
import org.springframework.beans.factory.BeanNameAware;
33+
import org.springframework.beans.factory.DisposableBean;
34+
import org.springframework.beans.factory.InitializingBean;
35+
import org.springframework.context.SmartLifecycle;
36+
import org.springframework.lang.Nullable;
37+
import org.springframework.scheduling.concurrent.CustomizableThreadFactory;
38+
import org.springframework.util.Assert;
39+
40+
/**
41+
* This class encapsulates the logic to configure and manage a ZeroMQ proxy.
42+
* It binds frontend and backend sockets over TCP on all the available network interfaces
43+
* with either provided or randomly selected ports.
44+
* <p>
45+
* The {@link ZeroMqProxy.Type} dictates which pair of ZeroMQ sockets to bind with this proxy
46+
* to implement any possible patterns for ZeroMQ intermediary. Defaults to @link {@link ZeroMqProxy.Type#PULL_PUSH}.
47+
* <p>
48+
* The control socket is exposed as a {@link SocketType#PAIR} with an inter-thread transport
49+
* on the {@code "inproc://" + beanName + ".control"} address; it can be obtained via {@link #getControlAddress()}.
50+
* Should be used with the same application from {@link SocketType#PAIR} socket to send
51+
* {@link zmq.ZMQ#PROXY_TERMINATE}, {@link zmq.ZMQ#PROXY_PAUSE} and/or {@link zmq.ZMQ#PROXY_RESUME} commands.
52+
* <p>
53+
* If the proxy cannot be started for some reason, an error message is logged and this component is
54+
* left in the non-started state.
55+
* <p>
56+
* With an {@link #exposeCaptureSocket} option, an additional capture data socket is bound to inter-thread transport
57+
* as a {@link SocketType#PUB}. There is no specific topic selection, so all the subscribers to this socket
58+
* must subscribe with plain {@link ZMQ#SUBSCRIPTION_ALL}.
59+
* The address for this socket is {@code "inproc://" + beanName + ".capture"}.
60+
*
61+
* @author Artem Bilan
62+
*
63+
* @since 5.4
64+
*
65+
* @see ZMQ#proxy(ZMQ.Socket, ZMQ.Socket, ZMQ.Socket)
66+
*/
67+
public class ZeroMqProxy implements InitializingBean, SmartLifecycle, BeanNameAware, DisposableBean {
68+
69+
private static final Log LOG = LogFactory.getLog(ZeroMqProxy.class);
70+
71+
private final ZContext context;
72+
73+
private final Type type;
74+
75+
private final AtomicBoolean running = new AtomicBoolean();
76+
77+
private final AtomicInteger frontendPort = new AtomicInteger();
78+
79+
private final AtomicInteger backendPort = new AtomicInteger();
80+
81+
private String controlAddress;
82+
83+
private Executor proxyExecutor;
84+
85+
private boolean proxyExecutorExplicitlySet;
86+
87+
@Nullable
88+
private Consumer<ZMQ.Socket> frontendSocketConfigurer;
89+
90+
@Nullable
91+
private Consumer<ZMQ.Socket> backendSocketConfigurer;
92+
93+
private boolean exposeCaptureSocket;
94+
95+
@Nullable
96+
private String captureAddress;
97+
98+
private String beanName;
99+
100+
private boolean autoStartup;
101+
102+
private int phase;
103+
104+
/**
105+
* Create a {@link ZeroMqProxy} instance based on the provided {@link ZContext}
106+
* and {@link Type#PULL_PUSH} as default mode.
107+
* @param context the {@link ZContext} to use
108+
*/
109+
public ZeroMqProxy(ZContext context) {
110+
this(context, Type.PULL_PUSH);
111+
}
112+
113+
/**
114+
* Create a {@link ZeroMqProxy} instance based on the provided {@link ZContext}
115+
* and {@link Type}.
116+
* @param context the {@link ZContext} to use
117+
* @param type the {@link Type} to use.
118+
*/
119+
public ZeroMqProxy(ZContext context, Type type) {
120+
Assert.notNull(context, "'context' must not be null");
121+
Assert.notNull(type, "'type' must not be null");
122+
this.context = context;
123+
this.type = type;
124+
}
125+
126+
/**
127+
* Configure an executor to perform a ZeroMQ proxy loop.
128+
* The thread is held until ZeroMQ proxy loop is terminated.
129+
* By default an internal {@link Executors#newSingleThreadExecutor} instance is used.
130+
* @param proxyExecutor the {@link Executor} to use for ZeroMQ proxy loop
131+
*/
132+
public void setProxyExecutor(Executor proxyExecutor) {
133+
Assert.notNull(proxyExecutor, "'proxyExecutor' must not be null");
134+
this.proxyExecutor = proxyExecutor;
135+
this.proxyExecutorExplicitlySet = true;
136+
}
137+
138+
/**
139+
* Specify a fixed port for frontend socket of the proxy.
140+
* @param frontendPort the port to use; must be more than 0
141+
*/
142+
public void setFrontendPort(int frontendPort) {
143+
Assert.isTrue(frontendPort > 0, "'frontendPort' must not be zero or negative");
144+
this.frontendPort.set(frontendPort);
145+
}
146+
147+
/**
148+
* Specify a fixed port for backend socket of the proxy.
149+
* @param backendPort the port to use; must be more than 0
150+
*/
151+
public void setBackendPort(int backendPort) {
152+
Assert.isTrue(backendPort > 0, "'backendPort' must not be zero or negative");
153+
this.backendPort.set(backendPort);
154+
}
155+
156+
/**
157+
* Provide a {@link Consumer} to configure a proxy frontend socket with arbitrary options, like security.
158+
* @param frontendSocketConfigurer the configurer for frontend socket
159+
*/
160+
public void setFrontendSocketConfigurer(@Nullable Consumer<ZMQ.Socket> frontendSocketConfigurer) {
161+
this.frontendSocketConfigurer = frontendSocketConfigurer;
162+
}
163+
164+
/**
165+
* Provide a {@link Consumer} to configure a proxy backend socket with arbitrary options, like security.
166+
* @param backendSocketConfigurer the configurer for backend socket
167+
*/
168+
public void setBackendSocketConfigurer(@Nullable Consumer<ZMQ.Socket> backendSocketConfigurer) {
169+
this.backendSocketConfigurer = backendSocketConfigurer;
170+
}
171+
172+
/**
173+
* Whether to bind and expose a capture socket for the proxy data.
174+
* @param exposeCaptureSocket true to bind capture socket for proxy
175+
*/
176+
public void setExposeCaptureSocket(boolean exposeCaptureSocket) {
177+
this.exposeCaptureSocket = exposeCaptureSocket;
178+
}
179+
180+
@Override
181+
public void setBeanName(String beanName) {
182+
this.beanName = beanName;
183+
}
184+
185+
public void setAutoStartup(boolean autoStartup) {
186+
this.autoStartup = autoStartup;
187+
}
188+
189+
public void setPhase(int phase) {
190+
this.phase = phase;
191+
}
192+
193+
public Type getType() {
194+
return this.type;
195+
}
196+
197+
/**
198+
* Return the port a frontend socket is bound or 0 if this proxy has not been started yet.
199+
* @return the port for a frontend socket or 0
200+
*/
201+
public int getFrontendPort() {
202+
return this.frontendPort.get();
203+
}
204+
205+
/**
206+
* Return the port a backend socket is bound or null if this proxy has not been started yet.
207+
* @return the port for a backend socket or 0
208+
*/
209+
public int getBackendPort() {
210+
return this.backendPort.get();
211+
}
212+
213+
/**
214+
* Return the address an {@code inproc} control socket is bound or null if this proxy has not been started yet.
215+
* @return the the address for control socket or null
216+
*/
217+
@Nullable
218+
public String getControlAddress() {
219+
return this.controlAddress;
220+
}
221+
222+
/**
223+
* Return the address an {@code inproc} capture socket is bound or null if this proxy has not been started yet
224+
* or {@link #captureAddress} is false.
225+
* @return the the address for capture socket or null
226+
*/
227+
@Nullable
228+
public String getCaptureAddress() {
229+
return this.captureAddress;
230+
}
231+
232+
@Override
233+
public boolean isAutoStartup() {
234+
return this.autoStartup;
235+
}
236+
237+
@Override
238+
public int getPhase() {
239+
return this.phase;
240+
}
241+
242+
@Override
243+
public void afterPropertiesSet() {
244+
if (this.proxyExecutor == null) {
245+
this.proxyExecutor = Executors.newSingleThreadExecutor(new CustomizableThreadFactory(this.beanName));
246+
}
247+
this.controlAddress = "inproc://" + this.beanName + ".control";
248+
if (this.exposeCaptureSocket) {
249+
this.captureAddress = "inproc://" + this.beanName + ".capture";
250+
}
251+
}
252+
253+
@Override
254+
public synchronized void start() {
255+
if (!this.running.get()) {
256+
this.proxyExecutor
257+
.execute(() -> {
258+
ZMQ.Socket captureSocket = null;
259+
if (this.exposeCaptureSocket) {
260+
captureSocket = this.context.createSocket(SocketType.PUB);
261+
}
262+
try (
263+
ZMQ.Socket frontendSocket = this.context.createSocket(this.type.getFrontendSocketType());
264+
ZMQ.Socket backendSocket = this.context.createSocket(this.type.getBackendSocketType());
265+
ZMQ.Socket controlSocket = this.context.createSocket(SocketType.PAIR)
266+
) {
267+
268+
if (this.frontendSocketConfigurer != null) {
269+
this.frontendSocketConfigurer.accept(frontendSocket);
270+
}
271+
272+
if (this.backendSocketConfigurer != null) {
273+
this.backendSocketConfigurer.accept(backendSocket);
274+
}
275+
276+
this.frontendPort.set(bindSocket(frontendSocket, this.frontendPort.get()));
277+
this.backendPort.set(bindSocket(backendSocket, this.backendPort.get()));
278+
boolean bound = controlSocket.bind(this.controlAddress);
279+
if (!bound) {
280+
throw new IllegalArgumentException("Cannot bind ZeroMQ socket to address: "
281+
+ this.controlAddress);
282+
}
283+
if (captureSocket != null) {
284+
bound = captureSocket.bind(this.captureAddress);
285+
if (!bound) {
286+
throw new IllegalArgumentException("Cannot bind ZeroMQ socket to address: "
287+
+ this.captureAddress);
288+
}
289+
}
290+
this.running.set(true);
291+
ZMQ.proxy(frontendSocket, backendSocket, captureSocket, controlSocket);
292+
}
293+
catch (Exception ex) {
294+
LOG.error("Cannot start ZeroMQ proxy from bean: " + this.beanName, ex);
295+
}
296+
finally {
297+
if (captureSocket != null) {
298+
captureSocket.close();
299+
}
300+
}
301+
});
302+
}
303+
}
304+
305+
@Override
306+
public synchronized void stop() {
307+
if (this.running.getAndSet(false)) {
308+
try (ZMQ.Socket commandSocket = this.context.createSocket(SocketType.PAIR)) {
309+
commandSocket.connect(this.controlAddress);
310+
commandSocket.send(zmq.ZMQ.PROXY_TERMINATE);
311+
}
312+
}
313+
}
314+
315+
@Override
316+
public boolean isRunning() {
317+
return this.running.get();
318+
}
319+
320+
@Override
321+
public void destroy() {
322+
if (!this.proxyExecutorExplicitlySet) {
323+
((ExecutorService) this.proxyExecutor).shutdown();
324+
}
325+
}
326+
327+
private static int bindSocket(ZMQ.Socket socket, int port) {
328+
if (port == 0) {
329+
return socket.bindToRandomPort("tcp://*");
330+
}
331+
else {
332+
boolean bound = socket.bind("tcp://*:" + port);
333+
if (!bound) {
334+
throw new IllegalArgumentException("Cannot bind ZeroMQ socket to port: " + port);
335+
}
336+
return port;
337+
}
338+
}
339+
340+
public enum Type {
341+
342+
SUB_PUB(SocketType.XSUB, SocketType.XPUB),
343+
344+
PULL_PUSH(SocketType.PULL, SocketType.PUSH),
345+
346+
ROUTER_DEALER(SocketType.ROUTER, SocketType.DEALER);
347+
348+
private final SocketType frontendSocketType;
349+
350+
private final SocketType backendSocketType;
351+
352+
Type(SocketType frontendSocketType, SocketType backendSocketType) {
353+
this.frontendSocketType = frontendSocketType;
354+
this.backendSocketType = backendSocketType;
355+
}
356+
357+
public SocketType getFrontendSocketType() {
358+
return this.frontendSocketType;
359+
}
360+
361+
public SocketType getBackendSocketType() {
362+
return this.backendSocketType;
363+
}
364+
365+
}
366+
367+
}

0 commit comments

Comments
 (0)