diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java index 9bf2f880a56ad..b6db25c5dbfab 100644 --- a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java +++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java @@ -586,6 +586,11 @@ public Either newSession( new SessionNotCreatedException("Unable to create new session"); for (Capabilities caps : request.getDesiredCapabilities()) { if (isNotSupported(caps)) { + // e.g. the last node drained, we have to wait for a new to register + lastFailure = + new SessionNotCreatedException( + "Unable to find a node supporting the desired capabilities"); + retry = true; continue; } diff --git a/java/src/org/openqa/selenium/grid/node/httpd/NodeServer.java b/java/src/org/openqa/selenium/grid/node/httpd/NodeServer.java index f1ac54bb62ea7..17123e23cbbf0 100644 --- a/java/src/org/openqa/selenium/grid/node/httpd/NodeServer.java +++ b/java/src/org/openqa/selenium/grid/node/httpd/NodeServer.java @@ -17,7 +17,7 @@ package org.openqa.selenium.grid.node.httpd; -import static java.net.HttpURLConnection.HTTP_NO_CONTENT; +import static java.net.HttpURLConnection.HTTP_OK; import static java.net.HttpURLConnection.HTTP_UNAVAILABLE; import static org.openqa.selenium.grid.config.StandardGridRoles.EVENT_BUS_ROLE; import static org.openqa.selenium.grid.config.StandardGridRoles.HTTPD_ROLE; @@ -131,13 +131,16 @@ protected Handlers createHandlers(Config config) { HttpHandler readinessCheck = req -> { if (node.getStatus().hasCapacity()) { - return new HttpResponse().setStatus(HTTP_NO_CONTENT); + return new HttpResponse() + .setStatus(HTTP_OK) + .setHeader("Content-Type", MediaType.PLAIN_TEXT_UTF_8.toString()) + .setContent(Contents.utf8String("Node has capacity available")); } return new HttpResponse() .setStatus(HTTP_UNAVAILABLE) .setHeader("Content-Type", MediaType.PLAIN_TEXT_UTF_8.toString()) - .setContent(Contents.utf8String("No capacity available")); + .setContent(Contents.utf8String("Node has no capacity available")); }; bus.addListener( diff --git a/java/test/org/openqa/selenium/grid/distributor/BUILD.bazel b/java/test/org/openqa/selenium/grid/distributor/BUILD.bazel index fd9ac64e97005..33f21dd8c5ee5 100644 --- a/java/test/org/openqa/selenium/grid/distributor/BUILD.bazel +++ b/java/test/org/openqa/selenium/grid/distributor/BUILD.bazel @@ -1,11 +1,56 @@ load("@rules_jvm_external//:defs.bzl", "artifact") -load("//java:defs.bzl", "JUNIT5_DEPS", "java_test_suite") +load("//java:defs.bzl", "JUNIT5_DEPS", "java_selenium_test_suite", "java_test_suite") load("//java:version.bzl", "TOOLS_JAVA_VERSION") +LARGE_TESTS = [ + "DrainTest.java", +] + +java_selenium_test_suite( + name = "large-tests", + size = "large", + srcs = LARGE_TESTS, + browsers = [ + "chrome", + "firefox", + "edge", + ], + javacopts = [ + "--release", + TOOLS_JAVA_VERSION, + ], + tags = [ + "selenium-remote", + ], + deps = [ + "//java/src/org/openqa/selenium/chrome", + "//java/src/org/openqa/selenium/firefox", + "//java/src/org/openqa/selenium/grid", + "//java/src/org/openqa/selenium/grid/config", + "//java/src/org/openqa/selenium/grid/distributor", + "//java/src/org/openqa/selenium/json", + "//java/src/org/openqa/selenium/remote", + "//java/src/org/openqa/selenium/support", + "//java/test/org/openqa/selenium/environment", + "//java/test/org/openqa/selenium/grid/testing", + "//java/test/org/openqa/selenium/remote/tracing:tracing-support", + "//java/test/org/openqa/selenium/testing:annotations", + "//java/test/org/openqa/selenium/testing:test-base", + artifact("org.junit.jupiter:junit-jupiter-api"), + artifact("org.junit.jupiter:junit-jupiter-params"), + artifact("org.assertj:assertj-core"), + "//java/src/org/openqa/selenium:core", + "//java/src/org/openqa/selenium/remote/http", + ] + JUNIT5_DEPS, +) + java_test_suite( name = "medium-tests", size = "medium", - srcs = glob(["*.java"]), + srcs = glob( + ["*.java"], + exclude = LARGE_TESTS, + ), javacopts = [ "--release", TOOLS_JAVA_VERSION, diff --git a/java/test/org/openqa/selenium/grid/distributor/DrainTest.java b/java/test/org/openqa/selenium/grid/distributor/DrainTest.java new file mode 100644 index 0000000000000..fb8f13d01ce1d --- /dev/null +++ b/java/test/org/openqa/selenium/grid/distributor/DrainTest.java @@ -0,0 +1,248 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.distributor; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.StringReader; +import java.time.Duration; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.function.Supplier; +import org.assertj.core.api.Assertions; +import org.junit.jupiter.api.Disabled; +import org.junit.jupiter.api.Test; +import org.openqa.selenium.WebDriver; +import org.openqa.selenium.grid.commands.Hub; +import org.openqa.selenium.grid.config.CompoundConfig; +import org.openqa.selenium.grid.config.Config; +import org.openqa.selenium.grid.config.MapConfig; +import org.openqa.selenium.grid.config.MemoizedConfig; +import org.openqa.selenium.grid.config.TomlConfig; +import org.openqa.selenium.grid.node.httpd.NodeServer; +import org.openqa.selenium.grid.server.Server; +import org.openqa.selenium.net.PortProber; +import org.openqa.selenium.net.UrlChecker; +import org.openqa.selenium.remote.RemoteWebDriver; +import org.openqa.selenium.testing.Safely; +import org.openqa.selenium.testing.drivers.Browser; + +class DrainTest { + + private final Browser browser = Objects.requireNonNull(Browser.detect()); + + @Disabled("the Node is terminated calling System.exit, this should be reworked in the future") + @Test + void nodeDoesNotTakeTooManySessions() throws Exception { + String[] rawConfig = + new String[] { + "[events]", + "publish = \"tcp://localhost:" + PortProber.findFreePort() + "\"", + "subscribe = \"tcp://localhost:" + PortProber.findFreePort() + "\"", + "", + "[server]", + "registration-secret = \"feta\"" + }; + + Config baseConfig = + new MemoizedConfig(new TomlConfig(new StringReader(String.join("\n", rawConfig)))); + + Server hub = startHub(baseConfig); + try (AutoCloseable stopHub = () -> Safely.safelyCall(hub::stop); ) { + UrlChecker urlChecker = new UrlChecker(); + urlChecker.waitUntilAvailable( + 5, TimeUnit.SECONDS, hub.getUrl().toURI().resolve("readyz").toURL()); + + // the CI has not enough CPUs so use a fixed number here + int nThreads = 4 * 3; + ExecutorService executor = Executors.newFixedThreadPool(nThreads); + + try { + List> pendingSessions = new ArrayList<>(); + CountDownLatch allPending = new CountDownLatch(nThreads); + + for (int i = 0; i < nThreads; i++) { + CompletableFuture future = + CompletableFuture.supplyAsync( + () -> { + allPending.countDown(); + + return RemoteWebDriver.builder() + .oneOf(browser.getCapabilities()) + .address(hub.getUrl()) + .build(); + }, + executor); + + pendingSessions.add(future); + } + + // ensure all sessions are in the queue + Assertions.assertThat(allPending.await(8, TimeUnit.SECONDS)).isTrue(); + + for (int i = 0; i < nThreads; i += 3) { + // remove all completed futures + assertThat(pendingSessions.removeIf(CompletableFuture::isDone)).isEqualTo(i != 0); + + // start a node draining after 3 sessions + Server node = startNode(baseConfig, hub, 6, 3); + + urlChecker.waitUntilAvailable( + 60, TimeUnit.SECONDS, node.getUrl().toURI().resolve("readyz").toURL()); + + // use nano time to avoid issues with a jumping clock e.g. on WSL2 or due to time-sync + long started = System.nanoTime(); + + // wait for the first to start + CompletableFuture.anyOf(pendingSessions.toArray(CompletableFuture[]::new)) + .get(120, TimeUnit.SECONDS); + + // we want to check not more than 3 are started, polling won't help here + Thread.sleep(Duration.ofNanos(System.nanoTime() - started).multipliedBy(2).toMillis()); + + int stopped = 0; + + for (CompletableFuture future : pendingSessions) { + if (future.isDone()) { + stopped++; + future.get().quit(); + } + } + + // the node should only pick 3 sessions to start, then starts to drain + Assertions.assertThat(stopped).isEqualTo(3); + + // check the node stopped + urlChecker.waitUntilUnavailable( + 40, TimeUnit.SECONDS, node.getUrl().toURI().resolve("readyz").toURL()); + } + } finally { + executor.shutdownNow(); + } + } + } + + @Test + void sessionIsNotRejectedWhenNodeDrains() throws Exception { + String[] rawConfig = + new String[] { + "[events]", + "publish = \"tcp://localhost:" + PortProber.findFreePort() + "\"", + "subscribe = \"tcp://localhost:" + PortProber.findFreePort() + "\"", + "", + "[server]", + "registration-secret = \"feta\"" + }; + + Config baseConfig = + new MemoizedConfig(new TomlConfig(new StringReader(String.join("\n", rawConfig)))); + + Server hub = startHub(baseConfig); + try (AutoCloseable stopHub = () -> Safely.safelyCall(hub::stop); ) { + UrlChecker urlChecker = new UrlChecker(); + urlChecker.waitUntilAvailable( + 5, TimeUnit.SECONDS, hub.getUrl().toURI().resolve("readyz").toURL()); + + ExecutorService executor = Executors.newFixedThreadPool(2); + + try { + Supplier> newDriver = + () -> + CompletableFuture.supplyAsync( + () -> + RemoteWebDriver.builder() + .oneOf(browser.getCapabilities()) + .address(hub.getUrl()) + .build(), + executor); + + CompletableFuture pendingA = newDriver.get(); + CompletableFuture pendingB = newDriver.get(); + + for (int i = 0; i < 16; i++) { + // the node should drain automatically, covered by other tests + startNode(baseConfig, hub, 6, 1); + + // wait for one to start + CompletableFuture.anyOf(pendingA, pendingB).get(80, TimeUnit.SECONDS); + + if (pendingA.isDone() && pendingB.isDone()) { + pendingA.get().quit(); + pendingB.get().quit(); + + throw new IllegalStateException("only one should be started"); + } else if (pendingA.isDone()) { + pendingA.get().quit(); + pendingA = newDriver.get(); + } else if (pendingB.isDone()) { + pendingB.get().quit(); + pendingB = newDriver.get(); + } + } + } finally { + executor.shutdownNow(); + } + } + } + + Server startHub(Config baseConfig) { + Config hubConfig = + new MemoizedConfig( + new CompoundConfig( + new MapConfig( + Map.of( + "server", + Map.of("port", PortProber.findFreePort()), + "events", + Map.of("bind", true), + "distributor", + Map.of("newsession-threadpool-size", "6"))), + baseConfig)); + + return new Hub().asServer(hubConfig).start(); + } + + Server startNode(Config baseConfig, Server hub, int maxSessions, int drainAfter) { + MapConfig additionalNodeConfig = + new MapConfig( + Map.of( + "server", Map.of("port", PortProber.findFreePort()), + "node", + Map.of( + "hub", + hub.getUrl(), + "driver-implementation", + browser.displayName(), + "override-max-sessions", + "true", + "max-sessions", + Integer.toString(maxSessions), + "drain-after-session-count", + drainAfter))); + + Config nodeConfig = new MemoizedConfig(new CompoundConfig(additionalNodeConfig, baseConfig)); + return new NodeServer().asServer(nodeConfig).start(); + } +}