Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 11 additions & 3 deletions java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.net.Inet6Address;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.time.Duration;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.logging.Level;
Expand All @@ -43,7 +44,11 @@ class BoundZmqEventBus implements EventBus {
private final ExecutorService executor;

BoundZmqEventBus(
ZContext context, String publishConnection, String subscribeConnection, Secret secret) {
ZContext context,
String publishConnection,
String subscribeConnection,
Secret secret,
Duration heartbeatPeriod) {
String address = new NetworkUtils().getHostAddress();
Addresses xpubAddr = deriveAddresses(address, publishConnection);
Addresses xsubAddr = deriveAddresses(address, subscribeConnection);
Expand All @@ -53,11 +58,13 @@ class BoundZmqEventBus implements EventBus {
xpub = context.createSocket(SocketType.XPUB);
xpub.setIPv6(xpubAddr.isIPv6);
xpub.setImmediate(true);
ZmqUtils.configureHeartbeat(xpub, heartbeatPeriod, "XPUB");
xpub.bind(xpubAddr.bindTo);

xsub = context.createSocket(SocketType.XSUB);
xsub.setIPv6(xsubAddr.isIPv6);
xsub.setImmediate(true);
ZmqUtils.configureHeartbeat(xsub, heartbeatPeriod, "XSUB");
xsub.bind(xsubAddr.bindTo);

executor =
Expand All @@ -68,8 +75,9 @@ class BoundZmqEventBus implements EventBus {
return thread;
});
executor.submit(() -> ZMQ.proxy(xsub, xpub, null));

delegate = new UnboundZmqEventBus(context, xpubAddr.advertise, xsubAddr.advertise, secret);
delegate =
new UnboundZmqEventBus(
context, xpubAddr.advertise, xsubAddr.advertise, secret, heartbeatPeriod);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.ArrayList;
import java.util.LinkedList;
Expand Down Expand Up @@ -76,7 +77,11 @@ class UnboundZmqEventBus implements EventBus {
private ZMQ.Socket sub;

UnboundZmqEventBus(
ZContext context, String publishConnection, String subscribeConnection, Secret secret) {
ZContext context,
String publishConnection,
String subscribeConnection,
Secret secret,
Duration heartbeatPeriod) {
Require.nonNull("Secret", secret);
StringBuilder builder = new StringBuilder();
try (JsonOutput out = JSON.newOutput(builder)) {
Expand Down Expand Up @@ -136,11 +141,13 @@ class UnboundZmqEventBus implements EventBus {
() -> {
sub = context.createSocket(SocketType.SUB);
sub.setIPv6(isSubAddressIPv6(publishConnection));
ZmqUtils.configureHeartbeat(sub, heartbeatPeriod, "SUB");
sub.connect(publishConnection);
sub.subscribe(new byte[0]);

pub = context.createSocket(SocketType.PUB);
pub.setIPv6(isSubAddressIPv6(subscribeConnection));
ZmqUtils.configureHeartbeat(pub, heartbeatPeriod, "PUB");
pub.connect(subscribeConnection);
});
// Connections are already established
Expand Down
39 changes: 36 additions & 3 deletions java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.net.URI;
import java.net.URISyntaxException;
import java.time.Duration;
import java.util.function.Consumer;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.events.EventListener;
Expand All @@ -38,17 +39,34 @@
public class ZeroMqEventBus {

private static final String EVENTS_SECTION = "events";
private static final int DEFAULT_HEARTBEAT_PERIOD_SECONDS = 60;

private ZeroMqEventBus() {
// Use the create method.
}

public static EventBus create(
ZContext context, String publish, String subscribe, boolean bind, Secret secret) {
return create(
context,
publish,
subscribe,
bind,
secret,
Duration.ofSeconds(DEFAULT_HEARTBEAT_PERIOD_SECONDS));
}

public static EventBus create(
ZContext context,
String publish,
String subscribe,
boolean bind,
Secret secret,
Duration heartbeatPeriod) {
if (bind) {
return new BoundZmqEventBus(context, publish, subscribe, secret);
return new BoundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod);
}
return new UnboundZmqEventBus(context, publish, subscribe, secret);
return new UnboundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod);
}

public static EventBus create(Config config) {
Expand Down Expand Up @@ -85,10 +103,25 @@ public static EventBus create(Config config) {
});

boolean bind = config.getBool(EVENTS_SECTION, "bind").orElse(false);
Duration heartbeatPeriod = getHeartbeatPeriod(config);

SecretOptions secretOptions = new SecretOptions(config);

return create(new ZContext(), publish, subscribe, bind, secretOptions.getRegistrationSecret());
return create(
new ZContext(),
publish,
subscribe,
bind,
secretOptions.getRegistrationSecret(),
heartbeatPeriod);
}

private static Duration getHeartbeatPeriod(Config config) {
int periodSeconds =
config
.getInt(EVENTS_SECTION, "eventbus-heartbeat-period")
.orElse(DEFAULT_HEARTBEAT_PERIOD_SECONDS);
return Duration.ofSeconds(periodSeconds);
}

private static String mungeUri(URI base, String scheme, int port) {
Expand Down
103 changes: 103 additions & 0 deletions java/src/org/openqa/selenium/events/zeromq/ZmqUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Licensed to the Software Freedom Conservancy (SFC) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The SFC licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

package org.openqa.selenium.events.zeromq;

import java.time.Duration;
import java.util.logging.Level;
import java.util.logging.Logger;
import org.zeromq.ZMQ;

class ZmqUtils {

private static final Logger LOG = Logger.getLogger(ZmqUtils.class.getName());

// Minimum heartbeat interval: 1 second
private static final long MIN_HEARTBEAT_MS = 1_000L;
// Maximum heartbeat interval: ~24 days (to prevent overflow when multiplied by 6)
private static final long MAX_HEARTBEAT_MS = Integer.MAX_VALUE / 6;

private ZmqUtils() {}

/**
* Configures ZeroMQ heartbeat settings on a socket to prevent stale connections.
*
* <p>The heartbeat interval is clamped between 1 second and ~24 days to prevent integer overflow
* and ensure reasonable values. If the provided duration is outside this range, it will be
* adjusted and a warning will be logged.
*
* @param socket The ZMQ socket to configure
* @param heartbeatPeriod The heartbeat interval duration
* @param socketType The socket type name for logging (e.g., "SUB", "PUB", "XPUB", "XSUB")
*/
static void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) {
if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) {
long heartbeatMs = heartbeatPeriod.toMillis();
long clampedHeartbeatMs = clampHeartbeatInterval(heartbeatMs, socketType);

// Safe to cast to int now
int heartbeatIvl = (int) clampedHeartbeatMs;
int heartbeatTimeout = heartbeatIvl * 3;
int heartbeatTtl = heartbeatIvl * 6;

socket.setHeartbeatIvl(heartbeatIvl);
socket.setHeartbeatTimeout(heartbeatTimeout);
socket.setHeartbeatTtl(heartbeatTtl);

LOG.info(
String.format(
"ZMQ %s socket heartbeat configured: interval=%ds, timeout=%ds, ttl=%ds",
socketType, heartbeatIvl / 1000, heartbeatTimeout / 1000, heartbeatTtl / 1000));
}
}

/**
* Clamps the heartbeat interval to safe bounds and logs warnings if adjustments are made.
*
* @param heartbeatMs The heartbeat interval in milliseconds
* @param socketType The socket type for logging
* @return The clamped heartbeat interval
*/
private static long clampHeartbeatInterval(long heartbeatMs, String socketType) {
if (heartbeatMs < MIN_HEARTBEAT_MS) {
logHeartbeatClampWarning(socketType, heartbeatMs, MIN_HEARTBEAT_MS, "below minimum");
return MIN_HEARTBEAT_MS;
}
if (heartbeatMs > MAX_HEARTBEAT_MS) {
logHeartbeatClampWarning(socketType, heartbeatMs, MAX_HEARTBEAT_MS, "exceeds maximum");
return MAX_HEARTBEAT_MS;
}
return heartbeatMs;
}

/**
* Logs a warning when the heartbeat interval is clamped.
*
* @param socketType The socket type
* @param originalMs The original interval value in milliseconds
* @param clampedMs The clamped interval value in milliseconds
* @param reason The reason for clamping
*/
private static void logHeartbeatClampWarning(
String socketType, long originalMs, long clampedMs, String reason) {
LOG.log(
Level.WARNING,
String.format(
"ZMQ %s socket heartbeat interval %ds is %s, clamping to %ds",
socketType, originalMs / 1000, reason, clampedMs / 1000));
}
}
6 changes: 6 additions & 0 deletions java/src/org/openqa/selenium/grid/server/EventBusFlags.java
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,12 @@ public class EventBusFlags implements HasRoles {
example = "\"org.openqa.selenium.events.zeromq.ZeroMqEventBus\"")
private String implementation;

@Parameter(
names = {"--eventbus-heartbeat-period"},
description = "How often, in seconds, will the EventBus socket send heartbeats")
@ConfigValue(section = EVENTS_SECTION, name = "eventbus-heartbeat-period", example = "30")
private int eventbusHeartbeatPeriod;

@Override
public Set<Role> getRoles() {
return ImmutableSet.of(EVENT_BUS_ROLE);
Expand Down
8 changes: 8 additions & 0 deletions java/src/org/openqa/selenium/grid/server/EventBusOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.openqa.selenium.grid.server;

import java.time.Duration;
import org.openqa.selenium.events.EventBus;
import org.openqa.selenium.grid.config.Config;
import org.openqa.selenium.internal.Require;
Expand All @@ -25,6 +26,7 @@ public class EventBusOptions {

static final String EVENTS_SECTION = "events";
private static final String DEFAULT_CLASS = "org.openqa.selenium.events.zeromq.ZeroMqEventBus";
private static final int DEFAULT_HEARTBEAT_PERIOD = 60;
private final Config config;
private volatile EventBus bus;

Expand All @@ -47,6 +49,12 @@ public EventBus getEventBus() {
return localBus;
}

public Duration getHeartbeatPeriod() {
int period =
config.getInt(EVENTS_SECTION, "eventbus-heartbeat-period").orElse(DEFAULT_HEARTBEAT_PERIOD);
return Duration.ofSeconds(period);
}

private EventBus createBus() {
return config.getClass(EVENTS_SECTION, "implementation", EventBus.class, DEFAULT_CLASS);
}
Expand Down
Loading