diff --git a/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java index 661be8e5c5a52..988987a3f1c4a 100644 --- a/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java @@ -20,6 +20,7 @@ import java.net.Inet6Address; import java.net.InetAddress; import java.net.UnknownHostException; +import java.time.Duration; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.logging.Level; @@ -43,7 +44,11 @@ class BoundZmqEventBus implements EventBus { private final ExecutorService executor; BoundZmqEventBus( - ZContext context, String publishConnection, String subscribeConnection, Secret secret) { + ZContext context, + String publishConnection, + String subscribeConnection, + Secret secret, + Duration heartbeatPeriod) { String address = new NetworkUtils().getHostAddress(); Addresses xpubAddr = deriveAddresses(address, publishConnection); Addresses xsubAddr = deriveAddresses(address, subscribeConnection); @@ -53,11 +58,13 @@ class BoundZmqEventBus implements EventBus { xpub = context.createSocket(SocketType.XPUB); xpub.setIPv6(xpubAddr.isIPv6); xpub.setImmediate(true); + ZmqUtils.configureHeartbeat(xpub, heartbeatPeriod, "XPUB"); xpub.bind(xpubAddr.bindTo); xsub = context.createSocket(SocketType.XSUB); xsub.setIPv6(xsubAddr.isIPv6); xsub.setImmediate(true); + ZmqUtils.configureHeartbeat(xsub, heartbeatPeriod, "XSUB"); xsub.bind(xsubAddr.bindTo); executor = @@ -68,8 +75,9 @@ class BoundZmqEventBus implements EventBus { return thread; }); executor.submit(() -> ZMQ.proxy(xsub, xpub, null)); - - delegate = new UnboundZmqEventBus(context, xpubAddr.advertise, xsubAddr.advertise, secret); + delegate = + new UnboundZmqEventBus( + context, xpubAddr.advertise, xsubAddr.advertise, secret, heartbeatPeriod); } @Override diff --git a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java index 4f14782159491..e3d255bde576e 100644 --- a/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java @@ -27,6 +27,7 @@ import java.net.URI; import java.net.URISyntaxException; import java.net.UnknownHostException; +import java.time.Duration; import java.time.temporal.ChronoUnit; import java.util.ArrayList; import java.util.LinkedList; @@ -76,7 +77,11 @@ class UnboundZmqEventBus implements EventBus { private ZMQ.Socket sub; UnboundZmqEventBus( - ZContext context, String publishConnection, String subscribeConnection, Secret secret) { + ZContext context, + String publishConnection, + String subscribeConnection, + Secret secret, + Duration heartbeatPeriod) { Require.nonNull("Secret", secret); StringBuilder builder = new StringBuilder(); try (JsonOutput out = JSON.newOutput(builder)) { @@ -136,11 +141,13 @@ class UnboundZmqEventBus implements EventBus { () -> { sub = context.createSocket(SocketType.SUB); sub.setIPv6(isSubAddressIPv6(publishConnection)); + ZmqUtils.configureHeartbeat(sub, heartbeatPeriod, "SUB"); sub.connect(publishConnection); sub.subscribe(new byte[0]); pub = context.createSocket(SocketType.PUB); pub.setIPv6(isSubAddressIPv6(subscribeConnection)); + ZmqUtils.configureHeartbeat(pub, heartbeatPeriod, "PUB"); pub.connect(subscribeConnection); }); // Connections are already established diff --git a/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java b/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java index 318846690b9bc..79502bd733ee6 100644 --- a/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java +++ b/java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java @@ -21,6 +21,7 @@ import java.net.URI; import java.net.URISyntaxException; +import java.time.Duration; import java.util.function.Consumer; import org.openqa.selenium.events.EventBus; import org.openqa.selenium.events.EventListener; @@ -38,6 +39,7 @@ public class ZeroMqEventBus { private static final String EVENTS_SECTION = "events"; + private static final int DEFAULT_HEARTBEAT_PERIOD_SECONDS = 60; private ZeroMqEventBus() { // Use the create method. @@ -45,10 +47,26 @@ private ZeroMqEventBus() { public static EventBus create( ZContext context, String publish, String subscribe, boolean bind, Secret secret) { + return create( + context, + publish, + subscribe, + bind, + secret, + Duration.ofSeconds(DEFAULT_HEARTBEAT_PERIOD_SECONDS)); + } + + public static EventBus create( + ZContext context, + String publish, + String subscribe, + boolean bind, + Secret secret, + Duration heartbeatPeriod) { if (bind) { - return new BoundZmqEventBus(context, publish, subscribe, secret); + return new BoundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod); } - return new UnboundZmqEventBus(context, publish, subscribe, secret); + return new UnboundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod); } public static EventBus create(Config config) { @@ -85,10 +103,25 @@ public static EventBus create(Config config) { }); boolean bind = config.getBool(EVENTS_SECTION, "bind").orElse(false); + Duration heartbeatPeriod = getHeartbeatPeriod(config); SecretOptions secretOptions = new SecretOptions(config); - return create(new ZContext(), publish, subscribe, bind, secretOptions.getRegistrationSecret()); + return create( + new ZContext(), + publish, + subscribe, + bind, + secretOptions.getRegistrationSecret(), + heartbeatPeriod); + } + + private static Duration getHeartbeatPeriod(Config config) { + int periodSeconds = + config + .getInt(EVENTS_SECTION, "eventbus-heartbeat-period") + .orElse(DEFAULT_HEARTBEAT_PERIOD_SECONDS); + return Duration.ofSeconds(periodSeconds); } private static String mungeUri(URI base, String scheme, int port) { diff --git a/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java b/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java new file mode 100644 index 0000000000000..1ad19d54725a3 --- /dev/null +++ b/java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java @@ -0,0 +1,103 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.events.zeromq; + +import java.time.Duration; +import java.util.logging.Level; +import java.util.logging.Logger; +import org.zeromq.ZMQ; + +class ZmqUtils { + + private static final Logger LOG = Logger.getLogger(ZmqUtils.class.getName()); + + // Minimum heartbeat interval: 1 second + private static final long MIN_HEARTBEAT_MS = 1_000L; + // Maximum heartbeat interval: ~24 days (to prevent overflow when multiplied by 6) + private static final long MAX_HEARTBEAT_MS = Integer.MAX_VALUE / 6; + + private ZmqUtils() {} + + /** + * Configures ZeroMQ heartbeat settings on a socket to prevent stale connections. + * + *

The heartbeat interval is clamped between 1 second and ~24 days to prevent integer overflow + * and ensure reasonable values. If the provided duration is outside this range, it will be + * adjusted and a warning will be logged. + * + * @param socket The ZMQ socket to configure + * @param heartbeatPeriod The heartbeat interval duration + * @param socketType The socket type name for logging (e.g., "SUB", "PUB", "XPUB", "XSUB") + */ + static void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) { + if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) { + long heartbeatMs = heartbeatPeriod.toMillis(); + long clampedHeartbeatMs = clampHeartbeatInterval(heartbeatMs, socketType); + + // Safe to cast to int now + int heartbeatIvl = (int) clampedHeartbeatMs; + int heartbeatTimeout = heartbeatIvl * 3; + int heartbeatTtl = heartbeatIvl * 6; + + socket.setHeartbeatIvl(heartbeatIvl); + socket.setHeartbeatTimeout(heartbeatTimeout); + socket.setHeartbeatTtl(heartbeatTtl); + + LOG.info( + String.format( + "ZMQ %s socket heartbeat configured: interval=%ds, timeout=%ds, ttl=%ds", + socketType, heartbeatIvl / 1000, heartbeatTimeout / 1000, heartbeatTtl / 1000)); + } + } + + /** + * Clamps the heartbeat interval to safe bounds and logs warnings if adjustments are made. + * + * @param heartbeatMs The heartbeat interval in milliseconds + * @param socketType The socket type for logging + * @return The clamped heartbeat interval + */ + private static long clampHeartbeatInterval(long heartbeatMs, String socketType) { + if (heartbeatMs < MIN_HEARTBEAT_MS) { + logHeartbeatClampWarning(socketType, heartbeatMs, MIN_HEARTBEAT_MS, "below minimum"); + return MIN_HEARTBEAT_MS; + } + if (heartbeatMs > MAX_HEARTBEAT_MS) { + logHeartbeatClampWarning(socketType, heartbeatMs, MAX_HEARTBEAT_MS, "exceeds maximum"); + return MAX_HEARTBEAT_MS; + } + return heartbeatMs; + } + + /** + * Logs a warning when the heartbeat interval is clamped. + * + * @param socketType The socket type + * @param originalMs The original interval value in milliseconds + * @param clampedMs The clamped interval value in milliseconds + * @param reason The reason for clamping + */ + private static void logHeartbeatClampWarning( + String socketType, long originalMs, long clampedMs, String reason) { + LOG.log( + Level.WARNING, + String.format( + "ZMQ %s socket heartbeat interval %ds is %s, clamping to %ds", + socketType, originalMs / 1000, reason, clampedMs / 1000)); + } +} diff --git a/java/src/org/openqa/selenium/grid/server/EventBusFlags.java b/java/src/org/openqa/selenium/grid/server/EventBusFlags.java index cd7a11ff08cb0..ab88dd96062f7 100644 --- a/java/src/org/openqa/selenium/grid/server/EventBusFlags.java +++ b/java/src/org/openqa/selenium/grid/server/EventBusFlags.java @@ -65,6 +65,12 @@ public class EventBusFlags implements HasRoles { example = "\"org.openqa.selenium.events.zeromq.ZeroMqEventBus\"") private String implementation; + @Parameter( + names = {"--eventbus-heartbeat-period"}, + description = "How often, in seconds, will the EventBus socket send heartbeats") + @ConfigValue(section = EVENTS_SECTION, name = "eventbus-heartbeat-period", example = "30") + private int eventbusHeartbeatPeriod; + @Override public Set getRoles() { return ImmutableSet.of(EVENT_BUS_ROLE); diff --git a/java/src/org/openqa/selenium/grid/server/EventBusOptions.java b/java/src/org/openqa/selenium/grid/server/EventBusOptions.java index 45f130fb2eace..a78e833186465 100644 --- a/java/src/org/openqa/selenium/grid/server/EventBusOptions.java +++ b/java/src/org/openqa/selenium/grid/server/EventBusOptions.java @@ -17,6 +17,7 @@ package org.openqa.selenium.grid.server; +import java.time.Duration; import org.openqa.selenium.events.EventBus; import org.openqa.selenium.grid.config.Config; import org.openqa.selenium.internal.Require; @@ -25,6 +26,7 @@ public class EventBusOptions { static final String EVENTS_SECTION = "events"; private static final String DEFAULT_CLASS = "org.openqa.selenium.events.zeromq.ZeroMqEventBus"; + private static final int DEFAULT_HEARTBEAT_PERIOD = 60; private final Config config; private volatile EventBus bus; @@ -47,6 +49,12 @@ public EventBus getEventBus() { return localBus; } + public Duration getHeartbeatPeriod() { + int period = + config.getInt(EVENTS_SECTION, "eventbus-heartbeat-period").orElse(DEFAULT_HEARTBEAT_PERIOD); + return Duration.ofSeconds(period); + } + private EventBus createBus() { return config.getClass(EVENTS_SECTION, "implementation", EventBus.class, DEFAULT_CLASS); }