Skip to content

Commit 827ec0e

Browse files
committed
Add ProjectScopedCache and update ReloadablePlugin
1 parent 2c0fb18 commit 827ec0e

File tree

6 files changed

+282
-21
lines changed

6 files changed

+282
-21
lines changed

server/src/main/java/org/elasticsearch/node/NodeConstruction.java

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,7 @@
5252
import org.elasticsearch.cluster.metadata.MetadataDataStreamsService;
5353
import org.elasticsearch.cluster.metadata.MetadataIndexTemplateService;
5454
import org.elasticsearch.cluster.metadata.MetadataUpdateSettingsService;
55+
import org.elasticsearch.cluster.metadata.ProjectId;
5556
import org.elasticsearch.cluster.metadata.ProjectMetadata;
5657
import org.elasticsearch.cluster.metadata.SystemIndexMetadataUpgradeService;
5758
import org.elasticsearch.cluster.metadata.TemplateUpgradeService;
@@ -1517,12 +1518,26 @@ private CircuitBreakerService createCircuitBreakerService(
15171518
* @return A single ReloadablePlugin that, upon reload, reloads the plugins it wraps
15181519
*/
15191520
private static ReloadablePlugin wrapPlugins(List<ReloadablePlugin> reloadablePlugins) {
1520-
return settings -> {
1521-
for (ReloadablePlugin plugin : reloadablePlugins) {
1522-
try {
1523-
plugin.reload(settings);
1524-
} catch (IOException e) {
1525-
throw new UncheckedIOException(e);
1521+
return new ReloadablePlugin() {
1522+
@Override
1523+
public void reload(Settings settings) throws Exception {
1524+
for (ReloadablePlugin plugin : reloadablePlugins) {
1525+
try {
1526+
plugin.reload(settings);
1527+
} catch (IOException e) {
1528+
throw new UncheckedIOException(e);
1529+
}
1530+
}
1531+
}
1532+
1533+
@Override
1534+
public void reload(ProjectId projectId, Settings settings) throws Exception {
1535+
for (ReloadablePlugin plugin : reloadablePlugins) {
1536+
try {
1537+
plugin.reload(projectId, settings);
1538+
} catch (IOException e) {
1539+
throw new UncheckedIOException(e);
1540+
}
15261541
}
15271542
}
15281543
};

server/src/main/java/org/elasticsearch/plugins/ReloadablePlugin.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.plugins;
1111

12+
import org.elasticsearch.cluster.metadata.ProjectId;
1213
import org.elasticsearch.common.settings.Settings;
1314

1415
/**
@@ -41,4 +42,6 @@ public interface ReloadablePlugin {
4142
* if the offending call didn't happen.
4243
*/
4344
void reload(Settings settings) throws Exception;
45+
46+
default void reload(ProjectId projectId, Settings settings) throws Exception {}
4447
}

test/framework/src/main/java/org/elasticsearch/test/rest/ESRestTestCase.java

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,7 @@
6161
import org.elasticsearch.core.CharArrays;
6262
import org.elasticsearch.core.CheckedFunction;
6363
import org.elasticsearch.core.CheckedRunnable;
64+
import org.elasticsearch.core.FixForMultiProject;
6465
import org.elasticsearch.core.IOUtils;
6566
import org.elasticsearch.core.Nullable;
6667
import org.elasticsearch.core.PathUtils;
@@ -418,7 +419,7 @@ public void initClient() throws IOException {
418419
.collect(Collectors.toSet());
419420
assert semanticNodeVersions.isEmpty() == false || serverless;
420421

421-
testFeatureService = createTestFeatureService(getClusterStateFeatures(adminClient), semanticNodeVersions);
422+
testFeatureService = createTestFeatureService(getClusterStateFeatures(adminClient, multiProjectTest()), semanticNodeVersions);
422423
}
423424

424425
assert testFeatureServiceInitialized();
@@ -1818,6 +1819,11 @@ public final void ensureGreen(RestClient client, String index) throws IOExceptio
18181819
});
18191820
}
18201821

1822+
@FixForMultiProject // Remove when cluster state API can be called in multi project mode without the query param
1823+
protected boolean multiProjectTest() {
1824+
return false;
1825+
}
1826+
18211827
protected static void ensureHealth(Consumer<Request> requestConsumer) throws IOException {
18221828
ensureHealth("", requestConsumer);
18231829
}
@@ -2374,7 +2380,11 @@ public void ensurePeerRecoveryRetentionLeasesRenewedAndSynced(String index) thro
23742380
}
23752381

23762382
protected static Map<String, Set<String>> getClusterStateFeatures(RestClient adminClient) throws IOException {
2377-
final Request request = new Request("GET", "_cluster/state");
2383+
return getClusterStateFeatures(adminClient, false);
2384+
}
2385+
2386+
protected static Map<String, Set<String>> getClusterStateFeatures(RestClient adminClient, boolean multiProject) throws IOException {
2387+
final Request request = new Request("GET", "_cluster/state" + (multiProject ? "?multi_project=true" : ""));
23782388
request.addParameter("filter_path", "nodes_features");
23792389

23802390
final Response response = adminClient.performRequest(request);

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/support/CachingUsernamePasswordRealm.java

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
import org.elasticsearch.action.ActionListener;
1010
import org.elasticsearch.common.cache.Cache;
1111
import org.elasticsearch.common.cache.CacheBuilder;
12+
import org.elasticsearch.common.cache.CacheLoader;
1213
import org.elasticsearch.common.settings.SecureString;
1314
import org.elasticsearch.common.util.concurrent.ListenableFuture;
1415
import org.elasticsearch.common.util.concurrent.ThreadContext;
@@ -32,25 +33,62 @@
3233

3334
public abstract class CachingUsernamePasswordRealm extends UsernamePasswordRealm implements CachingRealm {
3435

35-
private final Cache<String, ListenableFuture<CachedResult>> cache;
36+
private final UserAuthenticationCache cache;
3637
private final ThreadPool threadPool;
3738
private final boolean authenticationEnabled;
38-
final Hasher cacheHasher;
39+
protected final Hasher credentialHasher;
3940

4041
protected CachingUsernamePasswordRealm(RealmConfig config, ThreadPool threadPool) {
42+
this(config, threadPool, buildDefaultCache(config));
43+
}
44+
45+
protected CachingUsernamePasswordRealm(RealmConfig config, ThreadPool threadPool, UserAuthenticationCache cache) {
4146
super(config);
42-
cacheHasher = Hasher.resolve(this.config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_HASH_ALGO_SETTING));
47+
credentialHasher = Hasher.resolve(this.config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_HASH_ALGO_SETTING));
4348
this.threadPool = threadPool;
44-
final TimeValue ttl = this.config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_TTL_SETTING);
49+
this.authenticationEnabled = config.getSetting(CachingUsernamePasswordRealmSettings.AUTHC_ENABLED_SETTING);
50+
this.cache = cache;
51+
}
52+
53+
private static UserAuthenticationCache buildDefaultCache(RealmConfig config) {
54+
final TimeValue ttl = config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_TTL_SETTING);
4555
if (ttl.getNanos() > 0) {
46-
cache = CacheBuilder.<String, ListenableFuture<CachedResult>>builder()
56+
final Cache<String, ListenableFuture<CachedResult>> cache = CacheBuilder.<String, ListenableFuture<CachedResult>>builder()
4757
.setExpireAfterWrite(ttl)
48-
.setMaximumWeight(this.config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_MAX_USERS_SETTING))
58+
.setMaximumWeight(config.getSetting(CachingUsernamePasswordRealmSettings.CACHE_MAX_USERS_SETTING))
4959
.build();
50-
} else {
51-
cache = null;
60+
return new UserAuthenticationCache() {
61+
@Override
62+
public void invalidate(String key) {
63+
cache.invalidate(key);
64+
}
65+
66+
@Override
67+
public void invalidate(String key, ListenableFuture<CachedResult> value) {
68+
cache.invalidate(key, value);
69+
}
70+
71+
@Override
72+
public void invalidateAll() {
73+
cache.invalidateAll();
74+
}
75+
76+
@Override
77+
public int count() {
78+
return cache.count();
79+
}
80+
81+
@Override
82+
public ListenableFuture<CachedResult> computeIfAbsent(
83+
String key,
84+
CacheLoader<String, ListenableFuture<CachedResult>> loader
85+
) throws ExecutionException {
86+
return cache.computeIfAbsent(key, loader);
87+
}
88+
};
5289
}
53-
this.authenticationEnabled = config.getSetting(CachingUsernamePasswordRealmSettings.AUTHC_ENABLED_SETTING);
90+
91+
return null;
5492
}
5593

5694
@Override
@@ -122,6 +160,7 @@ public final void authenticate(AuthenticationToken authToken, ActionListener<Aut
122160
* @param listener to be called at completion
123161
*/
124162
private void authenticateWithCache(UsernamePasswordToken token, ActionListener<AuthenticationResult<User>> listener) {
163+
logger.info("AUTH WITH CACHE");
125164
assert cache != null;
126165
try {
127166
final AtomicBoolean authenticationInCache = new AtomicBoolean(true);
@@ -130,6 +169,7 @@ private void authenticateWithCache(UsernamePasswordToken token, ActionListener<A
130169
return new ListenableFuture<>();
131170
});
132171
if (authenticationInCache.get()) {
172+
logger.info("IN CACHE");
133173
// there is a cached or an inflight authenticate request
134174
listenableCacheEntry.addListener(ActionListener.wrap(cachedResult -> {
135175
final boolean credsMatch = cachedResult.verify(token.credentials());
@@ -196,8 +236,10 @@ private void authenticateWithCache(UsernamePasswordToken token, ActionListener<A
196236
name(),
197237
token.principal()
198238
);
239+
logger.info("ATTEMPTING AUTH");
199240
// attempt authentication against the authentication source
200241
doAuthenticate(token, ActionListener.wrap(authResult -> {
242+
logger.info("GOT AUTH RESULT: " + authResult);
201243
if (authResult.isAuthenticated() == false) {
202244
logger.trace("realm [{}] did not authenticate user [{}] ([{}])", name(), token.principal(), authResult);
203245
// a new request should trigger a new authentication
@@ -217,7 +259,9 @@ private void authenticateWithCache(UsernamePasswordToken token, ActionListener<A
217259
// notify any forestalled request listeners; they will not reach to the
218260
// authentication request and instead will use this result if they contain
219261
// the same credentials
220-
listenableCacheEntry.onResponse(new CachedResult(authResult, cacheHasher, authResult.getValue(), token.credentials()));
262+
listenableCacheEntry.onResponse(
263+
new CachedResult(authResult, credentialHasher, authResult.getValue(), token.credentials())
264+
);
221265
listener.onResponse(authResult);
222266
}, e -> {
223267
cache.invalidate(token.principal(), listenableCacheEntry);
@@ -282,7 +326,7 @@ private void lookupWithCache(String username, ActionListener<User> listener) {
282326
if (false == lookupInCache.get()) {
283327
// attempt lookup against the user directory
284328
doLookupUser(username, ActionListener.wrap(user -> {
285-
final CachedResult result = new CachedResult(AuthenticationResult.notHandled(), cacheHasher, user, null);
329+
final CachedResult result = new CachedResult(AuthenticationResult.notHandled(), credentialHasher, user, null);
286330
if (user == null) {
287331
// user not found, invalidate cache so that subsequent requests are forwarded to
288332
// the user directory
@@ -311,7 +355,20 @@ private void lookupWithCache(String username, ActionListener<User> listener) {
311355

312356
protected abstract void doLookupUser(String username, ActionListener<User> listener);
313357

314-
private static class CachedResult {
358+
protected interface UserAuthenticationCache {
359+
void invalidate(String key);
360+
361+
void invalidate(String key, ListenableFuture<CachedResult> value);
362+
363+
void invalidateAll();
364+
365+
int count();
366+
367+
ListenableFuture<CachedResult> computeIfAbsent(String key, CacheLoader<String, ListenableFuture<CachedResult>> loader)
368+
throws ExecutionException;
369+
}
370+
371+
protected static class CachedResult {
315372
private final AuthenticationResult<User> authenticationResult;
316373
private final User user;
317374
private final char[] hash;

0 commit comments

Comments
 (0)