Skip to content

Commit 89ce721

Browse files
committed
[grid] Improve readTimeout and cache expiration of HttpClient between Router and Nodes
Signed-off-by: Viet Nguyen Duc <[email protected]>
1 parent 8b45f02 commit 89ce721

File tree

2 files changed

+197
-101
lines changed

2 files changed

+197
-101
lines changed

java/src/org/openqa/selenium/grid/router/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,5 +24,6 @@ java_library(
2424
"//java/src/org/openqa/selenium/remote",
2525
"//java/src/org/openqa/selenium/status",
2626
artifact("com.google.guava:guava"),
27+
artifact("com.github.ben-manes.caffeine:caffeine"),
2728
],
2829
)

java/src/org/openqa/selenium/grid/router/HandleSession.java

Lines changed: 196 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -21,31 +21,31 @@
2121
import static org.openqa.selenium.remote.RemoteTags.SESSION_ID;
2222
import static org.openqa.selenium.remote.RemoteTags.SESSION_ID_EVENT;
2323
import static org.openqa.selenium.remote.http.Contents.asJson;
24+
import static org.openqa.selenium.remote.http.Contents.reader;
25+
import static org.openqa.selenium.remote.http.HttpMethod.GET;
2426
import static org.openqa.selenium.remote.tracing.Tags.EXCEPTION;
2527
import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST;
2628
import static org.openqa.selenium.remote.tracing.Tags.HTTP_REQUEST_EVENT;
2729
import static org.openqa.selenium.remote.tracing.Tags.HTTP_RESPONSE;
2830

31+
import com.github.benmanes.caffeine.cache.Cache;
32+
import com.github.benmanes.caffeine.cache.Caffeine;
33+
import com.github.benmanes.caffeine.cache.Expiry;
34+
import com.github.benmanes.caffeine.cache.RemovalListener;
2935
import java.io.Closeable;
36+
import java.io.Reader;
3037
import java.net.URI;
31-
import java.time.Instant;
32-
import java.time.temporal.ChronoUnit;
33-
import java.util.Iterator;
38+
import java.time.Duration;
3439
import java.util.concurrent.Callable;
35-
import java.util.concurrent.ConcurrentHashMap;
36-
import java.util.concurrent.ConcurrentMap;
37-
import java.util.concurrent.Executors;
38-
import java.util.concurrent.ScheduledExecutorService;
39-
import java.util.concurrent.TimeUnit;
40-
import java.util.concurrent.atomic.AtomicLong;
4140
import java.util.logging.Level;
4241
import java.util.logging.Logger;
4342
import org.openqa.selenium.NoSuchSessionException;
44-
import org.openqa.selenium.concurrent.ExecutorServices;
45-
import org.openqa.selenium.concurrent.GuardedRunnable;
43+
import org.openqa.selenium.grid.data.NodeStatus;
4644
import org.openqa.selenium.grid.sessionmap.SessionMap;
4745
import org.openqa.selenium.grid.web.ReverseProxyHandler;
4846
import org.openqa.selenium.internal.Require;
47+
import org.openqa.selenium.json.Json;
48+
import org.openqa.selenium.json.JsonInput;
4949
import org.openqa.selenium.remote.ErrorCodec;
5050
import org.openqa.selenium.remote.SessionId;
5151
import org.openqa.selenium.remote.http.ClientConfig;
@@ -60,92 +60,150 @@
6060
import org.openqa.selenium.remote.tracing.Status;
6161
import org.openqa.selenium.remote.tracing.Tracer;
6262

63-
class HandleSession implements HttpHandler, Closeable {
63+
public class HandleSession implements HttpHandler, Closeable {
6464

6565
private static final Logger LOG = Logger.getLogger(HandleSession.class.getName());
6666

67-
private static class CacheEntry {
68-
private final HttpClient httpClient;
69-
private final AtomicLong inUse;
70-
// volatile as the ConcurrentMap will not take care of synchronization
71-
private volatile Instant lastUse;
72-
73-
public CacheEntry(HttpClient httpClient, long initialUsage) {
74-
this.httpClient = httpClient;
75-
this.inUse = new AtomicLong(initialUsage);
76-
this.lastUse = Instant.now();
77-
}
78-
}
79-
80-
private static class UsageCountingReverseProxyHandler extends ReverseProxyHandler
67+
private static class ReverseProxyHandlerCloseable extends ReverseProxyHandler
8168
implements Closeable {
82-
private final CacheEntry entry;
8369

84-
public UsageCountingReverseProxyHandler(
85-
Tracer tracer, HttpClient httpClient, CacheEntry entry) {
70+
public ReverseProxyHandlerCloseable(Tracer tracer, HttpClient httpClient) {
8671
super(tracer, httpClient);
87-
88-
this.entry = entry;
8972
}
9073

9174
@Override
9275
public void close() {
93-
// set the last use here, to ensure we have to calculate the real inactivity of the client
94-
entry.lastUse = Instant.now();
95-
entry.inUse.decrementAndGet();
76+
// No operation needed - cache management is handled by Cache builder
9677
}
9778
}
9879

9980
private final Tracer tracer;
10081
private final HttpClient.Factory httpClientFactory;
10182
private final SessionMap sessions;
102-
private final ConcurrentMap<URI, CacheEntry> httpClients;
103-
private final ScheduledExecutorService cleanUpHttpClientsCacheService;
83+
private final Cache<URI, CacheEntry> httpClientCache;
84+
85+
/**
86+
* Wrapper class to store HttpClient along with its configuration for dynamic cache expiration
87+
* based on HttpClient's read timeout.
88+
*/
89+
private static class CacheEntry {
90+
private final HttpClient httpClient;
91+
private final ClientConfig config;
92+
private final long creationTime;
93+
94+
CacheEntry(HttpClient httpClient, ClientConfig config) {
95+
this.httpClient = Require.nonNull("HttpClient", httpClient);
96+
this.config = Require.nonNull("ClientConfig", config);
97+
this.creationTime = System.currentTimeMillis();
98+
}
99+
100+
HttpClient getHttpClient() {
101+
return httpClient;
102+
}
103+
104+
ClientConfig getConfig() {
105+
return config;
106+
}
107+
108+
/**
109+
* Check if this cache entry has expired based on the HttpClient's read timeout. Method is used
110+
* by Cache builder to determine if an entry should be evicted.
111+
*
112+
* @param lastAccessTime the last time this entry was accessed
113+
* @return true if the entry should be evicted
114+
*/
115+
boolean isExpired(long lastAccessTime) {
116+
long readTimeoutMs = config.readTimeout().toMillis();
117+
long timeSinceLastAccess = System.currentTimeMillis() - lastAccessTime;
118+
boolean expired = timeSinceLastAccess > readTimeoutMs;
119+
if (expired) {
120+
LOG.fine(
121+
String.format(
122+
"Connection for %s has expired (read timeout: %d seconds)",
123+
config.baseUri(), config.readTimeout().toSeconds()));
124+
}
125+
return expired;
126+
}
127+
128+
/**
129+
* Close the HTTP client associated with this cache entry. Method is used by Cache builder to
130+
* close expired entries.
131+
*/
132+
void close() {
133+
try {
134+
httpClient.close();
135+
LOG.fine(String.format("Closed expired connection for %s", config.baseUri()));
136+
} catch (Exception ex) {
137+
LOG.warning(String.format("Failed to close expired connection for %s", config.baseUri()));
138+
}
139+
}
140+
}
104141

105142
HandleSession(Tracer tracer, HttpClient.Factory httpClientFactory, SessionMap sessions) {
106143
this.tracer = Require.nonNull("Tracer", tracer);
107144
this.httpClientFactory = Require.nonNull("HTTP client factory", httpClientFactory);
108145
this.sessions = Require.nonNull("Sessions", sessions);
109146

110-
this.httpClients = new ConcurrentHashMap<>();
111-
112-
Runnable cleanUpHttpClients =
113-
() -> {
114-
Instant staleBefore = Instant.now().minus(2, ChronoUnit.MINUTES);
115-
Iterator<CacheEntry> iterator = httpClients.values().iterator();
116-
117-
while (iterator.hasNext()) {
118-
CacheEntry entry = iterator.next();
119-
120-
if (entry.inUse.get() != 0) {
121-
// the client is currently in use
122-
return;
123-
} else if (!entry.lastUse.isBefore(staleBefore)) {
124-
// the client was recently used
125-
return;
126-
} else {
127-
// the client has not been used for a while, remove it from the cache
128-
iterator.remove();
129-
130-
try {
131-
entry.httpClient.close();
132-
} catch (Exception ex) {
133-
LOG.log(Level.WARNING, "failed to close a stale httpclient", ex);
134-
}
135-
}
136-
}
137-
};
138-
139-
this.cleanUpHttpClientsCacheService =
140-
Executors.newSingleThreadScheduledExecutor(
141-
r -> {
142-
Thread thread = new Thread(r);
143-
thread.setDaemon(true);
144-
thread.setName("HandleSession - Clean up http clients cache");
145-
return thread;
146-
});
147-
cleanUpHttpClientsCacheService.scheduleAtFixedRate(
148-
GuardedRunnable.guard(cleanUpHttpClients), 1, 1, TimeUnit.MINUTES);
147+
// Create Cache with dynamic expiry based on HttpClient read timeout
148+
// and a removal listener to close HTTP clients
149+
this.httpClientCache =
150+
Caffeine.newBuilder()
151+
.expireAfter(
152+
new Expiry<URI, CacheEntry>() {
153+
@Override
154+
public long expireAfterCreate(URI uri, CacheEntry cacheEntry, long currentTime) {
155+
// Use the HttpClient's read timeout for initial expiration
156+
LOG.fine(
157+
String.format(
158+
"Set (read timeout: %d seconds) as initial expiration for %s in cache",
159+
cacheEntry.getConfig().readTimeout().toSeconds(), uri));
160+
return cacheEntry.getConfig().readTimeout().toNanos();
161+
}
162+
163+
@Override
164+
public long expireAfterUpdate(
165+
URI uri, CacheEntry cacheEntry, long currentTime, long currentDuration) {
166+
// Use the HttpClient's read timeout for expiration after update
167+
LOG.fine(
168+
String.format(
169+
"Set (read timeout: %d seconds) as expiration after update for %s in"
170+
+ " cache",
171+
cacheEntry.getConfig().readTimeout().toSeconds(), uri));
172+
return cacheEntry.getConfig().readTimeout().toNanos();
173+
}
174+
175+
@Override
176+
public long expireAfterRead(
177+
URI uri, CacheEntry cacheEntry, long currentTime, long currentDuration) {
178+
// Use the HttpClient's read timeout for expiration after read
179+
LOG.fine(
180+
String.format(
181+
"Set (read timeout: %d seconds) as expiration after read for %s in"
182+
+ " cache",
183+
cacheEntry.getConfig().readTimeout().toSeconds(), uri));
184+
return cacheEntry.getConfig().readTimeout().toNanos();
185+
}
186+
})
187+
.removalListener(
188+
(RemovalListener<URI, CacheEntry>)
189+
(uri, cacheEntry, cause) -> {
190+
if (cacheEntry != null) {
191+
try {
192+
Duration readTimeout = cacheEntry.getConfig().readTimeout();
193+
LOG.fine(
194+
"Closing HTTP client for "
195+
+ uri
196+
+ " (read timeout: "
197+
+ readTimeout.toSeconds()
198+
+ " seconds), removal cause: "
199+
+ cause);
200+
cacheEntry.close();
201+
} catch (Exception ex) {
202+
LOG.log(Level.WARNING, "Failed to close HTTP client for " + uri, ex);
203+
}
204+
}
205+
})
206+
.build();
149207
}
150208

151209
@Override
@@ -179,7 +237,7 @@ public HttpResponse execute(HttpRequest req) {
179237
try {
180238
HttpTracing.inject(tracer, span, req);
181239
HttpResponse res;
182-
try (UsageCountingReverseProxyHandler handler = loadSessionId(tracer, span, id).call()) {
240+
try (ReverseProxyHandlerCloseable handler = loadSessionId(tracer, span, id).call()) {
183241
res = handler.execute(req);
184242
}
185243

@@ -216,46 +274,83 @@ public HttpResponse execute(HttpRequest req) {
216274
}
217275
}
218276

219-
private Callable<UsageCountingReverseProxyHandler> loadSessionId(
277+
private Callable<ReverseProxyHandlerCloseable> loadSessionId(
220278
Tracer tracer, Span span, SessionId id) {
221279
return span.wrap(
222280
() -> {
281+
URI sessionUri = sessions.getUri(id);
282+
283+
// Get or create the HTTP client from cache (this also updates the "last access" time)
223284
CacheEntry cacheEntry =
224-
httpClients.compute(
225-
sessions.getUri(id),
226-
(sessionUri, entry) -> {
227-
if (entry != null) {
228-
entry.inUse.incrementAndGet();
229-
return entry;
230-
}
231-
232-
ClientConfig config =
233-
ClientConfig.defaultConfig().baseUri(sessionUri).withRetries();
285+
httpClientCache.get(
286+
sessionUri,
287+
uri -> {
288+
LOG.fine("Creating new HTTP client for " + uri);
289+
ClientConfig config = fetchNodeSessionTimeout(uri);
234290
HttpClient httpClient = httpClientFactory.createClient(config);
235-
236-
return new CacheEntry(httpClient, 1);
291+
LOG.fine(
292+
"Created connection for "
293+
+ uri
294+
+ " (read timeout: "
295+
+ config.readTimeout().toSeconds()
296+
+ " seconds)");
297+
return new CacheEntry(httpClient, config);
237298
});
238299

239300
try {
240-
return new UsageCountingReverseProxyHandler(tracer, cacheEntry.httpClient, cacheEntry);
301+
return new ReverseProxyHandlerCloseable(tracer, cacheEntry.getHttpClient());
241302
} catch (Throwable t) {
242-
// ensure we do not keep the http client when an unexpected throwable is raised
243-
cacheEntry.inUse.decrementAndGet();
244303
throw t;
245304
}
246305
});
247306
}
248307

308+
private ClientConfig fetchNodeSessionTimeout(URI uri) {
309+
ClientConfig config = ClientConfig.defaultConfig().baseUri(uri).withRetries();
310+
Duration sessionTimeout = config.readTimeout();
311+
HttpClient httpClient = httpClientFactory.createClient(config);
312+
HttpRequest statusRequest = new HttpRequest(GET, "/status");
313+
try {
314+
HttpResponse res = httpClient.execute(statusRequest);
315+
Reader reader = reader(res);
316+
Json JSON = new Json();
317+
JsonInput in = JSON.newInput(reader);
318+
in.beginObject();
319+
// Skip everything until we find "value"
320+
while (in.hasNext()) {
321+
if ("value".equals(in.nextName())) {
322+
in.beginObject();
323+
while (in.hasNext()) {
324+
if ("node".equals(in.nextName())) {
325+
NodeStatus nodeStatus = in.read(NodeStatus.class);
326+
sessionTimeout = nodeStatus.getSessionTimeout();
327+
LOG.fine(
328+
"Detected session timeout from node status (read timeout: "
329+
+ sessionTimeout.toSeconds()
330+
+ " seconds)");
331+
} else {
332+
in.skipValue();
333+
}
334+
}
335+
in.endObject();
336+
} else {
337+
in.skipValue();
338+
}
339+
}
340+
} catch (Exception e) {
341+
LOG.fine(
342+
"Use default from ClientConfig (read timeout: "
343+
+ config.readTimeout().toSeconds()
344+
+ " seconds)");
345+
}
346+
config = config.readTimeout(sessionTimeout);
347+
return config;
348+
}
349+
249350
@Override
250351
public void close() {
251-
ExecutorServices.shutdownGracefully(
252-
"HandleSession - Clean up http clients cache", cleanUpHttpClientsCacheService);
253-
httpClients
254-
.values()
255-
.removeIf(
256-
(entry) -> {
257-
entry.httpClient.close();
258-
return true;
259-
});
352+
// This will trigger the removal listener for all entries, which will close all HTTP clients
353+
httpClientCache.invalidateAll();
354+
httpClientCache.cleanUp();
260355
}
261356
}

0 commit comments

Comments
 (0)