Skip to content

Commit b8581e2

Browse files
cancel graph authorization tasks that are pending too long
1 parent 5d9c358 commit b8581e2

File tree

6 files changed

+133
-101
lines changed

6 files changed

+133
-101
lines changed

plugins/microsoft-graph-authz/src/main/java/module-info.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
requires com.google.gson;
2525
requires okhttp3;
2626
requires com.azure.core.http.okhttp;
27+
requires org.apache.logging.log4j;
2728

2829
provides org.elasticsearch.xpack.core.security.SecurityExtension with MicrosoftGraphAuthzPlugin;
2930
}

plugins/microsoft-graph-authz/src/main/java/org/elasticsearch/xpack/security/authz/microsoft/MicrosoftGraphAuthzRealm.java

Lines changed: 40 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -21,18 +21,20 @@
2121
import com.microsoft.kiota.authentication.AzureIdentityAuthenticationProvider;
2222
import com.microsoft.kiota.http.middleware.RetryHandler;
2323

24+
import org.apache.logging.log4j.LogManager;
25+
import org.apache.logging.log4j.Logger;
2426
import org.elasticsearch.action.ActionListener;
2527
import org.elasticsearch.common.Strings;
2628
import org.elasticsearch.common.settings.Setting;
2729
import org.elasticsearch.common.settings.SettingsException;
30+
import org.elasticsearch.common.util.concurrent.EsExecutors;
2831
import org.elasticsearch.common.util.concurrent.ThreadContext;
32+
import org.elasticsearch.core.TimeValue;
2933
import org.elasticsearch.core.Tuple;
3034
import org.elasticsearch.license.License;
3135
import org.elasticsearch.license.LicenseUtils;
3236
import org.elasticsearch.license.LicensedFeature;
3337
import org.elasticsearch.license.XPackLicenseState;
34-
import org.elasticsearch.logging.LogManager;
35-
import org.elasticsearch.logging.Logger;
3638
import org.elasticsearch.threadpool.ThreadPool;
3739
import org.elasticsearch.xpack.core.XPackPlugin;
3840
import org.elasticsearch.xpack.core.security.authc.AuthenticationResult;
@@ -41,6 +43,7 @@
4143
import org.elasticsearch.xpack.core.security.authc.RealmConfig;
4244
import org.elasticsearch.xpack.core.security.authc.RealmSettings;
4345
import org.elasticsearch.xpack.core.security.authc.support.UserRoleMapper;
46+
import org.elasticsearch.xpack.core.security.support.CancellableRunnable;
4447
import org.elasticsearch.xpack.core.security.user.User;
4548

4649
import java.time.Duration;
@@ -69,6 +72,7 @@ public class MicrosoftGraphAuthzRealm extends Realm {
6972
private final GraphServiceClient client;
7073
private final XPackLicenseState licenseState;
7174
private final ThreadPool threadPool;
75+
private final TimeValue executionTimeout;
7276

7377
public MicrosoftGraphAuthzRealm(UserRoleMapper roleMapper, RealmConfig config, ThreadPool threadPool) {
7478
this(roleMapper, config, buildClient(config), XPackPlugin.getSharedLicenseState(), threadPool);
@@ -90,6 +94,7 @@ public MicrosoftGraphAuthzRealm(UserRoleMapper roleMapper, RealmConfig config, T
9094
this.client = client;
9195
this.licenseState = licenseState;
9296
this.threadPool = threadPool;
97+
this.executionTimeout = config.getSetting(MicrosoftGraphAuthzRealmSettings.EXECUTION_TIMEOUT);
9398
}
9499

95100
private static void validate(RealmConfig config) {
@@ -127,30 +132,39 @@ public void lookupUser(String principal, ActionListener<User> listener) {
127132
return;
128133
}
129134

130-
threadPool.generic().execute(() -> {
131-
try {
132-
final var userProperties = fetchUserProperties(client, principal);
133-
final var groups = fetchGroupMembership(client, principal);
134-
135-
final var userData = new UserRoleMapper.UserData(principal, null, groups, Map.of(), config);
136-
137-
roleMapper.resolveRoles(userData, listener.delegateFailureAndWrap((l, roles) -> {
138-
final var user = new User(
139-
principal,
140-
roles.toArray(Strings.EMPTY_ARRAY),
141-
userProperties.v1(),
142-
userProperties.v2(),
143-
Map.of(),
144-
true
145-
);
146-
logger.trace("Authorized user from Microsoft Graph {}", user);
147-
l.onResponse(user);
148-
}));
149-
} catch (Exception e) {
150-
logger.error(Strings.format("Failed to authorize [{}] with MS Graph realm", principal), e);
151-
listener.onFailure(e);
152-
}
153-
});
135+
final var runnable = new CancellableRunnable<>(
136+
listener,
137+
ex -> null,
138+
() -> doLookupUser(principal, listener),
139+
logger
140+
);
141+
threadPool.generic().execute(runnable);
142+
threadPool.schedule(runnable::maybeTimeout, executionTimeout, EsExecutors.DIRECT_EXECUTOR_SERVICE);
143+
}
144+
145+
private void doLookupUser(String principal, ActionListener<User> listener) {
146+
try {
147+
final var userProperties = fetchUserProperties(client, principal);
148+
final var groups = fetchGroupMembership(client, principal);
149+
150+
final var userData = new UserRoleMapper.UserData(principal, null, groups, Map.of(), config);
151+
152+
roleMapper.resolveRoles(userData, listener.delegateFailureAndWrap((l, roles) -> {
153+
final var user = new User(
154+
principal,
155+
roles.toArray(Strings.EMPTY_ARRAY),
156+
userProperties.v1(),
157+
userProperties.v2(),
158+
Map.of(),
159+
true
160+
);
161+
logger.trace("Authorized user from Microsoft Graph {}", user);
162+
l.onResponse(user);
163+
}));
164+
} catch (Exception e) {
165+
logger.error(Strings.format("Failed to authorize [{}] with MS Graph realm", principal), e);
166+
listener.onFailure(e);
167+
}
154168
}
155169

156170
private static GraphServiceClient buildClient(RealmConfig config) {

plugins/microsoft-graph-authz/src/main/java/org/elasticsearch/xpack/security/authz/microsoft/MicrosoftGraphAuthzRealmSettings.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,12 @@ public class MicrosoftGraphAuthzRealmSettings {
5252
key -> Setting.timeSetting(key, TimeValue.timeValueSeconds(10), Setting.Property.NodeScope)
5353
);
5454

55+
public static final Setting.AffixSetting<TimeValue> EXECUTION_TIMEOUT = Setting.affixKeySetting(
56+
RealmSettings.realmSettingPrefix(REALM_TYPE),
57+
"execution_timeout",
58+
key -> Setting.timeSetting(key, TimeValue.timeValueSeconds(30), Setting.Property.NodeScope)
59+
);
60+
5561
public static List<Setting<?>> getSettings() {
5662
var settings = new ArrayList<Setting<?>>(RealmSettings.getStandardSettings(REALM_TYPE));
5763
settings.add(CLIENT_ID);
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.security.support;
9+
10+
import org.apache.logging.log4j.Logger;
11+
import org.elasticsearch.ElasticsearchTimeoutException;
12+
import org.elasticsearch.action.ActionListener;
13+
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
14+
15+
import java.util.Objects;
16+
import java.util.concurrent.atomic.AtomicReference;
17+
import java.util.function.Function;
18+
19+
/**
20+
* A runnable that allows us to terminate and call the listener. We use this as a runnable can
21+
* be queued and not executed for a long time or ever and this causes user requests to appear
22+
* to hang. In these cases at least we can provide a response.
23+
*/
24+
public class CancellableRunnable<T> extends AbstractRunnable {
25+
26+
private final Runnable in;
27+
private final ActionListener<T> listener;
28+
private final Function<Exception, T> defaultValue;
29+
private final Logger logger;
30+
private final AtomicReference<RunnableState> state = new AtomicReference<>(RunnableState.AWAITING_EXECUTION);
31+
32+
public CancellableRunnable(ActionListener<T> listener, Function<Exception, T> defaultValue, Runnable in, Logger logger) {
33+
this.listener = listener;
34+
this.defaultValue = Objects.requireNonNull(defaultValue);
35+
this.in = in;
36+
this.logger = logger;
37+
}
38+
39+
@Override
40+
public void onFailure(Exception e) {
41+
logger.error("execution of cancellable runnable failed", e);
42+
final T result = defaultValue.apply(e);
43+
listener.onResponse(result);
44+
}
45+
46+
@Override
47+
protected void doRun() throws Exception {
48+
if (state.compareAndSet(RunnableState.AWAITING_EXECUTION, RunnableState.EXECUTING)) {
49+
in.run();
50+
} else {
51+
logger.trace("skipping execution of cancellable runnable as the current state is [{}]", state.get());
52+
}
53+
}
54+
55+
@Override
56+
public void onRejection(Exception e) {
57+
listener.onFailure(e);
58+
}
59+
60+
/**
61+
* If the execution of this runnable has not already started, the runnable is cancelled and we pass an exception to the user
62+
* listener
63+
*/
64+
public void maybeTimeout() {
65+
if (state.compareAndSet(RunnableState.AWAITING_EXECUTION, RunnableState.TIMED_OUT)) {
66+
logger.warn("skipping execution of cancellable runnable as it has been waiting for execution too long");
67+
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for execution of cancellable runnable"));
68+
}
69+
}
70+
71+
private enum RunnableState {
72+
AWAITING_EXECUTION,
73+
EXECUTING,
74+
TIMED_OUT
75+
}
76+
}
Lines changed: 7 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@
44
* 2.0; you may not use this file except in compliance with the Elastic License
55
* 2.0.
66
*/
7-
package org.elasticsearch.xpack.security.authc.ldap;
7+
package org.elasticsearch.xpack.core.security.support;
88

99
import org.elasticsearch.ElasticsearchTimeoutException;
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.support.ActionTestUtils;
1212
import org.elasticsearch.test.ESTestCase;
1313
import org.elasticsearch.xpack.core.security.user.User;
14-
import org.elasticsearch.xpack.security.authc.ldap.LdapRealm.CancellableLdapRunnable;
1514

1615
import java.util.concurrent.CountDownLatch;
1716
import java.util.concurrent.atomic.AtomicBoolean;
@@ -22,11 +21,11 @@
2221
import static org.hamcrest.Matchers.instanceOf;
2322
import static org.hamcrest.Matchers.sameInstance;
2423

25-
public class CancellableLdapRunnableTests extends ESTestCase {
24+
public class CancellableRunnableTests extends ESTestCase {
2625

2726
public void testTimingOutARunnable() {
2827
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
29-
final CancellableLdapRunnable<Object> runnable = new CancellableLdapRunnable<>(ActionListener.wrap(user -> {
28+
final CancellableRunnable<Object> runnable = new CancellableRunnable<>(ActionListener.wrap(user -> {
3029
throw new AssertionError("onResponse should not be called");
3130
}, exceptionAtomicReference::set), e -> null, () -> { throw new AssertionError("runnable should not be executed"); }, logger);
3231

@@ -40,7 +39,7 @@ public void testTimingOutARunnable() {
4039
public void testCallTimeOutAfterRunning() {
4140
final AtomicBoolean ran = new AtomicBoolean(false);
4241
final AtomicBoolean listenerCalled = new AtomicBoolean(false);
43-
final CancellableLdapRunnable<Object> runnable = new CancellableLdapRunnable<>(ActionListener.wrap(user -> {
42+
final CancellableRunnable<Object> runnable = new CancellableRunnable<>(ActionListener.wrap(user -> {
4443
listenerCalled.set(true);
4544
throw new AssertionError("onResponse should not be called");
4645
}, e -> {
@@ -59,7 +58,7 @@ public void testCallTimeOutAfterRunning() {
5958

6059
public void testRejectingExecution() {
6160
AtomicReference<Exception> exceptionAtomicReference = new AtomicReference<>();
62-
final CancellableLdapRunnable<Object> runnable = new CancellableLdapRunnable<>(ActionListener.wrap(user -> {
61+
final CancellableRunnable<Object> runnable = new CancellableRunnable<>(ActionListener.wrap(user -> {
6362
throw new AssertionError("onResponse should not be called");
6463
}, exceptionAtomicReference::set), e -> null, () -> { throw new AssertionError("runnable should not be executed"); }, logger);
6564

@@ -75,7 +74,7 @@ public void testTimeoutDuringExecution() throws InterruptedException {
7574
final CountDownLatch timeoutCalledLatch = new CountDownLatch(1);
7675
final CountDownLatch runningLatch = new CountDownLatch(1);
7776
final ActionListener<User> listener = ActionTestUtils.assertNoFailureListener(user -> listenerCalledLatch.countDown());
78-
final CancellableLdapRunnable<User> runnable = new CancellableLdapRunnable<>(listener, e -> null, () -> {
77+
final CancellableRunnable<User> runnable = new CancellableRunnable<>(listener, e -> null, () -> {
7978
runningLatch.countDown();
8079
try {
8180
timeoutCalledLatch.await();
@@ -98,7 +97,7 @@ public void testExceptionInRunnable() {
9897
AtomicReference<String> resultRef = new AtomicReference<>();
9998
final ActionListener<String> listener = ActionTestUtils.assertNoFailureListener(resultRef::set);
10099
String defaultValue = randomAlphaOfLengthBetween(2, 10);
101-
final CancellableLdapRunnable<String> runnable = new CancellableLdapRunnable<>(listener, e -> defaultValue, () -> {
100+
final CancellableRunnable<String> runnable = new CancellableRunnable<>(listener, e -> defaultValue, () -> {
102101
throw new RuntimeException("runnable intentionally failed");
103102
}, logger);
104103

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ldap/LdapRealm.java

Lines changed: 3 additions & 67 deletions
Original file line numberDiff line numberDiff line change
@@ -8,13 +8,10 @@
88

99
import com.unboundid.ldap.sdk.LDAPException;
1010

11-
import org.apache.logging.log4j.Logger;
12-
import org.elasticsearch.ElasticsearchTimeoutException;
1311
import org.elasticsearch.action.ActionListener;
1412
import org.elasticsearch.action.support.ContextPreservingActionListener;
1513
import org.elasticsearch.common.settings.Setting;
1614
import org.elasticsearch.common.settings.Settings;
17-
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
1815
import org.elasticsearch.common.util.concurrent.EsExecutors;
1916
import org.elasticsearch.common.util.concurrent.ThreadContext;
2017
import org.elasticsearch.core.IOUtils;
@@ -41,15 +38,14 @@
4138
import org.elasticsearch.xpack.security.authc.support.DelegatedAuthorizationSupport;
4239
import org.elasticsearch.xpack.security.authc.support.DnRoleMapper;
4340
import org.elasticsearch.xpack.security.authc.support.mapper.CompositeRoleMapper;
41+
import org.elasticsearch.xpack.core.security.support.CancellableRunnable;
4442
import org.elasticsearch.xpack.security.support.ReloadableSecurityComponent;
4543

4644
import java.util.HashMap;
4745
import java.util.List;
4846
import java.util.Map;
49-
import java.util.Objects;
5047
import java.util.concurrent.atomic.AtomicReference;
5148
import java.util.function.Consumer;
52-
import java.util.function.Function;
5349
import java.util.function.Supplier;
5450
import java.util.stream.Collectors;
5551

@@ -150,7 +146,7 @@ protected void doAuthenticate(UsernamePasswordToken token, ActionListener<Authen
150146
assert delegatedRealms != null : "Realm has not been initialized correctly";
151147
// we submit to the threadpool because authentication using LDAP will execute blocking I/O for a bind request and we don't want
152148
// network threads stuck waiting for a socket to connect. After the bind, then all interaction with LDAP should be async
153-
final CancellableLdapRunnable<AuthenticationResult<User>> cancellableLdapRunnable = new CancellableLdapRunnable<>(
149+
final CancellableRunnable<AuthenticationResult<User>> cancellableLdapRunnable = new CancellableRunnable<>(
154150
listener,
155151
ex -> AuthenticationResult.unsuccessful("Authentication against realm [" + this.toString() + "] failed", ex),
156152
() -> sessionFactory.session(
@@ -173,7 +169,7 @@ protected void doLookupUser(String username, ActionListener<User> userActionList
173169
result -> userActionListener.onResponse(result.getValue()),
174170
userActionListener::onFailure
175171
);
176-
final CancellableLdapRunnable<User> cancellableLdapRunnable = new CancellableLdapRunnable<>(
172+
final CancellableRunnable<User> cancellableLdapRunnable = new CancellableRunnable<>(
177173
userActionListener,
178174
e -> null,
179175
() -> sessionFactory.unauthenticatedSession(
@@ -323,65 +319,5 @@ public void onFailure(Exception e) {
323319
}
324320
resultListener.onResponse(AuthenticationResult.unsuccessful(action + " failed", e));
325321
}
326-
327-
}
328-
329-
/**
330-
* A runnable that allows us to terminate and call the listener. We use this as a runnable can
331-
* be queued and not executed for a long time or ever and this causes user requests to appear
332-
* to hang. In these cases at least we can provide a response.
333-
*/
334-
static class CancellableLdapRunnable<T> extends AbstractRunnable {
335-
336-
private final Runnable in;
337-
private final ActionListener<T> listener;
338-
private final Function<Exception, T> defaultValue;
339-
private final Logger logger;
340-
private final AtomicReference<LdapRunnableState> state = new AtomicReference<>(LdapRunnableState.AWAITING_EXECUTION);
341-
342-
CancellableLdapRunnable(ActionListener<T> listener, Function<Exception, T> defaultValue, Runnable in, Logger logger) {
343-
this.listener = listener;
344-
this.defaultValue = Objects.requireNonNull(defaultValue);
345-
this.in = in;
346-
this.logger = logger;
347-
}
348-
349-
@Override
350-
public void onFailure(Exception e) {
351-
logger.error("execution of ldap runnable failed", e);
352-
final T result = defaultValue.apply(e);
353-
listener.onResponse(result);
354-
}
355-
356-
@Override
357-
protected void doRun() throws Exception {
358-
if (state.compareAndSet(LdapRunnableState.AWAITING_EXECUTION, LdapRunnableState.EXECUTING)) {
359-
in.run();
360-
} else {
361-
logger.trace("skipping execution of ldap runnable as the current state is [{}]", state.get());
362-
}
363-
}
364-
365-
@Override
366-
public void onRejection(Exception e) {
367-
listener.onFailure(e);
368-
}
369-
370-
/**
371-
* If the execution of this runnable has not already started, the runnable is cancelled and we pass an exception to the user
372-
* listener
373-
*/
374-
void maybeTimeout() {
375-
if (state.compareAndSet(LdapRunnableState.AWAITING_EXECUTION, LdapRunnableState.TIMED_OUT)) {
376-
logger.warn("skipping execution of ldap runnable as it has been waiting for " + "execution too long");
377-
listener.onFailure(new ElasticsearchTimeoutException("timed out waiting for " + "execution of ldap runnable"));
378-
}
379-
}
380-
381-
enum LdapRunnableState {
382-
AWAITING_EXECUTION,
383-
EXECUTING,
384-
TIMED_OUT
385-
}
386322
}
387323
}

0 commit comments

Comments
 (0)