Skip to content

Commit 043ddc3

Browse files
authored
[ML] Fix test failure updating model deployment with stale cluster state (#128667) (#132160)
(cherry picked from commit 989f72b)
1 parent 774b022 commit 043ddc3

File tree

4 files changed

+18
-65
lines changed

4 files changed

+18
-65
lines changed

muted-tests.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,6 @@ tests:
143143
- class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT
144144
method: test {p0=nodes.stats/11_indices_metrics/indices mappings exact count test for indices level}
145145
issue: https://github.com/elastic/elasticsearch/issues/120950
146-
- class: org.elasticsearch.xpack.ml.integration.PyTorchModelIT
147-
issue: https://github.com/elastic/elasticsearch/issues/121165
148146
- class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT
149147
issue: https://github.com/elastic/elasticsearch/issues/121407
150148
- class: org.elasticsearch.analysis.common.CommonAnalysisClientYamlTestSuiteIT

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java

Lines changed: 16 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,6 @@
1111
import org.apache.logging.log4j.Logger;
1212
import org.elasticsearch.ElasticsearchStatusException;
1313
import org.elasticsearch.ResourceNotFoundException;
14-
import org.elasticsearch.TransportVersion;
15-
import org.elasticsearch.TransportVersions;
1614
import org.elasticsearch.action.ActionListener;
1715
import org.elasticsearch.action.ActionRequestValidationException;
1816
import org.elasticsearch.action.support.master.AcknowledgedResponse;
@@ -79,9 +77,6 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
7977

8078
private static final Logger logger = LogManager.getLogger(TrainedModelAssignmentClusterService.class);
8179

82-
private static final TransportVersion RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION = TransportVersions.V_8_3_0;
83-
public static final TransportVersion DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION = TransportVersions.V_8_4_0;
84-
8580
private final ClusterService clusterService;
8681
private final ThreadPool threadPool;
8782
private final NodeLoadDetector nodeLoadDetector;
@@ -169,14 +164,6 @@ public void clusterChanged(ClusterChangedEvent event) {
169164
return;
170165
}
171166

172-
if (eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion(event)) {
173-
// we should not try to rebalance assignments while there may be nodes running on a version
174-
// prior to introducing distributed model allocation.
175-
// But we should remove routing to removed or shutting down nodes.
176-
removeRoutingToRemovedOrShuttingDownNodes(event);
177-
return;
178-
}
179-
180167
if (event.nodesAdded()) {
181168
logMlNodeHeterogeneity();
182169
}
@@ -203,10 +190,6 @@ public void clusterChanged(ClusterChangedEvent event) {
203190
}
204191
}
205192

206-
boolean eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion(ClusterChangedEvent event) {
207-
return event.state().getMinTransportVersion().before(DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION);
208-
}
209-
210193
boolean eventStateHasGlobalBlockStateNotRecoveredBlock(ClusterChangedEvent event) {
211194
return event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK);
212195
}
@@ -400,18 +383,6 @@ public void createNewModelAssignment(
400383
CreateTrainedModelAssignmentAction.Request request,
401384
ActionListener<TrainedModelAssignment> listener
402385
) {
403-
if (clusterService.state().getMinTransportVersion().before(DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION)) {
404-
listener.onFailure(
405-
new ElasticsearchStatusException(
406-
"cannot create new assignment [{}] for model [{}] while cluster upgrade is in progress",
407-
RestStatus.CONFLICT,
408-
request.getTaskParams().getDeploymentId(),
409-
request.getTaskParams().getModelId()
410-
)
411-
);
412-
return;
413-
}
414-
415386
if (MlMetadata.getMlMetadata(clusterService.state()).isResetMode()) {
416387
listener.onFailure(
417388
new ElasticsearchStatusException(
@@ -522,13 +493,11 @@ private static ClusterState update(ClusterState currentState, TrainedModelAssign
522493

523494
private static ClusterState forceUpdate(ClusterState currentState, TrainedModelAssignmentMetadata.Builder modelAssignments) {
524495
logger.debug(() -> format("updated assignments: %s", modelAssignments.build()));
496+
525497
ProjectMetadata.Builder builder = ProjectMetadata.builder(currentState.metadata().getProject());
526-
if (currentState.getMinTransportVersion().onOrAfter(RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION)) {
527-
builder.putCustom(TrainedModelAssignmentMetadata.NAME, modelAssignments.build())
528-
.removeCustom(TrainedModelAssignmentMetadata.DEPRECATED_NAME);
529-
} else {
530-
builder.putCustom(TrainedModelAssignmentMetadata.DEPRECATED_NAME, modelAssignments.buildOld());
531-
}
498+
builder.putCustom(TrainedModelAssignmentMetadata.NAME, modelAssignments.build())
499+
.removeCustom(TrainedModelAssignmentMetadata.DEPRECATED_NAME);
500+
532501
return ClusterState.builder(currentState).putProjectMetadata(builder).build();
533502
}
534503

@@ -844,7 +813,7 @@ private void updateDeployment(
844813
}
845814
boolean hasUpdates = hasUpdates(numberOfAllocations, adaptiveAllocationsSettingsUpdates, existingAssignment);
846815
if (hasUpdates == false) {
847-
logger.info("no updates");
816+
logger.debug("no updates to be made for deployment [{}]", deploymentId);
848817
listener.onResponse(existingAssignment);
849818
return;
850819
}
@@ -858,27 +827,17 @@ private void updateDeployment(
858827
);
859828
return;
860829
}
861-
if (clusterState.getMinTransportVersion().before(DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION)) {
862-
listener.onFailure(
863-
new ElasticsearchStatusException(
864-
"cannot update deployment with model id [{}] while cluster upgrade is in progress.",
865-
RestStatus.CONFLICT,
866-
deploymentId
867-
)
868-
);
869-
return;
870-
}
871830

872-
ActionListener<ClusterState> updatedStateListener = ActionListener.wrap(
873-
updatedState -> submitUnbatchedTask("update model deployment", new ClusterStateUpdateTask() {
831+
ActionListener<TrainedModelAssignmentMetadata.Builder> updatedAssignmentListener = ActionListener.wrap(
832+
updatedAssignment -> submitUnbatchedTask("update model deployment", new ClusterStateUpdateTask() {
874833

875834
private volatile boolean isUpdated;
876835

877836
@Override
878837
public ClusterState execute(ClusterState currentState) {
879838
if (areClusterStatesCompatibleForRebalance(clusterState, currentState)) {
880839
isUpdated = true;
881-
return updatedState;
840+
return update(currentState, updatedAssignment);
882841
}
883842
logger.debug(() -> format("[%s] Retrying update as cluster state has been modified", deploymentId));
884843
updateDeployment(currentState, deploymentId, numberOfAllocations, adaptiveAllocationsSettings, isInternal, listener);
@@ -910,7 +869,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
910869
listener::onFailure
911870
);
912871

913-
updateAssignment(clusterState, existingAssignment, numberOfAllocations, adaptiveAllocationsSettings, updatedStateListener);
872+
updateAssignment(clusterState, existingAssignment, numberOfAllocations, adaptiveAllocationsSettings, updatedAssignmentListener);
914873
}
915874

916875
static boolean hasUpdates(
@@ -944,7 +903,7 @@ private void updateAssignment(
944903
TrainedModelAssignment assignment,
945904
Integer numberOfAllocations,
946905
AdaptiveAllocationsSettings adaptiveAllocationsSettings,
947-
ActionListener<ClusterState> listener
906+
ActionListener<TrainedModelAssignmentMetadata.Builder> listener
948907
) {
949908
threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> {
950909
if (numberOfAllocations == null || numberOfAllocations == assignment.getTaskParams().getNumberOfAllocations()) {
@@ -961,21 +920,21 @@ private void updateAndKeepNumberOfAllocations(
961920
ClusterState clusterState,
962921
TrainedModelAssignment assignment,
963922
AdaptiveAllocationsSettings adaptiveAllocationsSettings,
964-
ActionListener<ClusterState> listener
923+
ActionListener<TrainedModelAssignmentMetadata.Builder> listener
965924
) {
966925
TrainedModelAssignment.Builder updatedAssignment = TrainedModelAssignment.Builder.fromAssignment(assignment)
967926
.setAdaptiveAllocationsSettings(adaptiveAllocationsSettings);
968927
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.builder(clusterState);
969928
builder.updateAssignment(assignment.getDeploymentId(), updatedAssignment);
970-
listener.onResponse(update(clusterState, builder));
929+
listener.onResponse(builder);
971930
}
972931

973932
private void increaseNumberOfAllocations(
974933
ClusterState clusterState,
975934
TrainedModelAssignment assignment,
976935
int numberOfAllocations,
977936
AdaptiveAllocationsSettings adaptiveAllocationsSettings,
978-
ActionListener<ClusterState> listener
937+
ActionListener<TrainedModelAssignmentMetadata.Builder> listener
979938
) {
980939
try {
981940
TrainedModelAssignment.Builder updatedAssignment = TrainedModelAssignment.Builder.fromAssignment(assignment)
@@ -995,7 +954,7 @@ private void increaseNumberOfAllocations(
995954
)
996955
);
997956
} else {
998-
listener.onResponse(update(clusterState, rebalancedMetadata));
957+
listener.onResponse(rebalancedMetadata);
999958
}
1000959
} catch (Exception e) {
1001960
listener.onFailure(e);
@@ -1007,7 +966,7 @@ private void decreaseNumberOfAllocations(
1007966
TrainedModelAssignment assignment,
1008967
int numberOfAllocations,
1009968
AdaptiveAllocationsSettings adaptiveAllocationsSettings,
1010-
ActionListener<ClusterState> listener
969+
ActionListener<TrainedModelAssignmentMetadata.Builder> listener
1011970
) {
1012971
TrainedModelAssignment.Builder updatedAssignment = numberOfAllocations < assignment.totalTargetAllocations()
1013972
? new AllocationReducer(assignment, nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(clusterState)).reduceTo(
@@ -1022,7 +981,7 @@ private void decreaseNumberOfAllocations(
1022981
}
1023982
TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.builder(clusterState);
1024983
builder.updateAssignment(assignment.getDeploymentId(), updatedAssignment);
1025-
listener.onResponse(update(clusterState, builder));
984+
listener.onResponse(builder);
1026985
}
1027986

1028987
static ClusterState setToStopping(ClusterState clusterState, String deploymentId, String reason) {

x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -375,20 +375,17 @@ public void clusterChanged(ClusterChangedEvent event) {
375375
final boolean isResetMode = MlMetadata.getMlMetadata(event.state()).isResetMode();
376376
TrainedModelAssignmentMetadata modelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(event.state());
377377
final String currentNode = event.state().nodes().getLocalNodeId();
378-
final boolean isNewAllocationSupported = event.state()
379-
.getMinTransportVersion()
380-
.onOrAfter(TrainedModelAssignmentClusterService.DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION);
381378
final Set<String> shuttingDownNodes = Collections.unmodifiableSet(event.state().metadata().nodeShutdowns().getAllNodeIds());
382379

383-
if (isResetMode == false && isNewAllocationSupported) {
380+
if (isResetMode == false) {
384381
updateNumberOfAllocations(modelAssignmentMetadata);
385382
}
386383

387384
for (TrainedModelAssignment trainedModelAssignment : modelAssignmentMetadata.allAssignments().values()) {
388385
RoutingInfo routingInfo = trainedModelAssignment.getNodeRoutingTable().get(currentNode);
389386
if (routingInfo != null) {
390387
// Add new models to start loading if the assignment is not stopping
391-
if (isNewAllocationSupported && trainedModelAssignment.getAssignmentState() != AssignmentState.STOPPING) {
388+
if (trainedModelAssignment.getAssignmentState() != AssignmentState.STOPPING) {
392389
if (shouldAssignmentBeRestarted(routingInfo, trainedModelAssignment.getDeploymentId())) {
393390
prepareAssignmentForRestart(trainedModelAssignment);
394391
}

x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -208,7 +208,6 @@ public void testClusterChanged_GivenNodesAdded_ThenLogMlNodeHeterogeneityCalled(
208208
TrainedModelAssignmentClusterService serviceSpy = spy(createClusterService(randomInt(5)));
209209
doNothing().when(serviceSpy).logMlNodeHeterogeneity();
210210
doReturn(false).when(serviceSpy).eventStateHasGlobalBlockStateNotRecoveredBlock(any());
211-
doReturn(false).when(serviceSpy).eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion(any());
212211

213212
ClusterChangedEvent mockNodesAddedEvent = mock(ClusterChangedEvent.class);
214213
ClusterState mockState = mock(ClusterState.class);

0 commit comments

Comments
 (0)