diff --git a/muted-tests.yml b/muted-tests.yml index b0268436ddb6f..2fc7158a90f77 100644 --- a/muted-tests.yml +++ b/muted-tests.yml @@ -143,8 +143,6 @@ tests: - class: org.elasticsearch.backwards.MixedClusterClientYamlTestSuiteIT method: test {p0=nodes.stats/11_indices_metrics/indices mappings exact count test for indices level} issue: https://github.com/elastic/elasticsearch/issues/120950 -- class: org.elasticsearch.xpack.ml.integration.PyTorchModelIT - issue: https://github.com/elastic/elasticsearch/issues/121165 - class: org.elasticsearch.test.rest.yaml.CcsCommonYamlTestSuiteIT issue: https://github.com/elastic/elasticsearch/issues/121407 - class: org.elasticsearch.analysis.common.CommonAnalysisClientYamlTestSuiteIT diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java index b359daebd5368..9d516fb3e1a74 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterService.java @@ -11,8 +11,6 @@ import org.apache.logging.log4j.Logger; import org.elasticsearch.ElasticsearchStatusException; import org.elasticsearch.ResourceNotFoundException; -import org.elasticsearch.TransportVersion; -import org.elasticsearch.TransportVersions; import org.elasticsearch.action.ActionListener; import org.elasticsearch.action.ActionRequestValidationException; import org.elasticsearch.action.support.master.AcknowledgedResponse; @@ -79,9 +77,6 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene private static final Logger logger = LogManager.getLogger(TrainedModelAssignmentClusterService.class); - private static final TransportVersion RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION = TransportVersions.V_8_3_0; - public static final TransportVersion DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION = TransportVersions.V_8_4_0; - private final ClusterService clusterService; private final ThreadPool threadPool; private final NodeLoadDetector nodeLoadDetector; @@ -169,14 +164,6 @@ public void clusterChanged(ClusterChangedEvent event) { return; } - if (eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion(event)) { - // we should not try to rebalance assignments while there may be nodes running on a version - // prior to introducing distributed model allocation. - // But we should remove routing to removed or shutting down nodes. - removeRoutingToRemovedOrShuttingDownNodes(event); - return; - } - if (event.nodesAdded()) { logMlNodeHeterogeneity(); } @@ -203,10 +190,6 @@ public void clusterChanged(ClusterChangedEvent event) { } } - boolean eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion(ClusterChangedEvent event) { - return event.state().getMinTransportVersion().before(DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION); - } - boolean eventStateHasGlobalBlockStateNotRecoveredBlock(ClusterChangedEvent event) { return event.state().blocks().hasGlobalBlock(GatewayService.STATE_NOT_RECOVERED_BLOCK); } @@ -400,18 +383,6 @@ public void createNewModelAssignment( CreateTrainedModelAssignmentAction.Request request, ActionListener listener ) { - if (clusterService.state().getMinTransportVersion().before(DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION)) { - listener.onFailure( - new ElasticsearchStatusException( - "cannot create new assignment [{}] for model [{}] while cluster upgrade is in progress", - RestStatus.CONFLICT, - request.getTaskParams().getDeploymentId(), - request.getTaskParams().getModelId() - ) - ); - return; - } - if (MlMetadata.getMlMetadata(clusterService.state()).isResetMode()) { listener.onFailure( new ElasticsearchStatusException( @@ -522,13 +493,11 @@ private static ClusterState update(ClusterState currentState, TrainedModelAssign private static ClusterState forceUpdate(ClusterState currentState, TrainedModelAssignmentMetadata.Builder modelAssignments) { logger.debug(() -> format("updated assignments: %s", modelAssignments.build())); + ProjectMetadata.Builder builder = ProjectMetadata.builder(currentState.metadata().getProject()); - if (currentState.getMinTransportVersion().onOrAfter(RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION)) { - builder.putCustom(TrainedModelAssignmentMetadata.NAME, modelAssignments.build()) - .removeCustom(TrainedModelAssignmentMetadata.DEPRECATED_NAME); - } else { - builder.putCustom(TrainedModelAssignmentMetadata.DEPRECATED_NAME, modelAssignments.buildOld()); - } + builder.putCustom(TrainedModelAssignmentMetadata.NAME, modelAssignments.build()) + .removeCustom(TrainedModelAssignmentMetadata.DEPRECATED_NAME); + return ClusterState.builder(currentState).putProjectMetadata(builder).build(); } @@ -844,7 +813,7 @@ private void updateDeployment( } boolean hasUpdates = hasUpdates(numberOfAllocations, adaptiveAllocationsSettingsUpdates, existingAssignment); if (hasUpdates == false) { - logger.info("no updates"); + logger.debug("no updates to be made for deployment [{}]", deploymentId); listener.onResponse(existingAssignment); return; } @@ -858,19 +827,9 @@ private void updateDeployment( ); return; } - if (clusterState.getMinTransportVersion().before(DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION)) { - listener.onFailure( - new ElasticsearchStatusException( - "cannot update deployment with model id [{}] while cluster upgrade is in progress.", - RestStatus.CONFLICT, - deploymentId - ) - ); - return; - } - ActionListener updatedStateListener = ActionListener.wrap( - updatedState -> submitUnbatchedTask("update model deployment", new ClusterStateUpdateTask() { + ActionListener updatedAssignmentListener = ActionListener.wrap( + updatedAssignment -> submitUnbatchedTask("update model deployment", new ClusterStateUpdateTask() { private volatile boolean isUpdated; @@ -878,7 +837,7 @@ private void updateDeployment( public ClusterState execute(ClusterState currentState) { if (areClusterStatesCompatibleForRebalance(clusterState, currentState)) { isUpdated = true; - return updatedState; + return update(currentState, updatedAssignment); } logger.debug(() -> format("[%s] Retrying update as cluster state has been modified", deploymentId)); updateDeployment(currentState, deploymentId, numberOfAllocations, adaptiveAllocationsSettings, isInternal, listener); @@ -910,7 +869,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState) listener::onFailure ); - updateAssignment(clusterState, existingAssignment, numberOfAllocations, adaptiveAllocationsSettings, updatedStateListener); + updateAssignment(clusterState, existingAssignment, numberOfAllocations, adaptiveAllocationsSettings, updatedAssignmentListener); } static boolean hasUpdates( @@ -944,7 +903,7 @@ private void updateAssignment( TrainedModelAssignment assignment, Integer numberOfAllocations, AdaptiveAllocationsSettings adaptiveAllocationsSettings, - ActionListener listener + ActionListener listener ) { threadPool.executor(MachineLearning.UTILITY_THREAD_POOL_NAME).execute(() -> { if (numberOfAllocations == null || numberOfAllocations == assignment.getTaskParams().getNumberOfAllocations()) { @@ -961,13 +920,13 @@ private void updateAndKeepNumberOfAllocations( ClusterState clusterState, TrainedModelAssignment assignment, AdaptiveAllocationsSettings adaptiveAllocationsSettings, - ActionListener listener + ActionListener listener ) { TrainedModelAssignment.Builder updatedAssignment = TrainedModelAssignment.Builder.fromAssignment(assignment) .setAdaptiveAllocationsSettings(adaptiveAllocationsSettings); TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.builder(clusterState); builder.updateAssignment(assignment.getDeploymentId(), updatedAssignment); - listener.onResponse(update(clusterState, builder)); + listener.onResponse(builder); } private void increaseNumberOfAllocations( @@ -975,7 +934,7 @@ private void increaseNumberOfAllocations( TrainedModelAssignment assignment, int numberOfAllocations, AdaptiveAllocationsSettings adaptiveAllocationsSettings, - ActionListener listener + ActionListener listener ) { try { TrainedModelAssignment.Builder updatedAssignment = TrainedModelAssignment.Builder.fromAssignment(assignment) @@ -995,7 +954,7 @@ private void increaseNumberOfAllocations( ) ); } else { - listener.onResponse(update(clusterState, rebalancedMetadata)); + listener.onResponse(rebalancedMetadata); } } catch (Exception e) { listener.onFailure(e); @@ -1007,7 +966,7 @@ private void decreaseNumberOfAllocations( TrainedModelAssignment assignment, int numberOfAllocations, AdaptiveAllocationsSettings adaptiveAllocationsSettings, - ActionListener listener + ActionListener listener ) { TrainedModelAssignment.Builder updatedAssignment = numberOfAllocations < assignment.totalTargetAllocations() ? new AllocationReducer(assignment, nodeAvailabilityZoneMapper.buildMlNodesByAvailabilityZone(clusterState)).reduceTo( @@ -1022,7 +981,7 @@ private void decreaseNumberOfAllocations( } TrainedModelAssignmentMetadata.Builder builder = TrainedModelAssignmentMetadata.builder(clusterState); builder.updateAssignment(assignment.getDeploymentId(), updatedAssignment); - listener.onResponse(update(clusterState, builder)); + listener.onResponse(builder); } static ClusterState setToStopping(ClusterState clusterState, String deploymentId, String reason) { diff --git a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java index c86b3e710a736..83db62e2da4de 100644 --- a/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java +++ b/x-pack/plugin/ml/src/main/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentNodeService.java @@ -375,12 +375,9 @@ public void clusterChanged(ClusterChangedEvent event) { final boolean isResetMode = MlMetadata.getMlMetadata(event.state()).isResetMode(); TrainedModelAssignmentMetadata modelAssignmentMetadata = TrainedModelAssignmentMetadata.fromState(event.state()); final String currentNode = event.state().nodes().getLocalNodeId(); - final boolean isNewAllocationSupported = event.state() - .getMinTransportVersion() - .onOrAfter(TrainedModelAssignmentClusterService.DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION); final Set shuttingDownNodes = Collections.unmodifiableSet(event.state().metadata().nodeShutdowns().getAllNodeIds()); - if (isResetMode == false && isNewAllocationSupported) { + if (isResetMode == false) { updateNumberOfAllocations(modelAssignmentMetadata); } @@ -388,7 +385,7 @@ public void clusterChanged(ClusterChangedEvent event) { RoutingInfo routingInfo = trainedModelAssignment.getNodeRoutingTable().get(currentNode); if (routingInfo != null) { // Add new models to start loading if the assignment is not stopping - if (isNewAllocationSupported && trainedModelAssignment.getAssignmentState() != AssignmentState.STOPPING) { + if (trainedModelAssignment.getAssignmentState() != AssignmentState.STOPPING) { if (shouldAssignmentBeRestarted(routingInfo, trainedModelAssignment.getDeploymentId())) { prepareAssignmentForRestart(trainedModelAssignment); } diff --git a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java index 31260403e5d92..059a8c573e039 100644 --- a/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java +++ b/x-pack/plugin/ml/src/test/java/org/elasticsearch/xpack/ml/inference/assignment/TrainedModelAssignmentClusterServiceTests.java @@ -208,7 +208,6 @@ public void testClusterChanged_GivenNodesAdded_ThenLogMlNodeHeterogeneityCalled( TrainedModelAssignmentClusterService serviceSpy = spy(createClusterService(randomInt(5))); doNothing().when(serviceSpy).logMlNodeHeterogeneity(); doReturn(false).when(serviceSpy).eventStateHasGlobalBlockStateNotRecoveredBlock(any()); - doReturn(false).when(serviceSpy).eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion(any()); ClusterChangedEvent mockNodesAddedEvent = mock(ClusterChangedEvent.class); ClusterState mockState = mock(ClusterState.class);