Skip to content

Commit 46586ac

Browse files
davidkylejan-elastic
authored andcommitted
[ML] Add Auditor reset internal action (#136363)
Adds auditor reset action for deterministic resets in tests.
1 parent 03b3a1a commit 46586ac

File tree

5 files changed

+235
-48
lines changed

5 files changed

+235
-48
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ml.action;
9+
10+
import org.elasticsearch.action.ActionType;
11+
import org.elasticsearch.action.FailedNodeException;
12+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
13+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
14+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
15+
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.node.DiscoveryNode;
17+
import org.elasticsearch.common.io.stream.StreamInput;
18+
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.transport.AbstractTransportRequest;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
import java.util.Objects;
24+
25+
public class ResetAuditorAction extends ActionType<ResetAuditorAction.Response> {
26+
27+
public static final ResetAuditorAction INSTANCE = new ResetAuditorAction();
28+
public static final String NAME = "cluster:internal/xpack/ml/auditor/reset";
29+
30+
private ResetAuditorAction() {
31+
super(NAME);
32+
}
33+
34+
public static class Request extends BaseNodesRequest {
35+
36+
public static Request RESET_AUDITOR_REQUEST = new Request();
37+
38+
private Request() {
39+
super(new String[] { "ml:true" }); // Only ml nodes. See DiscoveryNodes::resolveNodes
40+
}
41+
}
42+
43+
public static class NodeRequest extends AbstractTransportRequest {
44+
45+
public NodeRequest(StreamInput in) throws IOException {
46+
super(in);
47+
}
48+
49+
public NodeRequest() {}
50+
51+
@Override
52+
public boolean equals(Object o) {
53+
if (o == null || getClass() != o.getClass()) {
54+
return false;
55+
}
56+
return true;
57+
}
58+
59+
@Override
60+
public int hashCode() {
61+
return Objects.hash();
62+
}
63+
}
64+
65+
public static class Response extends BaseNodesResponse<Response.ResetResponse> {
66+
67+
public Response(ClusterName clusterName, List<ResetResponse> nodes, List<FailedNodeException> failures) {
68+
super(clusterName, nodes, failures);
69+
}
70+
71+
protected Response(StreamInput in) throws IOException {
72+
super(in);
73+
}
74+
75+
public static class ResetResponse extends BaseNodeResponse {
76+
private final boolean acknowledged;
77+
78+
public ResetResponse(DiscoveryNode node, boolean acknowledged) {
79+
super(node);
80+
this.acknowledged = acknowledged;
81+
}
82+
83+
public ResetResponse(StreamInput in) throws IOException {
84+
super(in, null);
85+
acknowledged = in.readBoolean();
86+
}
87+
88+
public ResetResponse(StreamInput in, DiscoveryNode node) throws IOException {
89+
super(in, node);
90+
acknowledged = in.readBoolean();
91+
}
92+
93+
public boolean isAcknowledged() {
94+
return acknowledged;
95+
}
96+
97+
@Override
98+
public void writeTo(StreamOutput out) throws IOException {
99+
super.writeTo(out);
100+
out.writeBoolean(acknowledged);
101+
}
102+
103+
@Override
104+
public boolean equals(Object o) {
105+
if (o == null || getClass() != o.getClass()) return false;
106+
ResetResponse that = (ResetResponse) o;
107+
return acknowledged == that.acknowledged;
108+
}
109+
110+
@Override
111+
public int hashCode() {
112+
return Objects.hashCode(acknowledged);
113+
}
114+
}
115+
116+
@Override
117+
protected List<Response.ResetResponse> readNodesFrom(StreamInput in) throws IOException {
118+
return in.readCollectionAsList(ResetResponse::new);
119+
}
120+
121+
@Override
122+
protected void writeNodesTo(StreamOutput out, List<Response.ResetResponse> nodes) throws IOException {
123+
out.writeCollection(nodes);
124+
}
125+
}
126+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,6 @@
3131
import org.elasticsearch.cluster.service.ClusterService;
3232
import org.elasticsearch.common.breaker.CircuitBreaker;
3333
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
34-
import org.elasticsearch.common.logging.DeprecationLogger;
3534
import org.elasticsearch.common.settings.ClusterSettings;
3635
import org.elasticsearch.common.settings.IndexScopedSettings;
3736
import org.elasticsearch.common.settings.Setting;
@@ -160,6 +159,7 @@
160159
import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAliasAction;
161160
import org.elasticsearch.xpack.core.ml.action.PutTrainedModelDefinitionPartAction;
162161
import org.elasticsearch.xpack.core.ml.action.PutTrainedModelVocabularyAction;
162+
import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction;
163163
import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
164164
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
165165
import org.elasticsearch.xpack.core.ml.action.SetResetModeAction;
@@ -270,6 +270,7 @@
270270
import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAliasAction;
271271
import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelDefinitionPartAction;
272272
import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelVocabularyAction;
273+
import org.elasticsearch.xpack.ml.action.TransportResetAuditorAction;
273274
import org.elasticsearch.xpack.ml.action.TransportResetJobAction;
274275
import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction;
275276
import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction;
@@ -784,7 +785,6 @@ public void loadExtensions(ExtensionLoader loader) {
784785
public static final int MAX_LOW_PRIORITY_MODELS_PER_NODE = 100;
785786

786787
private static final Logger logger = LogManager.getLogger(MachineLearning.class);
787-
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(MachineLearning.class);
788788

789789
private final Settings settings;
790790
private final boolean enabled;
@@ -1563,6 +1563,7 @@ public List<ActionHandler> getActions() {
15631563
actionHandlers.add(new ActionHandler(MlMemoryAction.INSTANCE, TransportMlMemoryAction.class));
15641564
actionHandlers.add(new ActionHandler(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class));
15651565
actionHandlers.add(new ActionHandler(SetResetModeAction.INSTANCE, TransportSetResetModeAction.class));
1566+
actionHandlers.add(new ActionHandler(ResetAuditorAction.INSTANCE, TransportResetAuditorAction.class));
15661567
// Included in this section as it's used by MlMemoryAction
15671568
actionHandlers.add(new ActionHandler(TrainedModelCacheInfoAction.INSTANCE, TransportTrainedModelCacheInfoAction.class));
15681569
actionHandlers.add(new ActionHandler(GetMlAutoscalingStats.INSTANCE, TransportGetMlAutoscalingStats.class));
@@ -2153,8 +2154,6 @@ public void cleanUpFeature(
21532154
final Map<String, Boolean> results = new ConcurrentHashMap<>();
21542155

21552156
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> unsetResetModeListener = ActionListener.wrap(success -> {
2156-
// reset the auditors as aliases used may be removed
2157-
resetAuditors();
21582157

21592158
client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap(resetSuccess -> {
21602159
finalListener.onResponse(success);
@@ -2180,8 +2179,24 @@ public void cleanUpFeature(
21802179
);
21812180
});
21822181

2182+
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> resetAuditors = ActionListener.wrap(success -> {
2183+
// reset the auditors as aliases used may be removed
2184+
client.execute(
2185+
ResetAuditorAction.INSTANCE,
2186+
ResetAuditorAction.Request.RESET_AUDITOR_REQUEST,
2187+
ActionListener.wrap(ignored -> unsetResetModeListener.onResponse(success), unsetResetModeListener::onFailure)
2188+
);
2189+
}, failure -> {
2190+
logger.error("failed to reset machine learning", failure);
2191+
client.execute(
2192+
ResetAuditorAction.INSTANCE,
2193+
ResetAuditorAction.Request.RESET_AUDITOR_REQUEST,
2194+
ActionListener.wrap(ignored -> unsetResetModeListener.onFailure(failure), unsetResetModeListener::onFailure)
2195+
);
2196+
});
2197+
21832198
// Stop all model deployments
2184-
ActionListener<AcknowledgedResponse> pipelineValidation = unsetResetModeListener.<ListTasksResponse>delegateFailureAndWrap(
2199+
ActionListener<AcknowledgedResponse> pipelineValidation = resetAuditors.<ListTasksResponse>delegateFailureAndWrap(
21852200
(delegate, listTasksResponse) -> {
21862201
listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices");
21872202
if (results.values().stream().allMatch(b -> b)) {
@@ -2336,18 +2351,6 @@ public void cleanUpFeature(
23362351
client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled(), afterResetModeSet);
23372352
}
23382353

2339-
private void resetAuditors() {
2340-
if (anomalyDetectionAuditor.get() != null) {
2341-
anomalyDetectionAuditor.get().reset();
2342-
}
2343-
if (dataFrameAnalyticsAuditor.get() != null) {
2344-
dataFrameAnalyticsAuditor.get().reset();
2345-
}
2346-
if (inferenceAuditor.get() != null) {
2347-
inferenceAuditor.get().reset();
2348-
}
2349-
}
2350-
23512354
@Override
23522355
public BreakerSettings getCircuitBreaker(Settings settingsToUse) {
23532356
return BreakerSettings.updateFromSettings(
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.action;
9+
10+
import org.elasticsearch.action.FailedNodeException;
11+
import org.elasticsearch.action.support.ActionFilters;
12+
import org.elasticsearch.action.support.nodes.TransportNodesAction;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.cluster.service.ClusterService;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.injection.guice.Inject;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.threadpool.ThreadPool;
19+
import org.elasticsearch.transport.TransportService;
20+
import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction;
21+
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
22+
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
23+
import org.elasticsearch.xpack.ml.notifications.InferenceAuditor;
24+
25+
import java.io.IOException;
26+
import java.util.List;
27+
28+
public class TransportResetAuditorAction extends TransportNodesAction<
29+
ResetAuditorAction.Request,
30+
ResetAuditorAction.Response,
31+
ResetAuditorAction.NodeRequest,
32+
ResetAuditorAction.Response.ResetResponse,
33+
Void> {
34+
35+
private final AnomalyDetectionAuditor anomalyDetectionAuditor;
36+
private final DataFrameAnalyticsAuditor dfaAuditor;
37+
private final InferenceAuditor inferenceAuditor;
38+
39+
@Inject
40+
public TransportResetAuditorAction(
41+
ThreadPool threadPool,
42+
ClusterService clusterService,
43+
TransportService transportService,
44+
ActionFilters actionFilters,
45+
AnomalyDetectionAuditor anomalyDetectionAuditor,
46+
DataFrameAnalyticsAuditor dfaAuditor,
47+
InferenceAuditor inferenceAuditor
48+
) {
49+
super(
50+
ResetAuditorAction.NAME,
51+
clusterService,
52+
transportService,
53+
actionFilters,
54+
ResetAuditorAction.NodeRequest::new,
55+
threadPool.executor(ThreadPool.Names.MANAGEMENT)
56+
);
57+
this.anomalyDetectionAuditor = anomalyDetectionAuditor;
58+
this.dfaAuditor = dfaAuditor;
59+
this.inferenceAuditor = inferenceAuditor;
60+
}
61+
62+
@Override
63+
protected ResetAuditorAction.Response newResponse(
64+
ResetAuditorAction.Request request,
65+
List<ResetAuditorAction.Response.ResetResponse> resetResponses,
66+
List<FailedNodeException> failures
67+
) {
68+
return new ResetAuditorAction.Response(clusterService.getClusterName(), resetResponses, failures);
69+
}
70+
71+
@Override
72+
protected ResetAuditorAction.NodeRequest newNodeRequest(ResetAuditorAction.Request request) {
73+
return new ResetAuditorAction.NodeRequest();
74+
}
75+
76+
@Override
77+
protected ResetAuditorAction.Response.ResetResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
78+
return new ResetAuditorAction.Response.ResetResponse(in);
79+
}
80+
81+
@Override
82+
protected ResetAuditorAction.Response.ResetResponse nodeOperation(ResetAuditorAction.NodeRequest request, Task task) {
83+
anomalyDetectionAuditor.reset();
84+
dfaAuditor.reset();
85+
inferenceAuditor.reset();
86+
return new ResetAuditorAction.Response.ResetResponse(clusterService.localNode(), true);
87+
}
88+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1818
import org.elasticsearch.cluster.service.ClusterService;
19-
import org.elasticsearch.xcontent.ToXContent;
2019
import org.elasticsearch.xcontent.XContentParserConfiguration;
2120
import org.elasticsearch.xcontent.json.JsonXContent;
2221
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
2322
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessageFactory;
2423
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
25-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2624
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
2725
import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry;
2826

@@ -33,7 +31,6 @@
3331
abstract class AbstractMlAuditor<T extends AbstractAuditMessage> extends AbstractAuditor<T> {
3432

3533
private static final Logger logger = LogManager.getLogger(AbstractMlAuditor.class);
36-
private volatile boolean isResetMode;
3734

3835
protected AbstractMlAuditor(
3936
Client client,
@@ -50,34 +47,6 @@ protected AbstractMlAuditor(
5047
indexNameExpressionResolver,
5148
clusterService.threadPool().generic()
5249
);
53-
clusterService.addListener(event -> {
54-
if (event.metadataChanged()) {
55-
setResetMode(MlMetadata.getMlMetadata(event.state()).isResetMode());
56-
}
57-
});
58-
}
59-
60-
private void setResetMode(boolean value) {
61-
isResetMode = value;
62-
}
63-
64-
@Override
65-
protected void indexDoc(ToXContent toXContent) {
66-
if (isResetMode) {
67-
logger.trace("Skipped writing the audit message backlog as reset_mode is enabled");
68-
} else {
69-
super.indexDoc(toXContent);
70-
}
71-
}
72-
73-
@Override
74-
protected void writeBacklog() {
75-
if (isResetMode) {
76-
logger.trace("Skipped writing the audit message backlog as reset_mode is enabled");
77-
clearBacklog();
78-
} else {
79-
super.writeBacklog();
80-
}
8150
}
8251

8352
@Override

x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ public class Constants {
327327
"cluster:internal/remote_cluster/nodes",
328328
"cluster:internal/xpack/inference",
329329
"cluster:internal/xpack/inference/unified",
330+
"cluster:internal/xpack/ml/auditor/reset",
330331
"cluster:internal/xpack/ml/coordinatedinference",
331332
"cluster:internal/xpack/ml/datafeed/isolate",
332333
"cluster:internal/xpack/ml/datafeed/running_state",

0 commit comments

Comments
 (0)