diff --git a/java/src/org/openqa/selenium/grid/node/Node.java b/java/src/org/openqa/selenium/grid/node/Node.java index 68767fa0a26b9..98322648d201a 100644 --- a/java/src/org/openqa/selenium/grid/node/Node.java +++ b/java/src/org/openqa/selenium/grid/node/Node.java @@ -101,6 +101,12 @@ * by {@code sessionId}. This returns a boolean. * * + * DELETE + * /se/grid/node/connection/{sessionId} + * Notifies the node about closure of a websocket connection for the {@link Session} + * identified by {@code sessionId}. + * + * * POST * /se/grid/node/connection/{sessionId} * Allows the node to be ask about whether or not new websocket connections are allowed for the {@link Session} @@ -173,6 +179,9 @@ protected Node( get("/se/grid/node/owner/{sessionId}") .to(params -> new IsSessionOwner(this, sessionIdFrom(params))) .with(spanDecorator("node.is_session_owner").andThen(requiresSecret)), + delete("/se/grid/node/connection/{sessionId}") + .to(params -> new ReleaseConnection(this, sessionIdFrom(params))) + .with(spanDecorator("node.is_session_owner").andThen(requiresSecret)), post("/se/grid/node/connection/{sessionId}") .to(params -> new TryAcquireConnection(this, sessionIdFrom(params))) .with(spanDecorator("node.is_session_owner").andThen(requiresSecret)), @@ -250,6 +259,8 @@ public TemporaryFilesystem getDownloadsFilesystem(UUID uuid) throws IOException public abstract boolean tryAcquireConnection(SessionId id); + public abstract void releaseConnection(SessionId id); + public abstract boolean isSupporting(Capabilities capabilities); public abstract NodeStatus getStatus(); diff --git a/java/src/org/openqa/selenium/grid/node/ProxyNodeWebsockets.java b/java/src/org/openqa/selenium/grid/node/ProxyNodeWebsockets.java index eff13dc5a40f5..e9583ca8fa0e8 100644 --- a/java/src/org/openqa/selenium/grid/node/ProxyNodeWebsockets.java +++ b/java/src/org/openqa/selenium/grid/node/ProxyNodeWebsockets.java @@ -237,7 +237,7 @@ private Consumer createWsEndPoint( WebSocket upstream = client.openSocket( new HttpRequest(GET, uri.toString()), - new ForwardingListener(downstream, sessionConsumer, sessionId)); + new ForwardingListener(node, downstream, sessionConsumer, sessionId)); return (msg) -> { try { @@ -260,12 +260,17 @@ private Consumer createWsEndPoint( } private static class ForwardingListener implements WebSocket.Listener { + private final Node node; private final Consumer downstream; private final Consumer sessionConsumer; private final SessionId sessionId; public ForwardingListener( - Consumer downstream, Consumer sessionConsumer, SessionId sessionId) { + Node node, + Consumer downstream, + Consumer sessionConsumer, + SessionId sessionId) { + this.node = node; this.downstream = Objects.requireNonNull(downstream); this.sessionConsumer = Objects.requireNonNull(sessionConsumer); this.sessionId = Objects.requireNonNull(sessionId); @@ -280,7 +285,7 @@ public void onBinary(byte[] data) { @Override public void onClose(int code, String reason) { downstream.accept(new CloseMessage(code, reason)); - sessionConsumer.accept(sessionId); + node.releaseConnection(sessionId); } @Override diff --git a/java/src/org/openqa/selenium/grid/node/ReleaseConnection.java b/java/src/org/openqa/selenium/grid/node/ReleaseConnection.java new file mode 100644 index 0000000000000..a9a7e85b0f7aa --- /dev/null +++ b/java/src/org/openqa/selenium/grid/node/ReleaseConnection.java @@ -0,0 +1,43 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.node; + +import java.io.UncheckedIOException; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.SessionId; +import org.openqa.selenium.remote.http.HttpHandler; +import org.openqa.selenium.remote.http.HttpRequest; +import org.openqa.selenium.remote.http.HttpResponse; + +class ReleaseConnection implements HttpHandler { + + private final Node node; + private final SessionId id; + + ReleaseConnection(Node node, SessionId id) { + this.node = Require.nonNull("Node", node); + this.id = Require.nonNull("Session id", id); + } + + @Override + public HttpResponse execute(HttpRequest req) throws UncheckedIOException { + node.releaseConnection(id); + + return new HttpResponse().setStatus(200); + } +} diff --git a/java/src/org/openqa/selenium/grid/node/TryAcquireConnection.java b/java/src/org/openqa/selenium/grid/node/TryAcquireConnection.java index 6c8822bea84cd..076e5f7d32037 100644 --- a/java/src/org/openqa/selenium/grid/node/TryAcquireConnection.java +++ b/java/src/org/openqa/selenium/grid/node/TryAcquireConnection.java @@ -19,8 +19,8 @@ import static org.openqa.selenium.remote.http.Contents.asJson; -import com.google.common.collect.ImmutableMap; import java.io.UncheckedIOException; +import java.util.Map; import org.openqa.selenium.internal.Require; import org.openqa.selenium.remote.SessionId; import org.openqa.selenium.remote.http.HttpHandler; @@ -39,7 +39,6 @@ class TryAcquireConnection implements HttpHandler { @Override public HttpResponse execute(HttpRequest req) throws UncheckedIOException { - return new HttpResponse() - .setContent(asJson(ImmutableMap.of("value", node.tryAcquireConnection(id)))); + return new HttpResponse().setContent(asJson(Map.of("value", node.tryAcquireConnection(id)))); } } diff --git a/java/src/org/openqa/selenium/grid/node/config/NodeFlags.java b/java/src/org/openqa/selenium/grid/node/config/NodeFlags.java index b56e57b3dcb97..e090fcd928b7a 100644 --- a/java/src/org/openqa/selenium/grid/node/config/NodeFlags.java +++ b/java/src/org/openqa/selenium/grid/node/config/NodeFlags.java @@ -81,8 +81,8 @@ public class NodeFlags implements HasRoles { @Parameter( names = {"--connection-limit-per-session"}, description = - "Let X be the maximum number of websocket connections per session.This will ensure one" - + " session is not able to exhaust the connection limit of the host") + "Let X be the maximum number of concurrent websocket connections per session. This will" + + " ensure one session is not able to exhaust the connection limit of the host") @ConfigValue(section = NODE_SECTION, name = "connection-limit-per-session", example = "8") public int connectionLimitPerSession = DEFAULT_CONNECTION_LIMIT; diff --git a/java/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java b/java/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java index af8c05cf7a7c1..45887df7da2e2 100644 --- a/java/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java +++ b/java/src/org/openqa/selenium/grid/node/k8s/OneShotNode.java @@ -365,7 +365,24 @@ public boolean isSessionOwner(SessionId id) { @Override public boolean tryAcquireConnection(SessionId id) { - return sessionId.equals(id) && connectionLimitPerSession > connectionCounter.getAndIncrement(); + if (!sessionId.equals(id)) { + return false; + } + + if (connectionLimitPerSession > connectionCounter.getAndIncrement()) { + return true; + } + + // ensure a rejected connection will not be counted + connectionCounter.getAndDecrement(); + return false; + } + + @Override + public void releaseConnection(SessionId id) { + if (sessionId.equals(id)) { + connectionCounter.getAndDecrement(); + } } @Override diff --git a/java/src/org/openqa/selenium/grid/node/local/LocalNode.java b/java/src/org/openqa/selenium/grid/node/local/LocalNode.java index bb45cd00579dc..665fce7ac567a 100644 --- a/java/src/org/openqa/selenium/grid/node/local/LocalNode.java +++ b/java/src/org/openqa/selenium/grid/node/local/LocalNode.java @@ -623,7 +623,31 @@ public boolean tryAcquireConnection(SessionId id) throws NoSuchSessionException AtomicLong counter = slot.getConnectionCounter(); - return connectionLimitPerSession > counter.getAndIncrement(); + if (connectionLimitPerSession > counter.getAndIncrement()) { + return true; + } + + // ensure a rejected connection will not be counted + counter.getAndDecrement(); + return false; + } + + @Override + public void releaseConnection(SessionId id) { + SessionSlot slot = currentSessions.getIfPresent(id); + + if (slot == null) { + return; + } + + if (connectionLimitPerSession == -1) { + // no limit + return; + } + + AtomicLong counter = slot.getConnectionCounter(); + + counter.decrementAndGet(); } @Override diff --git a/java/src/org/openqa/selenium/grid/node/remote/RemoteNode.java b/java/src/org/openqa/selenium/grid/node/remote/RemoteNode.java index d83d7bbbdd9ca..a40edb20afde3 100644 --- a/java/src/org/openqa/selenium/grid/node/remote/RemoteNode.java +++ b/java/src/org/openqa/selenium/grid/node/remote/RemoteNode.java @@ -196,6 +196,18 @@ public boolean tryAcquireConnection(SessionId id) { return Boolean.TRUE.equals(Values.get(res, Boolean.class)); } + @Override + public void releaseConnection(SessionId id) { + Require.nonNull("Session ID", id); + + HttpRequest req = new HttpRequest(DELETE, "/se/grid/node/connection/" + id); + HttpTracing.inject(tracer, tracer.getCurrentContext(), req); + + HttpResponse res = client.with(addSecret).execute(req); + + Values.get(res, Void.class); + } + @Override public Session getSession(SessionId id) throws NoSuchSessionException { Require.nonNull("Session ID", id); diff --git a/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java b/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java index 1485d04fca4c6..12647fe9c38b2 100644 --- a/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java +++ b/java/test/org/openqa/selenium/grid/distributor/AddingNodesTest.java @@ -450,6 +450,9 @@ public boolean tryAcquireConnection(SessionId id) { return false; } + @Override + public void releaseConnection(SessionId id) {} + @Override public boolean isSupporting(Capabilities capabilities) { return Objects.equals("cake", capabilities.getCapability("cheese")); diff --git a/java/test/org/openqa/selenium/grid/router/DistributedTest.java b/java/test/org/openqa/selenium/grid/router/DistributedTest.java index 821711b2c5be6..46eb746d48b77 100644 --- a/java/test/org/openqa/selenium/grid/router/DistributedTest.java +++ b/java/test/org/openqa/selenium/grid/router/DistributedTest.java @@ -30,8 +30,12 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.openqa.selenium.Capabilities; +import org.openqa.selenium.HasCapabilities; import org.openqa.selenium.SessionNotCreatedException; import org.openqa.selenium.WebDriver; +import org.openqa.selenium.bidi.BiDi; +import org.openqa.selenium.bidi.BiDiProvider; import org.openqa.selenium.grid.config.MapConfig; import org.openqa.selenium.grid.config.MemoizedConfig; import org.openqa.selenium.grid.config.TomlConfig; @@ -43,6 +47,7 @@ import org.openqa.selenium.netty.server.NettyServer; import org.openqa.selenium.remote.RemoteWebDriver; import org.openqa.selenium.remote.http.ClientConfig; +import org.openqa.selenium.remote.http.ConnectionFailedException; import org.openqa.selenium.remote.http.Contents; import org.openqa.selenium.remote.http.HttpClient; import org.openqa.selenium.remote.http.HttpMethod; @@ -76,7 +81,9 @@ public void setupServers() { + "\n" + "override-max-sessions = true" + "\n" - + "max-sessions = 2"))); + + "max-sessions = 2" + + "\n" + + "connection-limit-per-session = 3"))); tearDowns.add(deployment); server = deployment.getServer(); @@ -192,4 +199,46 @@ void clientTimeoutDoesNotLeakARunningBrowser() throws Exception { Safely.safelyCall(healthy::quit); } } + + @Test + void connectionLimitIsRespected() throws Exception { + assertThat(server.isStarted()).isTrue(); + + // don't use the RemoteWebDriver.builder here, using it does create an unknown number of + // connections + WebDriver driver = new RemoteWebDriver(server.getUrl(), browser.getCapabilities()); + + try { + Capabilities caps = ((HasCapabilities) driver).getCapabilities(); + BiDiProvider biDiProvider = new BiDiProvider(); + + BiDi cnn1 = biDiProvider.getImplementation(caps, null).getBiDi(); + BiDi cnn2 = biDiProvider.getImplementation(caps, null).getBiDi(); + BiDi cnn3 = biDiProvider.getImplementation(caps, null).getBiDi(); + + Assertions.assertThrows( + ConnectionFailedException.class, + () -> biDiProvider.getImplementation(caps, null).getBiDi()); + cnn1.close(); + BiDi cnn4 = biDiProvider.getImplementation(caps, null).getBiDi(); + + Assertions.assertThrows( + ConnectionFailedException.class, + () -> biDiProvider.getImplementation(caps, null).getBiDi()); + cnn2.close(); + cnn3.close(); + BiDi cnn5 = biDiProvider.getImplementation(caps, null).getBiDi(); + BiDi cnn6 = biDiProvider.getImplementation(caps, null).getBiDi(); + + Assertions.assertThrows( + ConnectionFailedException.class, + () -> biDiProvider.getImplementation(caps, null).getBiDi()); + + cnn4.close(); + cnn5.close(); + cnn6.close(); + } finally { + Safely.safelyCall(driver::quit); + } + } }