Skip to content

Commit cc71cd0

Browse files
committed
Fix review comment
Signed-off-by: Viet Nguyen Duc <[email protected]>
1 parent 4d808cf commit cc71cd0

File tree

4 files changed

+74
-40
lines changed

4 files changed

+74
-40
lines changed

java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -58,13 +58,13 @@ class BoundZmqEventBus implements EventBus {
5858
xpub = context.createSocket(SocketType.XPUB);
5959
xpub.setIPv6(xpubAddr.isIPv6);
6060
xpub.setImmediate(true);
61-
configureHeartbeat(xpub, heartbeatPeriod, "XPUB");
61+
ZmqUtils.configureHeartbeat(xpub, heartbeatPeriod, "XPUB");
6262
xpub.bind(xpubAddr.bindTo);
6363

6464
xsub = context.createSocket(SocketType.XSUB);
6565
xsub.setIPv6(xsubAddr.isIPv6);
6666
xsub.setImmediate(true);
67-
configureHeartbeat(xsub, heartbeatPeriod, "XSUB");
67+
ZmqUtils.configureHeartbeat(xsub, heartbeatPeriod, "XSUB");
6868
xsub.bind(xsubAddr.bindTo);
6969

7070
executor =
@@ -75,27 +75,11 @@ class BoundZmqEventBus implements EventBus {
7575
return thread;
7676
});
7777
executor.submit(() -> ZMQ.proxy(xsub, xpub, null));
78-
7978
delegate =
8079
new UnboundZmqEventBus(
8180
context, xpubAddr.advertise, xsubAddr.advertise, secret, heartbeatPeriod);
8281
}
8382

84-
private void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) {
85-
if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) {
86-
int heartbeatIvl = (int) heartbeatPeriod.toMillis();
87-
socket.setHeartbeatIvl(heartbeatIvl);
88-
// Set heartbeat timeout to 3x the interval
89-
socket.setHeartbeatTimeout(heartbeatIvl * 3);
90-
// Set heartbeat TTL to 6x the interval
91-
socket.setHeartbeatTtl(heartbeatIvl * 6);
92-
LOG.info(
93-
String.format(
94-
"Event bus %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms",
95-
socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6));
96-
}
97-
}
98-
9983
@Override
10084
public boolean isReady() {
10185
return !executor.isShutdown();

java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java

Lines changed: 3 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -141,13 +141,13 @@ class UnboundZmqEventBus implements EventBus {
141141
() -> {
142142
sub = context.createSocket(SocketType.SUB);
143143
sub.setIPv6(isSubAddressIPv6(publishConnection));
144-
configureHeartbeat(sub, heartbeatPeriod, "SUB");
144+
ZmqUtils.configureHeartbeat(sub, heartbeatPeriod, "SUB");
145145
sub.connect(publishConnection);
146146
sub.subscribe(new byte[0]);
147147

148148
pub = context.createSocket(SocketType.PUB);
149149
pub.setIPv6(isSubAddressIPv6(subscribeConnection));
150-
configureHeartbeat(pub, heartbeatPeriod, "PUB");
150+
ZmqUtils.configureHeartbeat(pub, heartbeatPeriod, "PUB");
151151
pub.connect(subscribeConnection);
152152
});
153153
// Connections are already established
@@ -176,22 +176,7 @@ class UnboundZmqEventBus implements EventBus {
176176

177177
@Override
178178
public boolean isReady() {
179-
return !socketPollingExecutor.isShutdown();
180-
}
181-
182-
private void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) {
183-
if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) {
184-
int heartbeatIvl = (int) heartbeatPeriod.toMillis();
185-
socket.setHeartbeatIvl(heartbeatIvl);
186-
// Set heartbeat timeout to 3x the interval
187-
socket.setHeartbeatTimeout(heartbeatIvl * 3);
188-
// Set heartbeat TTL to 6x the interval
189-
socket.setHeartbeatTtl(heartbeatIvl * 6);
190-
LOG.info(
191-
String.format(
192-
"Event bus %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms",
193-
socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6));
194-
}
179+
return !socketPollingExecutor.isShutdown() && pollingStarted.get();
195180
}
196181

197182
private boolean isSubAddressIPv6(String connection) {

java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,21 @@
3939
public class ZeroMqEventBus {
4040

4141
private static final String EVENTS_SECTION = "events";
42+
private static final int DEFAULT_HEARTBEAT_PERIOD_SECONDS = 60;
4243

4344
private ZeroMqEventBus() {
4445
// Use the create method.
4546
}
4647

4748
public static EventBus create(
4849
ZContext context, String publish, String subscribe, boolean bind, Secret secret) {
49-
return create(context, publish, subscribe, bind, secret, Duration.ofSeconds(60));
50+
return create(
51+
context,
52+
publish,
53+
subscribe,
54+
bind,
55+
secret,
56+
Duration.ofSeconds(DEFAULT_HEARTBEAT_PERIOD_SECONDS));
5057
}
5158

5259
public static EventBus create(
@@ -96,9 +103,7 @@ public static EventBus create(Config config) {
96103
});
97104

98105
boolean bind = config.getBool(EVENTS_SECTION, "bind").orElse(false);
99-
int heartbeatPeriodSeconds =
100-
config.getInt(EVENTS_SECTION, "eventbus-heartbeat-period").orElse(60);
101-
Duration heartbeatPeriod = Duration.ofSeconds(heartbeatPeriodSeconds);
106+
Duration heartbeatPeriod = getHeartbeatPeriod(config);
102107

103108
SecretOptions secretOptions = new SecretOptions(config);
104109

@@ -111,6 +116,14 @@ public static EventBus create(Config config) {
111116
heartbeatPeriod);
112117
}
113118

119+
private static Duration getHeartbeatPeriod(Config config) {
120+
int periodSeconds =
121+
config
122+
.getInt(EVENTS_SECTION, "eventbus-heartbeat-period")
123+
.orElse(DEFAULT_HEARTBEAT_PERIOD_SECONDS);
124+
return Duration.ofSeconds(periodSeconds);
125+
}
126+
114127
private static String mungeUri(URI base, String scheme, int port) {
115128
try {
116129
return new URI(scheme, null, base.getHost(), port, null, null, null).toString();
Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to the Software Freedom Conservancy (SFC) under one
2+
// or more contributor license agreements. See the NOTICE file
3+
// distributed with this work for additional information
4+
// regarding copyright ownership. The SFC licenses this file
5+
// to you under the Apache License, Version 2.0 (the
6+
// "License"); you may not use this file except in compliance
7+
// with the License. You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package org.openqa.selenium.events.zeromq;
19+
20+
import java.time.Duration;
21+
import java.util.logging.Logger;
22+
import org.zeromq.ZMQ;
23+
24+
/** Utility methods for ZeroMQ socket configuration. */
25+
class ZmqUtils {
26+
27+
private static final Logger LOG = Logger.getLogger(ZmqUtils.class.getName());
28+
29+
private ZmqUtils() {
30+
// Utility class
31+
}
32+
33+
/**
34+
* Configures ZeroMQ heartbeat settings on a socket to prevent stale connections.
35+
*
36+
* @param socket The ZMQ socket to configure
37+
* @param heartbeatPeriod The heartbeat interval duration
38+
* @param socketType The socket type name for logging (e.g., "SUB", "PUB", "XPUB", "XSUB")
39+
*/
40+
static void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) {
41+
if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) {
42+
int heartbeatIvl = (int) heartbeatPeriod.toMillis();
43+
socket.setHeartbeatIvl(heartbeatIvl);
44+
socket.setHeartbeatTimeout(heartbeatIvl * 3);
45+
socket.setHeartbeatTtl(heartbeatIvl * 6);
46+
LOG.info(
47+
String.format(
48+
"ZMQ %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms",
49+
socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6));
50+
}
51+
}
52+
}

0 commit comments

Comments
 (0)