diff --git a/java/src/org/openqa/selenium/grid/distributor/GridModel.java b/java/src/org/openqa/selenium/grid/distributor/GridModel.java index 49892f2680a4c..b975a16082d94 100644 --- a/java/src/org/openqa/selenium/grid/distributor/GridModel.java +++ b/java/src/org/openqa/selenium/grid/distributor/GridModel.java @@ -17,516 +17,97 @@ package org.openqa.selenium.grid.distributor; -import static org.openqa.selenium.grid.data.Availability.DOWN; -import static org.openqa.selenium.grid.data.Availability.DRAINING; -import static org.openqa.selenium.grid.data.Availability.UP; - -import com.google.common.collect.ImmutableSet; -import java.time.Instant; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.Map; -import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; -import java.util.logging.Logger; -import org.openqa.selenium.events.EventBus; -import org.openqa.selenium.grid.config.Config; import org.openqa.selenium.grid.data.Availability; -import org.openqa.selenium.grid.data.NodeDrainStarted; import org.openqa.selenium.grid.data.NodeId; -import org.openqa.selenium.grid.data.NodeRemovedEvent; -import org.openqa.selenium.grid.data.NodeRestartedEvent; import org.openqa.selenium.grid.data.NodeStatus; import org.openqa.selenium.grid.data.Session; -import org.openqa.selenium.grid.data.SessionClosedEvent; -import org.openqa.selenium.grid.data.Slot; import org.openqa.selenium.grid.data.SlotId; -import org.openqa.selenium.grid.server.EventBusOptions; -import org.openqa.selenium.internal.Debug; -import org.openqa.selenium.internal.Require; import org.openqa.selenium.remote.SessionId; -public class GridModel { - - private static final SessionId RESERVED = new SessionId("reserved"); - private static final Logger LOG = Logger.getLogger(GridModel.class.getName()); - // How many times a node's heartbeat duration needs to be exceeded before the node is considered - // purgeable. - private static final int PURGE_TIMEOUT_MULTIPLIER = 4; - private static final int UNHEALTHY_THRESHOLD = 4; - private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true); - private final Set nodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); - private final Map nodePurgeTimes = new ConcurrentHashMap<>(); - private final Map nodeHealthCount = new ConcurrentHashMap<>(); - private final EventBus events; - - public GridModel(EventBus events) { - this.events = Require.nonNull("Event bus", events); - - this.events.addListener(NodeDrainStarted.listener(nodeId -> setAvailability(nodeId, DRAINING))); - this.events.addListener(SessionClosedEvent.listener(this::release)); - } - - public static GridModel create(Config config) { - EventBus bus = new EventBusOptions(config).getEventBus(); - - return new GridModel(bus); - } - - public void add(NodeStatus node) { - Require.nonNull("Node", node); - - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - // If we've already added the node, remove it. - Iterator iterator = nodes.iterator(); - while (iterator.hasNext()) { - NodeStatus next = iterator.next(); - - // If the ID and the URI are the same, use the same - // availability as the version we have now: we're just refreshing - // an existing node. - if (next.getNodeId().equals(node.getNodeId()) - && next.getExternalUri().equals(node.getExternalUri())) { - iterator.remove(); - - LOG.log(Debug.getDebugLogLevel(), "Refreshing node with id {0}", node.getNodeId()); - NodeStatus refreshed = rewrite(node, next.getAvailability()); - nodes.add(refreshed); - nodePurgeTimes.put(refreshed.getNodeId(), Instant.now()); - updateHealthCheckCount(refreshed.getNodeId(), refreshed.getAvailability()); - - return; - } - - // If the URI is the same but NodeId is different, then the Node has restarted - if (!next.getNodeId().equals(node.getNodeId()) - && next.getExternalUri().equals(node.getExternalUri())) { - LOG.info( - String.format( - "Re-adding node with id %s and URI %s.", - node.getNodeId(), node.getExternalUri())); - - // Send the previous state to allow cleaning up the old node related resources. - // Nodes are initially added in the "down" state, so the new state must be ignored. - events.fire(new NodeRestartedEvent(next)); - iterator.remove(); - break; - } - - // If the URI has changed, then assume this is a new node and fall - // out of the loop: we want to add it as `DOWN` until something - // changes our mind. - if (next.getNodeId().equals(node.getNodeId())) { - LOG.info( - String.format( - "Re-adding node with id %s and URI %s.", - node.getNodeId(), node.getExternalUri())); - iterator.remove(); - break; - } - } - - // Nodes are initially added in the "down" state until something changes their availability - LOG.log( - Debug.getDebugLogLevel(), - "Adding node with id {0} and URI {1}", - new Object[] {node.getNodeId(), node.getExternalUri()}); - NodeStatus refreshed = rewrite(node, DOWN); - nodes.add(refreshed); - nodePurgeTimes.put(refreshed.getNodeId(), Instant.now()); - updateHealthCheckCount(refreshed.getNodeId(), refreshed.getAvailability()); - } finally { - writeLock.unlock(); - } - } - - public void refresh(NodeStatus status) { - Require.nonNull("Node status", status); - - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - Iterator iterator = nodes.iterator(); - while (iterator.hasNext()) { - NodeStatus node = iterator.next(); - - if (node.getNodeId().equals(status.getNodeId())) { - iterator.remove(); - - // if the node was marked as "down", keep it down until a healthcheck passes: - // just because the node can hit the event bus doesn't mean it's reachable - if (node.getAvailability() == DOWN) { - nodes.add(rewrite(status, DOWN)); - } else { - // Otherwise, trust what it tells us. - nodes.add(status); - } - - nodePurgeTimes.put(status.getNodeId(), Instant.now()); - - return; - } - } - } finally { - writeLock.unlock(); - } - } - - public void touch(NodeStatus nodeStatus) { - Require.nonNull("Node ID", nodeStatus); - - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - NodeStatus node = getNode(nodeStatus.getNodeId()); - if (node != null) { - nodePurgeTimes.put(node.getNodeId(), Instant.now()); - // Covers the case where the Node might be DOWN in the Grid model (e.g. Node lost - // connectivity for a while). The Node reports itself back as UP. - if (node.getAvailability() != nodeStatus.getAvailability() - && nodeStatus.getAvailability() == UP) { - nodes.remove(node); - nodes.add(nodeStatus); - } - } - } finally { - writeLock.unlock(); - } - } - - public void remove(NodeId id) { - Require.nonNull("Node ID", id); - - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - nodes.removeIf(n -> n.getNodeId().equals(id)); - nodePurgeTimes.remove(id); - nodeHealthCount.remove(id); - } finally { - writeLock.unlock(); - } - } - - public void purgeDeadNodes() { - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - Map replacements = new HashMap<>(); - Set toRemove = new HashSet<>(); - - for (NodeStatus node : nodes) { - NodeId id = node.getNodeId(); - if (nodeHealthCount.getOrDefault(id, 0) > UNHEALTHY_THRESHOLD) { - LOG.info( - String.format( - "Removing Node %s (uri: %s), unhealthy threshold has been reached", - node.getNodeId(), node.getExternalUri())); - toRemove.add(node); - break; - } - - Instant now = Instant.now(); - Instant lastTouched = nodePurgeTimes.getOrDefault(id, Instant.now()); - Instant lostTime = - lastTouched.plus(node.getHeartbeatPeriod().multipliedBy(PURGE_TIMEOUT_MULTIPLIER / 2)); - Instant deadTime = - lastTouched.plus(node.getHeartbeatPeriod().multipliedBy(PURGE_TIMEOUT_MULTIPLIER)); - - if (node.getAvailability() == UP && lostTime.isBefore(now)) { - LOG.info( - String.format( - "Switching Node %s (uri: %s) from UP to DOWN", - node.getNodeId(), node.getExternalUri())); - replacements.put(node, rewrite(node, DOWN)); - nodePurgeTimes.put(id, Instant.now()); - } else if (node.getAvailability() == DOWN && deadTime.isBefore(now)) { - LOG.info( - String.format( - "Removing Node %s (uri: %s), DOWN for too long", - node.getNodeId(), node.getExternalUri())); - toRemove.add(node); - } - } - - replacements.forEach( - (before, after) -> { - nodes.remove(before); - nodes.add(after); - }); - toRemove.forEach( - node -> { - nodes.remove(node); - nodePurgeTimes.remove(node.getNodeId()); - nodeHealthCount.remove(node.getNodeId()); - events.fire(new NodeRemovedEvent(node)); - }); - } finally { - writeLock.unlock(); - } - } - - public void setAvailability(NodeId id, Availability availability) { - Require.nonNull("Node ID", id); - Require.nonNull("Availability", availability); - - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - NodeStatus node = getNode(id); - - if (node == null) { - return; - } - - if (availability.equals(node.getAvailability())) { - if (node.getAvailability() == UP) { - nodePurgeTimes.put(node.getNodeId(), Instant.now()); - } - } else { - LOG.info( - String.format( - "Switching Node %s (uri: %s) from %s to %s", - id, node.getExternalUri(), node.getAvailability(), availability)); - - NodeStatus refreshed = rewrite(node, availability); - nodes.remove(node); - nodes.add(refreshed); - nodePurgeTimes.put(node.getNodeId(), Instant.now()); - } - } finally { - writeLock.unlock(); - } - } - - public boolean reserve(SlotId slotId) { - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - NodeStatus node = getNode(slotId.getOwningNodeId()); - if (node == null) { - LOG.warning( - String.format( - "Asked to reserve slot on node %s, but unable to find node", - slotId.getOwningNodeId())); - return false; - } - - if (!UP.equals(node.getAvailability())) { - LOG.warning( - String.format( - "Asked to reserve a slot on node %s, but node is %s", - slotId.getOwningNodeId(), node.getAvailability())); - return false; - } - - Optional maybeSlot = - node.getSlots().stream().filter(slot -> slotId.equals(slot.getId())).findFirst(); - - if (!maybeSlot.isPresent()) { - LOG.warning( - String.format( - "Asked to reserve slot on node %s, but no slot with id %s found", - node.getNodeId(), slotId)); - return false; - } - - reserve(node, maybeSlot.get()); - return true; - } finally { - writeLock.unlock(); - } - } - - public Set getSnapshot() { - Lock readLock = this.lock.readLock(); - readLock.lock(); - try { - return ImmutableSet.copyOf(nodes); - } finally { - readLock.unlock(); - } - } - - private NodeStatus getNode(NodeId id) { - Require.nonNull("Node ID", id); - - Lock readLock = lock.readLock(); - readLock.lock(); - try { - return nodes.stream().filter(n -> n.getNodeId().equals(id)).findFirst().orElse(null); - } finally { - readLock.unlock(); - } - } - - private NodeStatus rewrite(NodeStatus status, Availability availability) { - return new NodeStatus( - status.getNodeId(), - status.getExternalUri(), - status.getMaxSessionCount(), - status.getSlots(), - availability, - status.getHeartbeatPeriod(), - status.getSessionTimeout(), - status.getVersion(), - status.getOsInfo()); - } - - public void release(SessionId id) { - if (id == null) { - return; - } - - LOG.info("Releasing slot for session id " + id); - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - for (NodeStatus node : nodes) { - for (Slot slot : node.getSlots()) { - if (slot.getSession() == null) { - continue; - } - - if (id.equals(slot.getSession().getId())) { - Slot released = - new Slot(slot.getId(), slot.getStereotype(), slot.getLastStarted(), null); - amend(node.getAvailability(), node, released); - return; - } - } - } - } finally { - writeLock.unlock(); - } - } +/** + * An abstract representation of the Grid's node state model. This abstraction allows for different + * implementations that can store state either locally or in an external datastore for high + * availability. + */ +public abstract class GridModel { /** - * A helper to reserve a slot of a node. The writeLock must be acquired outside to ensure the view - * of the NodeStatus is the current state, otherwise concurrent calls to amend will work with an - * outdated view of slots. + * Adds a node to the grid model, typically starting with DOWN availability until health checks + * pass. + * + * @param node The node status to add */ - private void reserve(NodeStatus status, Slot slot) { - Instant now = Instant.now(); - - Slot reserved = - new Slot( - slot.getId(), - slot.getStereotype(), - now, - new Session( - RESERVED, - status.getExternalUri(), - slot.getStereotype(), - slot.getStereotype(), - now)); - - amend(UP, status, reserved); - } - - public void setSession(SlotId slotId, Session session) { - Require.nonNull("Slot ID", slotId); - - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - NodeStatus node = getNode(slotId.getOwningNodeId()); - if (node == null) { - LOG.warning( - "Grid model and reality have diverged. Unable to find node " - + slotId.getOwningNodeId()); - return; - } - - Optional maybeSlot = - node.getSlots().stream().filter(slot -> slotId.equals(slot.getId())).findFirst(); - - if (!maybeSlot.isPresent()) { - LOG.warning("Grid model and reality have diverged. Unable to find slot " + slotId); - return; - } + public abstract void add(NodeStatus node); - Slot slot = maybeSlot.get(); - Session maybeSession = slot.getSession(); - if (maybeSession == null) { - LOG.warning("Grid model and reality have diverged. Slot is not reserved. " + slotId); - return; - } - - if (!RESERVED.equals(maybeSession.getId())) { - LOG.warning( - "Grid model and reality have diverged. Slot has session and is not reserved. " - + slotId); - return; - } + /** + * Refreshes a node's status in the grid model. + * + * @param status The updated node status + */ + public abstract void refresh(NodeStatus status); - Slot updated = - new Slot( - slot.getId(), - slot.getStereotype(), - session == null ? slot.getLastStarted() : session.getStartTime(), - session); + /** + * Updates the timestamp for a node to prevent it from being considered stale. May also update the + * node's availability if reported differently. + * + * @param nodeStatus The node status to update + */ + public abstract void touch(NodeStatus nodeStatus); - amend(node.getAvailability(), node, updated); - } finally { - writeLock.unlock(); - } - } + /** + * Removes a node from the grid model. + * + * @param id The ID of the node to remove + */ + public abstract void remove(NodeId id); - public void updateHealthCheckCount(NodeId id, Availability availability) { - Require.nonNull("Node ID", id); - Require.nonNull("Availability", availability); + /** Removes nodes that have been unresponsive for too long. */ + public abstract void purgeDeadNodes(); - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - int unhealthyCount = nodeHealthCount.getOrDefault(id, 0); + /** + * Sets the availability status for a node. + * + * @param id The ID of the node + * @param availability The new availability status + */ + public abstract void setAvailability(NodeId id, Availability availability); - // Keep track of consecutive number of times the Node health check fails - if (availability.equals(DOWN)) { - nodeHealthCount.put(id, unhealthyCount + 1); - } + /** + * Attempts to reserve a specific slot on a node. + * + * @param slotId The ID of the slot to reserve + * @return true if the reservation was successful, false otherwise + */ + public abstract boolean reserve(SlotId slotId); - // If the Node is healthy again before crossing the threshold, then reset the count. - if (unhealthyCount <= UNHEALTHY_THRESHOLD && availability.equals(UP)) { - nodeHealthCount.put(id, 0); - } - } finally { - writeLock.unlock(); - } - } + /** + * Gets a snapshot of all node statuses currently in the grid model. + * + * @return A set of node statuses + */ + public abstract Set getSnapshot(); /** - * A helper to replace the availability and a slot of a node. The writeLock must be acquired - * outside to ensure the view of the NodeStatus is the current state, otherwise concurrent calls - * to amend will work with an outdated view of slots. + * Releases a session, making its slot available again. + * + * @param id The ID of the session to release */ - private void amend(Availability availability, NodeStatus status, Slot slot) { - Set newSlots = new HashSet<>(status.getSlots()); - newSlots.removeIf(s -> s.getId().equals(slot.getId())); - newSlots.add(slot); + public abstract void release(SessionId id); - NodeStatus node = getNode(status.getNodeId()); + /** + * Updates a reserved slot to contain an actual session. + * + * @param slotId The ID of the slot to update + * @param session The session to associate with the slot, or null to clear + */ + public abstract void setSession(SlotId slotId, Session session); - nodes.remove(node); - nodes.add( - new NodeStatus( - status.getNodeId(), - status.getExternalUri(), - status.getMaxSessionCount(), - newSlots, - availability, - status.getHeartbeatPeriod(), - status.getSessionTimeout(), - status.getVersion(), - status.getOsInfo())); - } + /** + * Updates the health check count for a node based on its availability. + * + * @param id The ID of the node + * @param availability The current availability status + */ + public abstract void updateHealthCheckCount(NodeId id, Availability availability); } diff --git a/java/src/org/openqa/selenium/grid/distributor/NodeRegistry.java b/java/src/org/openqa/selenium/grid/distributor/NodeRegistry.java new file mode 100644 index 0000000000000..d8b0789b792bd --- /dev/null +++ b/java/src/org/openqa/selenium/grid/distributor/NodeRegistry.java @@ -0,0 +1,148 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.distributor; + +import java.io.Closeable; +import java.net.URI; +import java.util.Set; +import org.openqa.selenium.grid.data.Availability; +import org.openqa.selenium.grid.data.DistributorStatus; +import org.openqa.selenium.grid.data.NodeId; +import org.openqa.selenium.grid.data.NodeStatus; +import org.openqa.selenium.grid.data.Session; +import org.openqa.selenium.grid.data.SlotId; +import org.openqa.selenium.grid.node.Node; +import org.openqa.selenium.status.HasReadyState; + +/** + * Maintains a registry of the nodes available for a {@link + * org.openqa.selenium.grid.distributor.Distributor}. Implementations may store nodes in memory or + * in an external data store to allow for high availability configurations. + */ +public interface NodeRegistry extends HasReadyState, Closeable { + + /** + * Register a node status received from an event. + * + * @param status The node status to register. + */ + void register(NodeStatus status); + + /** + * Add a node to this registry. + * + * @param node The node to add. + */ + void add(Node node); + + /** + * Removes a node from this registry. + * + * @param nodeId The id of the node to remove. + */ + void remove(NodeId nodeId); + + /** + * Set a node to draining state. + * + * @param nodeId The id of the node to drain. + * @return true if the node was set to draining, false otherwise. + */ + boolean drain(NodeId nodeId); + + /** + * Updates a node's availability status. + * + * @param nodeUri The URI of the node. + * @param id The id of the node. + * @param availability The new availability status. + */ + void updateNodeAvailability(URI nodeUri, NodeId id, Availability availability); + + /** Refreshes all nodes by running a health check on each one. */ + void refresh(); + + /** + * Gets a snapshot of all registered nodes. + * + * @return The current status of the distributor. + */ + DistributorStatus getStatus(); + + /** + * Gets all available nodes that are not DOWN or DRAINING. + * + * @return Set of available node statuses. + */ + Set getAvailableNodes(); + + /** + * Gets a node by its ID. + * + * @param id The node ID to look up. + * @return The node, or null if not found. + */ + Node getNode(NodeId id); + + /** + * Gets the total number of nodes that are UP. + * + * @return The number of UP nodes. + */ + long getUpNodeCount(); + + /** + * Gets the total number of nodes that are DOWN. + * + * @return The number of DOWN nodes. + */ + long getDownNodeCount(); + + /** Run health checks on all nodes. */ + void runHealthChecks(); + + /** + * Reserve a slot for a session. + * + * @param slotId The slot ID to reserve. + * @return Whether the reservation was successful. + */ + boolean reserve(SlotId slotId); + + /** + * Set a session for a particular slot. + * + * @param slotId The slot ID. + * @param session The session to associate with the slot, or null to clear. + */ + void setSession(SlotId slotId, Session session); + + /** Get the number of active slots. */ + int getActiveSlots(); + + /** Get the number of idle slots. */ + int getIdleSlots(); + + /** + * Get node by URI. + * + * @param uri The node URI to look up. + * @return The node if found, null otherwise. + */ + Node getNode(URI uri); +} diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java index 835150eb94f01..bc448ae1ca1b1 100644 --- a/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java +++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalDistributor.java @@ -17,11 +17,7 @@ package org.openqa.selenium.grid.distributor.local; -import static com.google.common.collect.ImmutableSet.toImmutableSet; import static org.openqa.selenium.concurrent.ExecutorServices.shutdownGracefully; -import static org.openqa.selenium.grid.data.Availability.DOWN; -import static org.openqa.selenium.grid.data.Availability.DRAINING; -import static org.openqa.selenium.grid.data.Availability.UP; import static org.openqa.selenium.internal.Debug.getDebugLogLevel; import static org.openqa.selenium.remote.RemoteTags.CAPABILITIES; import static org.openqa.selenium.remote.RemoteTags.CAPABILITIES_EVENT; @@ -32,20 +28,15 @@ import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import java.io.Closeable; import java.io.UncheckedIOException; import java.net.URI; import java.time.Duration; -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -58,7 +49,6 @@ import java.util.stream.Collectors; import org.openqa.selenium.Beta; import org.openqa.selenium.Capabilities; -import org.openqa.selenium.HealthCheckFailedException; import org.openqa.selenium.ImmutableCapabilities; import org.openqa.selenium.RetrySessionRequestException; import org.openqa.selenium.SessionNotCreatedException; @@ -66,18 +56,11 @@ import org.openqa.selenium.concurrent.GuardedRunnable; import org.openqa.selenium.events.EventBus; import org.openqa.selenium.grid.config.Config; -import org.openqa.selenium.grid.data.Availability; import org.openqa.selenium.grid.data.CreateSessionRequest; import org.openqa.selenium.grid.data.CreateSessionResponse; import org.openqa.selenium.grid.data.DistributorStatus; -import org.openqa.selenium.grid.data.NodeAddedEvent; -import org.openqa.selenium.grid.data.NodeDrainComplete; -import org.openqa.selenium.grid.data.NodeHeartBeatEvent; import org.openqa.selenium.grid.data.NodeId; -import org.openqa.selenium.grid.data.NodeRemovedEvent; -import org.openqa.selenium.grid.data.NodeRestartedEvent; import org.openqa.selenium.grid.data.NodeStatus; -import org.openqa.selenium.grid.data.NodeStatusEvent; import org.openqa.selenium.grid.data.RequestId; import org.openqa.selenium.grid.data.Session; import org.openqa.selenium.grid.data.SessionRequest; @@ -87,16 +70,14 @@ import org.openqa.selenium.grid.data.SlotMatcher; import org.openqa.selenium.grid.data.TraceSessionRequest; import org.openqa.selenium.grid.distributor.Distributor; -import org.openqa.selenium.grid.distributor.GridModel; +import org.openqa.selenium.grid.distributor.NodeRegistry; import org.openqa.selenium.grid.distributor.config.DistributorOptions; import org.openqa.selenium.grid.distributor.selector.SlotSelector; import org.openqa.selenium.grid.jmx.JMXHelper; import org.openqa.selenium.grid.jmx.ManagedAttribute; import org.openqa.selenium.grid.jmx.ManagedService; import org.openqa.selenium.grid.log.LoggingOptions; -import org.openqa.selenium.grid.node.HealthCheck; import org.openqa.selenium.grid.node.Node; -import org.openqa.selenium.grid.node.remote.RemoteNode; import org.openqa.selenium.grid.security.Secret; import org.openqa.selenium.grid.security.SecretOptions; import org.openqa.selenium.grid.server.EventBusOptions; @@ -105,7 +86,6 @@ import org.openqa.selenium.grid.sessionmap.config.SessionMapOptions; import org.openqa.selenium.grid.sessionqueue.NewSessionQueue; import org.openqa.selenium.grid.sessionqueue.config.NewSessionQueueOptions; -import org.openqa.selenium.internal.Debug; import org.openqa.selenium.internal.Either; import org.openqa.selenium.internal.Require; import org.openqa.selenium.remote.SessionId; @@ -125,20 +105,16 @@ public class LocalDistributor extends Distributor implements Closeable { private static final Logger LOG = Logger.getLogger(LocalDistributor.class.getName()); - private static final SessionId RESERVED = new SessionId("reserved"); - private final Tracer tracer; private final EventBus bus; private final HttpClient.Factory clientFactory; private final SessionMap sessions; private final SlotSelector slotSelector; private final Secret registrationSecret; - private final Map allChecks = new HashMap<>(); private final Duration healthcheckInterval; + private final NodeRegistry nodeRegistry; private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true); - private final GridModel model; - private final Map nodes; private final SlotMatcher slotMatcher; private final Duration purgeNodesInterval; @@ -198,27 +174,22 @@ public LocalDistributor( this.slotSelector = Require.nonNull("Slot selector", slotSelector); this.registrationSecret = Require.nonNull("Registration secret", registrationSecret); this.healthcheckInterval = Require.nonNull("Health check interval", healthcheckInterval); - this.model = new GridModel(bus); - this.nodes = new ConcurrentHashMap<>(); this.rejectUnsupportedCaps = rejectUnsupportedCaps; this.slotMatcher = slotMatcher; this.purgeNodesInterval = purgeNodesInterval; Require.nonNull("Session request interval", sessionRequestRetryInterval); - bus.addListener(NodeStatusEvent.listener(this::register)); - bus.addListener(NodeStatusEvent.listener(model::refresh)); - bus.addListener( - NodeRestartedEvent.listener(previousNodeStatus -> remove(previousNodeStatus.getNodeId()))); - bus.addListener(NodeRemovedEvent.listener(nodeStatus -> remove(nodeStatus.getNodeId()))); - bus.addListener( - NodeHeartBeatEvent.listener( - nodeStatus -> { - if (nodes.containsKey(nodeStatus.getNodeId())) { - model.touch(nodeStatus); - } else { - register(nodeStatus); - } - })); + this.nodeRegistry = + new LocalNodeRegistry( + tracer, + bus, + newSessionThreadPoolSize, + this.clientFactory, + this.registrationSecret, + this.healthcheckInterval, + this.nodeHealthCheckService, + this.purgeNodesInterval, + this.purgeDeadNodesService); sessionCreatorExecutor = Executors.newFixedThreadPool( @@ -231,22 +202,6 @@ public LocalDistributor( }); NewSessionRunnable newSessionRunnable = new NewSessionRunnable(); - bus.addListener(NodeDrainComplete.listener(this::remove)); - - // Disable purge dead nodes service if interval is set to zero - if (!this.purgeNodesInterval.isZero()) { - purgeDeadNodesService.scheduleAtFixedRate( - GuardedRunnable.guard(model::purgeDeadNodes), - this.purgeNodesInterval.getSeconds(), - this.purgeNodesInterval.getSeconds(), - TimeUnit.SECONDS); - } - - nodeHealthCheckService.scheduleAtFixedRate( - runNodeHealthChecks(), - this.healthcheckInterval.toMillis(), - this.healthcheckInterval.toMillis(), - TimeUnit.MILLISECONDS); // if sessionRequestRetryInterval is 0, we will schedule session creation every 10 millis long period = @@ -298,226 +253,33 @@ public boolean isReady() { } } - private void register(NodeStatus status) { - Require.nonNull("Node", status); - - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - if (nodes.containsKey(status.getNodeId())) { - return; - } - - if (status.getAvailability() != UP) { - // A Node might be draining or down (in the case of Relay nodes) - // but the heartbeat is still running. - // We do not need to add this Node for now. - return; - } - - Set capabilities = - status.getSlots().stream() - .map(Slot::getStereotype) - .map(ImmutableCapabilities::copyOf) - .collect(toImmutableSet()); - - // A new node! Add this as a remote node, since we've not called add - RemoteNode remoteNode = - new RemoteNode( - tracer, - clientFactory, - status.getNodeId(), - status.getExternalUri(), - registrationSecret, - status.getSessionTimeout(), - capabilities); - - add(remoteNode); - } finally { - writeLock.unlock(); - } - } - @Override public LocalDistributor add(Node node) { - Require.nonNull("Node", node); - - // An exception occurs if Node heartbeat has started but the server is not ready. - // Unhandled exception blocks the event-bus thread from processing any event henceforth. - NodeStatus initialNodeStatus; - Runnable healthCheck; - try { - initialNodeStatus = node.getStatus(); - if (initialNodeStatus.getAvailability() != UP) { - // A Node might be draining or down (in the case of Relay nodes) - // but the heartbeat is still running. - // We do not need to add this Node for now. - return this; - } - // Extract the health check - healthCheck = asRunnableHealthCheck(node); - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - nodes.put(node.getId(), node); - model.add(initialNodeStatus); - allChecks.put(node.getId(), healthCheck); - } finally { - writeLock.unlock(); - } - } catch (Exception e) { - LOG.log( - Debug.getDebugLogLevel(), - String.format("Exception while adding Node %s", node.getUri()), - e); - return this; - } - - updateNodeAvailability( - initialNodeStatus.getExternalUri(), - initialNodeStatus.getNodeId(), - initialNodeStatus.getAvailability()); - - LOG.info( - String.format( - "Added node %s at %s. Health check every %ss", - node.getId(), node.getUri(), healthcheckInterval.toMillis() / 1000)); - - bus.fire(new NodeAddedEvent(node.getId())); - + nodeRegistry.add(node); return this; } - private Runnable runNodeHealthChecks() { - return () -> { - ImmutableMap nodeHealthChecks; - Lock readLock = this.lock.readLock(); - readLock.lock(); - try { - nodeHealthChecks = ImmutableMap.copyOf(allChecks); - } finally { - readLock.unlock(); - } - - for (Runnable nodeHealthCheck : nodeHealthChecks.values()) { - GuardedRunnable.guard(nodeHealthCheck).run(); - } - }; - } - - private Runnable asRunnableHealthCheck(Node node) { - HealthCheck healthCheck = node.getHealthCheck(); - NodeId id = node.getId(); - return () -> { - boolean checkFailed = false; - Exception failedCheckException = null; - LOG.log(getDebugLogLevel(), "Running healthcheck for Node " + node.getUri()); - - HealthCheck.Result result; - try { - result = healthCheck.check(); - } catch (Exception e) { - LOG.log(Level.WARNING, "Unable to process Node healthcheck " + id, e); - result = new HealthCheck.Result(DOWN, "Unable to run healthcheck. Assuming down"); - checkFailed = true; - failedCheckException = e; - } - - updateNodeAvailability(node.getUri(), id, result.getAvailability()); - if (checkFailed) { - throw new HealthCheckFailedException("Node " + id, failedCheckException); - } - }; - } - - private void updateNodeAvailability(URI nodeUri, NodeId id, Availability availability) { - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - LOG.log( - getDebugLogLevel(), - String.format("Health check result for %s was %s", nodeUri, availability)); - model.setAvailability(id, availability); - model.updateHealthCheckCount(id, availability); - } finally { - writeLock.unlock(); - } - } - @Override public boolean drain(NodeId nodeId) { - Node node = nodes.get(nodeId); - if (node == null) { - LOG.info("Asked to drain unregistered node " + nodeId); - return false; - } - - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - node.drain(); - model.setAvailability(nodeId, DRAINING); - } finally { - writeLock.unlock(); - } - - return node.isDraining(); + return nodeRegistry.drain(nodeId); } public void remove(NodeId nodeId) { - Lock writeLock = lock.writeLock(); - writeLock.lock(); - try { - Node node = nodes.remove(nodeId); - model.remove(nodeId); - allChecks.remove(nodeId); - - if (node instanceof RemoteNode) { - ((RemoteNode) node).close(); - } - } finally { - writeLock.unlock(); - } + nodeRegistry.remove(nodeId); } @Override public DistributorStatus getStatus() { - Lock readLock = this.lock.readLock(); - readLock.lock(); - try { - return new DistributorStatus(model.getSnapshot()); - } finally { - readLock.unlock(); - } + return nodeRegistry.getStatus(); } @Beta public void refresh() { - List allHealthChecks = new ArrayList<>(); - - Lock readLock = this.lock.readLock(); - readLock.lock(); - try { - allHealthChecks.addAll(allChecks.values()); - } finally { - readLock.unlock(); - } - - allHealthChecks.parallelStream().forEach(Runnable::run); + nodeRegistry.refresh(); } protected Set getAvailableNodes() { - Lock readLock = this.lock.readLock(); - readLock.lock(); - try { - return model.getSnapshot().stream() - .filter( - node -> - !DOWN.equals(node.getAvailability()) && !DRAINING.equals(node.getAvailability())) - .collect(toImmutableSet()); - } finally { - readLock.unlock(); - } + return nodeRegistry.getAvailableNodes(); } @Override @@ -583,7 +345,7 @@ public Either newSession( try { CreateSessionResponse response = startSession(selectedSlot, singleRequest); sessions.add(response.getSession()); - model.setSession(selectedSlot, response.getSession()); + nodeRegistry.setSession(selectedSlot, response.getSession()); SessionId sessionId = response.getSession().getId(); Capabilities sessionCaps = response.getSession().getCapabilities(); @@ -603,7 +365,7 @@ public Either newSession( return Either.right(response); } catch (SessionNotCreatedException e) { - model.setSession(selectedSlot, null); + nodeRegistry.setSession(selectedSlot, null); lastFailure = e; } } @@ -655,7 +417,7 @@ public Either newSession( private CreateSessionResponse startSession( SlotId selectedSlot, CreateSessionRequest singleRequest) { - Node node = nodes.get(selectedSlot.getOwningNodeId()); + Node node = nodeRegistry.getNode(selectedSlot.getOwningNodeId()); if (node == null) { throw new SessionNotCreatedException("Unable to find owning node for slot"); } @@ -704,8 +466,7 @@ private SlotId reserveSlot(RequestId requestId, Capabilities caps) { } private boolean isNotSupported(Capabilities caps) { - return getAvailableNodes().stream() - .noneMatch(node -> node.hasCapability(caps, slotMatcher) && node.getAvailability() == UP); + return getAvailableNodes().stream().noneMatch(node -> node.hasCapability(caps, slotMatcher)); } private boolean reserve(SlotId id) { @@ -714,13 +475,7 @@ private boolean reserve(SlotId id) { Lock writeLock = this.lock.writeLock(); writeLock.lock(); try { - Node node = nodes.get(id.getOwningNodeId()); - if (node == null) { - LOG.log(getDebugLogLevel(), String.format("Unable to find node with id %s", id)); - return false; - } - - return model.reserve(id); + return nodeRegistry.reserve(id); } finally { writeLock.unlock(); } @@ -729,36 +484,25 @@ private boolean reserve(SlotId id) { @VisibleForTesting @ManagedAttribute(name = "NodeUpCount") public long getUpNodeCount() { - return model.getSnapshot().stream() - .filter(nodeStatus -> nodeStatus.getAvailability().equals(UP)) - .count(); + return nodeRegistry.getUpNodeCount(); } @VisibleForTesting @ManagedAttribute(name = "NodeDownCount") public long getDownNodeCount() { - return model.getSnapshot().stream() - .filter(nodeStatus -> !nodeStatus.getAvailability().equals(UP)) - .count(); + return nodeRegistry.getDownNodeCount(); } @VisibleForTesting @ManagedAttribute(name = "ActiveSlots") public int getActiveSlots() { - return model.getSnapshot().stream() - .map(NodeStatus::getSlots) - .flatMap(Collection::stream) - .filter(slot -> slot.getSession() != null) - .filter(slot -> !slot.getSession().getId().equals(RESERVED)) - .mapToInt(slot -> 1) - .sum(); + return nodeRegistry.getActiveSlots(); } @VisibleForTesting @ManagedAttribute(name = "IdleSlots") public int getIdleSlots() { - return (int) - (model.getSnapshot().stream().map(NodeStatus::getSlots).count() - getActiveSlots()); + return nodeRegistry.getIdleSlots(); } @Override @@ -795,7 +539,7 @@ public void run() { // up starving a session request. Map stereotypes = getAvailableNodes().stream() - .filter(node -> node.hasCapacity() && node.getAvailability() == UP) + .filter(node -> node.hasCapacity()) .flatMap(node -> node.getSlots().stream().map(Slot::getStereotype)) .collect( Collectors.groupingBy(ImmutableCapabilities::copyOf, Collectors.counting())); @@ -850,8 +594,9 @@ private void handleNewSessionRequest(SessionRequest sessionRequest) { if (response.isLeft() && response.left() instanceof RetrySessionRequestException) { try (Span childSpan = span.createSpan("distributor.retry")) { - LOG.log( - Debug.getDebugLogLevel(), "Retrying {0}", sessionRequest.getDesiredCapabilities()); + if (LOG.isLoggable(getDebugLogLevel())) { + LOG.log(getDebugLogLevel(), "Retrying {0}", sessionRequest.getDesiredCapabilities()); + } boolean retried = sessionQueue.retryAddToQueue(sessionRequest); attributeMap.put("request.retry_add", retried); @@ -873,7 +618,7 @@ private void handleNewSessionRequest(SessionRequest sessionRequest) { + " dropped, stopping it to avoid stalled browser", reqId.toString()); Session session = response.right().getSession(); - Node node = getNodeFromURI(session.getUri()); + Node node = nodeRegistry.getNode(session.getUri()); if (node != null) { boolean deleted; try { @@ -902,11 +647,10 @@ protected Node getNodeFromURI(URI uri) { Lock readLock = this.lock.readLock(); readLock.lock(); try { + Set nodes = nodeRegistry.getAvailableNodes(); Optional nodeStatus = - model.getSnapshot().stream() - .filter(node -> node.getExternalUri().equals(uri)) - .findFirst(); - return nodeStatus.map(status -> nodes.get(status.getNodeId())).orElse(null); + nodes.stream().filter(node -> node.getExternalUri().equals(uri)).findFirst(); + return nodeStatus.map(status -> nodeRegistry.getNode(status.getNodeId())).orElse(null); } finally { readLock.unlock(); } diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalGridModel.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalGridModel.java new file mode 100644 index 0000000000000..1d6483faf6633 --- /dev/null +++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalGridModel.java @@ -0,0 +1,545 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.distributor.local; + +import static org.openqa.selenium.grid.data.Availability.DOWN; +import static org.openqa.selenium.grid.data.Availability.DRAINING; +import static org.openqa.selenium.grid.data.Availability.UP; + +import com.google.common.collect.ImmutableSet; +import java.time.Instant; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Logger; +import org.openqa.selenium.events.EventBus; +import org.openqa.selenium.grid.config.Config; +import org.openqa.selenium.grid.data.Availability; +import org.openqa.selenium.grid.data.NodeDrainStarted; +import org.openqa.selenium.grid.data.NodeId; +import org.openqa.selenium.grid.data.NodeRemovedEvent; +import org.openqa.selenium.grid.data.NodeRestartedEvent; +import org.openqa.selenium.grid.data.NodeStatus; +import org.openqa.selenium.grid.data.Session; +import org.openqa.selenium.grid.data.SessionClosedEvent; +import org.openqa.selenium.grid.data.Slot; +import org.openqa.selenium.grid.data.SlotId; +import org.openqa.selenium.grid.distributor.GridModel; +import org.openqa.selenium.grid.server.EventBusOptions; +import org.openqa.selenium.internal.Debug; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.SessionId; + +public class LocalGridModel extends GridModel { + + private static final SessionId RESERVED = new SessionId("reserved"); + private static final Logger LOG = Logger.getLogger(LocalGridModel.class.getName()); + // How many times a node's heartbeat duration needs to be exceeded before the node is considered + // purgeable. + private static final int PURGE_TIMEOUT_MULTIPLIER = 4; + private static final int UNHEALTHY_THRESHOLD = 4; + private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true); + private final Set nodes = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private final Map nodePurgeTimes = new ConcurrentHashMap<>(); + private final Map nodeHealthCount = new ConcurrentHashMap<>(); + private final EventBus events; + + public LocalGridModel(EventBus events) { + this.events = Require.nonNull("Event bus", events); + + this.events.addListener(NodeDrainStarted.listener(nodeId -> setAvailability(nodeId, DRAINING))); + this.events.addListener(SessionClosedEvent.listener(this::release)); + } + + public static LocalGridModel create(Config config) { + EventBus bus = new EventBusOptions(config).getEventBus(); + + return new LocalGridModel(bus); + } + + @Override + public void add(NodeStatus node) { + Require.nonNull("Node", node); + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + // If we've already added the node, remove it. + Iterator iterator = nodes.iterator(); + while (iterator.hasNext()) { + NodeStatus next = iterator.next(); + + // If the ID and the URI are the same, use the same + // availability as the version we have now: we're just refreshing + // an existing node. + if (next.getNodeId().equals(node.getNodeId()) + && next.getExternalUri().equals(node.getExternalUri())) { + iterator.remove(); + + LOG.log(Debug.getDebugLogLevel(), "Refreshing node with id {0}", node.getNodeId()); + NodeStatus refreshed = rewrite(node, next.getAvailability()); + nodes.add(refreshed); + nodePurgeTimes.put(refreshed.getNodeId(), Instant.now()); + updateHealthCheckCount(refreshed.getNodeId(), refreshed.getAvailability()); + + return; + } + + // If the URI is the same but NodeId is different, then the Node has restarted + if (!next.getNodeId().equals(node.getNodeId()) + && next.getExternalUri().equals(node.getExternalUri())) { + LOG.info( + String.format( + "Re-adding node with id %s and URI %s.", + node.getNodeId(), node.getExternalUri())); + + // Send the previous state to allow cleaning up the old node related resources. + // Nodes are initially added in the "down" state, so the new state must be ignored. + events.fire(new NodeRestartedEvent(next)); + iterator.remove(); + break; + } + + // If the URI has changed, then assume this is a new node and fall + // out of the loop: we want to add it as `DOWN` until something + // changes our mind. + if (next.getNodeId().equals(node.getNodeId())) { + LOG.info( + String.format( + "Re-adding node with id %s and URI %s.", + node.getNodeId(), node.getExternalUri())); + iterator.remove(); + break; + } + } + + // Nodes are initially added in the "down" state until something changes their availability + LOG.log( + Debug.getDebugLogLevel(), + "Adding node with id {0} and URI {1}", + new Object[] {node.getNodeId(), node.getExternalUri()}); + NodeStatus refreshed = rewrite(node, DOWN); + nodes.add(refreshed); + nodePurgeTimes.put(refreshed.getNodeId(), Instant.now()); + updateHealthCheckCount(refreshed.getNodeId(), refreshed.getAvailability()); + } finally { + writeLock.unlock(); + } + } + + @Override + public void refresh(NodeStatus status) { + Require.nonNull("Node status", status); + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + Iterator iterator = nodes.iterator(); + while (iterator.hasNext()) { + NodeStatus node = iterator.next(); + + if (node.getNodeId().equals(status.getNodeId())) { + iterator.remove(); + + // if the node was marked as "down", keep it down until a healthcheck passes: + // just because the node can hit the event bus doesn't mean it's reachable + if (node.getAvailability() == DOWN) { + nodes.add(rewrite(status, DOWN)); + } else { + // Otherwise, trust what it tells us. + nodes.add(status); + } + + nodePurgeTimes.put(status.getNodeId(), Instant.now()); + + return; + } + } + } finally { + writeLock.unlock(); + } + } + + @Override + public void touch(NodeStatus nodeStatus) { + Require.nonNull("Node ID", nodeStatus); + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + NodeStatus node = getNode(nodeStatus.getNodeId()); + if (node != null) { + nodePurgeTimes.put(node.getNodeId(), Instant.now()); + // Covers the case where the Node might be DOWN in the Grid model (e.g. Node lost + // connectivity for a while). The Node reports itself back as UP. + if (node.getAvailability() != nodeStatus.getAvailability() + && nodeStatus.getAvailability() == UP) { + nodes.remove(node); + nodes.add(nodeStatus); + } + } + } finally { + writeLock.unlock(); + } + } + + @Override + public void remove(NodeId id) { + Require.nonNull("Node ID", id); + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + nodes.removeIf(n -> n.getNodeId().equals(id)); + nodePurgeTimes.remove(id); + nodeHealthCount.remove(id); + } finally { + writeLock.unlock(); + } + } + + @Override + public void purgeDeadNodes() { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + Map replacements = new HashMap<>(); + Set toRemove = new HashSet<>(); + + for (NodeStatus node : nodes) { + NodeId id = node.getNodeId(); + if (nodeHealthCount.getOrDefault(id, 0) > UNHEALTHY_THRESHOLD) { + LOG.info( + String.format( + "Removing Node %s (uri: %s), unhealthy threshold has been reached", + node.getNodeId(), node.getExternalUri())); + toRemove.add(node); + break; + } + + Instant now = Instant.now(); + Instant lastTouched = nodePurgeTimes.getOrDefault(id, Instant.now()); + Instant lostTime = + lastTouched.plus( + node.getHeartbeatPeriod().multipliedBy(PURGE_TIMEOUT_MULTIPLIER).dividedBy(2)); + Instant deadTime = + lastTouched.plus(node.getHeartbeatPeriod().multipliedBy(PURGE_TIMEOUT_MULTIPLIER)); + + if (node.getAvailability() == UP && lostTime.isBefore(now)) { + LOG.info( + String.format( + "Switching Node %s (uri: %s) from UP to DOWN", + node.getNodeId(), node.getExternalUri())); + replacements.put(node, rewrite(node, DOWN)); + nodePurgeTimes.put(id, Instant.now()); + } else if (node.getAvailability() == DOWN && deadTime.isBefore(now)) { + LOG.info( + String.format( + "Removing Node %s (uri: %s), DOWN for too long", + node.getNodeId(), node.getExternalUri())); + toRemove.add(node); + } + } + + replacements.forEach( + (before, after) -> { + nodes.remove(before); + nodes.add(after); + }); + toRemove.forEach( + node -> { + nodes.remove(node); + nodePurgeTimes.remove(node.getNodeId()); + nodeHealthCount.remove(node.getNodeId()); + events.fire(new NodeRemovedEvent(node)); + }); + } finally { + writeLock.unlock(); + } + } + + @Override + public void setAvailability(NodeId id, Availability availability) { + Require.nonNull("Node ID", id); + Require.nonNull("Availability", availability); + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + NodeStatus node = getNode(id); + + if (node == null) { + return; + } + + if (availability.equals(node.getAvailability())) { + if (node.getAvailability() == UP) { + nodePurgeTimes.put(node.getNodeId(), Instant.now()); + } + } else { + LOG.info( + String.format( + "Switching Node %s (uri: %s) from %s to %s", + id, node.getExternalUri(), node.getAvailability(), availability)); + + NodeStatus refreshed = rewrite(node, availability); + nodes.remove(node); + nodes.add(refreshed); + nodePurgeTimes.put(node.getNodeId(), Instant.now()); + } + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean reserve(SlotId slotId) { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + NodeStatus node = getNode(slotId.getOwningNodeId()); + if (node == null) { + LOG.warning( + String.format( + "Asked to reserve slot on node %s, but unable to find node", + slotId.getOwningNodeId())); + return false; + } + + if (!UP.equals(node.getAvailability())) { + LOG.warning( + String.format( + "Asked to reserve a slot on node %s, but node is %s", + slotId.getOwningNodeId(), node.getAvailability())); + return false; + } + + Optional maybeSlot = + node.getSlots().stream().filter(slot -> slotId.equals(slot.getId())).findFirst(); + + if (!maybeSlot.isPresent()) { + LOG.warning( + String.format( + "Asked to reserve slot on node %s, but no slot with id %s found", + node.getNodeId(), slotId)); + return false; + } + + reserve(node, maybeSlot.get()); + return true; + } finally { + writeLock.unlock(); + } + } + + @Override + public Set getSnapshot() { + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + return ImmutableSet.copyOf(nodes); + } finally { + readLock.unlock(); + } + } + + private NodeStatus getNode(NodeId id) { + Require.nonNull("Node ID", id); + + Lock readLock = lock.readLock(); + readLock.lock(); + try { + return nodes.stream().filter(n -> n.getNodeId().equals(id)).findFirst().orElse(null); + } finally { + readLock.unlock(); + } + } + + private NodeStatus rewrite(NodeStatus status, Availability availability) { + return new NodeStatus( + status.getNodeId(), + status.getExternalUri(), + status.getMaxSessionCount(), + status.getSlots(), + availability, + status.getHeartbeatPeriod(), + status.getSessionTimeout(), + status.getVersion(), + status.getOsInfo()); + } + + @Override + public void release(SessionId id) { + if (id == null) { + return; + } + + LOG.info("Releasing slot for session id " + id); + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + for (NodeStatus node : nodes) { + for (Slot slot : node.getSlots()) { + if (slot.getSession() == null) { + continue; + } + + if (id.equals(slot.getSession().getId())) { + Slot released = + new Slot(slot.getId(), slot.getStereotype(), slot.getLastStarted(), null); + amend(node.getAvailability(), node, released); + return; + } + } + } + } finally { + writeLock.unlock(); + } + } + + @Override + public void setSession(SlotId slotId, Session session) { + Require.nonNull("Slot ID", slotId); + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + NodeStatus node = getNode(slotId.getOwningNodeId()); + if (node == null) { + LOG.warning( + "Grid model and reality have diverged. Unable to find node " + + slotId.getOwningNodeId()); + return; + } + + Optional maybeSlot = + node.getSlots().stream().filter(slot -> slotId.equals(slot.getId())).findFirst(); + + if (!maybeSlot.isPresent()) { + LOG.warning("Grid model and reality have diverged. Unable to find slot " + slotId); + return; + } + + Slot slot = maybeSlot.get(); + Session maybeSession = slot.getSession(); + if (maybeSession == null) { + LOG.warning("Grid model and reality have diverged. Slot is not reserved. " + slotId); + return; + } + + if (!RESERVED.equals(maybeSession.getId())) { + LOG.warning( + "Grid model and reality have diverged. Slot has session and is not reserved. " + + slotId); + return; + } + + Slot updated = + new Slot( + slot.getId(), + slot.getStereotype(), + session == null ? slot.getLastStarted() : session.getStartTime(), + session); + + amend(node.getAvailability(), node, updated); + } finally { + writeLock.unlock(); + } + } + + @Override + public void updateHealthCheckCount(NodeId id, Availability availability) { + Require.nonNull("Node ID", id); + Require.nonNull("Availability", availability); + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + int unhealthyCount = nodeHealthCount.getOrDefault(id, 0); + + // Keep track of consecutive number of times the Node health check fails + if (availability.equals(DOWN)) { + nodeHealthCount.put(id, unhealthyCount + 1); + } + + // If the Node is healthy again before crossing the threshold, then reset the count. + if (unhealthyCount <= UNHEALTHY_THRESHOLD && availability.equals(UP)) { + nodeHealthCount.put(id, 0); + } + } finally { + writeLock.unlock(); + } + } + + /** + * A helper to reserve a slot of a node. The writeLock must be acquired outside to ensure the view + * of the NodeStatus is the current state, otherwise concurrent calls to amend will work with an + * outdated view of slots. + */ + private void reserve(NodeStatus status, Slot slot) { + Instant now = Instant.now(); + + Slot reserved = + new Slot( + slot.getId(), + slot.getStereotype(), + now, + new Session( + RESERVED, + status.getExternalUri(), + slot.getStereotype(), + slot.getStereotype(), + now)); + + amend(UP, status, reserved); + } + + /** + * A helper to replace the availability and a slot of a node. The writeLock must be acquired + * outside to ensure the view of the NodeStatus is the current state, otherwise concurrent calls + * to amend will work with an outdated view of slots. + */ + private void amend(Availability availability, NodeStatus status, Slot slot) { + Set newSlots = new HashSet<>(status.getSlots()); + newSlots.removeIf(s -> s.getId().equals(slot.getId())); + newSlots.add(slot); + + NodeStatus node = getNode(status.getNodeId()); + + nodes.remove(node); + nodes.add( + new NodeStatus( + status.getNodeId(), + status.getExternalUri(), + status.getMaxSessionCount(), + newSlots, + availability, + status.getHeartbeatPeriod(), + status.getSessionTimeout(), + status.getVersion(), + status.getOsInfo())); + } +} diff --git a/java/src/org/openqa/selenium/grid/distributor/local/LocalNodeRegistry.java b/java/src/org/openqa/selenium/grid/distributor/local/LocalNodeRegistry.java new file mode 100644 index 0000000000000..54ef10b657a9a --- /dev/null +++ b/java/src/org/openqa/selenium/grid/distributor/local/LocalNodeRegistry.java @@ -0,0 +1,602 @@ +// Licensed to the Software Freedom Conservancy (SFC) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The SFC licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.openqa.selenium.grid.distributor.local; + +import static org.openqa.selenium.grid.data.Availability.DOWN; +import static org.openqa.selenium.grid.data.Availability.DRAINING; +import static org.openqa.selenium.grid.data.Availability.UP; +import static org.openqa.selenium.internal.Debug.getDebugLogLevel; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import java.net.URI; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import java.util.logging.Level; +import java.util.logging.Logger; +import java.util.stream.Collectors; +import org.openqa.selenium.HealthCheckFailedException; +import org.openqa.selenium.concurrent.GuardedRunnable; +import org.openqa.selenium.events.EventBus; +import org.openqa.selenium.grid.data.Availability; +import org.openqa.selenium.grid.data.DistributorStatus; +import org.openqa.selenium.grid.data.NodeAddedEvent; +import org.openqa.selenium.grid.data.NodeDrainComplete; +import org.openqa.selenium.grid.data.NodeHeartBeatEvent; +import org.openqa.selenium.grid.data.NodeId; +import org.openqa.selenium.grid.data.NodeRemovedEvent; +import org.openqa.selenium.grid.data.NodeRestartedEvent; +import org.openqa.selenium.grid.data.NodeStatus; +import org.openqa.selenium.grid.data.NodeStatusEvent; +import org.openqa.selenium.grid.data.Session; +import org.openqa.selenium.grid.data.SlotId; +import org.openqa.selenium.grid.distributor.GridModel; +import org.openqa.selenium.grid.distributor.NodeRegistry; +import org.openqa.selenium.grid.node.HealthCheck; +import org.openqa.selenium.grid.node.Node; +import org.openqa.selenium.grid.node.remote.RemoteNode; +import org.openqa.selenium.grid.security.Secret; +import org.openqa.selenium.internal.Require; +import org.openqa.selenium.remote.SessionId; +import org.openqa.selenium.remote.http.HttpClient; +import org.openqa.selenium.remote.tracing.Tracer; +import org.openqa.selenium.status.HasReadyState; + +/** Default implementation of {@link NodeRegistry} that keeps nodes in memory. */ +public class LocalNodeRegistry implements NodeRegistry { + + private static final Logger LOG = Logger.getLogger(LocalNodeRegistry.class.getName()); + private static final SessionId RESERVED = new SessionId("reserved"); + + private final Tracer tracer; + private final EventBus bus; + private final HttpClient.Factory clientFactory; + private final Secret registrationSecret; + private final Duration healthcheckInterval; + private final GridModel model; + private final Map nodes; + private final Map allChecks = new ConcurrentHashMap<>(); + private final ReadWriteLock lock = new ReentrantReadWriteLock(/* fair */ true); + private final ScheduledExecutorService nodeHealthCheckService; + private final ExecutorService nodeHealthCheckExecutor; + private final Duration purgeNodesInterval; + private final ScheduledExecutorService purgeDeadNodesService; + private final int newSessionThreadPoolSize; + + public LocalNodeRegistry( + Tracer tracer, + EventBus bus, + int newSessionThreadPoolSize, + HttpClient.Factory clientFactory, + Secret registrationSecret, + Duration healthcheckInterval, + ScheduledExecutorService nodeHealthCheckService, + Duration purgeNodesInterval, + ScheduledExecutorService purgeDeadNodesService) { + this.tracer = Require.nonNull("Tracer", tracer); + this.bus = Require.nonNull("Event bus", bus); + this.clientFactory = Require.nonNull("HTTP client factory", clientFactory); + this.registrationSecret = Require.nonNull("Registration secret", registrationSecret); + this.healthcheckInterval = Require.nonNull("Health check interval", healthcheckInterval); + this.nodeHealthCheckService = + Require.nonNull("Node health check service", nodeHealthCheckService); + this.purgeNodesInterval = Require.nonNull("Purge nodes interval", purgeNodesInterval); + this.purgeDeadNodesService = Require.nonNull("Purge dead nodes service", purgeDeadNodesService); + this.newSessionThreadPoolSize = newSessionThreadPoolSize; + + this.model = new LocalGridModel(bus); + this.nodes = new ConcurrentHashMap<>(); + + // Register listeners for node events + this.bus.addListener(NodeStatusEvent.listener(this::register)); + this.bus.addListener(NodeStatusEvent.listener(model::refresh)); + this.bus.addListener( + NodeRestartedEvent.listener(previousNodeStatus -> remove(previousNodeStatus.getNodeId()))); + this.bus.addListener(NodeRemovedEvent.listener(nodeStatus -> remove(nodeStatus.getNodeId()))); + this.bus.addListener(NodeDrainComplete.listener(this::remove)); + this.bus.addListener( + NodeHeartBeatEvent.listener( + nodeStatus -> { + if (nodes.containsKey(nodeStatus.getNodeId())) { + model.touch(nodeStatus); + } else { + register(nodeStatus); + } + })); + + // Schedule regular health checks + this.nodeHealthCheckService.scheduleAtFixedRate( + GuardedRunnable.guard(this::runHealthChecks), + healthcheckInterval.toMillis(), + healthcheckInterval.toMillis(), + TimeUnit.MILLISECONDS); + + this.nodeHealthCheckExecutor = + Executors.newFixedThreadPool( + this.newSessionThreadPoolSize, + r -> { + Thread t = new Thread(r); + t.setName("node-health-check-" + t.getId()); + t.setDaemon(true); + return t; + }); + + // Schedule node purging if interval is non-zero + if (!this.purgeNodesInterval.isZero()) { + this.purgeDeadNodesService.scheduleAtFixedRate( + GuardedRunnable.guard(model::purgeDeadNodes), + this.purgeNodesInterval.getSeconds(), + this.purgeNodesInterval.getSeconds(), + TimeUnit.SECONDS); + } + } + + @Override + public void register(NodeStatus status) { + Require.nonNull("Node", status); + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + if (nodes.containsKey(status.getNodeId())) { + return; + } + + if (status.getAvailability() != UP) { + // A Node might be draining or down (in the case of Relay nodes) + // but the heartbeat is still running. + // We do not need to add this Node for now. + return; + } + + // A new node! Add this as a remote node, since we've not called add + RemoteNode remoteNode = + new RemoteNode( + tracer, + clientFactory, + status.getNodeId(), + status.getExternalUri(), + registrationSecret, + status.getSessionTimeout(), + status.getSlots().stream() + .map(slot -> slot.getStereotype()) + .collect(Collectors.toSet())); + + add(remoteNode); + } finally { + writeLock.unlock(); + } + } + + @Override + public void add(Node node) { + Require.nonNull("Node", node); + + // An exception occurs if Node heartbeat has started but the server is not ready. + // Unhandled exception blocks the event-bus thread from processing any event henceforth. + NodeStatus initialNodeStatus; + Runnable healthCheck; + try { + initialNodeStatus = node.getStatus(); + if (initialNodeStatus.getAvailability() != UP) { + // A Node might be draining or down (in the case of Relay nodes) + // but the heartbeat is still running. + // We do not need to add this Node for now. + return; + } + // Extract the health check + healthCheck = asRunnableHealthCheck(node); + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + nodes.put(node.getId(), node); + model.add(initialNodeStatus); + allChecks.put(node.getId(), healthCheck); + } finally { + writeLock.unlock(); + } + } catch (Exception e) { + LOG.log( + getDebugLogLevel(), String.format("Exception while adding Node %s", node.getUri()), e); + return; + } + + updateNodeAvailability( + initialNodeStatus.getExternalUri(), + initialNodeStatus.getNodeId(), + initialNodeStatus.getAvailability()); + + LOG.info( + String.format( + "Added node %s at %s. Health check every %ss", + node.getId(), node.getUri(), healthcheckInterval.toMillis() / 1000)); + + bus.fire(new NodeAddedEvent(node.getId())); + } + + @Override + public void remove(NodeId nodeId) { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + Node node = nodes.remove(nodeId); + model.remove(nodeId); + + allChecks.remove(nodeId); + + if (node instanceof RemoteNode) { + try { + ((RemoteNode) node).close(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Unable to close node properly: " + e.getMessage()); + } + } + + LOG.info(String.format("Node %s removed and all resources cleaned up", nodeId)); + } finally { + writeLock.unlock(); + } + } + + @Override + public boolean drain(NodeId nodeId) { + Node node = nodes.get(nodeId); + if (node == null) { + LOG.info("Asked to drain unregistered node " + nodeId); + return false; + } + + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + node.drain(); + model.setAvailability(nodeId, DRAINING); + } finally { + writeLock.unlock(); + } + + return node.isDraining(); + } + + @Override + public void updateNodeAvailability(URI nodeUri, NodeId id, Availability availability) { + Require.nonNull("Node URI", nodeUri); + Require.nonNull("Node ID", id); + Require.nonNull("Availability", availability); + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + LOG.log( + getDebugLogLevel(), + String.format("Health check result for %s was %s", nodeUri, availability)); + model.setAvailability(id, availability); + model.updateHealthCheckCount(id, availability); + } finally { + writeLock.unlock(); + } + } + + @Override + public void runHealthChecks() { + ImmutableMap nodeHealthChecks; + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + nodeHealthChecks = ImmutableMap.copyOf(allChecks); + } finally { + readLock.unlock(); + } + + if (nodeHealthChecks.isEmpty()) { + return; + } + + List checks = new ArrayList<>(nodeHealthChecks.values()); + int total = checks.size(); + + // Large deployments: process in parallel batches with controlled concurrency + int batchSize = Math.max(10, total / 10); + + List> batches = partition(checks, batchSize); + processBatchesInParallel(batches); + } + + @Override + public void refresh() { + List allHealthChecks = new ArrayList<>(); + + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + allHealthChecks.addAll(allChecks.values()); + } finally { + readLock.unlock(); + } + + allHealthChecks.parallelStream().forEach(Runnable::run); + } + + @Override + public DistributorStatus getStatus() { + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + return new DistributorStatus(model.getSnapshot()); + } finally { + readLock.unlock(); + } + } + + @Override + public Set getAvailableNodes() { + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + return model.getSnapshot().stream() + .filter( + node -> + !DOWN.equals(node.getAvailability()) && !DRAINING.equals(node.getAvailability())) + .collect(ImmutableSet.toImmutableSet()); + } finally { + readLock.unlock(); + } + } + + @Override + public Node getNode(NodeId id) { + return nodes.get(id); + } + + @Override + public long getUpNodeCount() { + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + return model.getSnapshot().stream() + .filter(node -> UP.equals(node.getAvailability())) + .collect(Collectors.toSet()) + .size(); + } finally { + readLock.unlock(); + } + } + + @Override + public long getDownNodeCount() { + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + return model.getSnapshot().stream() + .filter(node -> DOWN.equals(node.getAvailability())) + .collect(Collectors.toSet()) + .size(); + } finally { + readLock.unlock(); + } + } + + @Override + public boolean isReady() { + try { + return ImmutableSet.of(bus).parallelStream() + .map(HasReadyState::isReady) + .reduce(true, Boolean::logicalAnd); + } catch (RuntimeException e) { + return false; + } + } + + private void processBatchesInParallel(List> batches) { + if (batches.isEmpty()) { + return; + } + + // Process all batches with controlled parallelism + batches.forEach( + batch -> + nodeHealthCheckExecutor.submit( + () -> + batch.parallelStream() + .forEach( + r -> { + try { + r.run(); + } catch (Throwable t) { + LOG.log( + getDebugLogLevel(), + "Health check execution failed in batch", + t); + } + }))); + } + + private static List> partition(List list, int size) { + List> batches = new ArrayList<>(); + if (list.isEmpty() || size <= 0) { + return batches; + } + for (int i = 0; i < list.size(); i += size) { + int end = Math.min(i + size, list.size()); + batches.add(new ArrayList<>(list.subList(i, end))); + } + return batches; + } + + private Runnable asRunnableHealthCheck(Node node) { + HealthCheck healthCheck = node.getHealthCheck(); + NodeId id = node.getId(); + return () -> { + boolean checkFailed = false; + Exception failedCheckException = null; + LOG.log(getDebugLogLevel(), "Running healthcheck for Node " + node.getUri()); + + HealthCheck.Result result; + try { + result = healthCheck.check(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Unable to process Node healthcheck " + id, e); + result = new HealthCheck.Result(DOWN, "Unable to run healthcheck. Assuming down"); + checkFailed = true; + failedCheckException = e; + } + + updateNodeAvailability(node.getUri(), id, result.getAvailability()); + if (checkFailed) { + throw new HealthCheckFailedException("Node " + id, failedCheckException); + } + }; + } + + /** + * Get the GridModel used by this registry. This is primarily for use by the LocalDistributor. + * + * @return The GridModel instance + */ + public GridModel getModel() { + return model; + } + + @Override + public boolean reserve(SlotId slotId) { + Require.nonNull("Slot ID", slotId); + + Lock writeLock = this.lock.writeLock(); + writeLock.lock(); + try { + NodeId nodeId = slotId.getOwningNodeId(); + Node node = nodes.get(nodeId); + if (node == null) { + LOG.log(getDebugLogLevel(), String.format("Unable to find node with id %s", slotId)); + return false; + } + + // Try to reserve the slot in the model + try { + return model.reserve(slotId); + } catch (Exception e) { + LOG.log( + Level.WARNING, + String.format("Unable to reserve slot %s: %s", slotId, e.getMessage()), + e); + return false; + } + } finally { + writeLock.unlock(); + } + } + + @Override + public void setSession(SlotId slotId, Session session) { + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + model.setSession(slotId, session); + } finally { + writeLock.unlock(); + } + } + + @Override + public int getActiveSlots() { + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + return model.getSnapshot().stream() + .map(NodeStatus::getSlots) + .flatMap(Collection::stream) + .filter(slot -> slot.getSession() != null) + .filter(slot -> !slot.getSession().getId().equals(RESERVED)) + .mapToInt(slot -> 1) + .sum(); + } finally { + readLock.unlock(); + } + } + + @Override + public int getIdleSlots() { + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + return (int) + (model.getSnapshot().stream().flatMap(status -> status.getSlots().stream()).count() + - getActiveSlots()); + } finally { + readLock.unlock(); + } + } + + /** + * Get a node by its URI. + * + * @param uri The URI of the node to find + * @return The node if found, null otherwise + */ + public Node getNode(URI uri) { + Lock readLock = this.lock.readLock(); + readLock.lock(); + try { + Optional nodeStatus = + model.getSnapshot().stream() + .filter(node -> node.getExternalUri().equals(uri)) + .findFirst(); + + return nodeStatus.map(status -> nodes.get(status.getNodeId())).orElse(null); + } finally { + readLock.unlock(); + } + } + + @Override + public void close() { + LOG.info("Shutting down LocalNodeRegistry"); + Lock writeLock = lock.writeLock(); + writeLock.lock(); + try { + allChecks.clear(); + nodes + .values() + .forEach( + n -> { + if (n instanceof RemoteNode) { + try { + ((RemoteNode) n).close(); + } catch (Exception e) { + LOG.log(Level.WARNING, "Unable to close node properly: " + e.getMessage()); + } + } + }); + nodes.clear(); + } finally { + writeLock.unlock(); + } + } +}