1111import  org .apache .logging .log4j .Logger ;
1212import  org .elasticsearch .ElasticsearchStatusException ;
1313import  org .elasticsearch .ResourceNotFoundException ;
14- import  org .elasticsearch .TransportVersion ;
15- import  org .elasticsearch .TransportVersions ;
1614import  org .elasticsearch .action .ActionListener ;
1715import  org .elasticsearch .action .ActionRequestValidationException ;
1816import  org .elasticsearch .action .support .master .AcknowledgedResponse ;
@@ -79,9 +77,6 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
7977
8078    private  static  final  Logger  logger  = LogManager .getLogger (TrainedModelAssignmentClusterService .class );
8179
82-     private  static  final  TransportVersion  RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION  = TransportVersions .V_8_3_0 ;
83-     public  static  final  TransportVersion  DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION  = TransportVersions .V_8_4_0 ;
84- 
8580    private  final  ClusterService  clusterService ;
8681    private  final  ThreadPool  threadPool ;
8782    private  final  NodeLoadDetector  nodeLoadDetector ;
@@ -169,14 +164,6 @@ public void clusterChanged(ClusterChangedEvent event) {
169164            return ;
170165        }
171166
172-         if  (eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion (event )) {
173-             // we should not try to rebalance assignments while there may be nodes running on a version 
174-             // prior to introducing distributed model allocation. 
175-             // But we should remove routing to removed or shutting down nodes. 
176-             removeRoutingToRemovedOrShuttingDownNodes (event );
177-             return ;
178-         }
179- 
180167        if  (event .nodesAdded ()) {
181168            logMlNodeHeterogeneity ();
182169        }
@@ -203,10 +190,6 @@ public void clusterChanged(ClusterChangedEvent event) {
203190        }
204191    }
205192
206-     boolean  eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion (ClusterChangedEvent  event ) {
207-         return  event .state ().getMinTransportVersion ().before (DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION );
208-     }
209- 
210193    boolean  eventStateHasGlobalBlockStateNotRecoveredBlock (ClusterChangedEvent  event ) {
211194        return  event .state ().blocks ().hasGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK );
212195    }
@@ -400,18 +383,6 @@ public void createNewModelAssignment(
400383        CreateTrainedModelAssignmentAction .Request  request ,
401384        ActionListener <TrainedModelAssignment > listener 
402385    ) {
403-         if  (clusterService .state ().getMinTransportVersion ().before (DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION )) {
404-             listener .onFailure (
405-                 new  ElasticsearchStatusException (
406-                     "cannot create new assignment [{}] for model [{}] while cluster upgrade is in progress" ,
407-                     RestStatus .CONFLICT ,
408-                     request .getTaskParams ().getDeploymentId (),
409-                     request .getTaskParams ().getModelId ()
410-                 )
411-             );
412-             return ;
413-         }
414- 
415386        if  (MlMetadata .getMlMetadata (clusterService .state ()).isResetMode ()) {
416387            listener .onFailure (
417388                new  ElasticsearchStatusException (
@@ -522,13 +493,11 @@ private static ClusterState update(ClusterState currentState, TrainedModelAssign
522493
523494    private  static  ClusterState  forceUpdate (ClusterState  currentState , TrainedModelAssignmentMetadata .Builder  modelAssignments ) {
524495        logger .debug (() -> format ("updated assignments: %s" , modelAssignments .build ()));
496+ 
525497        ProjectMetadata .Builder  builder  = ProjectMetadata .builder (currentState .metadata ().getProject ());
526-         if  (currentState .getMinTransportVersion ().onOrAfter (RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION )) {
527-             builder .putCustom (TrainedModelAssignmentMetadata .NAME , modelAssignments .build ())
528-                 .removeCustom (TrainedModelAssignmentMetadata .DEPRECATED_NAME );
529-         } else  {
530-             builder .putCustom (TrainedModelAssignmentMetadata .DEPRECATED_NAME , modelAssignments .buildOld ());
531-         }
498+         builder .putCustom (TrainedModelAssignmentMetadata .NAME , modelAssignments .build ())
499+             .removeCustom (TrainedModelAssignmentMetadata .DEPRECATED_NAME );
500+ 
532501        return  ClusterState .builder (currentState ).putProjectMetadata (builder ).build ();
533502    }
534503
@@ -844,7 +813,7 @@ private void updateDeployment(
844813        }
845814        boolean  hasUpdates  = hasUpdates (numberOfAllocations , adaptiveAllocationsSettingsUpdates , existingAssignment );
846815        if  (hasUpdates  == false ) {
847-             logger .info ("no updates"  );
816+             logger .debug ("no updates to be made for deployment [{}]"  ,  deploymentId );
848817            listener .onResponse (existingAssignment );
849818            return ;
850819        }
@@ -858,27 +827,17 @@ private void updateDeployment(
858827            );
859828            return ;
860829        }
861-         if  (clusterState .getMinTransportVersion ().before (DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION )) {
862-             listener .onFailure (
863-                 new  ElasticsearchStatusException (
864-                     "cannot update deployment with model id [{}] while cluster upgrade is in progress." ,
865-                     RestStatus .CONFLICT ,
866-                     deploymentId 
867-                 )
868-             );
869-             return ;
870-         }
871830
872-         ActionListener <ClusterState >  updatedStateListener  = ActionListener .wrap (
873-             updatedState  -> submitUnbatchedTask ("update model deployment" , new  ClusterStateUpdateTask () {
831+         ActionListener <TrainedModelAssignmentMetadata . Builder >  updatedAssignmentListener  = ActionListener .wrap (
832+             updatedAssignment  -> submitUnbatchedTask ("update model deployment" , new  ClusterStateUpdateTask () {
874833
875834                private  volatile  boolean  isUpdated ;
876835
877836                @ Override 
878837                public  ClusterState  execute (ClusterState  currentState ) {
879838                    if  (areClusterStatesCompatibleForRebalance (clusterState , currentState )) {
880839                        isUpdated  = true ;
881-                         return  updatedState ;
840+                         return  update ( currentState ,  updatedAssignment ) ;
882841                    }
883842                    logger .debug (() -> format ("[%s] Retrying update as cluster state has been modified" , deploymentId ));
884843                    updateDeployment (currentState , deploymentId , numberOfAllocations , adaptiveAllocationsSettings , isInternal , listener );
@@ -910,7 +869,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
910869            listener ::onFailure 
911870        );
912871
913-         updateAssignment (clusterState , existingAssignment , numberOfAllocations , adaptiveAllocationsSettings , updatedStateListener );
872+         updateAssignment (clusterState , existingAssignment , numberOfAllocations , adaptiveAllocationsSettings , updatedAssignmentListener );
914873    }
915874
916875    static  boolean  hasUpdates (
@@ -944,7 +903,7 @@ private void updateAssignment(
944903        TrainedModelAssignment  assignment ,
945904        Integer  numberOfAllocations ,
946905        AdaptiveAllocationsSettings  adaptiveAllocationsSettings ,
947-         ActionListener <ClusterState > listener 
906+         ActionListener <TrainedModelAssignmentMetadata . Builder > listener 
948907    ) {
949908        threadPool .executor (MachineLearning .UTILITY_THREAD_POOL_NAME ).execute (() -> {
950909            if  (numberOfAllocations  == null  || numberOfAllocations  == assignment .getTaskParams ().getNumberOfAllocations ()) {
@@ -961,21 +920,21 @@ private void updateAndKeepNumberOfAllocations(
961920        ClusterState  clusterState ,
962921        TrainedModelAssignment  assignment ,
963922        AdaptiveAllocationsSettings  adaptiveAllocationsSettings ,
964-         ActionListener <ClusterState > listener 
923+         ActionListener <TrainedModelAssignmentMetadata . Builder > listener 
965924    ) {
966925        TrainedModelAssignment .Builder  updatedAssignment  = TrainedModelAssignment .Builder .fromAssignment (assignment )
967926            .setAdaptiveAllocationsSettings (adaptiveAllocationsSettings );
968927        TrainedModelAssignmentMetadata .Builder  builder  = TrainedModelAssignmentMetadata .builder (clusterState );
969928        builder .updateAssignment (assignment .getDeploymentId (), updatedAssignment );
970-         listener .onResponse (update ( clusterState ,  builder ) );
929+         listener .onResponse (builder );
971930    }
972931
973932    private  void  increaseNumberOfAllocations (
974933        ClusterState  clusterState ,
975934        TrainedModelAssignment  assignment ,
976935        int  numberOfAllocations ,
977936        AdaptiveAllocationsSettings  adaptiveAllocationsSettings ,
978-         ActionListener <ClusterState > listener 
937+         ActionListener <TrainedModelAssignmentMetadata . Builder > listener 
979938    ) {
980939        try  {
981940            TrainedModelAssignment .Builder  updatedAssignment  = TrainedModelAssignment .Builder .fromAssignment (assignment )
@@ -995,7 +954,7 @@ private void increaseNumberOfAllocations(
995954                    )
996955                );
997956            } else  {
998-                 listener .onResponse (update ( clusterState ,  rebalancedMetadata ) );
957+                 listener .onResponse (rebalancedMetadata );
999958            }
1000959        } catch  (Exception  e ) {
1001960            listener .onFailure (e );
@@ -1007,7 +966,7 @@ private void decreaseNumberOfAllocations(
1007966        TrainedModelAssignment  assignment ,
1008967        int  numberOfAllocations ,
1009968        AdaptiveAllocationsSettings  adaptiveAllocationsSettings ,
1010-         ActionListener <ClusterState > listener 
969+         ActionListener <TrainedModelAssignmentMetadata . Builder > listener 
1011970    ) {
1012971        TrainedModelAssignment .Builder  updatedAssignment  = numberOfAllocations  < assignment .totalTargetAllocations ()
1013972            ? new  AllocationReducer (assignment , nodeAvailabilityZoneMapper .buildMlNodesByAvailabilityZone (clusterState )).reduceTo (
@@ -1022,7 +981,7 @@ private void decreaseNumberOfAllocations(
1022981        }
1023982        TrainedModelAssignmentMetadata .Builder  builder  = TrainedModelAssignmentMetadata .builder (clusterState );
1024983        builder .updateAssignment (assignment .getDeploymentId (), updatedAssignment );
1025-         listener .onResponse (update ( clusterState ,  builder ) );
984+         listener .onResponse (builder );
1026985    }
1027986
1028987    static  ClusterState  setToStopping (ClusterState  clusterState , String  deploymentId , String  reason ) {
0 commit comments