Skip to content

Commit cc6139d

Browse files
authored
[ML] Abstract upgrade mode into core logic (#117512) (#117866)
Transform is adding an identical upgrade mode for 9.x migration. The logic to set the metadata is roughly the same, but the follow-up actions once the upgrade mode is changed will be different.
1 parent 2611915 commit cc6139d

File tree

5 files changed

+535
-177
lines changed

5 files changed

+535
-177
lines changed
Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.action;
9+
10+
import org.apache.logging.log4j.LogManager;
11+
import org.apache.logging.log4j.Logger;
12+
import org.elasticsearch.ElasticsearchStatusException;
13+
import org.elasticsearch.ElasticsearchTimeoutException;
14+
import org.elasticsearch.action.ActionListener;
15+
import org.elasticsearch.action.support.ActionFilters;
16+
import org.elasticsearch.action.support.master.AcknowledgedResponse;
17+
import org.elasticsearch.action.support.master.AcknowledgedTransportMasterNodeAction;
18+
import org.elasticsearch.cluster.ClusterState;
19+
import org.elasticsearch.cluster.ClusterStateTaskListener;
20+
import org.elasticsearch.cluster.SimpleBatchedExecutor;
21+
import org.elasticsearch.cluster.block.ClusterBlockException;
22+
import org.elasticsearch.cluster.block.ClusterBlockLevel;
23+
import org.elasticsearch.cluster.metadata.IndexNameExpressionResolver;
24+
import org.elasticsearch.cluster.service.ClusterService;
25+
import org.elasticsearch.cluster.service.MasterServiceTaskQueue;
26+
import org.elasticsearch.common.Priority;
27+
import org.elasticsearch.common.util.concurrent.EsExecutors;
28+
import org.elasticsearch.core.Strings;
29+
import org.elasticsearch.core.Tuple;
30+
import org.elasticsearch.rest.RestStatus;
31+
import org.elasticsearch.tasks.Task;
32+
import org.elasticsearch.threadpool.ThreadPool;
33+
import org.elasticsearch.transport.TransportService;
34+
35+
import java.util.concurrent.atomic.AtomicBoolean;
36+
37+
public abstract class AbstractTransportSetUpgradeModeAction extends AcknowledgedTransportMasterNodeAction<SetUpgradeModeActionRequest> {
38+
39+
private static final Logger logger = LogManager.getLogger(AbstractTransportSetUpgradeModeAction.class);
40+
private final AtomicBoolean isRunning = new AtomicBoolean(false);
41+
private final MasterServiceTaskQueue<UpdateModeStateListener> taskQueue;
42+
43+
public AbstractTransportSetUpgradeModeAction(
44+
String actionName,
45+
String taskQueuePrefix,
46+
TransportService transportService,
47+
ClusterService clusterService,
48+
ThreadPool threadPool,
49+
ActionFilters actionFilters,
50+
IndexNameExpressionResolver indexNameExpressionResolver
51+
) {
52+
super(
53+
actionName,
54+
transportService,
55+
clusterService,
56+
threadPool,
57+
actionFilters,
58+
SetUpgradeModeActionRequest::new,
59+
indexNameExpressionResolver,
60+
EsExecutors.DIRECT_EXECUTOR_SERVICE
61+
);
62+
63+
this.taskQueue = clusterService.createTaskQueue(taskQueuePrefix + " upgrade mode", Priority.NORMAL, new UpdateModeExecutor());
64+
}
65+
66+
@Override
67+
protected void masterOperation(
68+
Task task,
69+
SetUpgradeModeActionRequest request,
70+
ClusterState state,
71+
ActionListener<AcknowledgedResponse> listener
72+
) throws Exception {
73+
// Don't want folks spamming this endpoint while it is in progress, only allow one request to be handled at a time
74+
if (isRunning.compareAndSet(false, true) == false) {
75+
String msg = Strings.format(
76+
"Attempted to set [upgrade_mode] for feature name [%s] to [%s] from [%s] while previous request was processing.",
77+
featureName(),
78+
request.enabled(),
79+
upgradeMode(state)
80+
);
81+
logger.info(msg);
82+
Exception detail = new IllegalStateException(msg);
83+
listener.onFailure(
84+
new ElasticsearchStatusException(
85+
"Cannot change [upgrade_mode] for feature name [{}]. Previous request is still being processed.",
86+
RestStatus.TOO_MANY_REQUESTS,
87+
detail,
88+
featureName()
89+
)
90+
);
91+
return;
92+
}
93+
94+
// Noop, nothing for us to do, simply return fast to the caller
95+
var upgradeMode = upgradeMode(state);
96+
if (request.enabled() == upgradeMode) {
97+
logger.info("Upgrade mode noop");
98+
isRunning.set(false);
99+
listener.onResponse(AcknowledgedResponse.TRUE);
100+
return;
101+
}
102+
103+
logger.info(
104+
"Starting to set [upgrade_mode] for feature name [{}] to [{}] from [{}]",
105+
featureName(),
106+
request.enabled(),
107+
upgradeMode
108+
);
109+
110+
ActionListener<AcknowledgedResponse> wrappedListener = ActionListener.wrap(r -> {
111+
logger.info("Finished setting [upgrade_mode] for feature name [{}]", featureName());
112+
isRunning.set(false);
113+
listener.onResponse(r);
114+
}, e -> {
115+
logger.info("Failed to set [upgrade_mode] for feature name [{}]", featureName());
116+
isRunning.set(false);
117+
listener.onFailure(e);
118+
});
119+
120+
ActionListener<AcknowledgedResponse> setUpgradeModeListener = wrappedListener.delegateFailure((delegate, ack) -> {
121+
if (ack.isAcknowledged()) {
122+
upgradeModeSuccessfullyChanged(task, request, state, delegate);
123+
} else {
124+
logger.info("Cluster state update is NOT acknowledged");
125+
wrappedListener.onFailure(new ElasticsearchTimeoutException("Unknown error occurred while updating cluster state"));
126+
}
127+
});
128+
129+
taskQueue.submitTask(featureName(), new UpdateModeStateListener(request, setUpgradeModeListener), request.ackTimeout());
130+
}
131+
132+
/**
133+
* Define the feature name, used in log messages and naming the task on the task queue.
134+
*/
135+
protected abstract String featureName();
136+
137+
/**
138+
* Parse the ClusterState for the implementation's {@link org.elasticsearch.cluster.metadata.Metadata.Custom} and find the upgradeMode
139+
* boolean stored there. We will compare this boolean with the request's desired state to determine if we should change the metadata.
140+
*/
141+
protected abstract boolean upgradeMode(ClusterState state);
142+
143+
/**
144+
* This is called from the ClusterState updater and is expected to return quickly.
145+
*/
146+
protected abstract ClusterState createUpdatedState(SetUpgradeModeActionRequest request, ClusterState state);
147+
148+
/**
149+
* This method is only called when the cluster state was successfully changed.
150+
* If we failed to update for any reason, this will not be called.
151+
* The ClusterState param is the previous ClusterState before we called update.
152+
*/
153+
protected abstract void upgradeModeSuccessfullyChanged(
154+
Task task,
155+
SetUpgradeModeActionRequest request,
156+
ClusterState state,
157+
ActionListener<AcknowledgedResponse> listener
158+
);
159+
160+
@Override
161+
protected ClusterBlockException checkBlock(SetUpgradeModeActionRequest request, ClusterState state) {
162+
return state.blocks().globalBlockedException(ClusterBlockLevel.METADATA_WRITE);
163+
}
164+
165+
private record UpdateModeStateListener(SetUpgradeModeActionRequest request, ActionListener<AcknowledgedResponse> listener)
166+
implements
167+
ClusterStateTaskListener {
168+
169+
@Override
170+
public void onFailure(Exception e) {
171+
listener.onFailure(e);
172+
}
173+
}
174+
175+
private class UpdateModeExecutor extends SimpleBatchedExecutor<UpdateModeStateListener, Void> {
176+
@Override
177+
public Tuple<ClusterState, Void> executeTask(UpdateModeStateListener clusterStateListener, ClusterState clusterState) {
178+
return Tuple.tuple(createUpdatedState(clusterStateListener.request(), clusterState), null);
179+
}
180+
181+
@Override
182+
public void taskSucceeded(UpdateModeStateListener clusterStateListener, Void unused) {
183+
clusterStateListener.listener().onResponse(AcknowledgedResponse.TRUE);
184+
}
185+
}
186+
}
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the Elastic License
4+
* 2.0; you may not use this file except in compliance with the Elastic License
5+
* 2.0.
6+
*/
7+
8+
package org.elasticsearch.xpack.core.action;
9+
10+
import org.elasticsearch.action.support.master.AcknowledgedRequest;
11+
import org.elasticsearch.common.io.stream.StreamInput;
12+
import org.elasticsearch.common.io.stream.StreamOutput;
13+
import org.elasticsearch.xcontent.ConstructingObjectParser;
14+
import org.elasticsearch.xcontent.ParseField;
15+
import org.elasticsearch.xcontent.ToXContentObject;
16+
import org.elasticsearch.xcontent.XContentBuilder;
17+
18+
import java.io.IOException;
19+
import java.util.Objects;
20+
21+
public class SetUpgradeModeActionRequest extends AcknowledgedRequest<SetUpgradeModeActionRequest> implements ToXContentObject {
22+
23+
private final boolean enabled;
24+
25+
private static final ParseField ENABLED = new ParseField("enabled");
26+
public static final ConstructingObjectParser<SetUpgradeModeActionRequest, Void> PARSER = new ConstructingObjectParser<>(
27+
"set_upgrade_mode_action_request",
28+
a -> new SetUpgradeModeActionRequest((Boolean) a[0])
29+
);
30+
31+
static {
32+
PARSER.declareBoolean(ConstructingObjectParser.constructorArg(), ENABLED);
33+
}
34+
35+
public SetUpgradeModeActionRequest(boolean enabled) {
36+
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
37+
this.enabled = enabled;
38+
}
39+
40+
public SetUpgradeModeActionRequest(StreamInput in) throws IOException {
41+
super(in);
42+
this.enabled = in.readBoolean();
43+
}
44+
45+
public boolean enabled() {
46+
return enabled;
47+
}
48+
49+
@Override
50+
public void writeTo(StreamOutput out) throws IOException {
51+
super.writeTo(out);
52+
out.writeBoolean(enabled);
53+
}
54+
55+
@Override
56+
public int hashCode() {
57+
return Objects.hash(enabled);
58+
}
59+
60+
@Override
61+
public boolean equals(Object obj) {
62+
if (this == obj) {
63+
return true;
64+
}
65+
if (obj == null || obj.getClass() != getClass()) {
66+
return false;
67+
}
68+
SetUpgradeModeActionRequest other = (SetUpgradeModeActionRequest) obj;
69+
return enabled == other.enabled();
70+
}
71+
72+
@Override
73+
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
74+
builder.startObject();
75+
builder.field(ENABLED.getPreferredName(), enabled);
76+
builder.endObject();
77+
return builder;
78+
}
79+
}

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/SetUpgradeModeAction.java

Lines changed: 3 additions & 46 deletions
Original file line numberDiff line numberDiff line change
@@ -7,17 +7,13 @@
77
package org.elasticsearch.xpack.core.ml.action;
88

99
import org.elasticsearch.action.ActionType;
10-
import org.elasticsearch.action.support.master.AcknowledgedRequest;
1110
import org.elasticsearch.action.support.master.AcknowledgedResponse;
1211
import org.elasticsearch.common.io.stream.StreamInput;
13-
import org.elasticsearch.common.io.stream.StreamOutput;
1412
import org.elasticsearch.xcontent.ConstructingObjectParser;
1513
import org.elasticsearch.xcontent.ParseField;
16-
import org.elasticsearch.xcontent.ToXContentObject;
17-
import org.elasticsearch.xcontent.XContentBuilder;
14+
import org.elasticsearch.xpack.core.action.SetUpgradeModeActionRequest;
1815

1916
import java.io.IOException;
20-
import java.util.Objects;
2117

2218
public class SetUpgradeModeAction extends ActionType<AcknowledgedResponse> {
2319

@@ -28,9 +24,7 @@ private SetUpgradeModeAction() {
2824
super(NAME);
2925
}
3026

31-
public static class Request extends AcknowledgedRequest<Request> implements ToXContentObject {
32-
33-
private final boolean enabled;
27+
public static class Request extends SetUpgradeModeActionRequest {
3428

3529
private static final ParseField ENABLED = new ParseField("enabled");
3630
public static final ConstructingObjectParser<Request, Void> PARSER = new ConstructingObjectParser<>(
@@ -43,48 +37,11 @@ public static class Request extends AcknowledgedRequest<Request> implements ToXC
4337
}
4438

4539
public Request(boolean enabled) {
46-
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
47-
this.enabled = enabled;
40+
super(enabled);
4841
}
4942

5043
public Request(StreamInput in) throws IOException {
5144
super(in);
52-
this.enabled = in.readBoolean();
53-
}
54-
55-
public boolean isEnabled() {
56-
return enabled;
57-
}
58-
59-
@Override
60-
public void writeTo(StreamOutput out) throws IOException {
61-
super.writeTo(out);
62-
out.writeBoolean(enabled);
63-
}
64-
65-
@Override
66-
public int hashCode() {
67-
return Objects.hash(enabled);
68-
}
69-
70-
@Override
71-
public boolean equals(Object obj) {
72-
if (this == obj) {
73-
return true;
74-
}
75-
if (obj == null || obj.getClass() != getClass()) {
76-
return false;
77-
}
78-
Request other = (Request) obj;
79-
return Objects.equals(enabled, other.enabled);
80-
}
81-
82-
@Override
83-
public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException {
84-
builder.startObject();
85-
builder.field(ENABLED.getPreferredName(), enabled);
86-
builder.endObject();
87-
return builder;
8845
}
8946
}
9047
}

0 commit comments

Comments
 (0)