@@ -123,29 +123,10 @@ public ZeroMqChannel(ZContext context, boolean pubSub) {
123123
124124 Supplier <String > localPairConnection = () -> "inproc://" + getComponentName () + ".pair" ;
125125
126- Mono <?> proxyMono =
127- Mono .defer (() -> {
128- if (this .zeroMqProxy != null ) {
129- return Mono .fromCallable (() -> this .zeroMqProxy .getBackendPort ())
130- .filter ((port ) -> port > 0 )
131- .repeatWhenEmpty (100 , // NOSONAR
132- (repeat ) -> repeat .delayElements (Duration .ofMillis (100 ))) // NOSONAR
133- .doOnNext ((port ) ->
134- setConnectUrl ("tcp://localhost:" + this .zeroMqProxy .getFrontendPort () +
135- ':' + this .zeroMqProxy .getBackendPort ()))
136- .doOnError ((error ) ->
137- logger .error ("The provided '"
138- + this .zeroMqProxy + "' has not been started" , error ));
139- }
140- else {
141- return Mono .empty ();
142- }
143- })
144- .cache ();
126+ Mono <?> proxyMono = proxyMono ();
145127
146128 this .sendSocket =
147- proxyMono
148- .publishOn (this .publisherScheduler )
129+ proxyMono .publishOn (this .publisherScheduler )
149130 .then (Mono .fromCallable (() ->
150131 this .context .createSocket (
151132 this .connectSendUrl == null
@@ -165,8 +146,7 @@ public ZeroMqChannel(ZContext context, boolean pubSub) {
165146 .publishOn (this .publisherScheduler );
166147
167148 this .subscribeSocket =
168- proxyMono
169- .publishOn (this .subscriberScheduler )
149+ proxyMono .publishOn (this .subscriberScheduler )
170150 .then (Mono .fromCallable (() ->
171151 this .context .createSocket (
172152 this .connectSubscribeUrl == null
@@ -208,14 +188,32 @@ public ZeroMqChannel(ZContext context, boolean pubSub) {
208188 .repeat (() -> this .initialized );
209189
210190 if (this .pubSub ) {
211- receiveData = receiveData .publish ()
212- .autoConnect (1 , (disposable ) -> this .subscriberDataDisposable = disposable );
191+ receiveData =
192+ receiveData .publish ()
193+ .autoConnect (1 , (disposable ) -> this .subscriberDataDisposable = disposable );
213194 }
214195
215196 this .subscriberData = receiveData ;
216197
217198 }
218199
200+ private Mono <Integer > proxyMono () {
201+ if (this .zeroMqProxy != null ) {
202+ return Mono .fromCallable (() -> this .zeroMqProxy .getBackendPort ())
203+ .filter ((proxyPort ) -> proxyPort > 0 )
204+ .repeatWhenEmpty (100 , (repeat ) -> repeat .delayElements (Duration .ofMillis (100 ))) // NOSONAR
205+ .doOnNext ((proxyPort ) ->
206+ setConnectUrl ("tcp://localhost:" + this .zeroMqProxy .getFrontendPort () +
207+ ':' + this .zeroMqProxy .getBackendPort ()))
208+ .doOnError ((error ) ->
209+ logger .error ("The provided '" + this .zeroMqProxy + "' has not been started" , error ))
210+ .cache ();
211+ }
212+ else {
213+ return Mono .empty ();
214+ }
215+ }
216+
219217 /**
220218 * Configure a connection to the ZeroMQ proxy with the pair of ports over colon
221219 * for proxy frontend and backend sockets. Mutually exclusive with the {@link #setZeroMqProxy(ZeroMqProxy)}.
@@ -226,7 +224,7 @@ public void setConnectUrl(@Nullable String connectUrl) {
226224 if (connectUrl != null ) {
227225 this .connectSendUrl = connectUrl .substring (0 , connectUrl .lastIndexOf (':' ));
228226 this .connectSubscribeUrl =
229- this .connectSendUrl .substring (0 , this .connectSendUrl .lastIndexOf (':' ))
227+ this .connectSendUrl .substring (0 , this .connectSendUrl .lastIndexOf (':' )) // NOSONAR
230228 + connectUrl .substring (connectUrl .lastIndexOf (':' ));
231229 }
232230 }
@@ -314,7 +312,7 @@ public void destroy() {
314312 this .subscribeSocket .doOnNext (ZMQ .Socket ::close ).block ();
315313 this .subscriberScheduler .dispose ();
316314 if (this .subscriberDataDisposable != null ) {
317- this .subscriberDataDisposable .dispose ();
315+ this .subscriberDataDisposable .dispose (); // NOSONAR
318316 }
319317 }
320318
0 commit comments