Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import org.opensearch.action.search.SearchAction;
import org.opensearch.action.support.ActionFilter;
import org.opensearch.action.support.ActionFilterChain;
import org.opensearch.action.support.ActionRequestMetadata;
import org.opensearch.action.support.WriteRequest.RefreshPolicy;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.service.ClusterService;
Expand Down Expand Up @@ -233,6 +234,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
Task task,
String action,
Request request,
ActionRequestMetadata<Request, Response> actionRequestMetadata,
ActionListener<Response> listener,
ActionFilterChain<Request, Response> chain
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.support.ActionFilter;
import org.opensearch.action.support.ActionFilterChain;
import org.opensearch.action.support.ActionRequestMetadata;
import org.opensearch.core.action.ActionListener;
import org.opensearch.core.action.ActionResponse;
import org.opensearch.plugin.wlm.rule.attribute_extractor.IndicesExtractor;
Expand Down Expand Up @@ -51,6 +52,7 @@ public <Request extends ActionRequest, Response extends ActionResponse> void app
Task task,
String action,
Request request,
ActionRequestMetadata<Request, Response> actionRequestMetadata,
ActionListener<Response> listener,
ActionFilterChain<Request, Response> chain
) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.opensearch.OpenSearchGenerationException;
import org.opensearch.action.ActionRequestValidationException;
import org.opensearch.action.AliasesRequest;
import org.opensearch.action.CompositeIndicesRequest;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.clustermanager.AcknowledgedRequest;
import org.opensearch.cluster.metadata.AliasAction;
Expand Down Expand Up @@ -76,7 +77,7 @@
* @opensearch.api
*/
@PublicApi(since = "1.0.0")
public class IndicesAliasesRequest extends AcknowledgedRequest<IndicesAliasesRequest> implements ToXContentObject {
public class IndicesAliasesRequest extends AcknowledgedRequest<IndicesAliasesRequest> implements ToXContentObject, CompositeIndicesRequest {

private List<AliasActions> allAliasActions = new ArrayList<>();
private String origin = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.RequestValidators;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.TransportIndicesResolvingAction;
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
Expand All @@ -48,6 +49,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.Metadata;
import org.opensearch.cluster.metadata.MetadataIndexAliasesService;
import org.opensearch.cluster.metadata.ResolvedIndices;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.common.regex.Regex;
Expand All @@ -68,6 +70,7 @@
import java.util.Objects;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;

import static java.util.Collections.unmodifiableList;

Expand All @@ -76,7 +79,9 @@
*
* @opensearch.internal
*/
public class TransportIndicesAliasesAction extends TransportClusterManagerNodeAction<IndicesAliasesRequest, AcknowledgedResponse> {
public class TransportIndicesAliasesAction extends TransportClusterManagerNodeAction<IndicesAliasesRequest, AcknowledgedResponse>
implements
TransportIndicesResolvingAction<IndicesAliasesRequest> {

private static final Logger logger = LogManager.getLogger(TransportIndicesAliasesAction.class);

Expand Down Expand Up @@ -131,45 +136,97 @@ protected void clusterManagerOperation(
final IndicesAliasesRequest request,
final ClusterState state,
final ActionListener<AcknowledgedResponse> listener
) {
) throws Exception {

// Expand the indices names
List<IndicesAliasesRequest.AliasActions> actions = request.aliasActions();
List<AliasAction> finalActions = new ArrayList<>();
List<AliasAction> finalActions = resolvedAliasActions(request, state, true);
if (finalActions.isEmpty() && false == actions.isEmpty()) {
throw new AliasesNotFoundException(
actions.stream().flatMap(a -> Arrays.stream(a.getOriginalAliases())).collect(Collectors.toSet()).toArray(new String[0])
);
}
request.aliasActions().clear();
IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest(unmodifiableList(finalActions))
.ackTimeout(request.timeout())
.clusterManagerNodeTimeout(request.clusterManagerNodeTimeout());

indexAliasesService.indicesAliases(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception t) {
logger.debug("failed to perform aliases", t);
listener.onFailure(t);
}
});
}

@Override
public ResolvedIndices resolveIndices(IndicesAliasesRequest request) {
try {
Set<String> indices = new HashSet<>();

for (AliasAction aliasAction : resolvedAliasActions(request, clusterService.state(), false)) {
if (aliasAction instanceof AliasAction.Add addAliasAction) {
indices.add(addAliasAction.getIndex());
indices.add(addAliasAction.getAlias());
} else if (aliasAction instanceof AliasAction.Remove removeAliasAction) {
indices.add(removeAliasAction.getIndex());
indices.add(removeAliasAction.getAlias());
} else if (aliasAction instanceof AliasAction.RemoveIndex removeIndexAction) {
// TODO special action
indices.add(removeIndexAction.getIndex());
}
}

return ResolvedIndices.of(indices);
} catch (RuntimeException e) {
throw e;
} catch (Exception e) {
// This should not happen if validate=false is passed to resolvedAliasActions()
throw new RuntimeException(e);
}
}

private List<AliasAction> resolvedAliasActions(IndicesAliasesRequest request, ClusterState state, boolean validate) throws Exception {
List<AliasAction> result = new ArrayList<>();
// Resolve all the AliasActions into AliasAction instances and gather all the aliases
Set<String> aliases = new HashSet<>();
for (IndicesAliasesRequest.AliasActions action : actions) {
for (IndicesAliasesRequest.AliasActions action : request.aliasActions()) {
final Index[] concreteIndices = indexNameExpressionResolver.concreteIndices(
state,
request.indicesOptions(),
false,
action.indices()
);
for (Index concreteIndex : concreteIndices) {
IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(concreteIndex.getName());
assert indexAbstraction != null : "invalid cluster metadata. index [" + concreteIndex.getName() + "] was not found";
if (indexAbstraction.getParentDataStream() != null) {
throw new IllegalArgumentException(
"The provided expressions ["
+ String.join(",", action.indices())
+ "] match a backing index belonging to data stream ["
+ indexAbstraction.getParentDataStream().getName()
+ "]. Data streams and their backing indices don't support aliases."
);
if (validate) {
for (Index concreteIndex : concreteIndices) {
IndexAbstraction indexAbstraction = state.metadata().getIndicesLookup().get(concreteIndex.getName());
assert indexAbstraction != null : "invalid cluster metadata. index [" + concreteIndex.getName() + "] was not found";
if (indexAbstraction.getParentDataStream() != null) {
throw new IllegalArgumentException(
"The provided expressions ["
+ String.join(",", action.indices())
+ "] match a backing index belonging to data stream ["
+ indexAbstraction.getParentDataStream().getName()
+ "]. Data streams and their backing indices don't support aliases."
);
}
}
final Optional<Exception> maybeException = requestValidators.validateRequest(request, state, concreteIndices);
if (maybeException.isPresent()) {
throw maybeException.get();
}
}
final Optional<Exception> maybeException = requestValidators.validateRequest(request, state, concreteIndices);
if (maybeException.isPresent()) {
listener.onFailure(maybeException.get());
return;
}

Collections.addAll(aliases, action.getOriginalAliases());
for (final Index index : concreteIndices) {
switch (action.actionType()) {
case ADD:
for (String alias : concreteAliases(action, state.metadata(), index.getName())) {
finalActions.add(
result.add(
new AliasAction.Add(
index.getName(),
alias,
Expand All @@ -184,37 +241,19 @@ protected void clusterManagerOperation(
break;
case REMOVE:
for (String alias : concreteAliases(action, state.metadata(), index.getName())) {
finalActions.add(new AliasAction.Remove(index.getName(), alias, action.mustExist()));
result.add(new AliasAction.Remove(index.getName(), alias, action.mustExist()));
}
break;
case REMOVE_INDEX:
finalActions.add(new AliasAction.RemoveIndex(index.getName()));
result.add(new AliasAction.RemoveIndex(index.getName()));
break;
default:
throw new IllegalArgumentException("Unsupported action [" + action.actionType() + "]");
}
}
}
if (finalActions.isEmpty() && false == actions.isEmpty()) {
throw new AliasesNotFoundException(aliases.toArray(new String[0]));
}
request.aliasActions().clear();
IndicesAliasesClusterStateUpdateRequest updateRequest = new IndicesAliasesClusterStateUpdateRequest(unmodifiableList(finalActions))
.ackTimeout(request.timeout())
.clusterManagerNodeTimeout(request.clusterManagerNodeTimeout());

indexAliasesService.indicesAliases(updateRequest, new ActionListener<ClusterStateUpdateResponse>() {
@Override
public void onResponse(ClusterStateUpdateResponse response) {
listener.onResponse(new AcknowledgedResponse(response.isAcknowledged()));
}

@Override
public void onFailure(Exception t) {
logger.debug("failed to perform aliases", t);
listener.onFailure(t);
}
});
return result;
}

private static String[] concreteAliases(IndicesAliasesRequest.AliasActions action, Metadata metadata, String concreteIndex) {
Expand Down Expand Up @@ -255,4 +294,5 @@ private static String[] concreteAliases(IndicesAliasesRequest.AliasActions actio
return action.aliases();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -33,12 +33,14 @@
package org.opensearch.action.admin.indices.create;

import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.TransportIndicesResolvingAction;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
import org.opensearch.cluster.ClusterState;
import org.opensearch.cluster.block.ClusterBlockException;
import org.opensearch.cluster.block.ClusterBlockLevel;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MetadataCreateIndexService;
import org.opensearch.cluster.metadata.ResolvedIndices;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.inject.Inject;
import org.opensearch.core.action.ActionListener;
Expand All @@ -54,7 +56,9 @@
*
* @opensearch.internal
*/
public class TransportCreateIndexAction extends TransportClusterManagerNodeAction<CreateIndexRequest, CreateIndexResponse> {
public class TransportCreateIndexAction extends TransportClusterManagerNodeAction<CreateIndexRequest, CreateIndexResponse>
implements
TransportIndicesResolvingAction<CreateIndexRequest> {

private final MetadataCreateIndexService createIndexService;
private final MappingTransformerRegistry mappingTransformerRegistry;
Expand Down Expand Up @@ -115,7 +119,7 @@ protected void clusterManagerOperation(
cause = "api";
}

final String indexName = indexNameExpressionResolver.resolveDateMathExpression(request.index());
final String indexName = resolveIndexName(request);

final String finalCause = cause;
final ActionListener<String> mappingTransformListener = ActionListener.wrap(transformedMappings -> {
Expand Down Expand Up @@ -143,4 +147,12 @@ protected void clusterManagerOperation(
mappingTransformerRegistry.applyTransformers(request.mappings(), null, mappingTransformListener);
}

@Override
public ResolvedIndices resolveIndices(CreateIndexRequest request) {
return ResolvedIndices.of(resolveIndexName(request));
}

private String resolveIndexName(CreateIndexRequest request) {
return indexNameExpressionResolver.resolveDateMathExpression(request.index());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.opensearch.action.ValidateActions;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.TransportIndicesResolvingAction;
import org.opensearch.action.support.clustermanager.AcknowledgedRequest;
import org.opensearch.action.support.clustermanager.AcknowledgedResponse;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeAction;
Expand All @@ -46,6 +47,7 @@
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MetadataCreateDataStreamService;
import org.opensearch.cluster.metadata.MetadataCreateDataStreamService.CreateDataStreamClusterStateUpdateRequest;
import org.opensearch.cluster.metadata.ResolvedIndices;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.inject.Inject;
Expand Down Expand Up @@ -137,7 +139,9 @@ public IndicesOptions indicesOptions() {
*
* @opensearch.internal
*/
public static class TransportAction extends TransportClusterManagerNodeAction<Request, AcknowledgedResponse> {
public static class TransportAction extends TransportClusterManagerNodeAction<Request, AcknowledgedResponse>
implements
TransportIndicesResolvingAction<Request> {

private final MetadataCreateDataStreamService metadataCreateDataStreamService;

Expand Down Expand Up @@ -179,6 +183,11 @@ protected void clusterManagerOperation(Request request, ClusterState state, Acti
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
public ResolvedIndices resolveIndices(Request request) {
return ResolvedIndices.of(request.name);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.action.IndicesRequest;
import org.opensearch.action.support.ActionFilters;
import org.opensearch.action.support.IndicesOptions;
import org.opensearch.action.support.TransportIndicesResolvingAction;
import org.opensearch.action.support.clustermanager.ClusterManagerNodeReadRequest;
import org.opensearch.action.support.clustermanager.TransportClusterManagerNodeReadAction;
import org.opensearch.cluster.AbstractDiffable;
Expand All @@ -49,6 +50,7 @@
import org.opensearch.cluster.metadata.DataStream;
import org.opensearch.cluster.metadata.IndexNameExpressionResolver;
import org.opensearch.cluster.metadata.MetadataIndexTemplateService;
import org.opensearch.cluster.metadata.ResolvedIndices;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.common.Nullable;
import org.opensearch.common.annotation.PublicApi;
Expand Down Expand Up @@ -292,7 +294,9 @@ public int hashCode() {
*
* @opensearch.internal
*/
public static class TransportAction extends TransportClusterManagerNodeReadAction<Request, Response> {
public static class TransportAction extends TransportClusterManagerNodeReadAction<Request, Response>
implements
TransportIndicesResolvingAction<Request> {

private static final Logger logger = LogManager.getLogger(TransportAction.class);

Expand Down Expand Up @@ -354,6 +358,15 @@ static List<DataStream> getDataStreams(ClusterState clusterState, IndexNameExpre
protected ClusterBlockException checkBlock(Request request, ClusterState state) {
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
}

@Override
public ResolvedIndices resolveIndices(Request request) {
return ResolvedIndices.of(
getDataStreams(clusterService.state(), indexNameExpressionResolver, request).stream()
.map(DataStream::getName)
.collect(Collectors.toList())
);
}
}

}
Loading