-
-
Notifications
You must be signed in to change notification settings - Fork 8.6k
[grid] Improve readTimeout and cache expiration of HttpClient between Router and Nodes #16154
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: trunk
Are you sure you want to change the base?
Changes from 1 commit
89ce721
0f2192a
05f2d38
5d3f2b7
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,31 +21,31 @@ | |
| import static org.openqa.selenium.remote.RemoteTags.SESSION_ID; | ||
| import static org.openqa.selenium.remote.RemoteTags.SESSION_ID_EVENT; | ||
| import static org.openqa.selenium.remote.http.Contents.asJson; | ||
| import static org.openqa.selenium.remote.http.Contents.reader; | ||
| import static org.openqa.selenium.remote.http.HttpMethod.GET; | ||
| import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION; | ||
| import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST; | ||
| import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST_EVENT; | ||
| import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE; | ||
|
|
||
| import com.github.benmanes.caffeine.cache.Cache; | ||
| import com.github.benmanes.caffeine.cache.Caffeine; | ||
| import com.github.benmanes.caffeine.cache.Expiry; | ||
| import com.github.benmanes.caffeine.cache.RemovalListener; | ||
| import java.io.Closeable; | ||
| import java.io.Reader; | ||
| import java.net.URI; | ||
| import java.time.Instant; | ||
| import java.time.temporal.ChronoUnit; | ||
| import java.util.Iterator; | ||
| import java.time.Duration; | ||
| import java.util.concurrent.Callable; | ||
| import java.util.concurrent.ConcurrentHashMap; | ||
| import java.util.concurrent.ConcurrentMap; | ||
| import java.util.concurrent.Executors; | ||
| import java.util.concurrent.ScheduledExecutorService; | ||
| import java.util.concurrent.TimeUnit; | ||
| import java.util.concurrent.atomic.AtomicLong; | ||
| import java.util.logging.Level; | ||
| import java.util.logging.Logger; | ||
| import org.openqa.selenium.NoSuchSessionException; | ||
| import org.openqa.selenium.concurrent.ExecutorServices; | ||
| import org.openqa.selenium.concurrent.GuardedRunnable; | ||
| import org.openqa.selenium.grid.data.NodeStatus; | ||
| import org.openqa.selenium.grid.sessionmap.SessionMap; | ||
| import org.openqa.selenium.grid.web.ReverseProxyHandler; | ||
| import org.openqa.selenium.internal.Require; | ||
| import org.openqa.selenium.json.Json; | ||
| import org.openqa.selenium.json.JsonInput; | ||
| import org.openqa.selenium.remote.ErrorCodec; | ||
| import org.openqa.selenium.remote.SessionId; | ||
| import org.openqa.selenium.remote.http.ClientConfig; | ||
|
|
@@ -60,92 +60,150 @@ | |
| import org.openqa.selenium.remote.tracing.Status; | ||
| import org.openqa.selenium.remote.tracing.Tracer; | ||
|
|
||
| class HandleSession implements HttpHandler, Closeable { | ||
| public class HandleSession implements HttpHandler, Closeable { | ||
|
|
||
| private static final Logger LOG = Logger.getLogger(HandleSession.class.getName()); | ||
|
|
||
| private static class CacheEntry { | ||
| private final HttpClient httpClient; | ||
| private final AtomicLong inUse; | ||
| // volatile as the ConcurrentMap will not take care of synchronization | ||
| private volatile Instant lastUse; | ||
|
|
||
| public CacheEntry(HttpClient httpClient, long initialUsage) { | ||
| this.httpClient = httpClient; | ||
| this.inUse = new AtomicLong(initialUsage); | ||
| this.lastUse = Instant.now(); | ||
| } | ||
| } | ||
|
|
||
| private static class UsageCountingReverseProxyHandler extends ReverseProxyHandler | ||
| private static class ReverseProxyHandlerCloseable extends ReverseProxyHandler | ||
| implements Closeable { | ||
| private final CacheEntry entry; | ||
|
|
||
| public UsageCountingReverseProxyHandler( | ||
| Tracer tracer, HttpClient httpClient, CacheEntry entry) { | ||
| public ReverseProxyHandlerCloseable(Tracer tracer, HttpClient httpClient) { | ||
| super(tracer, httpClient); | ||
|
|
||
| this.entry = entry; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| // set the last use here, to ensure we have to calculate the real inactivity of the client | ||
| entry.lastUse = Instant.now(); | ||
| entry.inUse.decrementAndGet(); | ||
| // No operation needed - cache management is handled by Cache builder | ||
| } | ||
| } | ||
|
|
||
| private final Tracer tracer; | ||
| private final HttpClient.Factory httpClientFactory; | ||
| private final SessionMap sessions; | ||
| private final ConcurrentMap<URI, CacheEntry> httpClients; | ||
| private final ScheduledExecutorService cleanUpHttpClientsCacheService; | ||
| private final Cache<URI, CacheEntry> httpClientCache; | ||
|
|
||
| /** | ||
| * Wrapper class to store HttpClient along with its configuration for dynamic cache expiration | ||
| * based on HttpClient's read timeout. | ||
| */ | ||
| private static class CacheEntry { | ||
| private final HttpClient httpClient; | ||
| private final ClientConfig config; | ||
| private final long creationTime; | ||
|
|
||
| CacheEntry(HttpClient httpClient, ClientConfig config) { | ||
| this.httpClient = Require.nonNull("HttpClient", httpClient); | ||
| this.config = Require.nonNull("ClientConfig", config); | ||
| this.creationTime = System.currentTimeMillis(); | ||
| } | ||
|
|
||
| HttpClient getHttpClient() { | ||
| return httpClient; | ||
| } | ||
|
|
||
| ClientConfig getConfig() { | ||
| return config; | ||
| } | ||
|
|
||
| /** | ||
| * Check if this cache entry has expired based on the HttpClient's read timeout. Method is used | ||
| * by Cache builder to determine if an entry should be evicted. | ||
| * | ||
| * @param lastAccessTime the last time this entry was accessed | ||
| * @return true if the entry should be evicted | ||
| */ | ||
| boolean isExpired(long lastAccessTime) { | ||
| long readTimeoutMs = config.readTimeout().toMillis(); | ||
| long timeSinceLastAccess = System.currentTimeMillis() - lastAccessTime; | ||
| boolean expired = timeSinceLastAccess > readTimeoutMs; | ||
| if (expired) { | ||
| LOG.fine( | ||
| String.format( | ||
| "Connection for %s has expired (read timeout: %d seconds)", | ||
| config.baseUri(), config.readTimeout().toSeconds())); | ||
| } | ||
| return expired; | ||
| } | ||
|
|
||
| /** | ||
| * Close the HTTP client associated with this cache entry. Method is used by Cache builder to | ||
| * close expired entries. | ||
| */ | ||
| void close() { | ||
| try { | ||
| httpClient.close(); | ||
| LOG.fine(String.format("Closed expired connection for %s", config.baseUri())); | ||
| } catch (Exception ex) { | ||
| LOG.warning(String.format("Failed to close expired connection for %s", config.baseUri())); | ||
| } | ||
| } | ||
| } | ||
|
|
||
| HandleSession(Tracer tracer, HttpClient.Factory httpClientFactory, SessionMap sessions) { | ||
| this.tracer = Require.nonNull("Tracer", tracer); | ||
| this.httpClientFactory = Require.nonNull("HTTP client factory", httpClientFactory); | ||
| this.sessions = Require.nonNull("Sessions", sessions); | ||
|
|
||
| this.httpClients = new ConcurrentHashMap<>(); | ||
|
|
||
| Runnable cleanUpHttpClients = | ||
| () -> { | ||
| Instant staleBefore = Instant.now().minus(2, ChronoUnit.MINUTES); | ||
| Iterator<CacheEntry> iterator = httpClients.values().iterator(); | ||
|
|
||
| while (iterator.hasNext()) { | ||
| CacheEntry entry = iterator.next(); | ||
|
|
||
| if (entry.inUse.get() != 0) { | ||
| // the client is currently in use | ||
| return; | ||
| } else if (!entry.lastUse.isBefore(staleBefore)) { | ||
| // the client was recently used | ||
| return; | ||
| } else { | ||
| // the client has not been used for a while, remove it from the cache | ||
| iterator.remove(); | ||
|
|
||
| try { | ||
| entry.httpClient.close(); | ||
| } catch (Exception ex) { | ||
| LOG.log(Level.WARNING, "failed to close a stale httpclient", ex); | ||
| } | ||
| } | ||
| } | ||
| }; | ||
|
|
||
| this.cleanUpHttpClientsCacheService = | ||
| Executors.newSingleThreadScheduledExecutor( | ||
| r -> { | ||
| Thread thread = new Thread(r); | ||
| thread.setDaemon(true); | ||
| thread.setName("HandleSession - Clean up http clients cache"); | ||
| return thread; | ||
| }); | ||
| cleanUpHttpClientsCacheService.scheduleAtFixedRate( | ||
| GuardedRunnable.guard(cleanUpHttpClients), 1, 1, TimeUnit.MINUTES); | ||
| // Create Cache with dynamic expiry based on HttpClient read timeout | ||
| // and a removal listener to close HTTP clients | ||
| this.httpClientCache = | ||
| Caffeine.newBuilder() | ||
| .expireAfter( | ||
|
||
| new Expiry<URI, CacheEntry>() { | ||
| @Override | ||
| public long expireAfterCreate(URI uri, CacheEntry cacheEntry, long currentTime) { | ||
| // Use the HttpClient's read timeout for initial expiration | ||
| LOG.fine( | ||
| String.format( | ||
| "Set (read timeout: %d seconds) as initial expiration for %s in cache", | ||
| cacheEntry.getConfig().readTimeout().toSeconds(), uri)); | ||
| return cacheEntry.getConfig().readTimeout().toNanos(); | ||
| } | ||
|
|
||
| @Override | ||
| public long expireAfterUpdate( | ||
| URI uri, CacheEntry cacheEntry, long currentTime, long currentDuration) { | ||
| // Use the HttpClient's read timeout for expiration after update | ||
| LOG.fine( | ||
| String.format( | ||
| "Set (read timeout: %d seconds) as expiration after update for %s in" | ||
| + " cache", | ||
| cacheEntry.getConfig().readTimeout().toSeconds(), uri)); | ||
| return cacheEntry.getConfig().readTimeout().toNanos(); | ||
| } | ||
|
|
||
| @Override | ||
| public long expireAfterRead( | ||
| URI uri, CacheEntry cacheEntry, long currentTime, long currentDuration) { | ||
| // Use the HttpClient's read timeout for expiration after read | ||
| LOG.fine( | ||
| String.format( | ||
| "Set (read timeout: %d seconds) as expiration after read for %s in" | ||
| + " cache", | ||
| cacheEntry.getConfig().readTimeout().toSeconds(), uri)); | ||
| return cacheEntry.getConfig().readTimeout().toNanos(); | ||
| } | ||
| }) | ||
| .removalListener( | ||
| (RemovalListener<URI, CacheEntry>) | ||
| (uri, cacheEntry, cause) -> { | ||
| if (cacheEntry != null) { | ||
| try { | ||
| Duration readTimeout = cacheEntry.getConfig().readTimeout(); | ||
| LOG.fine( | ||
| "Closing HTTP client for " | ||
| + uri | ||
| + " (read timeout: " | ||
| + readTimeout.toSeconds() | ||
| + " seconds), removal cause: " | ||
| + cause); | ||
| cacheEntry.close(); | ||
| } catch (Exception ex) { | ||
| LOG.log(Level.WARNING, "Failed to close HTTP client for " + uri, ex); | ||
| } | ||
| } | ||
| }) | ||
| .build(); | ||
| } | ||
|
|
||
| @Override | ||
|
|
@@ -179,7 +237,7 @@ public HttpResponse execute(HttpRequest req) { | |
| try { | ||
| HttpTracing.inject(tracer, span, req); | ||
| HttpResponse res; | ||
| try (UsageCountingReverseProxyHandler handler = loadSessionId(tracer, span, id).call()) { | ||
| try (ReverseProxyHandlerCloseable handler = loadSessionId(tracer, span, id).call()) { | ||
| res = handler.execute(req); | ||
| } | ||
|
|
||
|
|
@@ -216,46 +274,83 @@ public HttpResponse execute(HttpRequest req) { | |
| } | ||
| } | ||
|
|
||
| private Callable<UsageCountingReverseProxyHandler> loadSessionId( | ||
| private Callable<ReverseProxyHandlerCloseable> loadSessionId( | ||
| Tracer tracer, Span span, SessionId id) { | ||
| return span.wrap( | ||
| () -> { | ||
| URI sessionUri = sessions.getUri(id); | ||
|
|
||
| // Get or create the HTTP client from cache (this also updates the "last access" time) | ||
| CacheEntry cacheEntry = | ||
| httpClients.compute( | ||
| sessions.getUri(id), | ||
| (sessionUri, entry) -> { | ||
| if (entry != null) { | ||
| entry.inUse.incrementAndGet(); | ||
| return entry; | ||
| } | ||
|
|
||
| ClientConfig config = | ||
| ClientConfig.defaultConfig().baseUri(sessionUri).withRetries(); | ||
| httpClientCache.get( | ||
| sessionUri, | ||
| uri -> { | ||
| LOG.fine("Creating new HTTP client for " + uri); | ||
| ClientConfig config = fetchNodeSessionTimeout(uri); | ||
| HttpClient httpClient = httpClientFactory.createClient(config); | ||
|
|
||
| return new CacheEntry(httpClient, 1); | ||
| LOG.fine( | ||
| "Created connection for " | ||
| + uri | ||
| + " (read timeout: " | ||
| + config.readTimeout().toSeconds() | ||
| + " seconds)"); | ||
| return new CacheEntry(httpClient, config); | ||
| }); | ||
|
|
||
| try { | ||
| return new UsageCountingReverseProxyHandler(tracer, cacheEntry.httpClient, cacheEntry); | ||
| return new ReverseProxyHandlerCloseable(tracer, cacheEntry.getHttpClient()); | ||
| } catch (Throwable t) { | ||
| // ensure we do not keep the http client when an unexpected throwable is raised | ||
| cacheEntry.inUse.decrementAndGet(); | ||
| throw t; | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| private ClientConfig fetchNodeSessionTimeout(URI uri) { | ||
| ClientConfig config = ClientConfig.defaultConfig().baseUri(uri).withRetries(); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would remove withRetries and exception handling in case we are unable to fetch the status the request should fail with a matching error (502 Bad Gateway). |
||
| Duration sessionTimeout = config.readTimeout(); | ||
| HttpClient httpClient = httpClientFactory.createClient(config); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. close the httpClient after use |
||
| HttpRequest statusRequest = new HttpRequest(GET, "/status"); | ||
| try { | ||
| HttpResponse res = httpClient.execute(statusRequest); | ||
| Reader reader = reader(res); | ||
| Json JSON = new Json(); | ||
| JsonInput in = JSON.newInput(reader); | ||
| in.beginObject(); | ||
| // Skip everything until we find "value" | ||
| while (in.hasNext()) { | ||
| if ("value".equals(in.nextName())) { | ||
| in.beginObject(); | ||
| while (in.hasNext()) { | ||
| if ("node".equals(in.nextName())) { | ||
| NodeStatus nodeStatus = in.read(NodeStatus.class); | ||
| sessionTimeout = nodeStatus.getSessionTimeout(); | ||
| LOG.fine( | ||
| "Detected session timeout from node status (read timeout: " | ||
| + sessionTimeout.toSeconds() | ||
| + " seconds)"); | ||
| } else { | ||
| in.skipValue(); | ||
| } | ||
| } | ||
| in.endObject(); | ||
| } else { | ||
| in.skipValue(); | ||
| } | ||
|
Comment on lines
+435
to
+461
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We should call /se/grid/node/status to get only the status this will allow to use org.openqa.selenium.grid.web.Values.get(...); to read the NodeStatus response. |
||
| } | ||
| } catch (Exception e) { | ||
| LOG.fine( | ||
| "Use default from ClientConfig (read timeout: " | ||
| + config.readTimeout().toSeconds() | ||
| + " seconds)"); | ||
| } | ||
| config = config.readTimeout(sessionTimeout); | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. add the default connection timeout to the sessionTimeout, as the node might need to connect e.g. a relay / docker / what ever node |
||
| return config; | ||
| } | ||
|
|
||
| @Override | ||
| public void close() { | ||
| ExecutorServices.shutdownGracefully( | ||
| "HandleSession - Clean up http clients cache", cleanUpHttpClientsCacheService); | ||
| httpClients | ||
| .values() | ||
| .removeIf( | ||
| (entry) -> { | ||
| entry.httpClient.close(); | ||
| return true; | ||
| }); | ||
| // This will trigger the removal listener for all entries, which will close all HTTP clients | ||
| httpClientCache.invalidateAll(); | ||
| httpClientCache.cleanUp(); | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I could not find any call to this method, is guess this can be removed or called 😀
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I am reworking this based on your feedback. Actually, when using Expiry (by create, read, update Cache), we could not overwrite the expired logic, so this isExpired is redundant and useless.
You're correct, client is not in use is still a safe protocol. I am thinking how can implement cache with this pattern.