From 7a60828b850b57de624cdc5373e7764a8fa0fcba Mon Sep 17 00:00:00 2001 From: Viet Nguyen Duc Date: Thu, 16 Oct 2025 02:11:01 +0700 Subject: [PATCH 1/4] [grid] Add event bus heartbeat to prevent steal connection Signed-off-by: Viet Nguyen Duc --- .../events/zeromq/BoundZmqEventBus.java | 28 +++++++++++++++++-- .../events/zeromq/UnboundZmqEventBus.java | 24 +++++++++++++++- .../events/zeromq/ZeroMqEventBus.java | 26 +++++++++++++++-- .../selenium/grid/server/EventBusFlags.java | 6 ++++ .../selenium/grid/server/EventBusOptions.java | 8 ++++++ 5 files changed, 86 insertions(+), 6 deletions(-) diff --git a/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java index 661be8e5c5a52..752d8b0eb6e96 100644 --- a/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java @@ -20,6 +20,7 @@ import java.net.Inet6Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; @@ -43,7 +44,11 @@ class BoundZmqEventBus implements EventBus { private final ExecutorService executor; BoundZmqEventBus( - ZContext context, String publishConnection, String subscribeConnection, Secret secret) { + ZContext context, + String publishConnection, + String subscribeConnection, + Secret secret, + Duration heartbeatPeriod) { String address = new NetworkUtils().getHostAddress(); Addresses xpubAddr = deriveAddresses(address, publishConnection); Addresses xsubAddr = deriveAddresses(address, subscribeConnection); @@ -53,11 +58,13 @@ class BoundZmqEventBus implements EventBus { xpub = context.createSocket(SocketType.XPUB); xpub.setIPv6(xpubAddr.isIPv6); xpub.setImmediate(true); + configureHeartbeat(xpub, heartbeatPeriod, "XPUB"); xpub.bind(xpubAddr.bindTo); xsub = context.createSocket(SocketType.XSUB); xsub.setIPv6(xsubAddr.isIPv6); xsub.setImmediate(true); + configureHeartbeat(xsub, heartbeatPeriod, "XSUB"); xsub.bind(xsubAddr.bindTo); executor = @@ -69,7 +76,24 @@ class BoundZmqEventBus implements EventBus { }); executor.submit(() -> ZMQ.proxy(xsub, xpub, null)); - delegate = new UnboundZmqEventBus(context, xpubAddr.advertise, xsubAddr.advertise, secret); + delegate = + new UnboundZmqEventBus( + context, xpubAddr.advertise, xsubAddr.advertise, secret, heartbeatPeriod); + } + + private void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) { + if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) { + int heartbeatIvl = (int) heartbeatPeriod.toMillis(); + socket.setHeartbeatIvl(heartbeatIvl); + // Set heartbeat timeout to 3x the interval + socket.setHeartbeatTimeout(heartbeatIvl * 3); + // Set heartbeat TTL to 6x the interval + socket.setHeartbeatTtl(heartbeatIvl * 6); + LOG.info( + String.format( + "Event bus %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms", + socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6)); + } } @Override diff --git a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java index 4f14782159491..cca4f56ca0d4b 100644 --- a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java @@ -27,6 +27,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.LinkedList; @@ -76,7 +77,11 @@ class UnboundZmqEventBus implements EventBus { private ZMQ.Socket sub; UnboundZmqEventBus( - ZContext context, String publishConnection, String subscribeConnection, Secret secret) { + ZContext context, + String publishConnection, + String subscribeConnection, + Secret secret, + Duration heartbeatPeriod) { Require.nonNull("Secret", secret); StringBuilder builder = new StringBuilder(); try (JsonOutput out = JSON.newOutput(builder)) { @@ -136,11 +141,13 @@ class UnboundZmqEventBus implements EventBus { () -> { sub = context.createSocket(SocketType.SUB); sub.setIPv6(isSubAddressIPv6(publishConnection)); + configureHeartbeat(sub, heartbeatPeriod, "SUB"); sub.connect(publishConnection); sub.subscribe(new byte[0]); pub = context.createSocket(SocketType.PUB); pub.setIPv6(isSubAddressIPv6(subscribeConnection)); + configureHeartbeat(pub, heartbeatPeriod, "PUB"); pub.connect(subscribeConnection); }); // Connections are already established @@ -172,6 +179,21 @@ public boolean isReady() { return !socketPollingExecutor.isShutdown(); } + private void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) { + if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) { + int heartbeatIvl = (int) heartbeatPeriod.toMillis(); + socket.setHeartbeatIvl(heartbeatIvl); + // Set heartbeat timeout to 3x the interval + socket.setHeartbeatTimeout(heartbeatIvl * 3); + // Set heartbeat TTL to 6x the interval + socket.setHeartbeatTtl(heartbeatIvl * 6); + LOG.info( + String.format( + "Event bus %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms", + socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6)); + } + } + private boolean isSubAddressIPv6(String connection) { try { URI uri = new URI(connection); diff --git a/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java index 318846690b9bc..2681e6a54f2f0 100644 --- a/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java @@ -21,6 +21,7 @@ import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.util.function.Consumer; import org.openqa.selenium.events.EventBus; import org.openqa.selenium.events.EventListener; @@ -45,10 +46,20 @@ private ZeroMqEventBus() { public static EventBus create( ZContext context, String publish, String subscribe, boolean bind, Secret secret) { + return create(context, publish, subscribe, bind, secret, Duration.ofSeconds(60)); + } + + public static EventBus create( + ZContext context, + String publish, + String subscribe, + boolean bind, + Secret secret, + Duration heartbeatPeriod) { if (bind) { - return new BoundZmqEventBus(context, publish, subscribe, secret); + return new BoundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod); } - return new UnboundZmqEventBus(context, publish, subscribe, secret); + return new UnboundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod); } public static EventBus create(Config config) { @@ -85,10 +96,19 @@ public static EventBus create(Config config) { }); boolean bind = config.getBool(EVENTS_SECTION, "bind").orElse(false); + int heartbeatPeriodSeconds = + config.getInt(EVENTS_SECTION, "eventbus-heartbeat-period").orElse(60); + Duration heartbeatPeriod = Duration.ofSeconds(heartbeatPeriodSeconds); SecretOptions secretOptions = new SecretOptions(config); - return create(new ZContext(), publish, subscribe, bind, secretOptions.getRegistrationSecret()); + return create( + new ZContext(), + publish, + subscribe, + bind, + secretOptions.getRegistrationSecret(), + heartbeatPeriod); } private static String mungeUri(URI base, String scheme, int port) { diff --git a/java/src/org/openqa/selenium/grid/server/EventBusFlags.java b/java/src/org/openqa/selenium/grid/server/EventBusFlags.java index cd7a11ff08cb0..ab88dd96062f7 100644 --- a/java/src/org/openqa/selenium/grid/server/EventBusFlags.java +++ b/java/src/org/openqa/selenium/grid/server/EventBusFlags.java @@ -65,6 +65,12 @@ public class EventBusFlags implements HasRoles { example = "\"org.openqa.selenium.events.zeromq.ZeroMqEventBus\"") private String implementation; + @Parameter( + names = {"--eventbus-heartbeat-period"}, + description = "How often, in seconds, will the EventBus socket send heartbeats") + @ConfigValue(section = EVENTS_SECTION, name = "eventbus-heartbeat-period", example = "30") + private int eventbusHeartbeatPeriod; + @Override public Set getRoles() { return ImmutableSet.of(EVENT_BUS_ROLE); diff --git a/java/src/org/openqa/selenium/grid/server/EventBusOptions.java b/java/src/org/openqa/selenium/grid/server/EventBusOptions.java index 45f130fb2eace..a78e833186465 100644 --- a/java/src/org/openqa/selenium/grid/server/EventBusOptions.java +++ b/java/src/org/openqa/selenium/grid/server/EventBusOptions.java @@ -17,6 +17,7 @@ package org.openqa.selenium.grid.server; +import java.time.Duration; import org.openqa.selenium.events.EventBus; import org.openqa.selenium.grid.config.Config; import org.openqa.selenium.internal.Require; @@ -25,6 +26,7 @@ public class EventBusOptions { static final String EVENTS_SECTION = "events"; private static final String DEFAULT_CLASS = "org.openqa.selenium.events.zeromq.ZeroMqEventBus"; + private static final int DEFAULT_HEARTBEAT_PERIOD = 60; private final Config config; private volatile EventBus bus; @@ -47,6 +49,12 @@ public EventBus getEventBus() { return localBus; } + public Duration getHeartbeatPeriod() { + int period = + config.getInt(EVENTS_SECTION, "eventbus-heartbeat-period").orElse(DEFAULT_HEARTBEAT_PERIOD); + return Duration.ofSeconds(period); + } + private EventBus createBus() { return config.getClass(EVENTS_SECTION, "implementation", EventBus.class, DEFAULT_CLASS); } From cc71cd0385b5341c931289288542142351cf607b Mon Sep 17 00:00:00 2001 From: Viet Nguyen Duc Date: Thu, 16 Oct 2025 06:01:46 +0700 Subject: [PATCH 2/4] Fix review comment Signed-off-by: Viet Nguyen Duc --- .../events/zeromq/BoundZmqEventBus.java | 20 +------ .../events/zeromq/UnboundZmqEventBus.java | 21 ++------ .../events/zeromq/ZeroMqEventBus.java | 21 ++++++-- .../selenium/events/zeromq/ZmqUtils.java | 52 +++++++++++++++++++ 4 files changed, 74 insertions(+), 40 deletions(-) create mode 100644 java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java diff --git a/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java index 752d8b0eb6e96..988987a3f1c4a 100644 --- a/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java @@ -58,13 +58,13 @@ class BoundZmqEventBus implements EventBus { xpub = context.createSocket(SocketType.XPUB); xpub.setIPv6(xpubAddr.isIPv6); xpub.setImmediate(true); - configureHeartbeat(xpub, heartbeatPeriod, "XPUB"); + ZmqUtils.configureHeartbeat(xpub, heartbeatPeriod, "XPUB"); xpub.bind(xpubAddr.bindTo); xsub = context.createSocket(SocketType.XSUB); xsub.setIPv6(xsubAddr.isIPv6); xsub.setImmediate(true); - configureHeartbeat(xsub, heartbeatPeriod, "XSUB"); + ZmqUtils.configureHeartbeat(xsub, heartbeatPeriod, "XSUB"); xsub.bind(xsubAddr.bindTo); executor = @@ -75,27 +75,11 @@ class BoundZmqEventBus implements EventBus { return thread; }); executor.submit(() -> ZMQ.proxy(xsub, xpub, null)); - delegate = new UnboundZmqEventBus( context, xpubAddr.advertise, xsubAddr.advertise, secret, heartbeatPeriod); } - private void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) { - if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) { - int heartbeatIvl = (int) heartbeatPeriod.toMillis(); - socket.setHeartbeatIvl(heartbeatIvl); - // Set heartbeat timeout to 3x the interval - socket.setHeartbeatTimeout(heartbeatIvl * 3); - // Set heartbeat TTL to 6x the interval - socket.setHeartbeatTtl(heartbeatIvl * 6); - LOG.info( - String.format( - "Event bus %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms", - socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6)); - } - } - @Override public boolean isReady() { return !executor.isShutdown(); diff --git a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java index cca4f56ca0d4b..e6efc09529244 100644 --- a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java @@ -141,13 +141,13 @@ class UnboundZmqEventBus implements EventBus { () -> { sub = context.createSocket(SocketType.SUB); sub.setIPv6(isSubAddressIPv6(publishConnection)); - configureHeartbeat(sub, heartbeatPeriod, "SUB"); + ZmqUtils.configureHeartbeat(sub, heartbeatPeriod, "SUB"); sub.connect(publishConnection); sub.subscribe(new byte[0]); pub = context.createSocket(SocketType.PUB); pub.setIPv6(isSubAddressIPv6(subscribeConnection)); - configureHeartbeat(pub, heartbeatPeriod, "PUB"); + ZmqUtils.configureHeartbeat(pub, heartbeatPeriod, "PUB"); pub.connect(subscribeConnection); }); // Connections are already established @@ -176,22 +176,7 @@ class UnboundZmqEventBus implements EventBus { @Override public boolean isReady() { - return !socketPollingExecutor.isShutdown(); - } - - private void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) { - if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) { - int heartbeatIvl = (int) heartbeatPeriod.toMillis(); - socket.setHeartbeatIvl(heartbeatIvl); - // Set heartbeat timeout to 3x the interval - socket.setHeartbeatTimeout(heartbeatIvl * 3); - // Set heartbeat TTL to 6x the interval - socket.setHeartbeatTtl(heartbeatIvl * 6); - LOG.info( - String.format( - "Event bus %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms", - socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6)); - } + return !socketPollingExecutor.isShutdown() && pollingStarted.get(); } private boolean isSubAddressIPv6(String connection) { diff --git a/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java index 2681e6a54f2f0..79502bd733ee6 100644 --- a/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java @@ -39,6 +39,7 @@ public class ZeroMqEventBus { private static final String EVENTS_SECTION = "events"; + private static final int DEFAULT_HEARTBEAT_PERIOD_SECONDS = 60; private ZeroMqEventBus() { // Use the create method. @@ -46,7 +47,13 @@ private ZeroMqEventBus() { public static EventBus create( ZContext context, String publish, String subscribe, boolean bind, Secret secret) { - return create(context, publish, subscribe, bind, secret, Duration.ofSeconds(60)); + return create( + context, + publish, + subscribe, + bind, + secret, + Duration.ofSeconds(DEFAULT_HEARTBEAT_PERIOD_SECONDS)); } public static EventBus create( @@ -96,9 +103,7 @@ public static EventBus create(Config config) { }); boolean bind = config.getBool(EVENTS_SECTION, "bind").orElse(false); - int heartbeatPeriodSeconds = - config.getInt(EVENTS_SECTION, "eventbus-heartbeat-period").orElse(60); - Duration heartbeatPeriod = Duration.ofSeconds(heartbeatPeriodSeconds); + Duration heartbeatPeriod = getHeartbeatPeriod(config); SecretOptions secretOptions = new SecretOptions(config); @@ -111,6 +116,14 @@ public static EventBus create(Config config) { heartbeatPeriod); } + private static Duration getHeartbeatPeriod(Config config) { + int periodSeconds = + config + .getInt(EVENTS_SECTION, "eventbus-heartbeat-period") + .orElse(DEFAULT_HEARTBEAT_PERIOD_SECONDS); + return Duration.ofSeconds(periodSeconds); + } + private static String mungeUri(URI base, String scheme, int port) { try { return new URI(scheme, null, base.getHost(), port, null, null, null).toString(); diff --git a/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java b/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java new file mode 100644 index 0000000000000..720dd56b5e91f --- /dev/null +++ b/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java @@ -0,0 +1,52 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.events.zeromq; + +import java.time.Duration; +import java.util.logging.Logger; +import org.zeromq.ZMQ; + +/** Utility methods for ZeroMQ socket configuration. */ +class ZmqUtils { + + private static final Logger LOG = Logger.getLogger(ZmqUtils.class.getName()); + + private ZmqUtils() { + // Utility class + } + + /** + * Configures ZeroMQ heartbeat settings on a socket to prevent stale connections. + * + * @param socket The ZMQ socket to configure + * @param heartbeatPeriod The heartbeat interval duration + * @param socketType The socket type name for logging (e.g., "SUB", "PUB", "XPUB", "XSUB") + */ + static void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) { + if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) { + int heartbeatIvl = (int) heartbeatPeriod.toMillis(); + socket.setHeartbeatIvl(heartbeatIvl); + socket.setHeartbeatTimeout(heartbeatIvl * 3); + socket.setHeartbeatTtl(heartbeatIvl * 6); + LOG.info( + String.format( + "ZMQ %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms", + socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6)); + } + } +} From b12c8f8b6b4a76d394e4c77aeae0729ba29a7141 Mon Sep 17 00:00:00 2001 From: Viet Nguyen Duc Date: Thu, 16 Oct 2025 06:38:15 +0700 Subject: [PATCH 3/4] Fix review to prevent heartbeat interval overflow Signed-off-by: Viet Nguyen Duc --- .../selenium/events/zeromq/ZmqUtils.java | 69 ++++++++++++++++--- 1 file changed, 60 insertions(+), 9 deletions(-) diff --git a/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java b/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java index 720dd56b5e91f..9adbb69560b0c 100644 --- a/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java +++ b/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java @@ -18,35 +18,86 @@ package org.openqa.selenium.events.zeromq; import java.time.Duration; +import java.util.logging.Level; import java.util.logging.Logger; import org.zeromq.ZMQ; -/** Utility methods for ZeroMQ socket configuration. */ class ZmqUtils { private static final Logger LOG = Logger.getLogger(ZmqUtils.class.getName()); - private ZmqUtils() { - // Utility class - } + // Minimum heartbeat interval: 1 second + private static final long MIN_HEARTBEAT_MS = 1_000L; + // Maximum heartbeat interval: ~24 days (to prevent overflow when multiplied by 6) + private static final long MAX_HEARTBEAT_MS = Integer.MAX_VALUE / 6; + + private ZmqUtils() {} /** * Configures ZeroMQ heartbeat settings on a socket to prevent stale connections. * + *

The heartbeat interval is clamped between 1 second and ~24 days to prevent integer overflow + * and ensure reasonable values. If the provided duration is outside this range, it will be + * adjusted and a warning will be logged. + * * @param socket The ZMQ socket to configure * @param heartbeatPeriod The heartbeat interval duration * @param socketType The socket type name for logging (e.g., "SUB", "PUB", "XPUB", "XSUB") */ static void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) { if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) { - int heartbeatIvl = (int) heartbeatPeriod.toMillis(); + long heartbeatMs = heartbeatPeriod.toMillis(); + long clampedHeartbeatMs = clampHeartbeatInterval(heartbeatMs, socketType); + + // Safe to cast to int now + int heartbeatIvl = (int) clampedHeartbeatMs; + int heartbeatTimeout = heartbeatIvl * 3; + int heartbeatTtl = heartbeatIvl * 6; + socket.setHeartbeatIvl(heartbeatIvl); - socket.setHeartbeatTimeout(heartbeatIvl * 3); - socket.setHeartbeatTtl(heartbeatIvl * 6); + socket.setHeartbeatTimeout(heartbeatTimeout); + socket.setHeartbeatTtl(heartbeatTtl); + LOG.info( String.format( - "ZMQ %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms", - socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6)); + "ZMQ %s socket heartbeat configured: interval=%ds, timeout=%ds, ttl=%ds", + socketType, heartbeatIvl / 1000, heartbeatTimeout / 1000, heartbeatTtl / 1000)); + } + } + + /** + * Clamps the heartbeat interval to safe bounds and logs warnings if adjustments are made. + * + * @param heartbeatMs The heartbeat interval in milliseconds + * @param socketType The socket type for logging + * @return The clamped heartbeat interval + */ + private static long clampHeartbeatInterval(long heartbeatMs, String socketType) { + if (heartbeatMs < MIN_HEARTBEAT_MS) { + logHeartbeatClampWarning(socketType, heartbeatMs, MIN_HEARTBEAT_MS, "below minimum"); + return MIN_HEARTBEAT_MS; } + if (heartbeatMs > MAX_HEARTBEAT_MS) { + logHeartbeatClampWarning(socketType, heartbeatMs, MAX_HEARTBEAT_MS, "exceeds maximum"); + return MAX_HEARTBEAT_MS; + } + return heartbeatMs; + } + + /** + * Logs a warning when the heartbeat interval is clamped. + * + * @param socketType The socket type + * @param originalMs The original interval value in milliseconds + * @param clampedMs The clamped interval value in milliseconds + * @param reason The reason for clamping + */ + private static void logHeartbeatClampWarning( + String socketType, long originalMs, long clampedMs, String reason) { + LOG.log( + Level.WARNING, + String.format( + "ZMQ %s socket heartbeat interval %ds %s %ds, clamping to %ds", + socketType, originalMs / 1000, reason, clampedMs / 1000, clampedMs / 1000)); } } From 831b8a4c13d87a5e3289d46ff215ac3e973818a8 Mon Sep 17 00:00:00 2001 From: Viet Nguyen Duc Date: Thu, 16 Oct 2025 09:01:35 +0700 Subject: [PATCH 4/4] Fix review comment Signed-off-by: Viet Nguyen Duc --- .../org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java | 2 +- java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java index e6efc09529244..e3d255bde576e 100644 --- a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java @@ -176,7 +176,7 @@ class UnboundZmqEventBus implements EventBus { @Override public boolean isReady() { - return !socketPollingExecutor.isShutdown() && pollingStarted.get(); + return !socketPollingExecutor.isShutdown(); } private boolean isSubAddressIPv6(String connection) { diff --git a/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java b/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java index 9adbb69560b0c..1ad19d54725a3 100644 --- a/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java +++ b/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java @@ -97,7 +97,7 @@ private static void logHeartbeatClampWarning( LOG.log( Level.WARNING, String.format( - "ZMQ %s socket heartbeat interval %ds %s %ds, clamping to %ds", - socketType, originalMs / 1000, reason, clampedMs / 1000, clampedMs / 1000)); + "ZMQ %s socket heartbeat interval %ds is %s, clamping to %ds", + socketType, originalMs / 1000, reason, clampedMs / 1000)); } }