Skip to content

Commit 7a60828

Browse files
committed
[grid] Add event bus heartbeat to prevent steal connection
Signed-off-by: Viet Nguyen Duc <[email protected]>
1 parent 65bb55f commit 7a60828

File tree

5 files changed

+86
-6
lines changed

5 files changed

+86
-6
lines changed

java/src/org/openqa/selenium/events/zeromq/BoundZmqEventBus.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.net.Inet6Address;
2121
import java.net.InetAddress;
2222
import java.net.UnknownHostException;
23+
import java.time.Duration;
2324
import java.util.concurrent.ExecutorService;
2425
import java.util.concurrent.Executors;
2526
import java.util.logging.Level;
@@ -43,7 +44,11 @@ class BoundZmqEventBus implements EventBus {
4344
private final ExecutorService executor;
4445

4546
BoundZmqEventBus(
46-
ZContext context, String publishConnection, String subscribeConnection, Secret secret) {
47+
ZContext context,
48+
String publishConnection,
49+
String subscribeConnection,
50+
Secret secret,
51+
Duration heartbeatPeriod) {
4752
String address = new NetworkUtils().getHostAddress();
4853
Addresses xpubAddr = deriveAddresses(address, publishConnection);
4954
Addresses xsubAddr = deriveAddresses(address, subscribeConnection);
@@ -53,11 +58,13 @@ class BoundZmqEventBus implements EventBus {
5358
xpub = context.createSocket(SocketType.XPUB);
5459
xpub.setIPv6(xpubAddr.isIPv6);
5560
xpub.setImmediate(true);
61+
configureHeartbeat(xpub, heartbeatPeriod, "XPUB");
5662
xpub.bind(xpubAddr.bindTo);
5763

5864
xsub = context.createSocket(SocketType.XSUB);
5965
xsub.setIPv6(xsubAddr.isIPv6);
6066
xsub.setImmediate(true);
67+
configureHeartbeat(xsub, heartbeatPeriod, "XSUB");
6168
xsub.bind(xsubAddr.bindTo);
6269

6370
executor =
@@ -69,7 +76,24 @@ class BoundZmqEventBus implements EventBus {
6976
});
7077
executor.submit(() -> ZMQ.proxy(xsub, xpub, null));
7178

72-
delegate = new UnboundZmqEventBus(context, xpubAddr.advertise, xsubAddr.advertise, secret);
79+
delegate =
80+
new UnboundZmqEventBus(
81+
context, xpubAddr.advertise, xsubAddr.advertise, secret, heartbeatPeriod);
82+
}
83+
84+
private void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) {
85+
if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) {
86+
int heartbeatIvl = (int) heartbeatPeriod.toMillis();
87+
socket.setHeartbeatIvl(heartbeatIvl);
88+
// Set heartbeat timeout to 3x the interval
89+
socket.setHeartbeatTimeout(heartbeatIvl * 3);
90+
// Set heartbeat TTL to 6x the interval
91+
socket.setHeartbeatTtl(heartbeatIvl * 6);
92+
LOG.info(
93+
String.format(
94+
"Event bus %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms",
95+
socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6));
96+
}
7397
}
7498

7599
@Override

java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import java.net.URI;
2828
import java.net.URISyntaxException;
2929
import java.net.UnknownHostException;
30+
import java.time.Duration;
3031
import java.time.temporal.ChronoUnit;
3132
import java.util.ArrayList;
3233
import java.util.LinkedList;
@@ -76,7 +77,11 @@ class UnboundZmqEventBus implements EventBus {
7677
private ZMQ.Socket sub;
7778

7879
UnboundZmqEventBus(
79-
ZContext context, String publishConnection, String subscribeConnection, Secret secret) {
80+
ZContext context,
81+
String publishConnection,
82+
String subscribeConnection,
83+
Secret secret,
84+
Duration heartbeatPeriod) {
8085
Require.nonNull("Secret", secret);
8186
StringBuilder builder = new StringBuilder();
8287
try (JsonOutput out = JSON.newOutput(builder)) {
@@ -136,11 +141,13 @@ class UnboundZmqEventBus implements EventBus {
136141
() -> {
137142
sub = context.createSocket(SocketType.SUB);
138143
sub.setIPv6(isSubAddressIPv6(publishConnection));
144+
configureHeartbeat(sub, heartbeatPeriod, "SUB");
139145
sub.connect(publishConnection);
140146
sub.subscribe(new byte[0]);
141147

142148
pub = context.createSocket(SocketType.PUB);
143149
pub.setIPv6(isSubAddressIPv6(subscribeConnection));
150+
configureHeartbeat(pub, heartbeatPeriod, "PUB");
144151
pub.connect(subscribeConnection);
145152
});
146153
// Connections are already established
@@ -172,6 +179,21 @@ public boolean isReady() {
172179
return !socketPollingExecutor.isShutdown();
173180
}
174181

182+
private void configureHeartbeat(ZMQ.Socket socket, Duration heartbeatPeriod, String socketType) {
183+
if (heartbeatPeriod != null && !heartbeatPeriod.isZero() && !heartbeatPeriod.isNegative()) {
184+
int heartbeatIvl = (int) heartbeatPeriod.toMillis();
185+
socket.setHeartbeatIvl(heartbeatIvl);
186+
// Set heartbeat timeout to 3x the interval
187+
socket.setHeartbeatTimeout(heartbeatIvl * 3);
188+
// Set heartbeat TTL to 6x the interval
189+
socket.setHeartbeatTtl(heartbeatIvl * 6);
190+
LOG.info(
191+
String.format(
192+
"Event bus %s socket heartbeat configured: interval=%dms, timeout=%dms, ttl=%dms",
193+
socketType, heartbeatIvl, heartbeatIvl * 3, heartbeatIvl * 6));
194+
}
195+
}
196+
175197
private boolean isSubAddressIPv6(String connection) {
176198
try {
177199
URI uri = new URI(connection);

java/src/org/openqa/selenium/events/zeromq/ZeroMqEventBus.java

Lines changed: 23 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121

2222
import java.net.URI;
2323
import java.net.URISyntaxException;
24+
import java.time.Duration;
2425
import java.util.function.Consumer;
2526
import org.openqa.selenium.events.EventBus;
2627
import org.openqa.selenium.events.EventListener;
@@ -45,10 +46,20 @@ private ZeroMqEventBus() {
4546

4647
public static EventBus create(
4748
ZContext context, String publish, String subscribe, boolean bind, Secret secret) {
49+
return create(context, publish, subscribe, bind, secret, Duration.ofSeconds(60));
50+
}
51+
52+
public static EventBus create(
53+
ZContext context,
54+
String publish,
55+
String subscribe,
56+
boolean bind,
57+
Secret secret,
58+
Duration heartbeatPeriod) {
4859
if (bind) {
49-
return new BoundZmqEventBus(context, publish, subscribe, secret);
60+
return new BoundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod);
5061
}
51-
return new UnboundZmqEventBus(context, publish, subscribe, secret);
62+
return new UnboundZmqEventBus(context, publish, subscribe, secret, heartbeatPeriod);
5263
}
5364

5465
public static EventBus create(Config config) {
@@ -85,10 +96,19 @@ public static EventBus create(Config config) {
8596
});
8697

8798
boolean bind = config.getBool(EVENTS_SECTION, "bind").orElse(false);
99+
int heartbeatPeriodSeconds =
100+
config.getInt(EVENTS_SECTION, "eventbus-heartbeat-period").orElse(60);
101+
Duration heartbeatPeriod = Duration.ofSeconds(heartbeatPeriodSeconds);
88102

89103
SecretOptions secretOptions = new SecretOptions(config);
90104

91-
return create(new ZContext(), publish, subscribe, bind, secretOptions.getRegistrationSecret());
105+
return create(
106+
new ZContext(),
107+
publish,
108+
subscribe,
109+
bind,
110+
secretOptions.getRegistrationSecret(),
111+
heartbeatPeriod);
92112
}
93113

94114
private static String mungeUri(URI base, String scheme, int port) {

java/src/org/openqa/selenium/grid/server/EventBusFlags.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,12 @@ public class EventBusFlags implements HasRoles {
6565
example = "\"org.openqa.selenium.events.zeromq.ZeroMqEventBus\"")
6666
private String implementation;
6767

68+
@Parameter(
69+
names = {"--eventbus-heartbeat-period"},
70+
description = "How often, in seconds, will the EventBus socket send heartbeats")
71+
@ConfigValue(section = EVENTS_SECTION, name = "eventbus-heartbeat-period", example = "30")
72+
private int eventbusHeartbeatPeriod;
73+
6874
@Override
6975
public Set<Role> getRoles() {
7076
return ImmutableSet.of(EVENT_BUS_ROLE);

java/src/org/openqa/selenium/grid/server/EventBusOptions.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.openqa.selenium.grid.server;
1919

20+
import java.time.Duration;
2021
import org.openqa.selenium.events.EventBus;
2122
import org.openqa.selenium.grid.config.Config;
2223
import org.openqa.selenium.internal.Require;
@@ -25,6 +26,7 @@ public class EventBusOptions {
2526

2627
static final String EVENTS_SECTION = "events";
2728
private static final String DEFAULT_CLASS = "org.openqa.selenium.events.zeromq.ZeroMqEventBus";
29+
private static final int DEFAULT_HEARTBEAT_PERIOD = 60;
2830
private final Config config;
2931
private volatile EventBus bus;
3032

@@ -47,6 +49,12 @@ public EventBus getEventBus() {
4749
return localBus;
4850
}
4951

52+
public Duration getHeartbeatPeriod() {
53+
int period =
54+
config.getInt(EVENTS_SECTION, "eventbus-heartbeat-period").orElse(DEFAULT_HEARTBEAT_PERIOD);
55+
return Duration.ofSeconds(period);
56+
}
57+
5058
private EventBus createBus() {
5159
return config.getClass(EVENTS_SECTION, "implementation", EventBus.class, DEFAULT_CLASS);
5260
}

0 commit comments

Comments
 (0)