Skip to content

Commit a2399f0

Browse files
Merge branch 'trunk' into null-pinned-script
2 parents 284a84a + 0338677 commit a2399f0

File tree

19 files changed

+276
-98
lines changed

19 files changed

+276
-98
lines changed

java/src/org/openqa/selenium/concurrent/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ java_library(
44
name = "concurrent",
55
srcs = glob(["*.java"]),
66
visibility = [
7+
"//java/src/org/openqa/selenium/events:__subpackages__",
78
"//java/src/org/openqa/selenium/grid:__subpackages__",
89
"//java/src/org/openqa/selenium/remote:__subpackages__",
910
],

java/src/org/openqa/selenium/events/zeromq/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ java_library(
1212
],
1313
deps = [
1414
"//java/src/org/openqa/selenium:core",
15+
"//java/src/org/openqa/selenium/concurrent",
1516
"//java/src/org/openqa/selenium/events",
1617
"//java/src/org/openqa/selenium/grid/config",
1718
"//java/src/org/openqa/selenium/grid/security",

java/src/org/openqa/selenium/events/zeromq/UnboundZmqEventBus.java

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import java.util.function.Consumer;
4343
import java.util.logging.Level;
4444
import java.util.logging.Logger;
45+
import org.openqa.selenium.concurrent.ExecutorServices;
4546
import org.openqa.selenium.events.Event;
4647
import org.openqa.selenium.events.EventBus;
4748
import org.openqa.selenium.events.EventListener;
@@ -61,6 +62,7 @@ class UnboundZmqEventBus implements EventBus {
6162
private static final Logger LOG = Logger.getLogger(EventBus.class.getName());
6263
private static final Json JSON = new Json();
6364
private final AtomicBoolean pollingStarted = new AtomicBoolean(false);
65+
private final PollingRunnable socketPolling;
6466
private final ExecutorService socketPollingExecutor;
6567
private final ExecutorService socketPublishingExecutor;
6668
private final ExecutorService listenerNotificationExecutor;
@@ -147,7 +149,8 @@ class UnboundZmqEventBus implements EventBus {
147149

148150
LOG.info("Sockets created");
149151

150-
socketPollingExecutor.submit(new PollingRunnable(secret));
152+
socketPolling = new PollingRunnable(secret);
153+
socketPollingExecutor.submit(socketPolling);
151154

152155
// Give ourselves up to a second to connect, using The World's Worst heuristic. If we don't
153156
// manage to connect, it's not the end of the world, as the socket we're connecting to may not
@@ -211,9 +214,11 @@ public void fire(Event event) {
211214

212215
@Override
213216
public void close() {
214-
socketPollingExecutor.shutdownNow();
215-
socketPublishingExecutor.shutdownNow();
216-
listenerNotificationExecutor.shutdownNow();
217+
socketPolling.shutdown = true;
218+
ExecutorServices.shutdownGracefully("Event Bus Poller", socketPollingExecutor);
219+
ExecutorServices.shutdownGracefully("Event Bus Publisher", socketPublishingExecutor);
220+
ExecutorServices.shutdownGracefully(
221+
"Event Bus Listener Notifier", listenerNotificationExecutor);
217222
poller.close();
218223

219224
if (sub != null) {
@@ -226,14 +231,15 @@ public void close() {
226231

227232
private class PollingRunnable implements Runnable {
228233
private final Secret secret;
234+
private volatile boolean shutdown;
229235

230236
public PollingRunnable(Secret secret) {
231237
this.secret = secret;
232238
}
233239

234240
@Override
235241
public void run() {
236-
while (!Thread.currentThread().isInterrupted()) {
242+
while (!Thread.currentThread().isInterrupted() && !shutdown) {
237243
try {
238244
int count = poller.poll(150);
239245

java/src/org/openqa/selenium/grid/TemplateGridServerCommand.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
package org.openqa.selenium.grid;
1919

20+
import java.io.Closeable;
2021
import java.util.Arrays;
2122
import java.util.Collections;
2223
import java.util.List;
@@ -47,7 +48,17 @@ public Server<?> asServer(Config initialConfig) {
4748
Handlers handler = createHandlers(config);
4849

4950
return new NettyServer(
50-
new BaseServerOptions(config), handler.httpHandler, handler.websocketHandler);
51+
new BaseServerOptions(config), handler.httpHandler, handler.websocketHandler) {
52+
53+
@Override
54+
public void stop() {
55+
try {
56+
handler.close();
57+
} finally {
58+
super.stop();
59+
}
60+
}
61+
};
5162
}
5263

5364
private static final String GRAPHQL = "/graphql";
@@ -77,7 +88,7 @@ protected static Routable baseRoute(String prefix, Route route) {
7788

7889
protected abstract Handlers createHandlers(Config config);
7990

80-
public static class Handlers {
91+
public abstract static class Handlers implements Closeable {
8192
public final HttpHandler httpHandler;
8293
public final BiFunction<String, Consumer<Message>, Optional<Consumer<Message>>>
8394
websocketHandler;
@@ -89,5 +100,8 @@ public Handlers(
89100
this.websocketHandler =
90101
websocketHandler == null ? (str, sink) -> Optional.empty() : websocketHandler;
91102
}
103+
104+
@Override
105+
public abstract void close();
92106
}
93107
}

java/src/org/openqa/selenium/grid/commands/EventBusCommand.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,17 @@ public Server<?> asServer(Config initialConfig) {
135135
return httpResponse(false, "Status checking was interrupted");
136136
}
137137
}),
138-
Route.get("/readyz").to(() -> req -> new HttpResponse().setStatus(HTTP_NO_CONTENT))));
138+
Route.get("/readyz").to(() -> req -> new HttpResponse().setStatus(HTTP_NO_CONTENT)))) {
139+
140+
@Override
141+
public void stop() {
142+
try {
143+
bus.close();
144+
} finally {
145+
super.stop();
146+
}
147+
}
148+
};
139149
}
140150

141151
@Override

java/src/org/openqa/selenium/grid/commands/Hub.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,6 @@
4242
import org.openqa.selenium.grid.TemplateGridServerCommand;
4343
import org.openqa.selenium.grid.config.Config;
4444
import org.openqa.selenium.grid.config.Role;
45-
import org.openqa.selenium.grid.distributor.Distributor;
4645
import org.openqa.selenium.grid.distributor.config.DistributorOptions;
4746
import org.openqa.selenium.grid.distributor.local.LocalDistributor;
4847
import org.openqa.selenium.grid.graphql.GraphqlHandler;
@@ -57,9 +56,7 @@
5756
import org.openqa.selenium.grid.server.EventBusOptions;
5857
import org.openqa.selenium.grid.server.NetworkOptions;
5958
import org.openqa.selenium.grid.server.Server;
60-
import org.openqa.selenium.grid.sessionmap.SessionMap;
6159
import org.openqa.selenium.grid.sessionmap.local.LocalSessionMap;
62-
import org.openqa.selenium.grid.sessionqueue.NewSessionQueue;
6360
import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions;
6461
import org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue;
6562
import org.openqa.selenium.grid.web.CombinedHandler;
@@ -120,7 +117,7 @@ protected Handlers createHandlers(Config config) {
120117

121118
CombinedHandler handler = new CombinedHandler();
122119

123-
SessionMap sessions = new LocalSessionMap(tracer, bus);
120+
LocalSessionMap sessions = new LocalSessionMap(tracer, bus);
124121
handler.addHandler(sessions);
125122

126123
BaseServerOptions serverOptions = new BaseServerOptions(config);
@@ -141,7 +138,7 @@ protected Handlers createHandlers(Config config) {
141138

142139
DistributorOptions distributorOptions = new DistributorOptions(config);
143140
NewSessionQueueOptions newSessionRequestOptions = new NewSessionQueueOptions(config);
144-
NewSessionQueue queue =
141+
LocalNewSessionQueue queue =
145142
new LocalNewSessionQueue(
146143
tracer,
147144
distributorOptions.getSlotMatcher(),
@@ -151,7 +148,7 @@ protected Handlers createHandlers(Config config) {
151148
newSessionRequestOptions.getBatchSize());
152149
handler.addHandler(queue);
153150

154-
Distributor distributor =
151+
LocalDistributor distributor =
155152
new LocalDistributor(
156153
tracer,
157154
bus,
@@ -212,7 +209,15 @@ protected Handlers createHandlers(Config config) {
212209
// these checks
213210
httpHandler = combine(httpHandler, Route.get("/readyz").to(() -> readinessCheck));
214211

215-
return new Handlers(httpHandler, new ProxyWebsocketsIntoGrid(clientFactory, sessions));
212+
return new Handlers(httpHandler, new ProxyWebsocketsIntoGrid(clientFactory, sessions)) {
213+
@Override
214+
public void close() {
215+
router.close();
216+
distributor.close();
217+
queue.close();
218+
bus.close();
219+
}
220+
};
216221
}
217222

218223
@Override

java/src/org/openqa/selenium/grid/commands/Standalone.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -63,7 +63,6 @@
6363
import org.openqa.selenium.grid.server.Server;
6464
import org.openqa.selenium.grid.sessionmap.SessionMap;
6565
import org.openqa.selenium.grid.sessionmap.local.LocalSessionMap;
66-
import org.openqa.selenium.grid.sessionqueue.NewSessionQueue;
6766
import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions;
6867
import org.openqa.selenium.grid.sessionqueue.local.LocalNewSessionQueue;
6968
import org.openqa.selenium.grid.web.CombinedHandler;
@@ -145,7 +144,7 @@ protected Handlers createHandlers(Config config) {
145144

146145
DistributorOptions distributorOptions = new DistributorOptions(config);
147146
NewSessionQueueOptions newSessionRequestOptions = new NewSessionQueueOptions(config);
148-
NewSessionQueue queue =
147+
LocalNewSessionQueue queue =
149148
new LocalNewSessionQueue(
150149
tracer,
151150
distributorOptions.getSlotMatcher(),
@@ -155,7 +154,7 @@ protected Handlers createHandlers(Config config) {
155154
newSessionRequestOptions.getBatchSize());
156155
combinedHandler.addHandler(queue);
157156

158-
Distributor distributor =
157+
LocalDistributor distributor =
159158
new LocalDistributor(
160159
tracer,
161160
bus,
@@ -171,9 +170,8 @@ protected Handlers createHandlers(Config config) {
171170
distributorOptions.getSlotMatcher());
172171
combinedHandler.addHandler(distributor);
173172

174-
Routable router =
175-
new Router(tracer, clientFactory, sessions, queue, distributor)
176-
.with(networkOptions.getSpecComplianceChecks());
173+
Router router = new Router(tracer, clientFactory, sessions, queue, distributor);
174+
Routable routerWithSpecChecks = router.with(networkOptions.getSpecComplianceChecks());
177175

178176
HttpHandler readinessCheck =
179177
req -> {
@@ -192,8 +190,8 @@ protected Handlers createHandlers(Config config) {
192190

193191
Routable appendRoute =
194192
Stream.of(
195-
baseRoute(subPath, combine(router)),
196-
hubRoute(subPath, combine(router)),
193+
baseRoute(subPath, combine(routerWithSpecChecks)),
194+
hubRoute(subPath, combine(routerWithSpecChecks)),
197195
graphqlRoute(subPath, () -> graphqlHandler))
198196
.reduce(Route::combine)
199197
.get();
@@ -218,7 +216,14 @@ protected Handlers createHandlers(Config config) {
218216
httpHandler = combine(httpHandler, Route.get("/readyz").to(() -> readinessCheck));
219217
Node node = createNode(config, bus, distributor, combinedHandler);
220218

221-
return new Handlers(httpHandler, new ProxyNodeWebsockets(clientFactory, node, subPath));
219+
return new Handlers(httpHandler, new ProxyNodeWebsockets(clientFactory, node, subPath)) {
220+
@Override
221+
public void close() {
222+
router.close();
223+
distributor.close();
224+
queue.close();
225+
}
226+
};
222227
}
223228

224229
@Override

java/src/org/openqa/selenium/grid/distributor/httpd/DistributorServer.java

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,9 @@
3131
import com.google.common.collect.ImmutableMap;
3232
import com.google.common.collect.ImmutableSet;
3333
import com.google.common.net.MediaType;
34+
import java.io.Closeable;
35+
import java.io.IOException;
36+
import java.io.UncheckedIOException;
3437
import java.util.Collections;
3538
import java.util.Set;
3639
import java.util.logging.Level;
@@ -118,7 +121,18 @@ protected Handlers createHandlers(Config config) {
118121
"message",
119122
"Distributor is ready"))))),
120123
get("/readyz").to(() -> readinessCheck)),
121-
null);
124+
null) {
125+
@Override
126+
public void close() {
127+
if (distributor instanceof Closeable) {
128+
try {
129+
((Closeable) distributor).close();
130+
} catch (IOException e) {
131+
throw new UncheckedIOException(e);
132+
}
133+
}
134+
}
135+
};
122136
}
123137

124138
@Override

java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.openqa.selenium.grid.distributor.local;
1919

2020
import static com.google.common.collect.ImmutableSet.toImmutableSet;
21+
import static org.openqa.selenium.concurrent.ExecutorServices.shutdownGracefully;
2122
import static org.openqa.selenium.grid.data.Availability.DOWN;
2223
import static org.openqa.selenium.grid.data.Availability.DRAINING;
2324
import static org.openqa.selenium.grid.data.Availability.UP;
@@ -34,6 +35,7 @@
3435
import com.google.common.collect.ImmutableSet;
3536
import dev.failsafe.Failsafe;
3637
import dev.failsafe.RetryPolicy;
38+
import java.io.Closeable;
3739
import java.io.UncheckedIOException;
3840
import java.net.URI;
3941
import java.time.Duration;
@@ -45,7 +47,6 @@
4547
import java.util.Optional;
4648
import java.util.Set;
4749
import java.util.concurrent.ConcurrentHashMap;
48-
import java.util.concurrent.Executor;
4950
import java.util.concurrent.ExecutorService;
5051
import java.util.concurrent.Executors;
5152
import java.util.concurrent.ScheduledExecutorService;
@@ -118,7 +119,7 @@
118119
@ManagedService(
119120
objectName = "org.seleniumhq.grid:type=Distributor,name=LocalDistributor",
120121
description = "Grid 4 node distributor")
121-
public class LocalDistributor extends Distributor implements AutoCloseable {
122+
public class LocalDistributor extends Distributor implements Closeable {
122123

123124
private static final Logger LOG = Logger.getLogger(LocalDistributor.class.getName());
124125

@@ -165,7 +166,7 @@ public class LocalDistributor extends Distributor implements AutoCloseable {
165166
return thread;
166167
});
167168

168-
private final Executor sessionCreatorExecutor;
169+
private final ExecutorService sessionCreatorExecutor;
169170

170171
private final NewSessionQueue sessionQueue;
171172

@@ -752,9 +753,10 @@ public int getIdleSlots() {
752753
@Override
753754
public void close() {
754755
LOG.info("Shutting down Distributor executor service");
755-
purgeDeadNodesService.shutdown();
756-
nodeHealthCheckService.shutdown();
757-
newSessionService.shutdown();
756+
shutdownGracefully("Local Distributor - Purge Dead Nodes", purgeDeadNodesService);
757+
shutdownGracefully("Local Distributor - Node Health Check", nodeHealthCheckService);
758+
shutdownGracefully("Local Distributor - New Session Queue", newSessionService);
759+
shutdownGracefully("Local Distributor - Session Creation", sessionCreatorExecutor);
758760
}
759761

760762
private class NewSessionRunnable implements Runnable {

java/src/org/openqa/selenium/grid/node/httpd/NodeServer.java

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
import com.google.common.net.MediaType;
3131
import dev.failsafe.Failsafe;
3232
import dev.failsafe.RetryPolicy;
33+
import java.io.Closeable;
34+
import java.io.IOException;
35+
import java.io.UncheckedIOException;
3336
import java.util.Collections;
3437
import java.util.Set;
3538
import java.util.concurrent.ExecutorService;
@@ -172,7 +175,18 @@ protected Handlers createHandlers(Config config) {
172175
Route httpHandler = Route.combine(node, get("/readyz").to(() -> readinessCheck));
173176

174177
return new Handlers(
175-
httpHandler, new ProxyNodeWebsockets(clientFactory, node, nodeOptions.getGridSubPath()));
178+
httpHandler, new ProxyNodeWebsockets(clientFactory, node, nodeOptions.getGridSubPath())) {
179+
@Override
180+
public void close() {
181+
if (node instanceof Closeable) {
182+
try {
183+
((Closeable) node).close();
184+
} catch (IOException e) {
185+
throw new UncheckedIOException(e);
186+
}
187+
}
188+
}
189+
};
176190
}
177191

178192
@Override
@@ -225,6 +239,15 @@ public NettyServer start() {
225239

226240
return this;
227241
}
242+
243+
@Override
244+
public void stop() {
245+
try {
246+
handler.close();
247+
} finally {
248+
super.stop();
249+
}
250+
}
228251
};
229252
}
230253

0 commit comments

Comments
 (0)