diff --git a/java/src/org/openqa/selenium/grid/router/BUILD.bazel b/java/src/org/openqa/selenium/grid/router/BUILD.bazel index 70065bd707e4e..ee5aa11941209 100644 --- a/java/src/org/openqa/selenium/grid/router/BUILD.bazel +++ b/java/src/org/openqa/selenium/grid/router/BUILD.bazel @@ -24,5 +24,6 @@ java_library( "//java/src/org/openqa/selenium/remote", "//java/src/org/openqa/selenium/status", artifact("com.google.guava:guava"), + artifact("com.github.ben-manes.caffeine:caffeine"), ], ) diff --git a/java/src/org/openqa/selenium/grid/router/HandleSession.java b/java/src/org/openqa/selenium/grid/router/HandleSession.java index b199f0a51f356..c3211425b43fe 100644 --- a/java/src/org/openqa/selenium/grid/router/HandleSession.java +++ b/java/src/org/openqa/selenium/grid/router/HandleSession.java @@ -21,19 +21,25 @@ import static org.openqa.selenium.remote.RemoteTags.SESSION_ID; import static org.openqa.selenium.remote.RemoteTags.SESSION_ID_EVENT; import static org.openqa.selenium.remote.http.Contents.asJson; +import static org.openqa.selenium.remote.http.Contents.reader; +import static org.openqa.selenium.remote.http.HttpMethod.GET; import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION; import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST; import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST_EVENT; import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE; +import com.github.benmanes.caffeine.cache.Cache; +import com.github.benmanes.caffeine.cache.Caffeine; +import com.github.benmanes.caffeine.cache.Expiry; +import com.github.benmanes.caffeine.cache.RemovalCause; +import com.github.benmanes.caffeine.cache.RemovalListener; +import com.github.benmanes.caffeine.cache.Weigher; import java.io.Closeable; +import java.io.Reader; import java.net.URI; +import java.time.Duration; import java.time.Instant; -import java.time.temporal.ChronoUnit; -import java.util.Iterator; import java.util.concurrent.Callable; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; @@ -41,11 +47,12 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.openqa.selenium.NoSuchSessionException; -import org.openqa.selenium.concurrent.ExecutorServices; -import org.openqa.selenium.concurrent.GuardedRunnable; +import org.openqa.selenium.grid.data.NodeStatus; import org.openqa.selenium.grid.sessionmap.SessionMap; import org.openqa.selenium.grid.web.ReverseProxyHandler; import org.openqa.selenium.internal.Require; +import org.openqa.selenium.json.Json; +import org.openqa.selenium.json.JsonInput; import org.openqa.selenium.remote.ErrorCodec; import org.openqa.selenium.remote.SessionId; import org.openqa.selenium.remote.http.ClientConfig; @@ -64,17 +71,103 @@ class HandleSession implements HttpHandler, Closeable { private static final Logger LOG = Logger.getLogger(HandleSession.class.getName()); + /** + * Cache entry that tracks HttpClient usage and timing for connection reuse. Connection reuse + * criteria: - inUse must be 0 (no active requests) - lastUse must be older than readTimeout + * duration + */ private static class CacheEntry { private final HttpClient httpClient; private final AtomicLong inUse; - // volatile as the ConcurrentMap will not take care of synchronization + private final Duration readTimeout; + // volatile as the cache will access this from multiple threads private volatile Instant lastUse; - public CacheEntry(HttpClient httpClient, long initialUsage) { + public CacheEntry(HttpClient httpClient, Duration readTimeout, long initialUsage) { this.httpClient = httpClient; + this.readTimeout = readTimeout; this.inUse = new AtomicLong(initialUsage); this.lastUse = Instant.now(); } + + /** + * Checks if this connection can be reused based on usage and timeout criteria. + * + * @return true if connection is idle (inUse=0) and over readTimeout + */ + public boolean canBeReused() { + return inUse.get() == 0 && lastUse.isBefore(Instant.now().minus(readTimeout)); + } + + /** + * Checks if this connection should be expired based on usage and timeout criteria. + * + * @return true if connection is idle (inUse=0) and over readTimeout + */ + public boolean shouldExpire() { + return canBeReused(); // Same criteria for now + } + + /** + * Attempts to reuse this connection if criteria are met. + * + * @return true if connection was successfully reused, false otherwise + */ + public boolean tryReuse() { + // Double-check criteria under potential race conditions + if (canBeReused()) { + // Try to increment usage atomically - if successful, connection is reused + long currentUsage = inUse.get(); + if (currentUsage == 0 && inUse.compareAndSet(0, 1)) { + LOG.fine("Reusing idle connection: " + httpClient); + return true; + } + } + return false; + } + + /** Updates the last use time - called when connection is accessed */ + public void updateLastUse() { + this.lastUse = Instant.now(); + } + } + + /** + * Custom expiry policy that considers inUse count and lastUse time with readTimeout. This ensures + * entries are expired when they meet our reuse criteria. + */ + private static class ConnectionExpiry implements Expiry { + + @Override + public long expireAfterCreate(URI key, CacheEntry value, long currentTime) { + // Initial expiration time based on readTimeout + return value.readTimeout.toNanos(); + } + + @Override + public long expireAfterUpdate( + URI key, CacheEntry value, long currentTime, long currentDuration) { + // Reset expiration time when entry is updated (connection reused) + return value.readTimeout.toNanos(); + } + + @Override + public long expireAfterRead(URI key, CacheEntry value, long currentTime, long currentDuration) { + // Check if connection should expire based on our criteria + if (value.shouldExpire()) { + return 0; // Expire immediately + } + + // Calculate remaining time until expiration + Instant expireTime = value.lastUse.plus(value.readTimeout); + Duration remaining = Duration.between(Instant.now(), expireTime); + + if (remaining.isNegative() || remaining.isZero()) { + return 0; // Expire immediately + } + + return remaining.toNanos(); + } } private static class UsageCountingReverseProxyHandler extends ReverseProxyHandler @@ -84,68 +177,144 @@ private static class UsageCountingReverseProxyHandler extends ReverseProxyHandle public UsageCountingReverseProxyHandler( Tracer tracer, HttpClient httpClient, CacheEntry entry) { super(tracer, httpClient); - this.entry = entry; } @Override public void close() { - // set the last use here, to ensure we have to calculate the real inactivity of the client + // Update last use time and decrement usage count entry.lastUse = Instant.now(); entry.inUse.decrementAndGet(); } } + /** + * Custom weigher that implements "pinning" by assigning very high weight to entries with inUse > + * 0. This prevents Caffeine from evicting active connections during size-based eviction. + * + *

Weight Strategy: - inUse == 0: Weight = 1 (normal, can be evicted) - inUse > 0: Weight = + * Integer.MAX_VALUE (effectively pinned, won't be evicted) + */ + private static class InUseWeigher implements Weigher { + @Override + public int weigh(URI key, CacheEntry value) { + long inUse = value.inUse.get(); + int weight = inUse == 0 ? 1 : Integer.MAX_VALUE; + + LOG.finest( + "Weighing cache entry: " + + key + + ", inUse: " + + inUse + + ", weight: " + + (weight == Integer.MAX_VALUE ? "PINNED" : weight)); + + return weight; + } + } + + /** + * Enhanced removal listener that provides detailed information about eviction causes and tracks + * when pinned entries are removed. + */ + private static class DetailedRemovalListener implements RemovalListener { + @Override + public void onRemoval(URI key, CacheEntry entry, RemovalCause cause) { + if (entry != null) { + boolean wasPinned = entry.inUse.get() > 0; + + LOG.info( + "Removing HttpClient from cache: " + + key + + ", cause: " + + cause + + ", inUse: " + + entry.inUse.get() + + ", lastUse: " + + entry.lastUse + + ", wasPinned: " + + wasPinned + + (wasPinned && cause == RemovalCause.SIZE + ? " [WARNING: Pinned entry evicted!]" + : "")); + + try { + entry.httpClient.close(); + } catch (Exception ex) { + LOG.log(Level.WARNING, "Failed to close HttpClient during cache removal", ex); + } + } + } + } + private final Tracer tracer; private final HttpClient.Factory httpClientFactory; private final SessionMap sessions; - private final ConcurrentMap httpClients; - private final ScheduledExecutorService cleanUpHttpClientsCacheService; + private final Cache httpClientsCache; + private final ScheduledExecutorService cleanupExecutor; HandleSession(Tracer tracer, HttpClient.Factory httpClientFactory, SessionMap sessions) { this.tracer = Require.nonNull("Tracer", tracer); this.httpClientFactory = Require.nonNull("HTTP client factory", httpClientFactory); this.sessions = Require.nonNull("Sessions", sessions); - this.httpClients = new ConcurrentHashMap<>(); - - Runnable cleanUpHttpClients = - () -> { - Instant staleBefore = Instant.now().minus(2, ChronoUnit.MINUTES); - Iterator iterator = httpClients.values().iterator(); - - while (iterator.hasNext()) { - CacheEntry entry = iterator.next(); - - if (entry.inUse.get() != 0) { - // the client is currently in use - return; - } else if (!entry.lastUse.isBefore(staleBefore)) { - // the client was recently used - return; - } else { - // the client has not been used for a while, remove it from the cache - iterator.remove(); - - try { - entry.httpClient.close(); - } catch (Exception ex) { - LOG.log(Level.WARNING, "failed to close a stale httpclient", ex); - } - } - } - }; - - this.cleanUpHttpClientsCacheService = + // Configure Caffeine cache with custom expiry and connection reuse support + this.httpClientsCache = + Caffeine.newBuilder() + .maximumWeight(1000) // Maximum weight to prevent eviction of in-use entries + .weigher(new InUseWeigher()) // Weigher to prevent eviction of in-use entries + .expireAfter(new ConnectionExpiry()) // Custom expiry based on inUse and readTimeout + .removalListener(new DetailedRemovalListener()) // Detailed removal listener + .build(); + + // Schedule periodic cleanup to actively check for expired entries + this.cleanupExecutor = Executors.newSingleThreadScheduledExecutor( r -> { Thread thread = new Thread(r); thread.setDaemon(true); - thread.setName("HandleSession - Clean up http clients cache"); + thread.setName("HandleSession - Connection Cleanup"); return thread; }); - cleanUpHttpClientsCacheService.scheduleAtFixedRate( - GuardedRunnable.guard(cleanUpHttpClients), 1, 1, TimeUnit.MINUTES); + + // Run cleanup every 30 seconds to actively expire stale connections + cleanupExecutor.scheduleAtFixedRate( + () -> { + try { + LOG.fine("Running periodic connection cleanup"); + + // Force cache maintenance - this will trigger expiry checks + httpClientsCache.cleanUp(); + + // Additional manual check for entries that should be expired + httpClientsCache + .asMap() + .entrySet() + .removeIf( + entry -> { + CacheEntry cacheEntry = entry.getValue(); + if (cacheEntry.shouldExpire()) { + LOG.fine( + "Manually expiring connection: " + + entry.getKey() + + ", inUse: " + + cacheEntry.inUse.get() + + ", lastUse: " + + cacheEntry.lastUse); + return true; + } + return false; + }); + + LOG.fine( + "Connection cleanup completed. Cache size: " + httpClientsCache.estimatedSize()); + } catch (Exception e) { + LOG.log(Level.WARNING, "Error during connection cleanup", e); + } + }, + 120, + 60, + TimeUnit.SECONDS); } @Override @@ -220,42 +389,104 @@ private Callable loadSessionId( Tracer tracer, Span span, SessionId id) { return span.wrap( () -> { - CacheEntry cacheEntry = - httpClients.compute( - sessions.getUri(id), - (sessionUri, entry) -> { - if (entry != null) { - entry.inUse.incrementAndGet(); - return entry; - } - - ClientConfig config = - ClientConfig.defaultConfig().baseUri(sessionUri).withRetries(); - HttpClient httpClient = httpClientFactory.createClient(config); - - return new CacheEntry(httpClient, 1); - }); + URI sessionUri = sessions.getUri(id); - try { + // Try to get existing entry and reuse connection if possible + CacheEntry cacheEntry = httpClientsCache.getIfPresent(sessionUri); + + if (cacheEntry != null && cacheEntry.tryReuse()) { + // Successfully reused existing idle connection - update last use + cacheEntry.updateLastUse(); + LOG.fine("Reusing idle connection for session: " + id); return new UsageCountingReverseProxyHandler(tracer, cacheEntry.httpClient, cacheEntry); + } + + // Need to create new connection or existing one couldn't be reused + ClientConfig config = fetchNodeSessionTimeout(sessionUri); + HttpClient httpClient = httpClientFactory.createClient(config); + + // Create new cache entry with usage count of 1 + CacheEntry newEntry = new CacheEntry(httpClient, config.readTimeout(), 1); + + // Put in cache (this might evict old entries) + httpClientsCache.put(sessionUri, newEntry); + + LOG.fine( + "Created new HttpClient for session: " + + id + + ", readTimeout: " + + config.readTimeout().toSeconds() + + "s"); + + try { + return new UsageCountingReverseProxyHandler(tracer, newEntry.httpClient, newEntry); } catch (Throwable t) { - // ensure we do not keep the http client when an unexpected throwable is raised - cacheEntry.inUse.decrementAndGet(); + // Ensure we don't keep the http client when an unexpected throwable is raised + newEntry.inUse.decrementAndGet(); throw t; } }); } + private ClientConfig fetchNodeSessionTimeout(URI uri) { + ClientConfig config = ClientConfig.defaultConfig().baseUri(uri).withRetries(); + Duration sessionTimeout = config.readTimeout(); + HttpClient httpClient = httpClientFactory.createClient(config); + HttpRequest statusRequest = new HttpRequest(GET, "/status"); + try { + HttpResponse res = httpClient.execute(statusRequest); + Reader reader = reader(res); + Json JSON = new Json(); + JsonInput in = JSON.newInput(reader); + in.beginObject(); + // Skip everything until we find "value" + while (in.hasNext()) { + if ("value".equals(in.nextName())) { + in.beginObject(); + while (in.hasNext()) { + if ("node".equals(in.nextName())) { + NodeStatus nodeStatus = in.read(NodeStatus.class); + sessionTimeout = nodeStatus.getSessionTimeout(); + LOG.fine( + "Detected session timeout from node status (read timeout: " + + sessionTimeout.toSeconds() + + " seconds)"); + } else { + in.skipValue(); + } + } + in.endObject(); + } else { + in.skipValue(); + } + } + } catch (Exception e) { + LOG.fine( + "Use default from ClientConfig (read timeout: " + + config.readTimeout().toSeconds() + + " seconds)"); + } + config = config.readTimeout(sessionTimeout); + return config; + } + @Override public void close() { - ExecutorServices.shutdownGracefully( - "HandleSession - Clean up http clients cache", cleanUpHttpClientsCacheService); - httpClients - .values() - .removeIf( - (entry) -> { - entry.httpClient.close(); - return true; - }); + // Shutdown cleanup executor + if (cleanupExecutor != null && !cleanupExecutor.isShutdown()) { + cleanupExecutor.shutdown(); + try { + if (!cleanupExecutor.awaitTermination(5, TimeUnit.SECONDS)) { + cleanupExecutor.shutdownNow(); + } + } catch (InterruptedException e) { + cleanupExecutor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + // Clean shutdown of the cache - this will trigger removal listeners + httpClientsCache.invalidateAll(); + httpClientsCache.cleanUp(); } }