diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java index bc448ae1ca1b1..609a8a27a55fb 100644 --- a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java +++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java @@ -539,7 +539,7 @@ public void run() { // up starving a session request. Map stereotypes = getAvailableNodes().stream() - .filter(node -> node.hasCapacity()) + .filter(NodeStatus::hasCapacity) .flatMap(node -> node.getSlots().stream().map(Slot::getStereotype)) .collect( Collectors.groupingBy(ImmutableCapabilities::copyOf, Collectors.counting())); diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalNodeRegistry.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalNodeRegistry.java index 54ef10b657a9a..097657a3cc384 100644 --- a/java/src/org/openqa/selenium/grid/distributor/local/LocalNodeRegistry.java +++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalNodeRegistry.java @@ -361,9 +361,8 @@ public Set getAvailableNodes() { readLock.lock(); try { return model.getSnapshot().stream() - .filter( - node -> - !DOWN.equals(node.getAvailability()) && !DRAINING.equals(node.getAvailability())) + // Filter nodes are UP and have capacity (available slots) + .filter(node -> UP.equals(node.getAvailability()) && node.hasCapacity()) .collect(ImmutableSet.toImmutableSet()); } finally { readLock.unlock(); diff --git a/java/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java b/java/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java index 2b88bdce35c30..66be16397885e 100644 --- a/java/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java +++ b/java/src/org/openqa/selenium/grid/sessionqueue/local/LocalNewSessionQueue.java @@ -55,6 +55,7 @@ import org.openqa.selenium.grid.data.TraceSessionRequest; import org.openqa.selenium.grid.distributor.config.DistributorOptions; import org.openqa.selenium.grid.jmx.JMXHelper; +import org.openqa.selenium.grid.jmx.MBean; import org.openqa.selenium.grid.jmx.ManagedAttribute; import org.openqa.selenium.grid.jmx.ManagedService; import org.openqa.selenium.grid.log.LoggingOptions; @@ -110,6 +111,7 @@ public class LocalNewSessionQueue extends NewSessionQueue implements Closeable { thread.setName(NAME); return thread; }); + private final MBean jmxBean; public LocalNewSessionQueue( Tracer tracer, @@ -139,7 +141,8 @@ public LocalNewSessionQueue( requestTimeoutCheck.toMillis(), MILLISECONDS); - new JMXHelper().register(this); + // Manage JMX and unregister on close() + this.jmxBean = new JMXHelper().register(this); } public static NewSessionQueue create(Config config) { @@ -502,6 +505,10 @@ public boolean isReady() { @Override public void close() { shutdownGracefully(NAME, service); + + if (jmxBean != null) { + new JMXHelper().unregister(jmxBean.getObjectName()); + } } private void failDueToTimeout(RequestId reqId) { diff --git a/java/test/org/openqa/selenium/grid/distributor/local/LocalDistributorTest.java b/java/test/org/openqa/selenium/grid/distributor/local/LocalDistributorTest.java index a0092c9d64da5..eae8de5d87f69 100644 --- a/java/test/org/openqa/selenium/grid/distributor/local/LocalDistributorTest.java +++ b/java/test/org/openqa/selenium/grid/distributor/local/LocalDistributorTest.java @@ -494,6 +494,351 @@ void slowStartingNodesShouldNotCauseReservationsToBeSerialized() { assertThat(System.currentTimeMillis() - start).isLessThan(delay * 2); } + @Test + void shouldOnlyReturnNodesWithFreeSlots() throws URISyntaxException { + // Create a distributor + NewSessionQueue queue = + new LocalNewSessionQueue( + tracer, + new DefaultSlotMatcher(), + Duration.ofSeconds(2), + Duration.ofSeconds(2), + Duration.ofSeconds(1), + registrationSecret, + 5); + LocalDistributor distributor = + new LocalDistributor( + tracer, + bus, + new PassthroughHttpClient.Factory(localNode), + new LocalSessionMap(tracer, bus), + queue, + new DefaultSlotSelector(), + registrationSecret, + Duration.ofMinutes(5), + false, + Duration.ofSeconds(5), + newSessionThreadPoolSize, + new DefaultSlotMatcher(), + Duration.ofSeconds(30)); + + // Create two nodes - both initially have free slots + URI nodeUri1 = new URI("http://example:1234"); + URI nodeUri2 = new URI("http://example:5678"); + + // Node 1: Has free slots + Node node1 = + LocalNode.builder(tracer, bus, nodeUri1, nodeUri1, registrationSecret) + .add( + new ImmutableCapabilities("browserName", "cheese"), + new TestSessionFactory( + (id, c) -> + new Session(id, nodeUri1, new ImmutableCapabilities(), c, Instant.now()))) + .build(); + + // Node 2: Will be fully occupied + Node node2 = + LocalNode.builder(tracer, bus, nodeUri2, nodeUri2, registrationSecret) + .add( + new ImmutableCapabilities("browserName", "cheese"), + new TestSessionFactory( + (id, c) -> + new Session(id, nodeUri2, new ImmutableCapabilities(), c, Instant.now()))) + .build(); + + // Add both nodes to distributor + distributor.add(node1); + distributor.add(node2); + + // Initially both nodes should be available + Set initialAvailableFreeNodes = distributor.getAvailableNodes(); + assertThat(initialAvailableFreeNodes).hasSize(2); + + // Create a session to occupy one slot + SessionRequest sessionRequest = + new SessionRequest( + new RequestId(UUID.randomUUID()), + Instant.now(), + Set.of(W3C), + Set.of(new ImmutableCapabilities("browserName", "cheese")), + Map.of(), + Map.of()); + + // Create session - this will occupy one slot on one of the nodes + distributor.newSession(sessionRequest); + + // Now test getAvailableNodes - should return nodes that still have free slots + Set availableFreeNodes = distributor.getAvailableNodes(); + + // Both nodes should still be available since each has only 1 slot and we created 1 session + // But let's verify the logic by checking that all returned nodes have free slots + for (NodeStatus nodeStatus : availableFreeNodes) { + assertThat(nodeStatus.getAvailability()).isEqualTo(UP); + + // Verify node has at least one free slot + boolean hasFreeSlot = + nodeStatus.getSlots().stream().anyMatch(slot -> slot.getSession() == null); + assertThat(hasFreeSlot).isTrue(); + } + + // Create another session to fully occupy both nodes + SessionRequest sessionRequest2 = + new SessionRequest( + new RequestId(UUID.randomUUID()), + Instant.now(), + Set.of(W3C), + Set.of(new ImmutableCapabilities("browserName", "cheese")), + Map.of(), + Map.of()); + + distributor.newSession(sessionRequest2); + + // Now both nodes should be fully occupied, so getAvailableNodes should return empty + Set fullyOccupiedNodes = distributor.getAvailableNodes(); + assertThat(fullyOccupiedNodes).isEmpty(); + } + + @Test + void shouldNotReturnDrainingNodes() throws URISyntaxException { + // Create a distributor + NewSessionQueue queue = + new LocalNewSessionQueue( + tracer, + new DefaultSlotMatcher(), + Duration.ofSeconds(2), + Duration.ofSeconds(2), + Duration.ofSeconds(1), + registrationSecret, + 5); + LocalDistributor distributor = + new LocalDistributor( + tracer, + bus, + new PassthroughHttpClient.Factory(localNode), + new LocalSessionMap(tracer, bus), + queue, + new DefaultSlotSelector(), + registrationSecret, + Duration.ofMinutes(5), + false, + Duration.ofSeconds(5), + newSessionThreadPoolSize, + new DefaultSlotMatcher(), + Duration.ofSeconds(30)); + + // Create a node + URI nodeUri = new URI("http://example:1234"); + Node node = + LocalNode.builder(tracer, bus, nodeUri, nodeUri, registrationSecret) + .add( + new ImmutableCapabilities("browserName", "cheese"), + new TestSessionFactory( + (id, c) -> + new Session(id, nodeUri, new ImmutableCapabilities(), c, Instant.now()))) + .build(); + + // Add node to distributor + distributor.add(node); + + // Initially, node should be available + Set availableFreeNodes = distributor.getAvailableNodes(); + assertThat(availableFreeNodes).hasSize(1); + assertThat(availableFreeNodes.iterator().next().getAvailability()).isEqualTo(UP); + + // Drain the node + distributor.drain(node.getId()); + + // After draining, node should not be returned by getAvailableNodes + availableFreeNodes = distributor.getAvailableNodes(); + assertThat(availableFreeNodes).isEmpty(); + } + + @Test + void shouldNotReturnDownNodes() throws URISyntaxException { + // Create a distributor + NewSessionQueue queue = + new LocalNewSessionQueue( + tracer, + new DefaultSlotMatcher(), + Duration.ofSeconds(2), + Duration.ofSeconds(2), + Duration.ofSeconds(1), + registrationSecret, + 5); + LocalDistributor distributor = + new LocalDistributor( + tracer, + bus, + new PassthroughHttpClient.Factory(localNode), + new LocalSessionMap(tracer, bus), + queue, + new DefaultSlotSelector(), + registrationSecret, + Duration.ofMinutes(5), + false, + Duration.ofSeconds(5), + newSessionThreadPoolSize, + new DefaultSlotMatcher(), + Duration.ofSeconds(30)); + + // Create a node + URI nodeUri = new URI("http://example:1234"); + Node node = + LocalNode.builder(tracer, bus, nodeUri, nodeUri, registrationSecret) + .add( + new ImmutableCapabilities("browserName", "cheese"), + new TestSessionFactory( + (id, c) -> + new Session(id, nodeUri, new ImmutableCapabilities(), c, Instant.now()))) + .build(); + + // Add node to distributor + distributor.add(node); + + // Initially, node should be available + Set availableFreeNodes = distributor.getAvailableNodes(); + assertThat(availableFreeNodes).hasSize(1); + + // Remove the node (simulates DOWN state) + distributor.remove(node.getId()); + + // After removal, node should not be returned by getAvailableNodes + availableFreeNodes = distributor.getAvailableNodes(); + assertThat(availableFreeNodes).isEmpty(); + } + + @Test + void shouldReduceRedundantSlotChecks() throws URISyntaxException { + // Create a distributor + NewSessionQueue queue = + new LocalNewSessionQueue( + tracer, + new DefaultSlotMatcher(), + Duration.ofSeconds(2), + Duration.ofSeconds(2), + Duration.ofSeconds(1), + registrationSecret, + 5); + LocalDistributor distributor = + new LocalDistributor( + tracer, + bus, + new PassthroughHttpClient.Factory(localNode), + new LocalSessionMap(tracer, bus), + queue, + new DefaultSlotSelector(), + registrationSecret, + Duration.ofMinutes(5), + false, + Duration.ofSeconds(5), + newSessionThreadPoolSize, + new DefaultSlotMatcher(), + Duration.ofSeconds(30)); + + // Create multiple nodes, some with free slots, some fully occupied + List nodes = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + URI nodeUri = new URI("http://example:" + (1234 + i)); + Node node = + LocalNode.builder(tracer, bus, nodeUri, nodeUri, registrationSecret) + .add( + new ImmutableCapabilities("browserName", "cheese"), + new TestSessionFactory( + (id, c) -> + new Session(id, nodeUri, new ImmutableCapabilities(), c, Instant.now()))) + .build(); + nodes.add(node); + distributor.add(node); + } + + // Occupy slots on first 3 nodes + for (int i = 0; i < 3; i++) { + SessionRequest sessionRequest = + new SessionRequest( + new RequestId(UUID.randomUUID()), + Instant.now(), + Set.of(W3C), + Set.of(new ImmutableCapabilities("browserName", "cheese")), + Map.of(), + Map.of()); + distributor.newSession(sessionRequest); + } + + // getAvailableNodes should only return the 2 nodes with free slots + Set availableFreeNodes = distributor.getAvailableNodes(); + assertThat(availableFreeNodes).hasSize(2); + + // Verify all returned nodes have free slots + for (NodeStatus nodeStatus : availableFreeNodes) { + boolean hasFreeSlot = + nodeStatus.getSlots().stream().anyMatch(slot -> slot.getSession() == null); + assertThat(hasFreeSlot).isTrue(); + assertThat(nodeStatus.getAvailability()).isEqualTo(UP); + } + } + + @Test + void shouldHandleAllNodesFullyOccupied() throws URISyntaxException { + // Create a distributor + NewSessionQueue queue = + new LocalNewSessionQueue( + tracer, + new DefaultSlotMatcher(), + Duration.ofSeconds(2), + Duration.ofSeconds(2), + Duration.ofSeconds(1), + registrationSecret, + 5); + LocalDistributor distributor = + new LocalDistributor( + tracer, + bus, + new PassthroughHttpClient.Factory(localNode), + new LocalSessionMap(tracer, bus), + queue, + new DefaultSlotSelector(), + registrationSecret, + Duration.ofMinutes(5), + false, + Duration.ofSeconds(5), + newSessionThreadPoolSize, + new DefaultSlotMatcher(), + Duration.ofSeconds(30)); + + // Create nodes with single slot each + List nodes = new ArrayList<>(); + for (int i = 0; i < 3; i++) { + URI nodeUri = new URI("http://example:" + (1234 + i)); + Node node = + LocalNode.builder(tracer, bus, nodeUri, nodeUri, registrationSecret) + .add( + new ImmutableCapabilities("browserName", "cheese"), + new TestSessionFactory( + (id, c) -> + new Session(id, nodeUri, new ImmutableCapabilities(), c, Instant.now()))) + .build(); + nodes.add(node); + distributor.add(node); + } + + // Occupy all slots + for (int i = 0; i < 3; i++) { + SessionRequest sessionRequest = + new SessionRequest( + new RequestId(UUID.randomUUID()), + Instant.now(), + Set.of(W3C), + Set.of(new ImmutableCapabilities("browserName", "cheese")), + Map.of(), + Map.of()); + distributor.newSession(sessionRequest); + } + + // getAvailableNodes should return empty set when all nodes are fully occupied + Set availableFreeNodes = distributor.getAvailableNodes(); + assertThat(availableFreeNodes).isEmpty(); + } + private class Handler extends Session implements HttpHandler { private Handler(Capabilities capabilities) {