1111import org .apache .logging .log4j .Logger ;
1212import org .elasticsearch .ElasticsearchStatusException ;
1313import org .elasticsearch .ResourceNotFoundException ;
14- import org .elasticsearch .TransportVersion ;
15- import org .elasticsearch .TransportVersions ;
1614import org .elasticsearch .action .ActionListener ;
1715import org .elasticsearch .action .ActionRequestValidationException ;
1816import org .elasticsearch .action .support .master .AcknowledgedResponse ;
@@ -80,9 +78,6 @@ public class TrainedModelAssignmentClusterService implements ClusterStateListene
8078
8179 private static final Logger logger = LogManager .getLogger (TrainedModelAssignmentClusterService .class );
8280
83- private static final TransportVersion RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION = TransportVersions .V_8_3_0 ;
84- public static final TransportVersion DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION = TransportVersions .V_8_4_0 ;
85-
8681 private final ClusterService clusterService ;
8782 private final ThreadPool threadPool ;
8883 private final NodeLoadDetector nodeLoadDetector ;
@@ -170,14 +165,6 @@ public void clusterChanged(ClusterChangedEvent event) {
170165 return ;
171166 }
172167
173- if (eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion (event )) {
174- // we should not try to rebalance assignments while there may be nodes running on a version
175- // prior to introducing distributed model allocation.
176- // But we should remove routing to removed or shutting down nodes.
177- removeRoutingToRemovedOrShuttingDownNodes (event );
178- return ;
179- }
180-
181168 if (event .nodesAdded ()) {
182169 logMlNodeHeterogeneity ();
183170 }
@@ -204,10 +191,6 @@ public void clusterChanged(ClusterChangedEvent event) {
204191 }
205192 }
206193
207- boolean eventStateMinTransportVersionIsBeforeDistributedModelAllocationTransportVersion (ClusterChangedEvent event ) {
208- return event .state ().getMinTransportVersion ().before (DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION );
209- }
210-
211194 boolean eventStateHasGlobalBlockStateNotRecoveredBlock (ClusterChangedEvent event ) {
212195 return event .state ().blocks ().hasGlobalBlock (GatewayService .STATE_NOT_RECOVERED_BLOCK );
213196 }
@@ -401,18 +384,6 @@ public void createNewModelAssignment(
401384 CreateTrainedModelAssignmentAction .Request request ,
402385 ActionListener <TrainedModelAssignment > listener
403386 ) {
404- if (clusterService .state ().getMinTransportVersion ().before (DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION )) {
405- listener .onFailure (
406- new ElasticsearchStatusException (
407- "cannot create new assignment [{}] for model [{}] while cluster upgrade is in progress" ,
408- RestStatus .CONFLICT ,
409- request .getTaskParams ().getDeploymentId (),
410- request .getTaskParams ().getModelId ()
411- )
412- );
413- return ;
414- }
415-
416387 if (MlMetadata .getMlMetadata (clusterService .state ()).isResetMode ()) {
417388 listener .onFailure (
418389 new ElasticsearchStatusException (
@@ -524,12 +495,8 @@ private static ClusterState update(ClusterState currentState, TrainedModelAssign
524495 private static ClusterState forceUpdate (ClusterState currentState , TrainedModelAssignmentMetadata .Builder modelAssignments ) {
525496 logger .debug (() -> format ("updated assignments: %s" , modelAssignments .build ()));
526497 Metadata .Builder metadata = Metadata .builder (currentState .metadata ());
527- if (currentState .getMinTransportVersion ().onOrAfter (RENAME_ALLOCATION_TO_ASSIGNMENT_TRANSPORT_VERSION )) {
528- metadata .putCustom (TrainedModelAssignmentMetadata .NAME , modelAssignments .build ())
529- .removeProjectCustom (TrainedModelAssignmentMetadata .DEPRECATED_NAME );
530- } else {
531- metadata .putCustom (TrainedModelAssignmentMetadata .DEPRECATED_NAME , modelAssignments .buildOld ());
532- }
498+ metadata .putCustom (TrainedModelAssignmentMetadata .NAME , modelAssignments .build ())
499+ .removeProjectCustom (TrainedModelAssignmentMetadata .DEPRECATED_NAME );
533500 return ClusterState .builder (currentState ).metadata (metadata ).build ();
534501 }
535502
@@ -847,7 +814,7 @@ private void updateDeployment(
847814 }
848815 boolean hasUpdates = hasUpdates (numberOfAllocations , adaptiveAllocationsSettingsUpdates , existingAssignment );
849816 if (hasUpdates == false ) {
850- logger .info ("no updates" );
817+ logger .debug ("no updates to be made for deployment [{}]" , deploymentId );
851818 listener .onResponse (existingAssignment );
852819 return ;
853820 }
@@ -861,27 +828,17 @@ private void updateDeployment(
861828 );
862829 return ;
863830 }
864- if (clusterState .getMinTransportVersion ().before (DISTRIBUTED_MODEL_ALLOCATION_TRANSPORT_VERSION )) {
865- listener .onFailure (
866- new ElasticsearchStatusException (
867- "cannot update deployment with model id [{}] while cluster upgrade is in progress." ,
868- RestStatus .CONFLICT ,
869- deploymentId
870- )
871- );
872- return ;
873- }
874831
875- ActionListener <ClusterState > updatedStateListener = ActionListener .wrap (
876- updatedState -> submitUnbatchedTask ("update model deployment" , new ClusterStateUpdateTask () {
832+ ActionListener <TrainedModelAssignmentMetadata . Builder > updatedAssignmentListener = ActionListener .wrap (
833+ updatedAssignment -> submitUnbatchedTask ("update model deployment" , new ClusterStateUpdateTask () {
877834
878835 private volatile boolean isUpdated ;
879836
880837 @ Override
881838 public ClusterState execute (ClusterState currentState ) {
882839 if (areClusterStatesCompatibleForRebalance (clusterState , currentState )) {
883840 isUpdated = true ;
884- return updatedState ;
841+ return update ( currentState , updatedAssignment ) ;
885842 }
886843 logger .debug (() -> format ("[%s] Retrying update as cluster state has been modified" , deploymentId ));
887844 updateDeployment (currentState , deploymentId , numberOfAllocations , adaptiveAllocationsSettings , isInternal , listener );
@@ -913,7 +870,7 @@ public void clusterStateProcessed(ClusterState oldState, ClusterState newState)
913870 listener ::onFailure
914871 );
915872
916- updateAssignment (clusterState , existingAssignment , numberOfAllocations , adaptiveAllocationsSettings , updatedStateListener );
873+ updateAssignment (clusterState , existingAssignment , numberOfAllocations , adaptiveAllocationsSettings , updatedAssignmentListener );
917874 }
918875
919876 static boolean hasUpdates (
@@ -947,7 +904,7 @@ private void updateAssignment(
947904 TrainedModelAssignment assignment ,
948905 Integer numberOfAllocations ,
949906 AdaptiveAllocationsSettings adaptiveAllocationsSettings ,
950- ActionListener <ClusterState > listener
907+ ActionListener <TrainedModelAssignmentMetadata . Builder > listener
951908 ) {
952909 threadPool .executor (MachineLearning .UTILITY_THREAD_POOL_NAME ).execute (() -> {
953910 if (numberOfAllocations == null || numberOfAllocations == assignment .getTaskParams ().getNumberOfAllocations ()) {
@@ -964,21 +921,21 @@ private void updateAndKeepNumberOfAllocations(
964921 ClusterState clusterState ,
965922 TrainedModelAssignment assignment ,
966923 AdaptiveAllocationsSettings adaptiveAllocationsSettings ,
967- ActionListener <ClusterState > listener
924+ ActionListener <TrainedModelAssignmentMetadata . Builder > listener
968925 ) {
969926 TrainedModelAssignment .Builder updatedAssignment = TrainedModelAssignment .Builder .fromAssignment (assignment )
970927 .setAdaptiveAllocationsSettings (adaptiveAllocationsSettings );
971928 TrainedModelAssignmentMetadata .Builder builder = TrainedModelAssignmentMetadata .builder (clusterState );
972929 builder .updateAssignment (assignment .getDeploymentId (), updatedAssignment );
973- listener .onResponse (update ( clusterState , builder ) );
930+ listener .onResponse (builder );
974931 }
975932
976933 private void increaseNumberOfAllocations (
977934 ClusterState clusterState ,
978935 TrainedModelAssignment assignment ,
979936 int numberOfAllocations ,
980937 AdaptiveAllocationsSettings adaptiveAllocationsSettings ,
981- ActionListener <ClusterState > listener
938+ ActionListener <TrainedModelAssignmentMetadata . Builder > listener
982939 ) {
983940 try {
984941 TrainedModelAssignment .Builder updatedAssignment = TrainedModelAssignment .Builder .fromAssignment (assignment )
@@ -998,7 +955,7 @@ private void increaseNumberOfAllocations(
998955 )
999956 );
1000957 } else {
1001- listener .onResponse (update ( clusterState , rebalancedMetadata ) );
958+ listener .onResponse (rebalancedMetadata );
1002959 }
1003960 } catch (Exception e ) {
1004961 listener .onFailure (e );
@@ -1010,7 +967,7 @@ private void decreaseNumberOfAllocations(
1010967 TrainedModelAssignment assignment ,
1011968 int numberOfAllocations ,
1012969 AdaptiveAllocationsSettings adaptiveAllocationsSettings ,
1013- ActionListener <ClusterState > listener
970+ ActionListener <TrainedModelAssignmentMetadata . Builder > listener
1014971 ) {
1015972 TrainedModelAssignment .Builder updatedAssignment = numberOfAllocations < assignment .totalTargetAllocations ()
1016973 ? new AllocationReducer (assignment , nodeAvailabilityZoneMapper .buildMlNodesByAvailabilityZone (clusterState )).reduceTo (
@@ -1025,7 +982,7 @@ private void decreaseNumberOfAllocations(
1025982 }
1026983 TrainedModelAssignmentMetadata .Builder builder = TrainedModelAssignmentMetadata .builder (clusterState );
1027984 builder .updateAssignment (assignment .getDeploymentId (), updatedAssignment );
1028- listener .onResponse (update ( clusterState , builder ) );
985+ listener .onResponse (builder );
1029986 }
1030987
1031988 static ClusterState setToStopping (ClusterState clusterState , String deploymentId , String reason ) {
0 commit comments