Skip to content

Commit 7da58b3

Browse files
artembilangaryrussell
authored andcommitted
Fix ZeroMQ components initialization
SO: https://stackoverflow.com/questions/67214907/zeromq-with-spring-spring-integration-zeromq The `Mono` is created in several places in ZeroMQ components from their constructors. That leads to the reactive stream to be configured just after ctor, which will ignore any changes to the options which are used from that `Mono` definition. For example this code `.doOnNext(this.sendSocketConfigurer)` is done once during reactive stream definition. * Fix all the ZeroMQ components to defer usage of the options which could be changed after ctor initialization * Cover affected option changes in the tests **Cherry-pick to `5.4.x`**
1 parent f97e44f commit 7da58b3

File tree

5 files changed

+55
-8
lines changed

5 files changed

+55
-8
lines changed

spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/channel/ZeroMqChannel.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -168,7 +168,7 @@ private Mono<ZMQ.Socket> prepareSendSocketMono(Supplier<String> localPairConnect
168168
? SocketType.PAIR
169169
: (this.pubSub ? SocketType.PUB : SocketType.PUSH))
170170
))
171-
.doOnNext(this.sendSocketConfigurer)
171+
.doOnNext((socket) -> this.sendSocketConfigurer.accept(socket))
172172
.doOnNext((socket) ->
173173
socket.connect(this.connectSendUrl != null
174174
? this.connectSendUrl
@@ -184,7 +184,7 @@ private Mono<ZMQ.Socket> prepareSubscribeSocketMono(Supplier<String> localPairCo
184184
this.connectSubscribeUrl == null
185185
? SocketType.PAIR
186186
: (this.pubSub ? SocketType.SUB : SocketType.PULL))))
187-
.doOnNext(this.subscribeSocketConfigurer)
187+
.doOnNext((socket) -> this.subscribeSocketConfigurer.accept(socket))
188188
.doOnNext((socket) -> {
189189
if (this.connectSubscribeUrl != null) {
190190
if (this.pubSub) {
@@ -213,7 +213,7 @@ private Flux<? extends Message<?>> prepareSubscriberDataFlux() {
213213
return Mono.empty();
214214
})
215215
.publishOn(Schedulers.parallel())
216-
.map(this.messageMapper::toMessage)
216+
.map((data) -> this.messageMapper.toMessage(data))
217217
.doOnError((error) -> logger.error(error,
218218
() -> "Error processing ZeroMQ message in the " + this))
219219
.repeatWhenEmpty((repeat) ->

spring-integration-zeromq/src/main/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -104,7 +104,7 @@ public ZeroMqMessageHandler(ZContext context, String connectUrl, SocketType sock
104104
this.socketMono =
105105
Mono.just(context.createSocket(socketType))
106106
.publishOn(this.publisherScheduler)
107-
.doOnNext(this.socketConfigurer)
107+
.doOnNext((socket) -> this.socketConfigurer.accept(socket))
108108
.doOnNext((socket) -> socket.connect(connectUrl))
109109
.cache()
110110
.publishOn(this.publisherScheduler);

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/channel/ZeroMqChannelTests.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -20,9 +20,11 @@
2020
import static org.awaitility.Awaitility.await;
2121

2222
import java.time.Duration;
23+
import java.util.Map;
2324
import java.util.concurrent.BlockingQueue;
2425
import java.util.concurrent.LinkedBlockingQueue;
2526
import java.util.concurrent.TimeUnit;
27+
import java.util.concurrent.atomic.AtomicBoolean;
2628

2729
import org.junit.jupiter.api.AfterAll;
2830
import org.junit.jupiter.api.Test;
@@ -31,10 +33,13 @@
3133
import org.zeromq.ZMQ;
3234

3335
import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper;
36+
import org.springframework.integration.test.util.TestUtils;
3437
import org.springframework.integration.zeromq.ZeroMqProxy;
3538
import org.springframework.messaging.Message;
3639
import org.springframework.messaging.support.GenericMessage;
3740

41+
import reactor.core.publisher.Mono;
42+
3843
/**
3944
* @author Artem Bilan
4045
*
@@ -54,8 +59,29 @@ void testSimpleSendAndReceive() throws InterruptedException {
5459
ZeroMqChannel channel = new ZeroMqChannel(CONTEXT);
5560
channel.setBeanName("testChannel1");
5661
channel.setConsumeDelay(Duration.ofMillis(10));
62+
channel.setSendSocketConfigurer(socket -> socket.setZapDomain("global"));
63+
channel.setSubscribeSocketConfigurer(socket -> socket.setZapDomain("local"));
64+
AtomicBoolean customMessageMapperCalled = new AtomicBoolean();
65+
channel.setMessageMapper(new EmbeddedJsonHeadersMessageMapper() {
66+
67+
@Override public Message<?> toMessage(byte[] bytes, Map<String, Object> headers) {
68+
customMessageMapperCalled.set(true);
69+
return super.toMessage(bytes, headers);
70+
}
71+
72+
});
5773
channel.afterPropertiesSet();
5874

75+
@SuppressWarnings("unchecked")
76+
Mono<ZMQ.Socket> sendSocketMono = TestUtils.getPropertyValue(channel, "sendSocket", Mono.class);
77+
ZMQ.Socket sendSocket = sendSocketMono.block(Duration.ofSeconds(10));
78+
assertThat(sendSocket.getZapDomain()).isEqualTo("global");
79+
80+
@SuppressWarnings("unchecked")
81+
Mono<ZMQ.Socket> subscribeSocketMono = TestUtils.getPropertyValue(channel, "subscribeSocket", Mono.class);
82+
ZMQ.Socket subscribeSocket = subscribeSocketMono.block(Duration.ofSeconds(10));
83+
assertThat(subscribeSocket.getZapDomain()).isEqualTo("local");
84+
5985
BlockingQueue<Message<?>> received = new LinkedBlockingQueue<>();
6086

6187
channel.subscribe(received::offer);
@@ -78,6 +104,8 @@ void testSimpleSendAndReceive() throws InterruptedException {
78104
assertThat(received.poll(100, TimeUnit.MILLISECONDS)).isNull();
79105

80106
channel.destroy();
107+
108+
assertThat(customMessageMapperCalled.get()).isTrue();
81109
}
82110

83111
@Test

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/inbound/ZeroMqMessageProducerTests.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2020 the original author or authors.
2+
* Copyright 2020-2021 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -33,8 +33,10 @@
3333

3434
import org.springframework.beans.factory.BeanFactory;
3535
import org.springframework.integration.channel.FluxMessageChannel;
36+
import org.springframework.integration.test.util.TestUtils;
3637
import org.springframework.messaging.support.GenericMessage;
3738

39+
import reactor.core.publisher.Mono;
3840
import reactor.test.StepVerifier;
3941

4042
/**
@@ -67,9 +69,15 @@ void testMessageProducerForPair() {
6769
messageProducer.setMessageMapper((object, headers) -> new GenericMessage<>(new String(object)));
6870
messageProducer.setConsumeDelay(Duration.ofMillis(10));
6971
messageProducer.setBeanFactory(mock(BeanFactory.class));
72+
messageProducer.setSocketConfigurer(s -> s.setZapDomain("global"));
7073
messageProducer.afterPropertiesSet();
7174
messageProducer.start();
7275

76+
@SuppressWarnings("unchecked")
77+
Mono<ZMQ.Socket> socketMono = TestUtils.getPropertyValue(messageProducer, "socketMono", Mono.class);
78+
ZMQ.Socket socketInUse = socketMono.block(Duration.ofSeconds(10));
79+
assertThat(socketInUse.getZapDomain()).isEqualTo("global");
80+
7381
ZMQ.Socket socket = CONTEXT.createSocket(SocketType.PAIR);
7482

7583
await().until(() -> messageProducer.getBoundPort() > 0);

spring-integration-zeromq/src/test/java/org/springframework/integration/zeromq/outbound/ZeroMqMessageHandlerTests.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,8 @@
2121
import static org.awaitility.Awaitility.await;
2222
import static org.mockito.Mockito.mock;
2323

24+
import java.time.Duration;
25+
2426
import org.junit.jupiter.api.AfterAll;
2527
import org.junit.jupiter.api.Test;
2628
import org.zeromq.SocketType;
@@ -32,11 +34,14 @@
3234
import org.springframework.integration.expression.FunctionExpression;
3335
import org.springframework.integration.support.MessageBuilder;
3436
import org.springframework.integration.support.json.EmbeddedJsonHeadersMessageMapper;
37+
import org.springframework.integration.test.util.TestUtils;
3538
import org.springframework.integration.zeromq.ZeroMqProxy;
3639
import org.springframework.messaging.Message;
3740
import org.springframework.messaging.converter.ByteArrayMessageConverter;
3841
import org.springframework.messaging.support.GenericMessage;
3942

43+
import reactor.core.publisher.Mono;
44+
4045
/**
4146
* @author Artem Bilan
4247
*
@@ -59,8 +64,14 @@ void testMessageHandlerForPair() {
5964

6065
ZeroMqMessageHandler messageHandler = new ZeroMqMessageHandler(CONTEXT, socketAddress);
6166
messageHandler.setBeanFactory(mock(BeanFactory.class));
67+
messageHandler.setSocketConfigurer(s -> s.setZapDomain("global"));
6268
messageHandler.afterPropertiesSet();
6369

70+
@SuppressWarnings("unchecked")
71+
Mono<ZMQ.Socket> socketMono = TestUtils.getPropertyValue(messageHandler, "socketMono", Mono.class);
72+
ZMQ.Socket socketInUse = socketMono.block(Duration.ofSeconds(10));
73+
assertThat(socketInUse.getZapDomain()).isEqualTo("global");
74+
6475
Message<?> testMessage = new GenericMessage<>("test");
6576
messageHandler.handleMessage(testMessage).subscribe();
6677

0 commit comments

Comments
 (0)