Skip to content

Commit 83f2ef8

Browse files
CR: comments
1 parent 62fb50e commit 83f2ef8

File tree

6 files changed

+34
-38
lines changed

6 files changed

+34
-38
lines changed

server/src/main/java/org/elasticsearch/action/support/SubscribableListener.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -266,14 +266,21 @@ public final boolean isDone() {
266266
return isDone(state);
267267
}
268268

269+
/**
270+
* @return return {@code true} if and only if this listener is done and has been completed successfully
271+
*/
272+
public final boolean isSuccess() {
273+
return state instanceof SuccessResult;
274+
}
275+
269276
/**
270277
* @return the result with which this listener completed successfully, or throw the exception with which it failed.
271278
*
272279
* @throws AssertionError if this listener is not complete yet and assertions are enabled.
273280
* @throws IllegalStateException if this listener is not complete yet and assertions are disabled.
274281
*/
275282
@SuppressWarnings({ "unchecked", "rawtypes" })
276-
public final T rawResult() throws Exception {
283+
protected final T rawResult() throws Exception {
277284
final Object currentState = state;
278285
if (currentState instanceof SuccessResult result) {
279286
return (T) result.result();

x-pack/plugin/security/src/internalClusterTest/java/org/elasticsearch/xpack/security/profile/ProfileCancellationIntegTests.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@
2222
import org.elasticsearch.cluster.metadata.ProjectMetadata;
2323
import org.elasticsearch.common.Strings;
2424
import org.elasticsearch.common.settings.Settings;
25-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
2625
import org.elasticsearch.index.IndexModule;
2726
import org.elasticsearch.index.shard.SearchOperationListener;
2827
import org.elasticsearch.plugins.ActionPlugin;
@@ -414,7 +413,7 @@ public SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
414413
AsyncSupplier<ResolvedIndices> indicesAsyncSupplier,
415414
ProjectMetadata metadata
416415
) {
417-
return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES);
416+
return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES);
418417
}
419418

420419
@Override

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/AuthorizationService.java

Lines changed: 9 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -486,13 +486,13 @@ private void authorizeAction(
486486
final AsyncSupplier<ResolvedIndices> resolvedIndicesAsyncSupplier = new CachingAsyncSupplier<>(() -> {
487487
if (request instanceof SearchRequest searchRequest && searchRequest.pointInTimeBuilder() != null) {
488488
var resolvedIndices = indicesAndAliasesResolver.resolvePITIndices(searchRequest);
489-
return ListenableFuture.newSucceeded(resolvedIndices);
489+
return SubscribableListener.newSucceeded(resolvedIndices);
490490
}
491491
final ResolvedIndices resolvedIndices = indicesAndAliasesResolver.tryResolveWithoutWildcards(action, request);
492492
if (resolvedIndices != null) {
493-
return ListenableFuture.newSucceeded(resolvedIndices);
493+
return SubscribableListener.newSucceeded(resolvedIndices);
494494
} else {
495-
final SubscribableListener<ResolvedIndices> resolvedIndicesListener = new SubscribableListener<ResolvedIndices>();
495+
final SubscribableListener<ResolvedIndices> resolvedIndicesListener = new SubscribableListener<>();
496496
authzEngine.loadAuthorizedIndices(
497497
requestInfo,
498498
authzInfo,
@@ -637,29 +637,21 @@ private void runRequestInterceptors(
637637
final Iterator<RequestInterceptor> requestInterceptorIterator = requestInterceptors.iterator();
638638
while (requestInterceptorIterator.hasNext()) {
639639
var res = requestInterceptorIterator.next().intercept(requestInfo, authorizationEngine, authorizationInfo);
640-
if (res.isDone() == false) {
640+
if (res.isSuccess() == false) {
641641
res.addListener(new DelegatingActionListener<>(listener) {
642642
@Override
643643
public void onResponse(Void unused) {
644644
if (requestInterceptorIterator.hasNext()) {
645-
var nestedRest = requestInterceptorIterator.next()
646-
.intercept(requestInfo, authorizationEngine, authorizationInfo);
647-
if (nestedRest.isDone() == false) {
648-
nestedRest.addListener(this);
649-
}
645+
requestInterceptorIterator.next()
646+
.intercept(requestInfo, authorizationEngine, authorizationInfo)
647+
.addListener(this);
650648
} else {
651-
listener.onResponse(null);
649+
delegate.onResponse(null);
652650
}
653651
}
654652
});
655653
return;
656654
}
657-
try {
658-
res.rawResult();
659-
} catch (Exception e) {
660-
listener.onFailure(e);
661-
return;
662-
}
663655
}
664656
listener.onResponse(null);
665657
}
@@ -887,7 +879,7 @@ private void authorizeBulkItems(
887879
authzEngine.authorizeIndexAction(
888880
bulkItemInfo,
889881
authzInfo,
890-
() -> ListenableFuture.newSucceeded(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())),
882+
() -> SubscribableListener.newSucceeded(new ResolvedIndices(new ArrayList<>(indices), Collections.emptyList())),
891883
projectMetadata
892884
)
893885
.addListener(

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/RBACEngine.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@
4141
import org.elasticsearch.common.regex.Regex;
4242
import org.elasticsearch.common.settings.Settings;
4343
import org.elasticsearch.common.util.CachedSupplier;
44-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
4544
import org.elasticsearch.common.util.set.Sets;
4645
import org.elasticsearch.index.Index;
4746
import org.elasticsearch.index.shard.ShardId;
@@ -328,12 +327,12 @@ public SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
328327
try {
329328
role = ensureRBAC(authorizationInfo).getRole();
330329
} catch (Exception e) {
331-
return ListenableFuture.newFailed(e);
330+
return SubscribableListener.newFailed(e);
332331
}
333332
if (TransportActionProxy.isProxyAction(action) || shouldAuthorizeIndexActionNameOnly(action, request)) {
334333
// we've already validated that the request is a proxy request so we can skip that but we still
335334
// need to validate that the action is allowed and then move on
336-
return ListenableFuture.newSucceeded(
335+
return SubscribableListener.newSucceeded(
337336
role.checkIndicesAction(action) ? IndexAuthorizationResult.EMPTY : IndexAuthorizationResult.DENIED
338337
);
339338
} else if (request instanceof IndicesRequest == false) {
@@ -353,7 +352,7 @@ public SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
353352
// index and if they cannot, we can fail the request early before we allow the execution of the action and in
354353
// turn the shard actions
355354
if (TransportSearchScrollAction.TYPE.name().equals(action)) {
356-
final ListenableFuture<IndexAuthorizationResult> listener = new ListenableFuture<>();
355+
final SubscribableListener<IndexAuthorizationResult> listener = new SubscribableListener<>();
357356
ActionRunnable.supply(listener.delegateFailureAndWrap((l, parsedScrollId) -> {
358357
if (parsedScrollId.hasLocalIndices()) {
359358
l.onResponse(
@@ -373,42 +372,42 @@ public SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
373372
// The DLS/FLS permissions are used inside the {@code DirectoryReader} that {@code SecurityIndexReaderWrapper}
374373
// built while handling the initial search request. In addition, for consistency, the DLS/FLS permissions from
375374
// the originating search request are attached to the thread context upon validating the scroll.
376-
return ListenableFuture.newSucceeded(IndexAuthorizationResult.EMPTY);
375+
return SubscribableListener.newSucceeded(IndexAuthorizationResult.EMPTY);
377376
}
378377
} else if (isAsyncRelatedAction(action)) {
379378
if (SubmitAsyncSearchAction.NAME.equals(action)) {
380379
// authorize submit async search but don't fill in the DLS/FLS permissions
381380
// the `null` IndicesAccessControl parameter indicates that this action has *not* determined
382381
// which DLS/FLS controls should be applied to this action
383-
return ListenableFuture.newSucceeded(IndexAuthorizationResult.EMPTY);
382+
return SubscribableListener.newSucceeded(IndexAuthorizationResult.EMPTY);
384383
} else {
385384
// async-search actions other than submit have a custom security layer that checks if the current user is
386385
// the same as the user that submitted the original request so no additional checks are needed here.
387-
return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES);
386+
return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES);
388387
}
389388
} else if (action.equals(TransportClosePointInTimeAction.TYPE.name())) {
390-
return ListenableFuture.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES);
389+
return SubscribableListener.newSucceeded(IndexAuthorizationResult.ALLOW_NO_INDICES);
391390
} else {
392391
assert false
393392
: "only scroll and async-search related requests are known indices api that don't "
394393
+ "support retrieving the indices they relate to";
395-
return ListenableFuture.newFailed(
394+
return SubscribableListener.newFailed(
396395
new IllegalStateException(
397396
"only scroll and async-search related requests are known indices "
398397
+ "api that don't support retrieving the indices they relate to"
399398
)
400399
);
401400
}
402401
} else if (isChildActionAuthorizedByParentOnLocalNode(requestInfo, authorizationInfo)) {
403-
return ListenableFuture.newSucceeded(
402+
return SubscribableListener.newSucceeded(
404403
new IndexAuthorizationResult(requestInfo.getOriginatingAuthorizationContext().getIndicesAccessControl())
405404
);
406405
} else if (PreAuthorizationUtils.shouldPreAuthorizeChildByParentAction(requestInfo, authorizationInfo)) {
407406
// We only pre-authorize child actions if DLS/FLS is not configured,
408407
// hence we can allow here access for all requested indices.
409-
return ListenableFuture.newSucceeded(new IndexAuthorizationResult(IndicesAccessControl.allowAll()));
408+
return SubscribableListener.newSucceeded(new IndexAuthorizationResult(IndicesAccessControl.allowAll()));
410409
} else if (allowsRemoteIndices(request) || role.checkIndicesAction(action)) {
411-
final ListenableFuture<IndexAuthorizationResult> listener = new ListenableFuture<>();
410+
final SubscribableListener<IndexAuthorizationResult> listener = new SubscribableListener<>();
412411
indicesAsyncSupplier.getAsync().addListener(listener.delegateFailureAndWrap((delegateListener, resolvedIndices) -> {
413412
assert resolvedIndices.isEmpty() == false
414413
: "every indices request needs to have its indices set thus the resolved indices must not be empty";
@@ -446,7 +445,7 @@ public SubscribableListener<IndexAuthorizationResult> authorizeIndexAction(
446445
}));
447446
return listener;
448447
} else {
449-
return ListenableFuture.newSucceeded(IndexAuthorizationResult.DENIED);
448+
return SubscribableListener.newSucceeded(IndexAuthorizationResult.DENIED);
450449
}
451450
}
452451

x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authz/interceptor/IndicesAliasesRequestInterceptor.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010
import org.elasticsearch.action.ActionListener;
1111
import org.elasticsearch.action.admin.indices.alias.IndicesAliasesRequest;
1212
import org.elasticsearch.action.support.SubscribableListener;
13-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
1413
import org.elasticsearch.common.util.concurrent.ThreadContext;
1514
import org.elasticsearch.core.Tuple;
1615
import org.elasticsearch.license.XPackLicenseState;
@@ -99,7 +98,7 @@ public SubscribableListener<Void> intercept(
9998
list.addAll(toMerge);
10099
return list;
101100
}));
102-
final SubscribableListener<Void> listener = new ListenableFuture<>();
101+
final SubscribableListener<Void> listener = new SubscribableListener<>();
103102
authorizationEngine.validateIndexPermissionsAreSubset(
104103
requestInfo,
105104
authorizationInfo,
@@ -126,7 +125,7 @@ public SubscribableListener<Void> intercept(
126125
);
127126
return listener;
128127
} else {
129-
return ListenableFuture.newSucceeded(null);
128+
return SubscribableListener.newSucceeded(null);
130129
}
131130
}
132131
}

x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authz/RBACEngineTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import org.elasticsearch.action.search.SearchRequest;
2121
import org.elasticsearch.action.search.TransportSearchAction;
2222
import org.elasticsearch.action.support.PlainActionFuture;
23+
import org.elasticsearch.action.support.SubscribableListener;
2324
import org.elasticsearch.client.internal.Client;
2425
import org.elasticsearch.client.internal.ElasticsearchClient;
2526
import org.elasticsearch.cluster.metadata.DataStream;
@@ -31,7 +32,6 @@
3132
import org.elasticsearch.common.bytes.BytesArray;
3233
import org.elasticsearch.common.bytes.BytesReference;
3334
import org.elasticsearch.common.settings.Settings;
34-
import org.elasticsearch.common.util.concurrent.ListenableFuture;
3535
import org.elasticsearch.common.util.set.Sets;
3636
import org.elasticsearch.core.Tuple;
3737
import org.elasticsearch.index.IndexVersion;
@@ -1971,7 +1971,7 @@ private void authorizeIndicesAction(
19711971
final ResolvedIndices resolvedIndices = new ResolvedIndices(List.of(indices), List.of());
19721972
final TransportRequest searchRequest = new SearchRequest(indices);
19731973
final RequestInfo requestInfo = createRequestInfo(searchRequest, action, parentAuthorization);
1974-
final AsyncSupplier<ResolvedIndices> indicesAsyncSupplier = () -> ListenableFuture.newSucceeded(resolvedIndices);
1974+
final AsyncSupplier<ResolvedIndices> indicesAsyncSupplier = () -> SubscribableListener.newSucceeded(resolvedIndices);
19751975

19761976
Metadata.Builder metadata = Metadata.builder();
19771977
Stream.of(indices)

0 commit comments

Comments
 (0)