diff --git a/modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexFromRemoteWithAuthTests.java b/modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexFromRemoteWithAuthTests.java index 51aa02c55070c..8cb63b91c44de 100644 --- a/modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexFromRemoteWithAuthTests.java +++ b/modules/reindex/src/test/java/org/opensearch/index/reindex/ReindexFromRemoteWithAuthTests.java @@ -39,6 +39,7 @@ import org.opensearch.action.search.SearchAction; import org.opensearch.action.support.ActionFilter; import org.opensearch.action.support.ActionFilterChain; +import org.opensearch.action.support.ActionRequestMetadata; import org.opensearch.action.support.WriteRequest.RefreshPolicy; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.service.ClusterService; @@ -233,6 +234,7 @@ public void app Task task, String action, Request request, + ActionRequestMetadata actionRequestMetadata, ActionListener listener, ActionFilterChain chain ) { diff --git a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/AutoTaggingActionFilter.java b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/AutoTaggingActionFilter.java index 1268f0f69b5eb..15838fc903498 100644 --- a/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/AutoTaggingActionFilter.java +++ b/plugins/workload-management/src/main/java/org/opensearch/plugin/wlm/AutoTaggingActionFilter.java @@ -13,6 +13,7 @@ import org.opensearch.action.search.SearchRequest; import org.opensearch.action.support.ActionFilter; import org.opensearch.action.support.ActionFilterChain; +import org.opensearch.action.support.ActionRequestMetadata; import org.opensearch.core.action.ActionListener; import org.opensearch.core.action.ActionResponse; import org.opensearch.plugin.wlm.rule.attribute_extractor.IndicesExtractor; @@ -51,6 +52,7 @@ public void app Task task, String action, Request request, + ActionRequestMetadata actionRequestMetadata, ActionListener listener, ActionFilterChain chain ) { diff --git a/server/src/main/java/org/opensearch/action/admin/indices/alias/IndicesAliasesRequest.java b/server/src/main/java/org/opensearch/action/admin/indices/alias/IndicesAliasesRequest.java index dd915f8857162..32fc713c93047 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/alias/IndicesAliasesRequest.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/alias/IndicesAliasesRequest.java @@ -35,6 +35,7 @@ import org.opensearch.OpenSearchGenerationException; import org.opensearch.action.ActionRequestValidationException; import org.opensearch.action.AliasesRequest; +import org.opensearch.action.CompositeIndicesRequest; import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.clustermanager.AcknowledgedRequest; import org.opensearch.cluster.metadata.AliasAction; @@ -76,7 +77,7 @@ * @opensearch.api */ @PublicApi(since = "1.0.0") -public class IndicesAliasesRequest extends AcknowledgedRequest implements ToXContentObject { +public class IndicesAliasesRequest extends AcknowledgedRequest implements ToXContentObject, CompositeIndicesRequest { private List allAliasActions = new ArrayList<>(); private String origin = ""; diff --git a/server/src/main/java/org/opensearch/action/admin/indices/alias/TransportIndicesAliasesAction.java b/server/src/main/java/org/opensearch/action/admin/indices/alias/TransportIndicesAliasesAction.java index a5e87482f7645..0ab19d8b1c63b 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/alias/TransportIndicesAliasesAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/alias/TransportIndicesAliasesAction.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.RequestValidators; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.AcknowledgedResponse; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; @@ -48,6 +49,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; import org.opensearch.cluster.metadata.MetadataIndexAliasesService; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.regex.Regex; @@ -68,6 +70,7 @@ import java.util.Objects; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; import static java.util.Collections.unmodifiableList; @@ -76,7 +79,9 @@ * * @opensearch.internal */ -public class TransportIndicesAliasesAction extends TransportClusterManagerNodeAction { +public class TransportIndicesAliasesAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private static final Logger logger = LogManager.getLogger(TransportIndicesAliasesAction.class); @@ -131,45 +136,97 @@ protected void clusterManagerOperation( final IndicesAliasesRequest request, final ClusterState state, final ActionListener listener - ) { + ) throws Exception { // Expand the indices names List actions = request.aliasActions(); - List finalActions = new ArrayList<>(); + List finalActions = resolvedAliasActions(request, state, true); + if (finalActions.isEmpty() && false == actions.isEmpty()) { + throw new AliasesNotFoundException( + actions.stream().flatMap(a -> Arrays.stream(a.getOriginalAliases())).collect(Collectors.toSet()).toArray(new String[0]) + ); + } + request.aliasActions().clear(); + IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest(unmodifiableList(finalActions)) + .ackTimeout(request.timeout()) + .clusterManagerNodeTimeout(request.clusterManagerNodeTimeout()); + + indexAliasesService.indicesAliases(updateRequest, new ActionListener() { + @Override + public void onResponse(ClusterStateUpdateResponse response) { + listener.onResponse(new AcknowledgedResponse(response.isAcknowledged())); + } + + @Override + public void onFailure(Exception t) { + logger.debug("failed to perform aliases", t); + listener.onFailure(t); + } + }); + } + + @Override + public ResolvedIndices resolveIndices(IndicesAliasesRequest request) { + try { + Set indices = new HashSet<>(); + + for (AliasAction aliasAction : resolvedAliasActions(request, clusterService.state(), false)) { + if (aliasAction instanceof AliasAction.Add addAliasAction) { + indices.add(addAliasAction.getIndex()); + indices.add(addAliasAction.getAlias()); + } else if (aliasAction instanceof AliasAction.Remove removeAliasAction) { + indices.add(removeAliasAction.getIndex()); + indices.add(removeAliasAction.getAlias()); + } else if (aliasAction instanceof AliasAction.RemoveIndex removeIndexAction) { + // TODO special action + indices.add(removeIndexAction.getIndex()); + } + } + + return ResolvedIndices.of(indices); + } catch (RuntimeException e) { + throw e; + } catch (Exception e) { + // This should not happen if validate=false is passed to resolvedAliasActions() + throw new RuntimeException(e); + } + } + + private List resolvedAliasActions(IndicesAliasesRequest request, ClusterState state, boolean validate) throws Exception { + List result = new ArrayList<>(); // Resolve all the AliasActions into AliasAction instances and gather all the aliases - Set aliases = new HashSet<>(); - for (IndicesAliasesRequest.AliasActions action : actions) { + for (IndicesAliasesRequest.AliasActions action : request.aliasActions()) { final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices( state, request.indicesOptions(), false, action.indices() ); - for (Index concreteIndex : concreteIndices) { - IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(concreteIndex.getName()); - assert indexAbstraction != null : "invalid cluster metadata. index [" + concreteIndex.getName() + "] was not found"; - if (indexAbstraction.getParentDataStream() != null) { - throw new IllegalArgumentException( - "The provided expressions [" - + String.join(",", action.indices()) - + "] match a backing index belonging to data stream [" - + indexAbstraction.getParentDataStream().getName() - + "]. Data streams and their backing indices don't support aliases." - ); + if (validate) { + for (Index concreteIndex : concreteIndices) { + IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(concreteIndex.getName()); + assert indexAbstraction != null : "invalid cluster metadata. index [" + concreteIndex.getName() + "] was not found"; + if (indexAbstraction.getParentDataStream() != null) { + throw new IllegalArgumentException( + "The provided expressions [" + + String.join(",", action.indices()) + + "] match a backing index belonging to data stream [" + + indexAbstraction.getParentDataStream().getName() + + "]. Data streams and their backing indices don't support aliases." + ); + } + } + final Optional maybeException = requestValidators.validateRequest(request, state, concreteIndices); + if (maybeException.isPresent()) { + throw maybeException.get(); } - } - final Optional maybeException = requestValidators.validateRequest(request, state, concreteIndices); - if (maybeException.isPresent()) { - listener.onFailure(maybeException.get()); - return; } - Collections.addAll(aliases, action.getOriginalAliases()); for (final Index index : concreteIndices) { switch (action.actionType()) { case ADD: for (String alias : concreteAliases(action, state.metadata(), index.getName())) { - finalActions.add( + result.add( new AliasAction.Add( index.getName(), alias, @@ -184,37 +241,19 @@ protected void clusterManagerOperation( break; case REMOVE: for (String alias : concreteAliases(action, state.metadata(), index.getName())) { - finalActions.add(new AliasAction.Remove(index.getName(), alias, action.mustExist())); + result.add(new AliasAction.Remove(index.getName(), alias, action.mustExist())); } break; case REMOVE_INDEX: - finalActions.add(new AliasAction.RemoveIndex(index.getName())); + result.add(new AliasAction.RemoveIndex(index.getName())); break; default: throw new IllegalArgumentException("Unsupported action [" + action.actionType() + "]"); } } } - if (finalActions.isEmpty() && false == actions.isEmpty()) { - throw new AliasesNotFoundException(aliases.toArray(new String[0])); - } - request.aliasActions().clear(); - IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest(unmodifiableList(finalActions)) - .ackTimeout(request.timeout()) - .clusterManagerNodeTimeout(request.clusterManagerNodeTimeout()); - indexAliasesService.indicesAliases(updateRequest, new ActionListener() { - @Override - public void onResponse(ClusterStateUpdateResponse response) { - listener.onResponse(new AcknowledgedResponse(response.isAcknowledged())); - } - - @Override - public void onFailure(Exception t) { - logger.debug("failed to perform aliases", t); - listener.onFailure(t); - } - }); + return result; } private static String[] concreteAliases(IndicesAliasesRequest.AliasActions action, Metadata metadata, String concreteIndex) { @@ -255,4 +294,5 @@ private static String[] concreteAliases(IndicesAliasesRequest.AliasActions actio return action.aliases(); } } + } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/create/TransportCreateIndexAction.java b/server/src/main/java/org/opensearch/action/admin/indices/create/TransportCreateIndexAction.java index c1f7c6ef87e8a..31d3c1373b3c4 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/create/TransportCreateIndexAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/create/TransportCreateIndexAction.java @@ -33,12 +33,14 @@ package org.opensearch.action.admin.indices.create; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataCreateIndexService; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; @@ -54,7 +56,9 @@ * * @opensearch.internal */ -public class TransportCreateIndexAction extends TransportClusterManagerNodeAction { +public class TransportCreateIndexAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private final MetadataCreateIndexService createIndexService; private final MappingTransformerRegistry mappingTransformerRegistry; @@ -115,7 +119,7 @@ protected void clusterManagerOperation( cause = "api"; } - final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index()); + final String indexName = resolveIndexName(request); final String finalCause = cause; final ActionListener mappingTransformListener = ActionListener.wrap(transformedMappings -> { @@ -143,4 +147,12 @@ protected void clusterManagerOperation( mappingTransformerRegistry.applyTransformers(request.mappings(), null, mappingTransformListener); } + @Override + public ResolvedIndices resolveIndices(CreateIndexRequest request) { + return ResolvedIndices.of(resolveIndexName(request)); + } + + private String resolveIndexName(CreateIndexRequest request) { + return indexNameExpressionResolver.resolveDateMathExpression(request.index()); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/datastream/CreateDataStreamAction.java b/server/src/main/java/org/opensearch/action/admin/indices/datastream/CreateDataStreamAction.java index 6b5091e0b2ab5..70bcf2d015b98 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/datastream/CreateDataStreamAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/datastream/CreateDataStreamAction.java @@ -37,6 +37,7 @@ import org.opensearch.action.ValidateActions; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.AcknowledgedRequest; import org.opensearch.action.support.clustermanager.AcknowledgedResponse; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; @@ -46,6 +47,7 @@ import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataCreateDataStreamService; import org.opensearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.annotation.PublicApi; import org.opensearch.common.inject.Inject; @@ -137,7 +139,9 @@ public IndicesOptions indicesOptions() { * * @opensearch.internal */ - public static class TransportAction extends TransportClusterManagerNodeAction { + public static class TransportAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private final MetadataCreateDataStreamService metadataCreateDataStreamService; @@ -179,6 +183,11 @@ protected void clusterManagerOperation(Request request, ClusterState state, Acti protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } + + @Override + public ResolvedIndices resolveIndices(Request request) { + return ResolvedIndices.of(request.name); + } } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/datastream/GetDataStreamAction.java b/server/src/main/java/org/opensearch/action/admin/indices/datastream/GetDataStreamAction.java index 1db4e85887c23..edefdfa2b3759 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/datastream/GetDataStreamAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/datastream/GetDataStreamAction.java @@ -38,6 +38,7 @@ import org.opensearch.action.IndicesRequest; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; import org.opensearch.cluster.AbstractDiffable; @@ -49,6 +50,7 @@ import org.opensearch.cluster.metadata.DataStream; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataIndexTemplateService; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; import org.opensearch.common.annotation.PublicApi; @@ -292,7 +294,9 @@ public int hashCode() { * * @opensearch.internal */ - public static class TransportAction extends TransportClusterManagerNodeReadAction { + public static class TransportAction extends TransportClusterManagerNodeReadAction + implements + TransportIndicesResolvingAction { private static final Logger logger = LogManager.getLogger(TransportAction.class); @@ -354,6 +358,15 @@ static List getDataStreams(ClusterState clusterState, IndexNameExpre protected ClusterBlockException checkBlock(Request request, ClusterState state) { return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE); } + + @Override + public ResolvedIndices resolveIndices(Request request) { + return ResolvedIndices.of( + getDataStreams(clusterService.state(), indexNameExpressionResolver, request).stream() + .map(DataStream::getName) + .collect(Collectors.toList()) + ); + } } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/delete/TransportDeleteIndexAction.java b/server/src/main/java/org/opensearch/action/admin/indices/delete/TransportDeleteIndexAction.java index d6fc5386aed92..3641c6b06f34f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/delete/TransportDeleteIndexAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/delete/TransportDeleteIndexAction.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.DestructiveOperations; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.AcknowledgedResponse; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; @@ -44,6 +45,7 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataDeleteIndexService; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; @@ -55,15 +57,15 @@ import java.io.IOException; import java.util.Arrays; -import java.util.HashSet; -import java.util.Set; /** * Delete index action. * * @opensearch.internal */ -public class TransportDeleteIndexAction extends TransportClusterManagerNodeAction { +public class TransportDeleteIndexAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private static final Logger logger = LogManager.getLogger(TransportDeleteIndexAction.class); @@ -120,15 +122,15 @@ protected void clusterManagerOperation( final ClusterState state, final ActionListener listener ) { - final Set concreteIndices = new HashSet<>(Arrays.asList(indexNameExpressionResolver.concreteIndices(state, request))); - if (concreteIndices.isEmpty()) { + Index[] concreteIndices = resolveIndicesAsArray(request, state); + if (concreteIndices.length == 0) { listener.onResponse(new AcknowledgedResponse(true)); return; } DeleteIndexClusterStateUpdateRequest deleteRequest = new DeleteIndexClusterStateUpdateRequest().ackTimeout(request.timeout()) .clusterManagerNodeTimeout(request.clusterManagerNodeTimeout()) - .indices(concreteIndices.toArray(new Index[0])); + .indices(concreteIndices); deleteIndexService.deleteIndices(deleteRequest, new ActionListener() { @@ -139,9 +141,18 @@ public void onResponse(ClusterStateUpdateResponse response) { @Override public void onFailure(Exception t) { - logger.debug(() -> new ParameterizedMessage("failed to delete indices [{}]", concreteIndices), t); + logger.debug(() -> new ParameterizedMessage("failed to delete indices [{}]", Arrays.asList(concreteIndices)), t); listener.onFailure(t); } }); } + + @Override + public ResolvedIndices resolveIndices(DeleteIndexRequest request) { + return ResolvedIndices.of(resolveIndicesAsArray(request, clusterService.state())); + } + + private Index[] resolveIndicesAsArray(DeleteIndexRequest request, ClusterState clusterState) { + return indexNameExpressionResolver.concreteIndices(clusterState, request); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java index a298eae1aa865..7dc26641780f3 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/exists/indices/TransportIndicesExistsAction.java @@ -34,11 +34,13 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; @@ -54,7 +56,9 @@ * * @opensearch.internal */ -public class TransportIndicesExistsAction extends TransportClusterManagerNodeReadAction { +public class TransportIndicesExistsAction extends TransportClusterManagerNodeReadAction + implements + TransportIndicesResolvingAction { @Inject public TransportIndicesExistsAction( @@ -119,4 +123,10 @@ protected void clusterManagerOperation( } listener.onResponse(new IndicesExistsResponse(exists)); } + + @Override + public ResolvedIndices resolveIndices(IndicesExistsRequest request) { + // TODO this is likely not correct + return ResolvedIndices.of(indexNameExpressionResolver.resolveExpressions(clusterService.state(), request.indices())); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/mapping/put/TransportPutMappingAction.java b/server/src/main/java/org/opensearch/action/admin/indices/mapping/put/TransportPutMappingAction.java index 4c37592714185..a156aeeca8e1f 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/mapping/put/TransportPutMappingAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/mapping/put/TransportPutMappingAction.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.RequestValidators; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.AcknowledgedResponse; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; @@ -45,6 +46,7 @@ import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataMappingService; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; @@ -68,7 +70,9 @@ * * @opensearch.internal */ -public class TransportPutMappingAction extends TransportClusterManagerNodeAction { +public class TransportPutMappingAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private static final Logger logger = LogManager.getLogger(TransportPutMappingAction.class); @@ -150,6 +154,11 @@ protected void clusterManagerOperation( } } + @Override + public ResolvedIndices resolveIndices(PutMappingRequest request) { + return ResolvedIndices.of(resolveIndices(clusterService.state(), request, indexNameExpressionResolver)); + } + static Index[] resolveIndices(final ClusterState state, PutMappingRequest request, final IndexNameExpressionResolver iner) { if (request.getConcreteIndex() == null) { if (request.writeIndexOnly()) { diff --git a/server/src/main/java/org/opensearch/action/admin/indices/open/TransportOpenIndexAction.java b/server/src/main/java/org/opensearch/action/admin/indices/open/TransportOpenIndexAction.java index 7bd21925eb11d..a8ecc62fa112c 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/open/TransportOpenIndexAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/open/TransportOpenIndexAction.java @@ -37,6 +37,7 @@ import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.DestructiveOperations; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ack.OpenIndexClusterStateUpdateResponse; @@ -44,6 +45,7 @@ import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataIndexStateService; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; @@ -60,7 +62,9 @@ * * @opensearch.internal */ -public class TransportOpenIndexAction extends TransportClusterManagerNodeAction { +public class TransportOpenIndexAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private static final Logger logger = LogManager.getLogger(TransportOpenIndexAction.class); @@ -119,7 +123,7 @@ protected void clusterManagerOperation( final ClusterState state, final ActionListener listener ) { - final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + final Index[] concreteIndices = resolveIndicesAsArray(request, state); if (concreteIndices == null || concreteIndices.length == 0) { listener.onResponse(new OpenIndexResponse(true, true)); return; @@ -143,4 +147,13 @@ public void onFailure(Exception t) { } }); } + + @Override + public ResolvedIndices resolveIndices(OpenIndexRequest request) { + return ResolvedIndices.of(resolveIndicesAsArray(request, clusterService.state())); + } + + private Index[] resolveIndicesAsArray(OpenIndexRequest request, ClusterState clusterState) { + return indexNameExpressionResolver.concreteIndices(clusterState, request); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java index 3124484716706..8168c9aab98df 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/rollover/TransportRolloverAction.java @@ -39,6 +39,7 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ActiveShardsObserver; import org.opensearch.action.support.IndicesOptions; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateUpdateTask; @@ -47,6 +48,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.Metadata; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterManagerTaskThrottler; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.Nullable; @@ -77,7 +79,9 @@ * * @opensearch.internal */ -public class TransportRolloverAction extends TransportClusterManagerNodeAction { +public class TransportRolloverAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private final MetadataRolloverService rolloverService; private final ActiveShardsObserver activeShardsObserver; @@ -260,6 +264,29 @@ public void onFailure(Exception e) { }); } + @Override + public ResolvedIndices resolveIndices(RolloverRequest rolloverRequest) { + try { + MetadataRolloverService.RolloverResult preResult = rolloverService.rolloverClusterState( + clusterService.state(), + rolloverRequest.getRolloverTarget(), + rolloverRequest.getNewIndexName(), + rolloverRequest.getCreateIndexRequest(), + Collections.emptyList(), + true, + true + ); + + return ResolvedIndices.of(preResult.sourceIndexName, preResult.rolloverIndexName); + } catch (Exception e) { + // Exceptions are mostly occurring due to validation errors (e.g. non-existing indices). + // These are not propagated to the caller because it should be still + // the clusterManagerOperation() that should report these failures. + // Instead, we return a basic result which still allows privilege evaluation. + return ResolvedIndices.ofNonNull(rolloverRequest.getNewIndexName(), rolloverRequest.getRolloverTarget()); + } + } + static Map evaluateConditions( final Collection> conditions, @Nullable final DocsStats docsStats, @@ -291,4 +318,5 @@ static Map evaluateConditions( return evaluateConditions(conditions, docsStats, metadata); } } + } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/settings/get/TransportGetSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/settings/get/TransportGetSettingsAction.java index d8f2180208b18..ecf6ad610c4f5 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/settings/get/TransportGetSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/settings/get/TransportGetSettingsAction.java @@ -33,12 +33,14 @@ package org.opensearch.action.admin.indices.settings.get; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.regex.Regex; @@ -61,7 +63,9 @@ * * @opensearch.internal */ -public class TransportGetSettingsAction extends TransportClusterManagerNodeReadAction { +public class TransportGetSettingsAction extends TransportClusterManagerNodeReadAction + implements + TransportIndicesResolvingAction { private final SettingsFilter settingsFilter; private final IndexScopedSettings indexScopedSettings; @@ -112,7 +116,7 @@ private static boolean isFilteredRequest(GetSettingsRequest request) { @Override protected void clusterManagerOperation(GetSettingsRequest request, ClusterState state, ActionListener listener) { - Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + Index[] concreteIndices = resolveIndicesAsArray(request, state); final Map indexToSettingsBuilder = new HashMap<>(); final Map indexToDefaultSettingsBuilder = new HashMap<>(); for (Index concreteIndex : concreteIndices) { @@ -141,4 +145,13 @@ protected void clusterManagerOperation(GetSettingsRequest request, ClusterState } listener.onResponse(new GetSettingsResponse(indexToSettingsBuilder, indexToDefaultSettingsBuilder)); } + + @Override + public ResolvedIndices resolveIndices(GetSettingsRequest request) { + return ResolvedIndices.of(resolveIndicesAsArray(request, clusterService.state())); + } + + private Index[] resolveIndicesAsArray(GetSettingsRequest request, ClusterState clusterState) { + return indexNameExpressionResolver.concreteIndices(clusterState, request); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java b/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java index fe1b139358b30..91670ac1d15c9 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/settings/put/TransportUpdateSettingsAction.java @@ -36,6 +36,7 @@ import org.apache.logging.log4j.Logger; import org.apache.logging.log4j.message.ParameterizedMessage; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.AcknowledgedResponse; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; @@ -45,6 +46,7 @@ import org.opensearch.cluster.block.ClusterBlocks; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataUpdateSettingsService; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; @@ -63,7 +65,9 @@ * * @opensearch.internal */ -public class TransportUpdateSettingsAction extends TransportClusterManagerNodeAction { +public class TransportUpdateSettingsAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private static final Logger logger = LogManager.getLogger(TransportUpdateSettingsAction.class); @@ -158,7 +162,7 @@ protected void clusterManagerOperation( final ClusterState state, final ActionListener listener ) { - final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + final Index[] concreteIndices = resolveIndicesAsArray(request, state); UpdateSettingsClusterStateUpdateRequest clusterStateUpdateRequest = new UpdateSettingsClusterStateUpdateRequest().indices( concreteIndices ) @@ -180,4 +184,13 @@ public void onFailure(Exception t) { } }); } + + @Override + public ResolvedIndices resolveIndices(UpdateSettingsRequest request) { + return ResolvedIndices.of(resolveIndicesAsArray(request, clusterService.state())); + } + + private Index[] resolveIndicesAsArray(UpdateSettingsRequest request, ClusterState clusterState) { + return indexNameExpressionResolver.concreteIndices(clusterState, request); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java b/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java index 41b6df062ab42..a47917d0cdcbf 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/shrink/TransportResizeAction.java @@ -37,6 +37,7 @@ import org.opensearch.action.admin.indices.create.CreateIndexRequest; import org.opensearch.action.admin.indices.stats.IndexShardStats; import org.opensearch.action.support.ActionFilters; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; @@ -44,6 +45,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataCreateIndexService; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; @@ -78,7 +80,9 @@ * * @opensearch.internal */ -public class TransportResizeAction extends TransportClusterManagerNodeAction { +public class TransportResizeAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private final MetadataCreateIndexService createIndexService; private final Client client; @@ -143,8 +147,8 @@ protected void clusterManagerOperation( ) { // there is no need to fetch docs stats for split but we keep it simple and do it anyway for simplicity of the code - final String sourceIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex()); - final String targetIndex = indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index()); + final String sourceIndex = resolveSourceIndex(resizeRequest); + final String targetIndex = resolveTargetIndex(resizeRequest); IndexMetadata indexMetadata = state.metadata().index(sourceIndex); ClusterSettings clusterSettings = clusterService.getClusterSettings(); if (resizeRequest.getResizeType().equals(ResizeType.SHRINK) @@ -222,6 +226,11 @@ protected void clusterManagerOperation( } + @Override + public ResolvedIndices resolveIndices(ResizeRequest resizeRequest) { + return ResolvedIndices.of(resolveSourceIndex(resizeRequest), resolveTargetIndex(resizeRequest)); + } + // static for unittesting this method static CreateIndexClusterStateUpdateRequest prepareCreateIndexRequest( final ResizeRequest resizeRequest, @@ -410,4 +419,12 @@ private static void validateRemoteMigrationModeSettings( } } } + + private String resolveSourceIndex(ResizeRequest resizeRequest) { + return indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getSourceIndex()); + } + + private String resolveTargetIndex(ResizeRequest resizeRequest) { + return indexNameExpressionResolver.resolveDateMathExpression(resizeRequest.getTargetIndexRequest().index()); + } } diff --git a/server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/pause/TransportPauseIngestionAction.java b/server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/pause/TransportPauseIngestionAction.java index aacf67974e5fa..6f2a2577c84a3 100644 --- a/server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/pause/TransportPauseIngestionAction.java +++ b/server/src/main/java/org/opensearch/action/admin/indices/streamingingestion/pause/TransportPauseIngestionAction.java @@ -15,12 +15,14 @@ import org.opensearch.action.admin.indices.streamingingestion.state.UpdateIngestionStateResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.DestructiveOperations; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.cluster.metadata.MetadataStreamingIngestionStateService; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.core.action.ActionListener; @@ -38,7 +40,9 @@ * * @opensearch.experimental */ -public class TransportPauseIngestionAction extends TransportClusterManagerNodeAction { +public class TransportPauseIngestionAction extends TransportClusterManagerNodeAction + implements + TransportIndicesResolvingAction { private static final Logger logger = LogManager.getLogger(TransportPauseIngestionAction.class); @@ -106,7 +110,7 @@ protected void clusterManagerOperation( final ClusterState state, final ActionListener listener ) throws Exception { - final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(state, request); + final Index[] concreteIndices = resolveIndicesAsArray(request, state); if (concreteIndices == null || concreteIndices.length == 0) { listener.onResponse(new PauseIngestionResponse(true, false, new IngestionStateShardFailure[0], "")); return; @@ -144,4 +148,13 @@ public void onFailure(Exception e) { } ); } + + @Override + public ResolvedIndices resolveIndices(PauseIngestionRequest request) { + return ResolvedIndices.of(resolveIndicesAsArray(request, clusterService.state())); + } + + private Index[] resolveIndicesAsArray(PauseIngestionRequest request, ClusterState clusterState) { + return indexNameExpressionResolver.concreteIndices(clusterState, request); + } } diff --git a/server/src/main/java/org/opensearch/action/bulk/BulkShardRequest.java b/server/src/main/java/org/opensearch/action/bulk/BulkShardRequest.java index c86f8c87eee15..453d98ea6255f 100644 --- a/server/src/main/java/org/opensearch/action/bulk/BulkShardRequest.java +++ b/server/src/main/java/org/opensearch/action/bulk/BulkShardRequest.java @@ -74,6 +74,10 @@ public BulkItemRequest[] items() { @Override public String[] indices() { + // TODO: The following comment was possibly true on Elasticsearch, but it is not + // true on OpenSearch. Authorization also works with index names if an alias + // grants privileges for that particular index name. + // Thus, question: Shall we change this? // A bulk shard request encapsulates items targeted at a specific shard of an index. // However, items could be targeting aliases of the index, so the bulk request although // targeting a single concrete index shard might do so using several alias names. diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java index d4454fc39bb57..f0304eee9844b 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportBulkAction.java @@ -865,7 +865,7 @@ void executeBulk( * * @opensearch.internal */ - private static class ConcreteIndices { + static class ConcreteIndices { private final ClusterState state; private final IndexNameExpressionResolver indexNameExpressionResolver; private final Map indices = new HashMap<>(); diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java index efe8df735d769..f74f5d4fa2f2b 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportShardBulkAction.java @@ -47,6 +47,7 @@ import org.opensearch.action.index.IndexResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.ChannelActionListener; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.replication.ReplicationMode; import org.opensearch.action.support.replication.ReplicationOperation; import org.opensearch.action.support.replication.ReplicationTask; @@ -61,6 +62,7 @@ import org.opensearch.cluster.action.shard.ShardStateAction; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.MappingMetadata; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.AllocationId; import org.opensearch.cluster.routing.ShardRouting; @@ -122,7 +124,9 @@ * * @opensearch.internal */ -public class TransportShardBulkAction extends TransportWriteAction { +public class TransportShardBulkAction extends TransportWriteAction + implements + TransportIndicesResolvingAction { public static final String ACTION_NAME = BulkAction.NAME + "[s]"; @@ -217,6 +221,11 @@ protected void handlePrimaryTermValidationRequest( } } + @Override + public ResolvedIndices resolveIndices(BulkShardRequest request) { + return ResolvedIndices.of(request.shardId().getIndexName()); + } + /** * This action is the primary term validation action which is used for doing primary term validation with replicas. * This is only applicable for TransportShardBulkAction because all writes (delete/update/single write/bulk) diff --git a/server/src/main/java/org/opensearch/action/bulk/TransportSingleItemBulkWriteAction.java b/server/src/main/java/org/opensearch/action/bulk/TransportSingleItemBulkWriteAction.java index 9b901dda24c2b..5b80b507b90e7 100644 --- a/server/src/main/java/org/opensearch/action/bulk/TransportSingleItemBulkWriteAction.java +++ b/server/src/main/java/org/opensearch/action/bulk/TransportSingleItemBulkWriteAction.java @@ -36,10 +36,13 @@ import org.opensearch.action.DocWriteResponse; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.WriteRequest; import org.opensearch.action.support.WriteResponse; import org.opensearch.action.support.replication.ReplicatedWriteRequest; import org.opensearch.action.support.replication.ReplicationResponse; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.core.action.ActionListener; import org.opensearch.core.common.io.stream.Writeable; import org.opensearch.tasks.Task; @@ -53,19 +56,24 @@ @Deprecated public abstract class TransportSingleItemBulkWriteAction< Request extends ReplicatedWriteRequest, - Response extends ReplicationResponse & WriteResponse> extends HandledTransportAction { + Response extends ReplicationResponse & WriteResponse> extends HandledTransportAction + implements + TransportIndicesResolvingAction { private final TransportBulkAction bulkAction; + private final IndexNameExpressionResolver indexNameExpressionResolver; protected TransportSingleItemBulkWriteAction( String actionName, TransportService transportService, ActionFilters actionFilters, Writeable.Reader requestReader, - TransportBulkAction bulkAction + TransportBulkAction bulkAction, + IndexNameExpressionResolver indexNameExpressionResolver ) { super(actionName, transportService, actionFilters, requestReader); this.bulkAction = bulkAction; + this.indexNameExpressionResolver = indexNameExpressionResolver; } @Override @@ -73,6 +81,11 @@ protected void doExecute(Task task, final Request request, final ActionListener< bulkAction.execute(task, toSingleItemBulkRequest(request), wrapBulkResponse(listener)); } + @Override + public ResolvedIndices resolveIndices(Request request) { + return ResolvedIndices.of(indexNameExpressionResolver.resolveDateMathExpression(request.index())); + } + public static ActionListener wrapBulkResponse( ActionListener listener ) { @@ -97,4 +110,5 @@ public static BulkRequest toSingleItemBulkRequest(ReplicatedWriteRequest requ request.setRefreshPolicy(WriteRequest.RefreshPolicy.NONE); return bulkRequest; } + } diff --git a/server/src/main/java/org/opensearch/action/delete/TransportDeleteAction.java b/server/src/main/java/org/opensearch/action/delete/TransportDeleteAction.java index 6cbabfec6d763..96f6fe5df94f1 100644 --- a/server/src/main/java/org/opensearch/action/delete/TransportDeleteAction.java +++ b/server/src/main/java/org/opensearch/action/delete/TransportDeleteAction.java @@ -35,6 +35,7 @@ import org.opensearch.action.bulk.TransportBulkAction; import org.opensearch.action.bulk.TransportSingleItemBulkWriteAction; import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.common.inject.Inject; import org.opensearch.transport.TransportService; @@ -49,7 +50,12 @@ public class TransportDeleteAction extends TransportSingleItemBulkWriteAction { @Inject - public TransportDeleteAction(TransportService transportService, ActionFilters actionFilters, TransportBulkAction bulkAction) { - super(DeleteAction.NAME, transportService, actionFilters, DeleteRequest::new, bulkAction); + public TransportDeleteAction( + TransportService transportService, + ActionFilters actionFilters, + TransportBulkAction bulkAction, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super(DeleteAction.NAME, transportService, actionFilters, DeleteRequest::new, bulkAction, indexNameExpressionResolver); } } diff --git a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesAction.java b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesAction.java index 28328e4cfc415..527f44233b50f 100644 --- a/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesAction.java +++ b/server/src/main/java/org/opensearch/action/fieldcaps/TransportFieldCapabilitiesAction.java @@ -35,8 +35,10 @@ import org.opensearch.action.OriginalIndices; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.inject.Inject; import org.opensearch.common.util.concurrent.CountDown; @@ -63,7 +65,9 @@ * * @opensearch.internal */ -public class TransportFieldCapabilitiesAction extends HandledTransportAction { +public class TransportFieldCapabilitiesAction extends HandledTransportAction + implements + TransportIndicesResolvingAction { private final ThreadPool threadPool; private final ClusterService clusterService; private final TransportFieldCapabilitiesIndexAction shardAction; @@ -91,21 +95,11 @@ public TransportFieldCapabilitiesAction( protected void doExecute(Task task, FieldCapabilitiesRequest request, final ActionListener listener) { // retrieve the initial timestamp in case the action is a cross cluster search long nowInMillis = request.nowInMillis() == null ? System.currentTimeMillis() : request.nowInMillis(); - final ClusterState clusterState = clusterService.state(); - final Map remoteClusterIndices = remoteClusterService.groupIndices( - request.indicesOptions(), - request.indices(), - idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState) - ); - final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); - final String[] concreteIndices; - if (localIndices == null) { - // in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices - concreteIndices = Strings.EMPTY_ARRAY; - } else { - concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices); - } - final int totalNumRequest = concreteIndices.length + remoteClusterIndices.size(); + ResolvedIndices allResolvedIndices = resolveIndices(request, clusterService.state()); + Set concreteIndices = allResolvedIndices.local().names(); + Map remoteClusterIndices = allResolvedIndices.remote(); + OriginalIndices localIndices = allResolvedIndices.local().originalIndices(); + final int totalNumRequest = concreteIndices.size() + remoteClusterIndices.size(); final CountDown completionCounter = new CountDown(totalNumRequest); final List indexResponses = Collections.synchronizedList(new ArrayList<>()); final Runnable onResponse = () -> { @@ -171,6 +165,29 @@ public void onFailure(Exception e) { } } + @Override + public ResolvedIndices resolveIndices(FieldCapabilitiesRequest request) { + return resolveIndices(request, clusterService.state()); + } + + private ResolvedIndices resolveIndices(FieldCapabilitiesRequest request, ClusterState clusterState) { + final Map remoteClusterIndices = remoteClusterService.groupIndices( + request.indicesOptions(), + request.indices(), + idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState) + ); + final OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + final String[] concreteIndices; + if (localIndices == null) { + // in the case we have one or more remote indices but no local we don't expand to all local indices and just do remote indices + concreteIndices = Strings.EMPTY_ARRAY; + } else { + concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, localIndices); + } + + return ResolvedIndices.of(concreteIndices).withLocalOriginalIndices(localIndices).withRemoteIndices(remoteClusterIndices); + } + private FieldCapabilitiesResponse merge(List indexResponses, boolean includeUnmapped) { String[] indices = indexResponses.stream().map(FieldCapabilitiesIndexResponse::getIndexName).sorted().toArray(String[]::new); final Map> responseMapBuilder = new HashMap<>(); diff --git a/server/src/main/java/org/opensearch/action/index/TransportIndexAction.java b/server/src/main/java/org/opensearch/action/index/TransportIndexAction.java index ce32840f6751b..dccf2f720934a 100644 --- a/server/src/main/java/org/opensearch/action/index/TransportIndexAction.java +++ b/server/src/main/java/org/opensearch/action/index/TransportIndexAction.java @@ -35,6 +35,7 @@ import org.opensearch.action.bulk.TransportBulkAction; import org.opensearch.action.bulk.TransportSingleItemBulkWriteAction; import org.opensearch.action.support.ActionFilters; +import org.opensearch.cluster.metadata.IndexNameExpressionResolver; import org.opensearch.common.inject.Inject; import org.opensearch.transport.TransportService; @@ -56,7 +57,12 @@ public class TransportIndexAction extends TransportSingleItemBulkWriteAction { @Inject - public TransportIndexAction(ActionFilters actionFilters, TransportService transportService, TransportBulkAction bulkAction) { - super(IndexAction.NAME, transportService, actionFilters, IndexRequest::new, bulkAction); + public TransportIndexAction( + ActionFilters actionFilters, + TransportService transportService, + TransportBulkAction bulkAction, + IndexNameExpressionResolver indexNameExpressionResolver + ) { + super(IndexAction.NAME, transportService, actionFilters, IndexRequest::new, bulkAction, indexNameExpressionResolver); } } diff --git a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java index 1da080e5bd302..b831ab670d404 100644 --- a/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/opensearch/action/search/TransportSearchAction.java @@ -41,11 +41,13 @@ import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.IndicesOptions; import org.opensearch.action.support.TimeoutTaskCancellationUtility; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.GroupShardsIterator; @@ -134,7 +136,9 @@ * * @opensearch.internal */ -public class TransportSearchAction extends HandledTransportAction { +public class TransportSearchAction extends HandledTransportAction + implements + TransportIndicesResolvingAction { /** The maximum number of shards for a single search request. */ public static final Setting SHARD_COUNT_LIMIT_SETTING = Setting.longSetting( @@ -500,20 +504,10 @@ private ActionListener buildRewriteListener( searchRequest.source(source); } final ClusterState clusterState = clusterService.state(); - final SearchContextId searchContext; - final Map remoteClusterIndices; - if (searchRequest.pointInTimeBuilder() != null) { - searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId()); - remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions()); - } else { - searchContext = null; - remoteClusterIndices = remoteClusterService.groupIndices( - searchRequest.indicesOptions(), - searchRequest.indices(), - idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState) - ); - } - OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + final OriginalIndicesAndSearchContextId requestedIndices = extractRequestedIndices(searchRequest, clusterState); + final SearchContextId searchContext = requestedIndices.searchContextId; + final Map remoteClusterIndices = requestedIndices.remoteClusterIndices; + OriginalIndices localIndices = requestedIndices.localOriginalIndices; if (remoteClusterIndices.isEmpty()) { executeLocalSearch( task, @@ -991,6 +985,37 @@ static List getRemoteShardsIteratorFromPointInTime( return remoteShardIterators; } + @Override + public ResolvedIndices resolveIndices(SearchRequest searchRequest) { + ClusterState clusterState = clusterService.state(); + OriginalIndicesAndSearchContextId requestedIndices = extractRequestedIndices(searchRequest, clusterState); + Index[] localConcreteIndices = resolveLocalIndices( + requestedIndices.localOriginalIndices, + clusterState, + new SearchTimeProvider(searchRequest.getOrCreateAbsoluteStartMillis(), System.nanoTime(), System::nanoTime) + ); + + return ResolvedIndices.of(localConcreteIndices).withRemoteIndices(requestedIndices.remoteClusterIndices); + } + + private OriginalIndicesAndSearchContextId extractRequestedIndices(SearchRequest searchRequest, ClusterState clusterState) { + final SearchContextId searchContext; + final Map remoteClusterIndices; + if (searchRequest.pointInTimeBuilder() != null) { + searchContext = SearchContextId.decode(namedWriteableRegistry, searchRequest.pointInTimeBuilder().getId()); + remoteClusterIndices = getIndicesFromSearchContexts(searchContext, searchRequest.indicesOptions()); + } else { + searchContext = null; + remoteClusterIndices = remoteClusterService.groupIndices( + searchRequest.indicesOptions(), + searchRequest.indices(), + idx -> indexNameExpressionResolver.hasIndexAbstraction(idx, clusterState) + ); + } + OriginalIndices localIndices = remoteClusterIndices.remove(RemoteClusterAware.LOCAL_CLUSTER_GROUP_KEY); + return new OriginalIndicesAndSearchContextId(localIndices, remoteClusterIndices, searchContext); + } + private Index[] resolveLocalIndices(OriginalIndices localIndices, ClusterState clusterState, SearchTimeProvider timeProvider) { if (localIndices == null) { return Index.EMPTY_ARRAY; // don't search on any local index (happens when only remote indices were specified) @@ -1467,4 +1492,9 @@ static List getLocalLocalShardsIteratorFromPointInTime( } return iterators; } + + record OriginalIndicesAndSearchContextId(OriginalIndices localOriginalIndices, Map remoteClusterIndices, + SearchContextId searchContextId) { + + } } diff --git a/server/src/main/java/org/opensearch/action/support/ActionFilter.java b/server/src/main/java/org/opensearch/action/support/ActionFilter.java index e936512004fd2..0c7f5a6e220a0 100644 --- a/server/src/main/java/org/opensearch/action/support/ActionFilter.java +++ b/server/src/main/java/org/opensearch/action/support/ActionFilter.java @@ -57,6 +57,7 @@ void apply( Task task, String action, Request request, + ActionRequestMetadata actionRequestMetadata, ActionListener listener, ActionFilterChain chain ); @@ -72,6 +73,7 @@ public final vo Task task, String action, Request request, + ActionRequestMetadata actionRequestMetadata, ActionListener listener, ActionFilterChain chain ) { diff --git a/server/src/main/java/org/opensearch/action/support/ActionRequestMetadata.java b/server/src/main/java/org/opensearch/action/support/ActionRequestMetadata.java new file mode 100644 index 0000000000000..20b183309a09d --- /dev/null +++ b/server/src/main/java/org/opensearch/action/support/ActionRequestMetadata.java @@ -0,0 +1,78 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support; + +import org.opensearch.action.ActionRequest; +import org.opensearch.cluster.metadata.ResolvedIndices; +import org.opensearch.core.action.ActionResponse; + +import java.util.Optional; + +/** + * This class can be used to provide metadata about action requests to ActionFilter implementations. + * At the moment, this class provides information about the requested indices of a request, but it can be + * extended to transport further metadata. + */ +public class ActionRequestMetadata { + + /** + * Returns an empty meta data object which will just report unknown results. + */ + public static ActionRequestMetadata empty() { + @SuppressWarnings("unchecked") + ActionRequestMetadata result = (ActionRequestMetadata) EMPTY; + return result; + } + + private static final ActionRequestMetadata EMPTY = new ActionRequestMetadata<>(null, null); + + private final TransportAction transportAction; + private final Request request; + + private ResolvedIndices resolvedIndices; + private boolean resolvedIndicesInitialized; + + ActionRequestMetadata(TransportAction transportAction, Request request) { + this.transportAction = transportAction; + this.request = request; + } + + /** + * If the current action request references indices, this method actually referenced indices. That means that any + * expressions or patterns will be resolved. + *

+ * If the request cannot reference indices OR if the respective action does not support resolving of requests, + * this returns an empty Optional. + */ + public Optional resolvedIndices() { + if (!(transportAction instanceof TransportIndicesResolvingAction)) { + return Optional.empty(); + } + + if (this.resolvedIndicesInitialized) { + return Optional.of(this.resolvedIndices); + } else { + return resolveIndices(); + } + } + + /** + * Performs the actual index resolution. Index resolution can be relatively costly on big clusters, so we + * perform it lazily only when requested. + */ + private Optional resolveIndices() { + @SuppressWarnings("unchecked") + TransportIndicesResolvingAction indicesResolvingAction = (TransportIndicesResolvingAction) this.transportAction; + ResolvedIndices result = indicesResolvingAction.resolveIndices(request); + + this.resolvedIndices = result; + this.resolvedIndicesInitialized = true; + return Optional.of(result); + } +} diff --git a/server/src/main/java/org/opensearch/action/support/TransportAction.java b/server/src/main/java/org/opensearch/action/support/TransportAction.java index f71347f6f1d07..3cc79a407bd29 100644 --- a/server/src/main/java/org/opensearch/action/support/TransportAction.java +++ b/server/src/main/java/org/opensearch/action/support/TransportAction.java @@ -215,7 +215,7 @@ public void proceed(Task task, String actionName, Request request, ActionListene int i = index.getAndIncrement(); try { if (i < this.action.filters.length) { - this.action.filters[i].apply(task, actionName, request, listener, this); + this.action.filters[i].apply(task, actionName, request, new ActionRequestMetadata<>(action, request), listener, this); } else if (i == this.action.filters.length) { this.action.doExecute(task, request, listener); } else { diff --git a/server/src/main/java/org/opensearch/action/support/TransportIndicesResolvingAction.java b/server/src/main/java/org/opensearch/action/support/TransportIndicesResolvingAction.java new file mode 100644 index 0000000000000..120117ade6466 --- /dev/null +++ b/server/src/main/java/org/opensearch/action/support/TransportIndicesResolvingAction.java @@ -0,0 +1,30 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.action.support; + +import org.opensearch.action.ActionRequest; +import org.opensearch.cluster.metadata.ResolvedIndices; + +/** + * An additional interface that should be implemented by TransportAction implementations which need to resolve + * IndicesRequests or other action requests which specify indices. This interface allows other components to retrieve + * precise information about the indices an action is going to operate on. This is particularly useful for access + * control implementations, but can be also used for other purposes, such as monitoring, audit logging, etc. + *

+ * Classes implementing this interface should make sure that the reported indices are also actually the indices + * the action will operate on. The best way to achieve this, is to move the index extraction code from the execute + * methods into reusable methods and to depend on these both for execution and reporting. + */ +public interface TransportIndicesResolvingAction { + + /** + * Returns the actual indices the action will operate on, given the specified request and cluster state. + */ + ResolvedIndices resolveIndices(Request request); +} diff --git a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java index 8bf8555194976..06c137d0fb477 100644 --- a/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java +++ b/server/src/main/java/org/opensearch/action/support/broadcast/TransportBroadcastAction.java @@ -38,9 +38,11 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.TransportActions; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.FailAwareWeightedRouting; @@ -73,7 +75,9 @@ public abstract class TransportBroadcastAction< Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends BroadcastShardRequest, - ShardResponse extends BroadcastShardResponse> extends HandledTransportAction { + ShardResponse extends BroadcastShardResponse> extends HandledTransportAction + implements + TransportIndicesResolvingAction { protected final ClusterService clusterService; protected final TransportService transportService; @@ -107,6 +111,15 @@ protected void doExecute(Task task, Request request, ActionListener li new AsyncBroadcastAction(task, request, listener).start(); } + @Override + public ResolvedIndices resolveIndices(Request request) { + return ResolvedIndices.of(indexNameExpressionResolver.concreteIndexNames(clusterService.state(), request)); + } + + protected ResolvedIndices resolveIndices(Request request, ClusterState clusterState) { + return ResolvedIndices.of(indexNameExpressionResolver.concreteIndexNames(clusterState, request)); + } + protected abstract Response newResponse(Request request, AtomicReferenceArray shardsResponses, ClusterState clusterState); protected abstract ShardRequest newShardRequest(int numShards, ShardRouting shard, Request request); @@ -154,7 +167,7 @@ protected AsyncBroadcastAction(Task task, Request request, ActionListener, Response extends BroadcastResponse, - ShardOperationResult extends Writeable> extends HandledTransportAction { + ShardOperationResult extends Writeable> extends HandledTransportAction + implements + TransportIndicesResolvingAction { private final ClusterService clusterService; private final TransportService transportService; @@ -273,6 +277,11 @@ protected void doExecute(Task task, Request request, ActionListener li new AsyncAction(task, request, listener).start(); } + @Override + public ResolvedIndices resolveIndices(Request request) { + return ResolvedIndices.of(resolveConcreteIndexNames(clusterService.state(), request)); + } + /** * Asynchronous action * diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportBroadcastReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportBroadcastReplicationAction.java index e235adbc162fc..c13cffe598456 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportBroadcastReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportBroadcastReplicationAction.java @@ -36,12 +36,14 @@ import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; import org.opensearch.action.support.TransportActions; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.broadcast.BroadcastRequest; import org.opensearch.action.support.broadcast.BroadcastResponse; import org.opensearch.action.support.broadcast.BroadcastShardOperationFailedException; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.routing.IndexShardRoutingTable; import org.opensearch.cluster.service.ClusterService; import org.opensearch.common.util.concurrent.CountDown; @@ -55,6 +57,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.Set; import java.util.concurrent.CopyOnWriteArrayList; /** @@ -67,7 +70,9 @@ public abstract class TransportBroadcastReplicationAction< Request extends BroadcastRequest, Response extends BroadcastResponse, ShardRequest extends ReplicationRequest, - ShardResponse extends ReplicationResponse> extends HandledTransportAction { + ShardResponse extends ReplicationResponse> extends HandledTransportAction + implements + TransportIndicesResolvingAction { private final TransportReplicationAction replicatedBroadcastShardAction; private final ClusterService clusterService; @@ -138,6 +143,15 @@ public void onFailure(Exception e) { } } + @Override + public ResolvedIndices resolveIndices(Request request) { + return resolveIndices(request, clusterService.state()); + } + + private ResolvedIndices resolveIndices(Request request, ClusterState clusterState) { + return ResolvedIndices.of(indexNameExpressionResolver.concreteIndexNames(clusterState, request)); + } + protected void shardExecute(Task task, Request request, ShardId shardId, ActionListener shardActionListener) { ShardRequest shardRequest = newShardRequest(request, shardId); shardRequest.setParentTask(clusterService.localNode().getId(), task.getId()); @@ -149,7 +163,7 @@ protected void shardExecute(Task task, Request request, ShardId shardId, ActionL */ protected List shards(Request request, ClusterState clusterState) { List shardIds = new ArrayList<>(); - String[] concreteIndices = indexNameExpressionResolver.concreteIndexNames(clusterState, request); + Set concreteIndices = resolveIndices(request, clusterState).local().names(); for (String index : concreteIndices) { IndexMetadata indexMetadata = clusterState.metadata().getIndices().get(index); if (indexMetadata != null) { diff --git a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java index c81754b33fa62..1164cd5accfb7 100644 --- a/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java +++ b/server/src/main/java/org/opensearch/action/support/replication/TransportReplicationAction.java @@ -43,6 +43,7 @@ import org.opensearch.action.support.ChannelActionListener; import org.opensearch.action.support.TransportAction; import org.opensearch.action.support.TransportActions; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.action.support.replication.ReplicationOperation.Replicas; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; @@ -50,6 +51,7 @@ import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.AllocationId; import org.opensearch.cluster.routing.ShardRouting; @@ -111,7 +113,7 @@ public abstract class TransportReplicationAction< Request extends ReplicationRequest, ReplicaRequest extends ReplicationRequest, - Response extends ReplicationResponse> extends TransportAction { + Response extends ReplicationResponse> extends TransportAction implements TransportIndicesResolvingAction { /** * The timeout for retrying replication requests. @@ -318,6 +320,11 @@ protected void doExecute(Task task, Request request, ActionListener li runReroutePhase(task, request, listener, true); } + @Override + public ResolvedIndices resolveIndices(Request request) { + return ResolvedIndices.of(request.index); + } + private void runReroutePhase(Task task, Request request, ActionListener listener, boolean initiatedByNodeClient) { try { new ReroutePhase((ReplicationTask) task, request, listener, initiatedByNodeClient).run(); diff --git a/server/src/main/java/org/opensearch/action/support/single/instance/TransportInstanceSingleOperationAction.java b/server/src/main/java/org/opensearch/action/support/single/instance/TransportInstanceSingleOperationAction.java index 21d4ba726e86f..f9ef23c711180 100644 --- a/server/src/main/java/org/opensearch/action/support/single/instance/TransportInstanceSingleOperationAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/instance/TransportInstanceSingleOperationAction.java @@ -35,11 +35,13 @@ import org.opensearch.action.UnavailableShardsException; import org.opensearch.action.support.ActionFilters; import org.opensearch.action.support.HandledTransportAction; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.ClusterStateObserver; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.routing.ShardIterator; import org.opensearch.cluster.routing.ShardRouting; @@ -76,7 +78,7 @@ */ public abstract class TransportInstanceSingleOperationAction< Request extends InstanceShardOperationRequest, - Response extends ActionResponse> extends HandledTransportAction { + Response extends ActionResponse> extends HandledTransportAction implements TransportIndicesResolvingAction { protected final ThreadPool threadPool; protected final ClusterService clusterService; @@ -108,6 +110,22 @@ protected void doExecute(Task task, Request request, ActionListener li new AsyncSingleAction(request, listener).start(); } + @Override + public ResolvedIndices resolveIndices(Request request) { + if (request.concreteIndex() != null) { + return ResolvedIndices.of(request.concreteIndex()); + } else { + try { + // TODO shall we possibly also set request.concreteIndex here? + return ResolvedIndices.of(indexNameExpressionResolver.concreteWriteIndex(clusterService.state(), request).getName()); + } catch (IndexNotFoundException e) { + // We just return the original unresolved expression. The error we encountered here will + // be encountered again in the doStart() method + return ResolvedIndices.of(request.index()); + } + } + } + protected abstract String executor(ShardId shardId); protected abstract void shardOperation(Request request, ActionListener listener); diff --git a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java index df91559a2f8cb..4a934002e9f81 100644 --- a/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java +++ b/server/src/main/java/org/opensearch/action/support/single/shard/TransportSingleShardAction.java @@ -39,10 +39,12 @@ import org.opensearch.action.support.ChannelActionListener; import org.opensearch.action.support.TransportAction; import org.opensearch.action.support.TransportActions; +import org.opensearch.action.support.TransportIndicesResolvingAction; import org.opensearch.cluster.ClusterState; import org.opensearch.cluster.block.ClusterBlockException; import org.opensearch.cluster.block.ClusterBlockLevel; import org.opensearch.cluster.metadata.IndexNameExpressionResolver; +import org.opensearch.cluster.metadata.ResolvedIndices; import org.opensearch.cluster.node.DiscoveryNode; import org.opensearch.cluster.node.DiscoveryNodes; import org.opensearch.cluster.routing.FailAwareWeightedRouting; @@ -76,7 +78,9 @@ * @opensearch.internal */ public abstract class TransportSingleShardAction, Response extends ActionResponse> extends - TransportAction { + TransportAction + implements + TransportIndicesResolvingAction { protected final ThreadPool threadPool; protected final ClusterService clusterService; @@ -154,6 +158,19 @@ protected void resolveRequest(ClusterState state, InternalRequest request) { @Nullable protected abstract ShardsIterator shards(ClusterState state, InternalRequest request); + @Override + public ResolvedIndices resolveIndices(Request request) { + return ResolvedIndices.of(resolveToConcreteSingleIndex(request, clusterService.state())); + } + + private String resolveToConcreteSingleIndex(Request request, ClusterState clusterState) { + if (resolveIndex(request)) { + return indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName(); + } else { + return request.index(); + } + } + /** * Asynchronous single action * @@ -180,12 +197,7 @@ private AsyncSingleAction(Request request, ActionListener listener) { throw blockException; } - String concreteSingleIndex; - if (resolveIndex(request)) { - concreteSingleIndex = indexNameExpressionResolver.concreteSingleIndex(clusterState, request).getName(); - } else { - concreteSingleIndex = request.index(); - } + String concreteSingleIndex = resolveToConcreteSingleIndex(request, clusterState); this.internalRequest = new InternalRequest(request, concreteSingleIndex); resolveRequest(clusterState, internalRequest); diff --git a/server/src/main/java/org/opensearch/cluster/metadata/ResolvedIndices.java b/server/src/main/java/org/opensearch/cluster/metadata/ResolvedIndices.java new file mode 100644 index 0000000000000..b505b3e63bf65 --- /dev/null +++ b/server/src/main/java/org/opensearch/cluster/metadata/ResolvedIndices.java @@ -0,0 +1,154 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.cluster.metadata; + +import org.opensearch.action.OriginalIndices; +import org.opensearch.core.index.Index; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +/** + * A class that encapsulates resolved indices. Resolved indices do not any wildcards or date math expressions. + * However, in contrast to the concept of "concrete indices", resolved indices might not exist yet, or might + * refer to aliases or data streams. + *

+ * ResolvedIndices classes are primarily created by the resolveIndices() method in TransportIndicesResolvingAction. + *

+ * How resolved indices are obtained depends on the respective action and the associated requests: + *

    + *
  • If a request carries an index expression (i.e, might contain patterns or date math expressions), the index + * expression must be resolved using the appropriate index options; these might be request-specific or action-specific.
  • + *
  • Some requests already carry concrete indices; in these cases, the names of the concrete indices can be + * just taken without further evaluation
  • + *
+ */ +public class ResolvedIndices { + public static ResolvedIndices of(String... indices) { + return new ResolvedIndices( + new Local(Collections.unmodifiableSet(new HashSet<>(Arrays.asList(indices))), null, false), + Collections.emptyMap() + ); + } + + public static ResolvedIndices of(Index... indices) { + return new ResolvedIndices( + new Local(Stream.of(indices).map(Index::getName).collect(Collectors.toUnmodifiableSet()), null, false), + Collections.emptyMap() + ); + } + + public static ResolvedIndices of(Collection indices) { + return new ResolvedIndices(new Local(Collections.unmodifiableSet(new HashSet<>(indices)), null, false), Collections.emptyMap()); + } + + public static ResolvedIndices all() { + return ALL; + } + + public static ResolvedIndices ofNonNull(String... indices) { + Set indexSet = new HashSet<>(indices.length); + + for (String index : indices) { + if (index != null) { + indexSet.add(index); + } + } + + return new ResolvedIndices(new Local(Collections.unmodifiableSet(indexSet), null, false), Collections.emptyMap()); + } + + private static final ResolvedIndices ALL = new ResolvedIndices(new Local(Set.of(Metadata.ALL), null, true), Collections.emptyMap()); + + private final Local local; + private final Map remote; + + private ResolvedIndices(Local local, Map remote) { + this.local = local; + this.remote = remote; + } + + public Local local() { + return this.local; + } + + public Map remote() { + return this.remote; + } + + public ResolvedIndices withRemoteIndices(Map remoteIndices) { + if (remoteIndices.isEmpty()) { + return this; + } + + Map newRemoteIndices = new HashMap<>(remoteIndices); + newRemoteIndices.putAll(this.remote); + + return new ResolvedIndices(this.local, Collections.unmodifiableMap(newRemoteIndices)); + } + + public ResolvedIndices withLocalOriginalIndices(OriginalIndices originalIndices) { + return new ResolvedIndices(new Local(this.local.names, originalIndices, this.local.isAll), this.remote); + } + + public boolean isEmpty() { + return this.local.isEmpty() && this.remote.isEmpty(); + } + + /** + * Encapsulates the local (i.e., non-remote) indices referenced by the respective request. + */ + public static class Local { + private final Set names; + private final OriginalIndices originalIndices; + private final boolean isAll; + + private Local(Set names, OriginalIndices originalIndices, boolean isAll) { + this.names = names; + this.originalIndices = originalIndices; + this.isAll = isAll; + } + + public Set names() { + return this.names; + } + + public String[] namesAsArray() { + return this.names.toArray(new String[0]); + } + + public OriginalIndices originalIndices() { + return this.originalIndices; + } + + public boolean isEmpty() { + if (this.isAll) { + return false; + } else { + return this.names.isEmpty(); + } + } + + public boolean contains(String index) { + if (this.isAll) { + return true; + } else { + return this.names.contains(index); + } + } + } + +} diff --git a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java index a1b41117ee13b..6af6770bc80e5 100644 --- a/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java +++ b/server/src/test/java/org/opensearch/action/bulk/TransportBulkActionIngestTests.java @@ -224,7 +224,8 @@ class TestSingleItemBulkWriteAction extends TransportSingleItemBulkWriteAction void app Task task, String action, Request request, + ActionRequestMetadata actionRequestMetadata, ActionListener listener, ActionFilterChain chain ) {