Skip to content

Commit 24acc73

Browse files
authored
Merge branch 'main' into bugfix/handle-fork-to-transport-worker
2 parents 0a3a0a4 + cf85f04 commit 24acc73

File tree

9 files changed

+104
-18
lines changed

9 files changed

+104
-18
lines changed

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ static TransportVersion def(int id) {
6060
public static final TransportVersion V_7_8_1 = def(7_08_01_99);
6161
public static final TransportVersion V_7_9_0 = def(7_09_00_99);
6262
public static final TransportVersion V_7_10_0 = def(7_10_00_99);
63+
public static final TransportVersion V_8_0_0 = def(8_00_00_99);
6364
public static final TransportVersion V_8_8_0 = def(8_08_00_99);
6465
public static final TransportVersion V_8_8_1 = def(8_08_01_99);
6566
/*

server/src/main/java/org/elasticsearch/action/admin/indices/flush/TransportShardFlushAction.java

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99

1010
package org.elasticsearch.action.admin.indices.flush;
1111

12+
import org.elasticsearch.TransportVersions;
1213
import org.elasticsearch.action.ActionListener;
1314
import org.elasticsearch.action.ActionType;
1415
import org.elasticsearch.action.support.ActionFilters;
@@ -17,11 +18,17 @@
1718
import org.elasticsearch.cluster.action.shard.ShardStateAction;
1819
import org.elasticsearch.cluster.service.ClusterService;
1920
import org.elasticsearch.common.io.stream.StreamInput;
21+
import org.elasticsearch.common.io.stream.StreamOutput;
2022
import org.elasticsearch.common.settings.Settings;
2123
import org.elasticsearch.index.shard.IndexShard;
24+
import org.elasticsearch.index.shard.ShardId;
2225
import org.elasticsearch.indices.IndicesService;
2326
import org.elasticsearch.injection.guice.Inject;
27+
import org.elasticsearch.tasks.Task;
2428
import org.elasticsearch.threadpool.ThreadPool;
29+
import org.elasticsearch.transport.AbstractTransportRequest;
30+
import org.elasticsearch.transport.TransportChannel;
31+
import org.elasticsearch.transport.TransportRequestHandler;
2532
import org.elasticsearch.transport.TransportService;
2633

2734
import java.io.IOException;
@@ -57,6 +64,12 @@ public TransportShardFlushAction(
5764
PrimaryActionExecution.RejectOnOverload,
5865
ReplicaActionExecution.SubjectToCircuitBreaker
5966
);
67+
transportService.registerRequestHandler(
68+
PRE_SYNCED_FLUSH_ACTION_NAME,
69+
threadPool.executor(ThreadPool.Names.FLUSH),
70+
PreShardSyncedFlushRequest::new,
71+
new PreSyncedFlushTransportHandler(indicesService)
72+
);
6073
}
6174

6275
@Override
@@ -83,4 +96,43 @@ protected void shardOperationOnReplica(ShardFlushRequest request, IndexShard rep
8396
return new ReplicaResult();
8497
}));
8598
}
99+
100+
// TODO: Remove this transition in 9.0
101+
private static final String PRE_SYNCED_FLUSH_ACTION_NAME = "internal:indices/flush/synced/pre";
102+
103+
private static class PreShardSyncedFlushRequest extends AbstractTransportRequest {
104+
private final ShardId shardId;
105+
106+
private PreShardSyncedFlushRequest(StreamInput in) throws IOException {
107+
super(in);
108+
assert in.getTransportVersion().before(TransportVersions.V_8_0_0) : "received pre_sync request from a new node";
109+
this.shardId = new ShardId(in);
110+
}
111+
112+
@Override
113+
public String toString() {
114+
return "PreShardSyncedFlushRequest{" + "shardId=" + shardId + '}';
115+
}
116+
117+
@Override
118+
public void writeTo(StreamOutput out) throws IOException {
119+
assert false : "must not send pre_sync request from a new node";
120+
throw new UnsupportedOperationException("");
121+
}
122+
}
123+
124+
private static final class PreSyncedFlushTransportHandler implements TransportRequestHandler<PreShardSyncedFlushRequest> {
125+
private final IndicesService indicesService;
126+
127+
PreSyncedFlushTransportHandler(IndicesService indicesService) {
128+
this.indicesService = indicesService;
129+
}
130+
131+
@Override
132+
public void messageReceived(PreShardSyncedFlushRequest request, TransportChannel channel, Task task) {
133+
IndexShard indexShard = indicesService.indexServiceSafe(request.shardId.getIndex()).getShard(request.shardId.id());
134+
indexShard.flush(new FlushRequest().force(false).waitIfOngoing(true));
135+
throw new UnsupportedOperationException("Synced flush was removed and a normal flush was performed instead.");
136+
}
137+
}
86138
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
9195000
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1 @@
1-
esql_reduce_late_materialization,9194000
1+
inference_update_trained_model_deployment_request_source,9195000

x-pack/plugin/core/src/main/java/org/elasticsearch/xpack/core/ml/action/UpdateTrainedModelDeploymentAction.java

Lines changed: 39 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77

88
package org.elasticsearch.xpack.core.ml.action;
99

10+
import org.elasticsearch.TransportVersion;
1011
import org.elasticsearch.TransportVersions;
1112
import org.elasticsearch.action.ActionRequestValidationException;
1213
import org.elasticsearch.action.ActionType;
@@ -46,6 +47,10 @@ public static class Request extends AcknowledgedRequest<Request> implements ToXC
4647

4748
public static final ParseField TIMEOUT = new ParseField("timeout");
4849

50+
private static final TransportVersion UPDATE_TRAINED_MODEL_DEPLOYMENT_REQUEST_SOURCE = TransportVersion.fromName(
51+
"inference_update_trained_model_deployment_request_source"
52+
);
53+
4954
static {
5055
PARSER.declareString(Request::setDeploymentId, MODEL_ID);
5156
PARSER.declareInt(Request::setNumberOfAllocations, NUMBER_OF_ALLOCATIONS);
@@ -73,7 +78,7 @@ public static Request parseRequest(String deploymentId, XContentParser parser) {
7378
private String deploymentId;
7479
private Integer numberOfAllocations;
7580
private AdaptiveAllocationsSettings adaptiveAllocationsSettings;
76-
private boolean isInternal;
81+
private Source source = Source.API;
7782

7883
private Request() {
7984
super(TRAPPY_IMPLICIT_DEFAULT_MASTER_NODE_TIMEOUT, DEFAULT_ACK_TIMEOUT);
@@ -90,11 +95,17 @@ public Request(StreamInput in) throws IOException {
9095
if (in.getTransportVersion().before(TransportVersions.V_8_16_0)) {
9196
numberOfAllocations = in.readVInt();
9297
adaptiveAllocationsSettings = null;
93-
isInternal = false;
98+
source = Source.API;
9499
} else {
95100
numberOfAllocations = in.readOptionalVInt();
96101
adaptiveAllocationsSettings = in.readOptionalWriteable(AdaptiveAllocationsSettings::new);
97-
isInternal = in.readBoolean();
102+
if (in.getTransportVersion().supports(UPDATE_TRAINED_MODEL_DEPLOYMENT_REQUEST_SOURCE)) {
103+
source = in.readEnum(Source.class);
104+
} else {
105+
// we changed over from a boolean to an enum
106+
// when it was a boolean, true came from adaptive allocations and false came from the rest api
107+
source = in.readBoolean() ? Source.ADAPTIVE_ALLOCATIONS : Source.API;
108+
}
98109
}
99110
}
100111

@@ -119,11 +130,15 @@ public void setAdaptiveAllocationsSettings(AdaptiveAllocationsSettings adaptiveA
119130
}
120131

121132
public boolean isInternal() {
122-
return isInternal;
133+
return source == Source.INFERENCE_API || source == Source.ADAPTIVE_ALLOCATIONS;
134+
}
135+
136+
public void setSource(Source source) {
137+
this.source = source != null ? source : this.source;
123138
}
124139

125-
public void setIsInternal(boolean isInternal) {
126-
this.isInternal = isInternal;
140+
public Source getSource() {
141+
return source;
127142
}
128143

129144
public AdaptiveAllocationsSettings getAdaptiveAllocationsSettings() {
@@ -139,7 +154,14 @@ public void writeTo(StreamOutput out) throws IOException {
139154
} else {
140155
out.writeOptionalVInt(numberOfAllocations);
141156
out.writeOptionalWriteable(adaptiveAllocationsSettings);
142-
out.writeBoolean(isInternal);
157+
if (out.getTransportVersion().supports(UPDATE_TRAINED_MODEL_DEPLOYMENT_REQUEST_SOURCE)) {
158+
out.writeEnum(source);
159+
} else {
160+
// we changed over from a boolean to an enum
161+
// when it was a boolean, true came from adaptive allocations and false came from the rest api
162+
// treat "inference" as if it came from the api
163+
out.writeBoolean(isInternal());
164+
}
143165
}
144166
}
145167

@@ -161,10 +183,10 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
161183
public ActionRequestValidationException validate() {
162184
ActionRequestValidationException validationException = new ActionRequestValidationException();
163185
if (numberOfAllocations != null) {
164-
if (numberOfAllocations < 0 || (isInternal == false && numberOfAllocations == 0)) {
186+
if (numberOfAllocations < 0 || (isInternal() == false && numberOfAllocations == 0)) {
165187
validationException.addValidationError("[" + NUMBER_OF_ALLOCATIONS + "] must be a positive integer");
166188
}
167-
if (isInternal == false
189+
if (isInternal() == false
168190
&& adaptiveAllocationsSettings != null
169191
&& adaptiveAllocationsSettings.getEnabled() == Boolean.TRUE) {
170192
validationException.addValidationError(
@@ -183,7 +205,7 @@ public ActionRequestValidationException validate() {
183205

184206
@Override
185207
public int hashCode() {
186-
return Objects.hash(deploymentId, numberOfAllocations, adaptiveAllocationsSettings, isInternal);
208+
return Objects.hash(deploymentId, numberOfAllocations, adaptiveAllocationsSettings, source);
187209
}
188210

189211
@Override
@@ -198,12 +220,18 @@ public boolean equals(Object obj) {
198220
return Objects.equals(deploymentId, other.deploymentId)
199221
&& Objects.equals(numberOfAllocations, other.numberOfAllocations)
200222
&& Objects.equals(adaptiveAllocationsSettings, other.adaptiveAllocationsSettings)
201-
&& isInternal == other.isInternal;
223+
&& source == other.source;
202224
}
203225

204226
@Override
205227
public String toString() {
206228
return Strings.toString(this);
207229
}
230+
231+
public enum Source {
232+
API,
233+
ADAPTIVE_ALLOCATIONS,
234+
INFERENCE_API
235+
}
208236
}
209237
}

x-pack/plugin/inference/src/main/java/org/elasticsearch/xpack/inference/action/TransportUpdateInferenceModelAction.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ private void updateInClusterEndpoint(
294294
var updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId);
295295
updateRequest.setNumberOfAllocations(elasticServiceSettings.getNumAllocations());
296296
updateRequest.setAdaptiveAllocationsSettings(elasticServiceSettings.getAdaptiveAllocationsSettings());
297-
updateRequest.setIsInternal(true);
297+
updateRequest.setSource(UpdateTrainedModelDeploymentAction.Request.Source.INFERENCE_API);
298298

299299
var delegate = listener.<CreateTrainedModelAssignmentAction.Response>delegateFailure((l2, response) -> {
300300
modelRegistry.updateModelTransaction(newModel, existingParsedModel, l2);

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -520,7 +520,7 @@ private void updateNumberOfAllocations(
520520
) {
521521
UpdateTrainedModelDeploymentAction.Request updateRequest = new UpdateTrainedModelDeploymentAction.Request(deploymentId);
522522
updateRequest.setNumberOfAllocations(numberOfAllocations);
523-
updateRequest.setIsInternal(true);
523+
updateRequest.setSource(UpdateTrainedModelDeploymentAction.Request.Source.ADAPTIVE_ALLOCATIONS);
524524
ClientHelper.executeAsyncWithOrigin(
525525
client,
526526
ClientHelper.ML_ORIGIN,

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/adaptiveallocations/AdaptiveAllocationsScalerServiceTests.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,7 @@ public void test_scaleUp() {
229229
verify(client, times(1)).execute(eq(GetDeploymentStatsAction.INSTANCE), any(), any());
230230
var updateRequest = new UpdateTrainedModelDeploymentAction.Request("test-deployment");
231231
updateRequest.setNumberOfAllocations(2);
232-
updateRequest.setIsInternal(true);
232+
updateRequest.setSource(UpdateTrainedModelDeploymentAction.Request.Source.ADAPTIVE_ALLOCATIONS);
233233
verify(client, times(1)).execute(eq(UpdateTrainedModelDeploymentAction.INSTANCE), eq(updateRequest), any());
234234
verifyNoMoreInteractions(client, clusterService);
235235
reset(client, clusterService);
@@ -323,7 +323,7 @@ public void test_scaleDownToZero_whenNoRequests() {
323323
verify(client, times(1)).execute(eq(GetDeploymentStatsAction.INSTANCE), any(), any());
324324
var updateRequest = new UpdateTrainedModelDeploymentAction.Request("test-deployment");
325325
updateRequest.setNumberOfAllocations(0);
326-
updateRequest.setIsInternal(true);
326+
updateRequest.setSource(UpdateTrainedModelDeploymentAction.Request.Source.ADAPTIVE_ALLOCATIONS);
327327
verify(client, times(1)).execute(eq(UpdateTrainedModelDeploymentAction.INSTANCE), eq(updateRequest), any());
328328
verifyNoMoreInteractions(client, clusterService);
329329

@@ -468,7 +468,7 @@ public void test_noScaleDownToZero_whenRecentlyScaledUpByOtherNode() {
468468
verify(client, times(1)).execute(eq(GetDeploymentStatsAction.INSTANCE), any(), any());
469469
var updateRequest = new UpdateTrainedModelDeploymentAction.Request("test-deployment");
470470
updateRequest.setNumberOfAllocations(0);
471-
updateRequest.setIsInternal(true);
471+
updateRequest.setSource(UpdateTrainedModelDeploymentAction.Request.Source.ADAPTIVE_ALLOCATIONS);
472472
verify(client, times(1)).execute(eq(UpdateTrainedModelDeploymentAction.INSTANCE), eq(updateRequest), any());
473473
verifyNoMoreInteractions(client, clusterService);
474474

x-pack/plugin/src/yamlRestTest/resources/rest-api-spec/test/ml/update_trained_model_deployment.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,9 @@
66
model_id: "missing-model"
77
body: >
88
{
9-
"number_of_allocations": 4
9+
"adaptive_allocations": {
10+
"enabled": true,
11+
"min_number_of_allocations": 0,
12+
"max_number_of_allocations": 1
13+
}
1014
}

0 commit comments

Comments
 (0)