1111import org .apache .logging .log4j .Logger ;
1212import org .elasticsearch .ElasticsearchStatusException ;
1313import org .elasticsearch .ResourceNotFoundException ;
14- import org .elasticsearch .TransportVersion ;
15- import org .elasticsearch .TransportVersions ;
1614import org .elasticsearch .action .ActionListener ;
1715import org .elasticsearch .action .ActionRequestValidationException ;
1816import org .elasticsearch .action .support .master .AcknowledgedResponse ;
@@ -79,9 +77,6 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
7977
8078 private static final Logger logger = LogManager .getLogger (TrainedModelAssignmentClusterService .class );
8179
82- private static final TransportVersion RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION = TransportVersions .V_8_3_0 ;
83- public static final TransportVersion DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION = TransportVersions .V_8_4_0 ;
84-
8580 private final ClusterService clusterService ;
8681 private final ThreadPool threadPool ;
8782 private final NodeLoadDetector nodeLoadDetector ;
@@ -169,14 +164,6 @@ public void clusterChanged(ClusterChangedEvent event) {
169164 return ;
170165 }
171166
172- if (eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion (event )) {
173- // we should not try to rebalance assignments while there may be nodes running on a version
174- // prior to introducing distributed model allocation.
175- // But we should remove routing to removed or shutting down nodes.
176- removeRoutingToRemovedOrShuttingDownNodes (event );
177- return ;
178- }
179-
180167 if (event .nodesAdded ()) {
181168 logMlNodeHeterogeneity ();
182169 }
@@ -203,10 +190,6 @@ public void clusterChanged(ClusterChangedEvent event) {
203190 }
204191 }
205192
206- boolean eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion (ClusterChangedEvent event ) {
207- return event .state ().getMinTransportVersion ().before (DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION );
208- }
209-
210193 boolean eventStateHasGlobalBlockStateNotRecoveredBlock (ClusterChangedEvent event ) {
211194 return event .state ().blocks ().hasGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK );
212195 }
@@ -400,18 +383,6 @@ public void createNewModelAssignment(
400383 CreateTrainedModelAssignmentAction .Request request ,
401384 ActionListener <TrainedModelAssignment > listener
402385 ) {
403- if (clusterService .state ().getMinTransportVersion ().before (DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION )) {
404- listener .onFailure (
405- new ElasticsearchStatusException (
406- "cannot create new assignment [{}] for model [{}] while cluster upgrade is in progress" ,
407- RestStatus .CONFLICT ,
408- request .getTaskParams ().getDeploymentId (),
409- request .getTaskParams ().getModelId ()
410- )
411- );
412- return ;
413- }
414-
415386 if (MlMetadata .getMlMetadata (clusterService .state ()).isResetMode ()) {
416387 listener .onFailure (
417388 new ElasticsearchStatusException (
@@ -522,13 +493,11 @@ private static ClusterState update(ClusterState currentState, TrainedModelAssign
522493
523494 private static ClusterState forceUpdate (ClusterState currentState , TrainedModelAssignmentMetadata .Builder modelAssignments ) {
524495 logger .debug (() -> format ("updated assignments: %s" , modelAssignments .build ()));
496+
525497 ProjectMetadata .Builder builder = ProjectMetadata .builder (currentState .metadata ().getProject ());
526- if (currentState .getMinTransportVersion ().onOrAfter (RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION )) {
527- builder .putCustom (TrainedModelAssignmentMetadata .NAME , modelAssignments .build ())
528- .removeCustom (TrainedModelAssignmentMetadata .DEPRECATED_NAME );
529- } else {
530- builder .putCustom (TrainedModelAssignmentMetadata .DEPRECATED_NAME , modelAssignments .buildOld ());
531- }
498+ builder .putCustom (TrainedModelAssignmentMetadata .NAME , modelAssignments .build ())
499+ .removeCustom (TrainedModelAssignmentMetadata .DEPRECATED_NAME );
500+
532501 return ClusterState .builder (currentState ).putProjectMetadata (builder ).build ();
533502 }
534503
@@ -844,7 +813,7 @@ private void updateDeployment(
844813 }
845814 boolean hasUpdates = hasUpdates (numberOfAllocations , adaptiveAllocationsSettingsUpdates , existingAssignment );
846815 if (hasUpdates == false ) {
847- logger .info ("no updates" );
816+ logger .debug ("no updates to be made for deployment [{}]" , deploymentId );
848817 listener .onResponse (existingAssignment );
849818 return ;
850819 }
@@ -858,27 +827,17 @@ private void updateDeployment(
858827 );
859828 return ;
860829 }
861- if (clusterState .getMinTransportVersion ().before (DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION )) {
862- listener .onFailure (
863- new ElasticsearchStatusException (
864- "cannot update deployment with model id [{}] while cluster upgrade is in progress." ,
865- RestStatus .CONFLICT ,
866- deploymentId
867- )
868- );
869- return ;
870- }
871830
872- ActionListener <ClusterState > updatedStateListener = ActionListener .wrap (
873- updatedState -> submitUnbatchedTask ("update model deployment" , new ClusterStateUpdateTask () {
831+ ActionListener <TrainedModelAssignmentMetadata . Builder > updatedAssignmentListener = ActionListener .wrap (
832+ updatedAssignment -> submitUnbatchedTask ("update model deployment" , new ClusterStateUpdateTask () {
874833
875834 private volatile boolean isUpdated ;
876835
877836 @ Override
878837 public ClusterState execute (ClusterState currentState ) {
879838 if (areClusterStatesCompatibleForRebalance (clusterState , currentState )) {
880839 isUpdated = true ;
881- return updatedState ;
840+ return update ( currentState , updatedAssignment ) ;
882841 }
883842 logger .debug (() -> format ("[%s] Retrying update as cluster state has been modified" , deploymentId ));
884843 updateDeployment (currentState , deploymentId , numberOfAllocations , adaptiveAllocationsSettings , isInternal , listener );
@@ -910,7 +869,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
910869 listener ::onFailure
911870 );
912871
913- updateAssignment (clusterState , existingAssignment , numberOfAllocations , adaptiveAllocationsSettings , updatedStateListener );
872+ updateAssignment (clusterState , existingAssignment , numberOfAllocations , adaptiveAllocationsSettings , updatedAssignmentListener );
914873 }
915874
916875 static boolean hasUpdates (
@@ -944,7 +903,7 @@ private void updateAssignment(
944903 TrainedModelAssignment assignment ,
945904 Integer numberOfAllocations ,
946905 AdaptiveAllocationsSettings adaptiveAllocationsSettings ,
947- ActionListener <ClusterState > listener
906+ ActionListener <TrainedModelAssignmentMetadata . Builder > listener
948907 ) {
949908 threadPool .executor (MachineLearning .UTILITY_THREAD_POOL_NAME ).execute (() -> {
950909 if (numberOfAllocations == null || numberOfAllocations == assignment .getTaskParams ().getNumberOfAllocations ()) {
@@ -961,21 +920,21 @@ private void updateAndKeepNumberOfAllocations(
961920 ClusterState clusterState ,
962921 TrainedModelAssignment assignment ,
963922 AdaptiveAllocationsSettings adaptiveAllocationsSettings ,
964- ActionListener <ClusterState > listener
923+ ActionListener <TrainedModelAssignmentMetadata . Builder > listener
965924 ) {
966925 TrainedModelAssignment .Builder updatedAssignment = TrainedModelAssignment .Builder .fromAssignment (assignment )
967926 .setAdaptiveAllocationsSettings (adaptiveAllocationsSettings );
968927 TrainedModelAssignmentMetadata .Builder builder = TrainedModelAssignmentMetadata .builder (clusterState );
969928 builder .updateAssignment (assignment .getDeploymentId (), updatedAssignment );
970- listener .onResponse (update ( clusterState , builder ) );
929+ listener .onResponse (builder );
971930 }
972931
973932 private void increaseNumberOfAllocations (
974933 ClusterState clusterState ,
975934 TrainedModelAssignment assignment ,
976935 int numberOfAllocations ,
977936 AdaptiveAllocationsSettings adaptiveAllocationsSettings ,
978- ActionListener <ClusterState > listener
937+ ActionListener <TrainedModelAssignmentMetadata . Builder > listener
979938 ) {
980939 try {
981940 TrainedModelAssignment .Builder updatedAssignment = TrainedModelAssignment .Builder .fromAssignment (assignment )
@@ -995,7 +954,7 @@ private void increaseNumberOfAllocations(
995954 )
996955 );
997956 } else {
998- listener .onResponse (update ( clusterState , rebalancedMetadata ) );
957+ listener .onResponse (rebalancedMetadata );
999958 }
1000959 } catch (Exception e ) {
1001960 listener .onFailure (e );
@@ -1007,7 +966,7 @@ private void decreaseNumberOfAllocations(
1007966 TrainedModelAssignment assignment ,
1008967 int numberOfAllocations ,
1009968 AdaptiveAllocationsSettings adaptiveAllocationsSettings ,
1010- ActionListener <ClusterState > listener
969+ ActionListener <TrainedModelAssignmentMetadata . Builder > listener
1011970 ) {
1012971 TrainedModelAssignment .Builder updatedAssignment = numberOfAllocations < assignment .totalTargetAllocations ()
1013972 ? new AllocationReducer (assignment , nodeAvailabilityZoneMapper .buildMlNodesByAvailabilityZone (clusterState )).reduceTo (
@@ -1022,7 +981,7 @@ private void decreaseNumberOfAllocations(
1022981 }
1023982 TrainedModelAssignmentMetadata .Builder builder = TrainedModelAssignmentMetadata .builder (clusterState );
1024983 builder .updateAssignment (assignment .getDeploymentId (), updatedAssignment );
1025- listener .onResponse (update ( clusterState , builder ) );
984+ listener .onResponse (builder );
1026985 }
1027986
1028987 static ClusterState setToStopping (ClusterState clusterState , String deploymentId , String reason ) {
0 commit comments