Skip to content

Commit 5c8c576

Browse files
davidkylejan-elastic
authored andcommitted
[ML] Add Auditor reset internal action (elastic#136363)
Adds auditor reset action for deterministic resets in tests.
1 parent b19d2e4 commit 5c8c576

File tree

5 files changed

+235
-48
lines changed

5 files changed

+235
-48
lines changed
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.ml.action;
9+
10+
import org.elasticsearch.action.ActionType;
11+
import org.elasticsearch.action.FailedNodeException;
12+
import org.elasticsearch.action.support.nodes.BaseNodeResponse;
13+
import org.elasticsearch.action.support.nodes.BaseNodesRequest;
14+
import org.elasticsearch.action.support.nodes.BaseNodesResponse;
15+
import org.elasticsearch.cluster.ClusterName;
16+
import org.elasticsearch.cluster.node.DiscoveryNode;
17+
import org.elasticsearch.common.io.stream.StreamInput;
18+
import org.elasticsearch.common.io.stream.StreamOutput;
19+
import org.elasticsearch.transport.AbstractTransportRequest;
20+
21+
import java.io.IOException;
22+
import java.util.List;
23+
import java.util.Objects;
24+
25+
public class ResetAuditorAction extends ActionType<ResetAuditorAction.Response> {
26+
27+
public static final ResetAuditorAction INSTANCE = new ResetAuditorAction();
28+
public static final String NAME = "cluster:internal/xpack/ml/auditor/reset";
29+
30+
private ResetAuditorAction() {
31+
super(NAME);
32+
}
33+
34+
public static class Request extends BaseNodesRequest {
35+
36+
public static Request RESET_AUDITOR_REQUEST = new Request();
37+
38+
private Request() {
39+
super(new String[] { "ml:true" }); // Only ml nodes. See DiscoveryNodes::resolveNodes
40+
}
41+
}
42+
43+
public static class NodeRequest extends AbstractTransportRequest {
44+
45+
public NodeRequest(StreamInput in) throws IOException {
46+
super(in);
47+
}
48+
49+
public NodeRequest() {}
50+
51+
@Override
52+
public boolean equals(Object o) {
53+
if (o == null || getClass() != o.getClass()) {
54+
return false;
55+
}
56+
return true;
57+
}
58+
59+
@Override
60+
public int hashCode() {
61+
return Objects.hash();
62+
}
63+
}
64+
65+
public static class Response extends BaseNodesResponse<Response.ResetResponse> {
66+
67+
public Response(ClusterName clusterName, List<ResetResponse> nodes, List<FailedNodeException> failures) {
68+
super(clusterName, nodes, failures);
69+
}
70+
71+
protected Response(StreamInput in) throws IOException {
72+
super(in);
73+
}
74+
75+
public static class ResetResponse extends BaseNodeResponse {
76+
private final boolean acknowledged;
77+
78+
public ResetResponse(DiscoveryNode node, boolean acknowledged) {
79+
super(node);
80+
this.acknowledged = acknowledged;
81+
}
82+
83+
public ResetResponse(StreamInput in) throws IOException {
84+
super(in, null);
85+
acknowledged = in.readBoolean();
86+
}
87+
88+
public ResetResponse(StreamInput in, DiscoveryNode node) throws IOException {
89+
super(in, node);
90+
acknowledged = in.readBoolean();
91+
}
92+
93+
public boolean isAcknowledged() {
94+
return acknowledged;
95+
}
96+
97+
@Override
98+
public void writeTo(StreamOutput out) throws IOException {
99+
super.writeTo(out);
100+
out.writeBoolean(acknowledged);
101+
}
102+
103+
@Override
104+
public boolean equals(Object o) {
105+
if (o == null || getClass() != o.getClass()) return false;
106+
ResetResponse that = (ResetResponse) o;
107+
return acknowledged == that.acknowledged;
108+
}
109+
110+
@Override
111+
public int hashCode() {
112+
return Objects.hashCode(acknowledged);
113+
}
114+
}
115+
116+
@Override
117+
protected List<Response.ResetResponse> readNodesFrom(StreamInput in) throws IOException {
118+
return in.readCollectionAsList(ResetResponse::new);
119+
}
120+
121+
@Override
122+
protected void writeNodesTo(StreamOutput out, List<Response.ResetResponse> nodes) throws IOException {
123+
out.writeCollection(nodes);
124+
}
125+
}
126+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/MachineLearning.java

Lines changed: 20 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.elasticsearch.cluster.service.ClusterService;
3333
import org.elasticsearch.common.breaker.CircuitBreaker;
3434
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
35-
import org.elasticsearch.common.logging.DeprecationLogger;
3635
import org.elasticsearch.common.settings.ClusterSettings;
3736
import org.elasticsearch.common.settings.IndexScopedSettings;
3837
import org.elasticsearch.common.settings.Setting;
@@ -161,6 +160,7 @@
161160
import org.elasticsearch.xpack.core.ml.action.PutTrainedModelAliasAction;
162161
import org.elasticsearch.xpack.core.ml.action.PutTrainedModelDefinitionPartAction;
163162
import org.elasticsearch.xpack.core.ml.action.PutTrainedModelVocabularyAction;
163+
import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction;
164164
import org.elasticsearch.xpack.core.ml.action.ResetJobAction;
165165
import org.elasticsearch.xpack.core.ml.action.RevertModelSnapshotAction;
166166
import org.elasticsearch.xpack.core.ml.action.SetResetModeAction;
@@ -271,6 +271,7 @@
271271
import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelAliasAction;
272272
import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelDefinitionPartAction;
273273
import org.elasticsearch.xpack.ml.action.TransportPutTrainedModelVocabularyAction;
274+
import org.elasticsearch.xpack.ml.action.TransportResetAuditorAction;
274275
import org.elasticsearch.xpack.ml.action.TransportResetJobAction;
275276
import org.elasticsearch.xpack.ml.action.TransportRevertModelSnapshotAction;
276277
import org.elasticsearch.xpack.ml.action.TransportSetResetModeAction;
@@ -785,7 +786,6 @@ public void loadExtensions(ExtensionLoader loader) {
785786
public static final int MAX_LOW_PRIORITY_MODELS_PER_NODE = 100;
786787

787788
private static final Logger logger = LogManager.getLogger(MachineLearning.class);
788-
private static final DeprecationLogger deprecationLogger = DeprecationLogger.getLogger(MachineLearning.class);
789789

790790
private final Settings settings;
791791
private final boolean enabled;
@@ -1564,6 +1564,7 @@ public List<ActionHandler> getActions() {
15641564
actionHandlers.add(new ActionHandler(MlMemoryAction.INSTANCE, TransportMlMemoryAction.class));
15651565
actionHandlers.add(new ActionHandler(SetUpgradeModeAction.INSTANCE, TransportSetUpgradeModeAction.class));
15661566
actionHandlers.add(new ActionHandler(SetResetModeAction.INSTANCE, TransportSetResetModeAction.class));
1567+
actionHandlers.add(new ActionHandler(ResetAuditorAction.INSTANCE, TransportResetAuditorAction.class));
15671568
// Included in this section as it's used by MlMemoryAction
15681569
actionHandlers.add(new ActionHandler(TrainedModelCacheInfoAction.INSTANCE, TransportTrainedModelCacheInfoAction.class));
15691570
actionHandlers.add(new ActionHandler(GetMlAutoscalingStats.INSTANCE, TransportGetMlAutoscalingStats.class));
@@ -2149,8 +2150,6 @@ public void cleanUpFeature(
21492150
final Map<String, Boolean> results = new ConcurrentHashMap<>();
21502151

21512152
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> unsetResetModeListener = ActionListener.wrap(success -> {
2152-
// reset the auditors as aliases used may be removed
2153-
resetAuditors();
21542153

21552154
client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.disabled(true), ActionListener.wrap(resetSuccess -> {
21562155
finalListener.onResponse(success);
@@ -2176,8 +2175,24 @@ public void cleanUpFeature(
21762175
);
21772176
});
21782177

2178+
ActionListener<ResetFeatureStateResponse.ResetFeatureStateStatus> resetAuditors = ActionListener.wrap(success -> {
2179+
// reset the auditors as aliases used may be removed
2180+
client.execute(
2181+
ResetAuditorAction.INSTANCE,
2182+
ResetAuditorAction.Request.RESET_AUDITOR_REQUEST,
2183+
ActionListener.wrap(ignored -> unsetResetModeListener.onResponse(success), unsetResetModeListener::onFailure)
2184+
);
2185+
}, failure -> {
2186+
logger.error("failed to reset machine learning", failure);
2187+
client.execute(
2188+
ResetAuditorAction.INSTANCE,
2189+
ResetAuditorAction.Request.RESET_AUDITOR_REQUEST,
2190+
ActionListener.wrap(ignored -> unsetResetModeListener.onFailure(failure), unsetResetModeListener::onFailure)
2191+
);
2192+
});
2193+
21792194
// Stop all model deployments
2180-
ActionListener<AcknowledgedResponse> pipelineValidation = unsetResetModeListener.<ListTasksResponse>delegateFailureAndWrap(
2195+
ActionListener<AcknowledgedResponse> pipelineValidation = resetAuditors.<ListTasksResponse>delegateFailureAndWrap(
21812196
(delegate, listTasksResponse) -> {
21822197
listTasksResponse.rethrowFailures("Waiting for indexing requests for .ml-* indices");
21832198
if (results.values().stream().allMatch(b -> b)) {
@@ -2332,18 +2347,6 @@ public void cleanUpFeature(
23322347
client.execute(SetResetModeAction.INSTANCE, SetResetModeActionRequest.enabled(), afterResetModeSet);
23332348
}
23342349

2335-
private void resetAuditors() {
2336-
if (anomalyDetectionAuditor.get() != null) {
2337-
anomalyDetectionAuditor.get().reset();
2338-
}
2339-
if (dataFrameAnalyticsAuditor.get() != null) {
2340-
dataFrameAnalyticsAuditor.get().reset();
2341-
}
2342-
if (inferenceAuditor.get() != null) {
2343-
inferenceAuditor.get().reset();
2344-
}
2345-
}
2346-
23472350
@Override
23482351
public BreakerSettings getCircuitBreaker(Settings settingsToUse) {
23492352
return BreakerSettings.updateFromSettings(
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.ml.action;
9+
10+
import org.elasticsearch.action.FailedNodeException;
11+
import org.elasticsearch.action.support.ActionFilters;
12+
import org.elasticsearch.action.support.nodes.TransportNodesAction;
13+
import org.elasticsearch.cluster.node.DiscoveryNode;
14+
import org.elasticsearch.cluster.service.ClusterService;
15+
import org.elasticsearch.common.io.stream.StreamInput;
16+
import org.elasticsearch.injection.guice.Inject;
17+
import org.elasticsearch.tasks.Task;
18+
import org.elasticsearch.threadpool.ThreadPool;
19+
import org.elasticsearch.transport.TransportService;
20+
import org.elasticsearch.xpack.core.ml.action.ResetAuditorAction;
21+
import org.elasticsearch.xpack.ml.notifications.AnomalyDetectionAuditor;
22+
import org.elasticsearch.xpack.ml.notifications.DataFrameAnalyticsAuditor;
23+
import org.elasticsearch.xpack.ml.notifications.InferenceAuditor;
24+
25+
import java.io.IOException;
26+
import java.util.List;
27+
28+
public class TransportResetAuditorAction extends TransportNodesAction<
29+
ResetAuditorAction.Request,
30+
ResetAuditorAction.Response,
31+
ResetAuditorAction.NodeRequest,
32+
ResetAuditorAction.Response.ResetResponse,
33+
Void> {
34+
35+
private final AnomalyDetectionAuditor anomalyDetectionAuditor;
36+
private final DataFrameAnalyticsAuditor dfaAuditor;
37+
private final InferenceAuditor inferenceAuditor;
38+
39+
@Inject
40+
public TransportResetAuditorAction(
41+
ThreadPool threadPool,
42+
ClusterService clusterService,
43+
TransportService transportService,
44+
ActionFilters actionFilters,
45+
AnomalyDetectionAuditor anomalyDetectionAuditor,
46+
DataFrameAnalyticsAuditor dfaAuditor,
47+
InferenceAuditor inferenceAuditor
48+
) {
49+
super(
50+
ResetAuditorAction.NAME,
51+
clusterService,
52+
transportService,
53+
actionFilters,
54+
ResetAuditorAction.NodeRequest::new,
55+
threadPool.executor(ThreadPool.Names.MANAGEMENT)
56+
);
57+
this.anomalyDetectionAuditor = anomalyDetectionAuditor;
58+
this.dfaAuditor = dfaAuditor;
59+
this.inferenceAuditor = inferenceAuditor;
60+
}
61+
62+
@Override
63+
protected ResetAuditorAction.Response newResponse(
64+
ResetAuditorAction.Request request,
65+
List<ResetAuditorAction.Response.ResetResponse> resetResponses,
66+
List<FailedNodeException> failures
67+
) {
68+
return new ResetAuditorAction.Response(clusterService.getClusterName(), resetResponses, failures);
69+
}
70+
71+
@Override
72+
protected ResetAuditorAction.NodeRequest newNodeRequest(ResetAuditorAction.Request request) {
73+
return new ResetAuditorAction.NodeRequest();
74+
}
75+
76+
@Override
77+
protected ResetAuditorAction.Response.ResetResponse newNodeResponse(StreamInput in, DiscoveryNode node) throws IOException {
78+
return new ResetAuditorAction.Response.ResetResponse(in);
79+
}
80+
81+
@Override
82+
protected ResetAuditorAction.Response.ResetResponse nodeOperation(ResetAuditorAction.NodeRequest request, Task task) {
83+
anomalyDetectionAuditor.reset();
84+
dfaAuditor.reset();
85+
inferenceAuditor.reset();
86+
return new ResetAuditorAction.Response.ResetResponse(clusterService.localNode(), true);
87+
}
88+
}

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/notifications/AbstractMlAuditor.java

Lines changed: 0 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -16,13 +16,11 @@
1616
import org.elasticsearch.cluster.metadata.ComposableIndexTemplate;
1717
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
1818
import org.elasticsearch.cluster.service.ClusterService;
19-
import org.elasticsearch.xcontent.ToXContent;
2019
import org.elasticsearch.xcontent.XContentParserConfiguration;
2120
import org.elasticsearch.xcontent.json.JsonXContent;
2221
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessage;
2322
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditMessageFactory;
2423
import org.elasticsearch.xpack.core.common.notifications.AbstractAuditor;
25-
import org.elasticsearch.xpack.core.ml.MlMetadata;
2624
import org.elasticsearch.xpack.core.ml.notifications.NotificationsIndex;
2725
import org.elasticsearch.xpack.ml.MlIndexTemplateRegistry;
2826

@@ -33,7 +31,6 @@
3331
abstract class AbstractMlAuditor<T extends AbstractAuditMessage> extends AbstractAuditor<T> {
3432

3533
private static final Logger logger = LogManager.getLogger(AbstractMlAuditor.class);
36-
private volatile boolean isResetMode;
3734

3835
protected AbstractMlAuditor(
3936
Client client,
@@ -50,34 +47,6 @@ protected AbstractMlAuditor(
5047
indexNameExpressionResolver,
5148
clusterService.threadPool().generic()
5249
);
53-
clusterService.addListener(event -> {
54-
if (event.metadataChanged()) {
55-
setResetMode(MlMetadata.getMlMetadata(event.state()).isResetMode());
56-
}
57-
});
58-
}
59-
60-
private void setResetMode(boolean value) {
61-
isResetMode = value;
62-
}
63-
64-
@Override
65-
protected void indexDoc(ToXContent toXContent) {
66-
if (isResetMode) {
67-
logger.trace("Skipped writing the audit message backlog as reset_mode is enabled");
68-
} else {
69-
super.indexDoc(toXContent);
70-
}
71-
}
72-
73-
@Override
74-
protected void writeBacklog() {
75-
if (isResetMode) {
76-
logger.trace("Skipped writing the audit message backlog as reset_mode is enabled");
77-
clearBacklog();
78-
} else {
79-
super.writeBacklog();
80-
}
8150
}
8251

8352
@Override

x-pack/plugin/security/qa/operator-privileges-tests/src/javaRestTest/java/org/elasticsearch/xpack/security/operator/Constants.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -329,6 +329,7 @@ public class Constants {
329329
"cluster:internal/xpack/inference",
330330
"cluster:internal/xpack/inference/rerankwindowsize/get",
331331
"cluster:internal/xpack/inference/unified",
332+
"cluster:internal/xpack/ml/auditor/reset",
332333
"cluster:internal/xpack/ml/coordinatedinference",
333334
"cluster:internal/xpack/ml/datafeed/isolate",
334335
"cluster:internal/xpack/ml/datafeed/running_state",

0 commit comments

Comments
 (0)