diff --git a/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java new file mode 100644 index 0000000000000..dd5cbaf29bc1f --- /dev/null +++ b/x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/ResetAuditorAction.java @@ -0,0 +1,126 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.core.ml.action; + +import org.elasticsearch.action.ActionType; +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.nodes.BaseNodeResponse; +import org.elasticsearch.action.support.nodes.BaseNodesRequest; +import org.elasticsearch.action.support.nodes.BaseNodesResponse; +import org.elasticsearch.cluster.ClusterName; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.common.io.stream.StreamOutput; +import org.elasticsearch.transport.AbstractTransportRequest; + +import java.io.IOException; +import java.util.List; +import java.util.Objects; + +public class ResetAuditorAction extends ActionType { + + public static final ResetAuditorAction INSTANCE = new ResetAuditorAction(); + public static final String NAME = "cluster:internal/xpack/ml/auditor/reset"; + + private ResetAuditorAction() { + super(NAME); + } + + public static class Request extends BaseNodesRequest { + + public static Request RESET_AUDITOR_REQUEST = new Request(); + + private Request() { + super(new String[] { "ml:true" }); // Only ml nodes. See DiscoveryNodes::resolveNodes + } + } + + public static class NodeRequest extends AbstractTransportRequest { + + public NodeRequest(StreamInput in) throws IOException { + super(in); + } + + public NodeRequest() {} + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) { + return false; + } + return true; + } + + @Override + public int hashCode() { + return Objects.hash(); + } + } + + public static class Response extends BaseNodesResponse { + + public Response(ClusterName clusterName, List nodes, List failures) { + super(clusterName, nodes, failures); + } + + protected Response(StreamInput in) throws IOException { + super(in); + } + + public static class ResetResponse extends BaseNodeResponse { + private final boolean acknowledged; + + public ResetResponse(DiscoveryNode node, boolean acknowledged) { + super(node); + this.acknowledged = acknowledged; + } + + public ResetResponse(StreamInput in) throws IOException { + super(in, null); + acknowledged = in.readBoolean(); + } + + public ResetResponse(StreamInput in, DiscoveryNode node) throws IOException { + super(in, node); + acknowledged = in.readBoolean(); + } + + public boolean isAcknowledged() { + return acknowledged; + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + super.writeTo(out); + out.writeBoolean(acknowledged); + } + + @Override + public boolean equals(Object o) { + if (o == null || getClass() != o.getClass()) return false; + ResetResponse that = (ResetResponse) o; + return acknowledged == that.acknowledged; + } + + @Override + public int hashCode() { + return Objects.hashCode(acknowledged); + } + } + + @Override + protected List readNodesFrom(StreamInput in) throws IOException { + return in.readCollectionAsList(ResetResponse::new); + } + + @Override + protected void writeNodesTo(StreamOutput out, List nodes) throws IOException { + out.writeCollection(nodes); + } + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java index 83adee27248be..704b0d7634db4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java @@ -32,7 +32,6 @@ import org.elasticsearch.cluster.service.ClusterService; import org.elasticsearch.common.breaker.CircuitBreaker; import org.elasticsearch.common.io.stream.NamedWriteableRegistry; -import org.elasticsearch.common.logging.DeprecationLogger; import org.elasticsearch.common.settings.ClusterSettings; import org.elasticsearch.common.settings.IndexScopedSettings; import org.elasticsearch.common.settings.Setting; @@ -161,6 +160,7 @@ import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAliasAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.core.ml.action.PutTrainedModelVocabularyAction; +import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction; import org.elasticsearch.xpack.core.ml.action.ResetJobAction; import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction; import org.elasticsearch.xpack.core.ml.action.SetResetModeAction; @@ -271,6 +271,7 @@ import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAliasAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelDefinitionPartAction; import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelVocabularyAction; +import org.elasticsearch.xpack.ml.action.TransportResetAuditorAction; import org.elasticsearch.xpack.ml.action.TransportResetJobAction; import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction; import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction; @@ -785,7 +786,6 @@ public void loadExtensions(ExtensionLoader loader) { public static final int MAX_LOW_PRIORITY_MODELS_PER_NODE = 100; private static final Logger logger = LogManager.getLogger(MachineLearning.class); - private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(MachineLearning.class); private final Settings settings; private final boolean enabled; @@ -1564,6 +1564,7 @@ public List getActions() { actionHandlers.add(new ActionHandler(MlMemoryAction.INSTANCE, TransportMlMemoryAction.class)); actionHandlers.add(new ActionHandler(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class)); actionHandlers.add(new ActionHandler(SetResetModeAction.INSTANCE, TransportSetResetModeAction.class)); + actionHandlers.add(new ActionHandler(ResetAuditorAction.INSTANCE, TransportResetAuditorAction.class)); // Included in this section as it's used by MlMemoryAction actionHandlers.add(new ActionHandler(TrainedModelCacheInfoAction.INSTANCE, TransportTrainedModelCacheInfoAction.class)); actionHandlers.add(new ActionHandler(GetMlAutoscalingStats.INSTANCE, TransportGetMlAutoscalingStats.class)); @@ -2149,8 +2150,6 @@ public void cleanUpFeature( final Map results = new ConcurrentHashMap<>(); ActionListener unsetResetModeListener = ActionListener.wrap(success -> { - // reset the auditors as aliases used may be removed - resetAuditors(); client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap(resetSuccess -> { finalListener.onResponse(success); @@ -2176,8 +2175,24 @@ public void cleanUpFeature( ); }); + ActionListener resetAuditors = ActionListener.wrap(success -> { + // reset the auditors as aliases used may be removed + client.execute( + ResetAuditorAction.INSTANCE, + ResetAuditorAction.Request.RESET_AUDITOR_REQUEST, + ActionListener.wrap(ignored -> unsetResetModeListener.onResponse(success), unsetResetModeListener::onFailure) + ); + }, failure -> { + logger.error("failed to reset machine learning", failure); + client.execute( + ResetAuditorAction.INSTANCE, + ResetAuditorAction.Request.RESET_AUDITOR_REQUEST, + ActionListener.wrap(ignored -> unsetResetModeListener.onFailure(failure), unsetResetModeListener::onFailure) + ); + }); + // Stop all model deployments - ActionListener pipelineValidation = unsetResetModeListener.delegateFailureAndWrap( + ActionListener pipelineValidation = resetAuditors.delegateFailureAndWrap( (delegate, listTasksResponse) -> { listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices"); if (results.values().stream().allMatch(b -> b)) { @@ -2332,18 +2347,6 @@ public void cleanUpFeature( client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled(), afterResetModeSet); } - private void resetAuditors() { - if (anomalyDetectionAuditor.get() != null) { - anomalyDetectionAuditor.get().reset(); - } - if (dataFrameAnalyticsAuditor.get() != null) { - dataFrameAnalyticsAuditor.get().reset(); - } - if (inferenceAuditor.get() != null) { - inferenceAuditor.get().reset(); - } - } - @Override public BreakerSettings getCircuitBreaker(Settings settingsToUse) { return BreakerSettings.updateFromSettings( diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java new file mode 100644 index 0000000000000..a8f0daca2274d --- /dev/null +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/action/TransportResetAuditorAction.java @@ -0,0 +1,88 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the Elastic License + * 2.0; you may not use this file except in compliance with the Elastic License + * 2.0. + */ + +package org.elasticsearch.xpack.ml.action; + +import org.elasticsearch.action.FailedNodeException; +import org.elasticsearch.action.support.ActionFilters; +import org.elasticsearch.action.support.nodes.TransportNodesAction; +import org.elasticsearch.cluster.node.DiscoveryNode; +import org.elasticsearch.cluster.service.ClusterService; +import org.elasticsearch.common.io.stream.StreamInput; +import org.elasticsearch.injection.guice.Inject; +import org.elasticsearch.tasks.Task; +import org.elasticsearch.threadpool.ThreadPool; +import org.elasticsearch.transport.TransportService; +import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction; +import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor; +import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor; +import org.elasticsearch.xpack.ml.notifications.InferenceAuditor; + +import java.io.IOException; +import java.util.List; + +public class TransportResetAuditorAction extends TransportNodesAction< + ResetAuditorAction.Request, + ResetAuditorAction.Response, + ResetAuditorAction.NodeRequest, + ResetAuditorAction.Response.ResetResponse, + Void> { + + private final AnomalyDetectionAuditor anomalyDetectionAuditor; + private final DataFrameAnalyticsAuditor dfaAuditor; + private final InferenceAuditor inferenceAuditor; + + @Inject + public TransportResetAuditorAction( + ThreadPool threadPool, + ClusterService clusterService, + TransportService transportService, + ActionFilters actionFilters, + AnomalyDetectionAuditor anomalyDetectionAuditor, + DataFrameAnalyticsAuditor dfaAuditor, + InferenceAuditor inferenceAuditor + ) { + super( + ResetAuditorAction.NAME, + clusterService, + transportService, + actionFilters, + ResetAuditorAction.NodeRequest::new, + threadPool.executor(ThreadPool.Names.MANAGEMENT) + ); + this.anomalyDetectionAuditor = anomalyDetectionAuditor; + this.dfaAuditor = dfaAuditor; + this.inferenceAuditor = inferenceAuditor; + } + + @Override + protected ResetAuditorAction.Response newResponse( + ResetAuditorAction.Request request, + List resetResponses, + List failures + ) { + return new ResetAuditorAction.Response(clusterService.getClusterName(), resetResponses, failures); + } + + @Override + protected ResetAuditorAction.NodeRequest newNodeRequest(ResetAuditorAction.Request request) { + return new ResetAuditorAction.NodeRequest(); + } + + @Override + protected ResetAuditorAction.Response.ResetResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException { + return new ResetAuditorAction.Response.ResetResponse(in); + } + + @Override + protected ResetAuditorAction.Response.ResetResponse nodeOperation(ResetAuditorAction.NodeRequest request, Task task) { + anomalyDetectionAuditor.reset(); + dfaAuditor.reset(); + inferenceAuditor.reset(); + return new ResetAuditorAction.Response.ResetResponse(clusterService.localNode(), true); + } +} diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java index 99b03c2725411..2c9949fac70d4 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java @@ -16,13 +16,11 @@ import org.elasticsearch.cluster.metadata.ComposableIndexTemplate; import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver; import org.elasticsearch.cluster.service.ClusterService; -import org.elasticsearch.xcontent.ToXContent; import org.elasticsearch.xcontent.XContentParserConfiguration; import org.elasticsearch.xcontent.json.JsonXContent; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessageFactory; import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor; -import org.elasticsearch.xpack.core.ml.MlMetadata; import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex; import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry; @@ -33,7 +31,6 @@ abstract class AbstractMlAuditor extends AbstractAuditor { private static final Logger logger = LogManager.getLogger(AbstractMlAuditor.class); - private volatile boolean isResetMode; protected AbstractMlAuditor( Client client, @@ -50,34 +47,6 @@ protected AbstractMlAuditor( indexNameExpressionResolver, clusterService.threadPool().generic() ); - clusterService.addListener(event -> { - if (event.metadataChanged()) { - setResetMode(MlMetadata.getMlMetadata(event.state()).isResetMode()); - } - }); - } - - private void setResetMode(boolean value) { - isResetMode = value; - } - - @Override - protected void indexDoc(ToXContent toXContent) { - if (isResetMode) { - logger.trace("Skipped writing the audit message backlog as reset_mode is enabled"); - } else { - super.indexDoc(toXContent); - } - } - - @Override - protected void writeBacklog() { - if (isResetMode) { - logger.trace("Skipped writing the audit message backlog as reset_mode is enabled"); - clearBacklog(); - } else { - super.writeBacklog(); - } } @Override diff --git a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java index a66f85e953e62..253aa0d1a60a8 100644 --- a/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java +++ b/x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java @@ -329,6 +329,7 @@ public class Constants { "cluster:internal/xpack/inference", "cluster:internal/xpack/inference/rerankwindowsize/get", "cluster:internal/xpack/inference/unified", + "cluster:internal/xpack/ml/auditor/reset", "cluster:internal/xpack/ml/coordinatedinference", "cluster:internal/xpack/ml/datafeed/isolate", "cluster:internal/xpack/ml/datafeed/running_state",