@@ -138,7 +138,7 @@ static void copyAssignments(AssignmentPlan source, AssignmentPlan.Builder dest,
138138 Map <AssignmentPlan .Node , Integer > sourceNodeAssignments = source .assignments (deployment ).orElse (Map .of ());
139139 for (Map .Entry <AssignmentPlan .Node , Integer > sourceAssignment : sourceNodeAssignments .entrySet ()) {
140140 AssignmentPlan .Node node = originalNodeById .get (sourceAssignment .getKey ().id ());
141- if (dest .canAssign (deployment , node , sourceAssignment .getValue ())) {
141+ if (dest .canAssign (deployment , node , sourceAssignment .getValue ())) {
142142 dest .assignModelToNode (deployment , node , sourceAssignment .getValue ());
143143 }
144144 }
@@ -320,8 +320,10 @@ private Map<List<String>, List<AssignmentPlan.Node>> createNodesByZoneMap() {
320320 }
321321
322322 private static long getNodeFreeMemoryExcludingPerNodeOverheadAndNativeInference (NodeLoad load ) {
323- // load.getFreeMemoryExcludingPerNodeOverhead() = maxMemory - assignedJobMemoryExcludingPerNodeOverhead - 30MB native executable code overhead
324- // assignedJobMemoryExcludingPerNodeOverhead = assignedAnomalyDetectorMemory + assignedDataFrameAnalyticsMemory + assignedNativeInferenceMemory
323+ // load.getFreeMemoryExcludingPerNodeOverhead() = maxMemory - assignedJobMemoryExcludingPerNodeOverhead - 30MB native executable
324+ // code overhead
325+ // assignedJobMemoryExcludingPerNodeOverhead = assignedAnomalyDetectorMemory + assignedDataFrameAnalyticsMemory +
326+ // assignedNativeInferenceMemory
325327 // load.getAssignedNativeInferenceMemory() = assignedNativeInferenceMemory
326328 // TODO: (valeriy) assignedNativeInferenceMemory is double counted in the current calculation.
327329 return load .getFreeMemoryExcludingPerNodeOverhead ()/* - load.getAssignedNativeInferenceMemory()*/ ;
@@ -412,17 +414,20 @@ private Optional<String> explainAssignment(
412414 if (Strings .isNullOrEmpty (load .getError ()) == false ) {
413415 return Optional .of (load .getError ());
414416 }
415- // TODO (valeriy): this test should be actually true, but it is false, because we use the "naked" deployment footprint
416- // Get existing allocations for this node to avoid double counting
417417 int existingAllocationsOnNode = assignmentPlan .assignments (deployment )
418- .flatMap (assignments -> assignments .entrySet ().stream ()
419- .filter (entry -> entry .getKey ().id ().equals (node .getId ()))
420- .findFirst ()
421- .map (Map .Entry ::getValue ))
418+ .flatMap (
419+ assignments -> assignments .entrySet ()
420+ .stream ()
421+ .filter (entry -> entry .getKey ().id ().equals (node .getId ()))
422+ .findFirst ()
423+ .map (Map .Entry ::getValue )
424+ )
422425 .orElse (0 );
423426 int notYetAssignedAllocations = deployment .allocations () - assignmentPlan .totalAllocations (deployment );
424- // if (deployment.estimateMemoryUsageBytes(deployment.allocations() - existingAllocationsOnNode) > assignmentPlan.getRemainingNodeMemory(node.getId())) {
425- if (deployment .estimateAdditionalMemoryUsageBytes (existingAllocationsOnNode , existingAllocationsOnNode + notYetAssignedAllocations ) > assignmentPlan .getRemainingNodeMemory (node .getId ())) {
427+ if (deployment .estimateAdditionalMemoryUsageBytes (
428+ existingAllocationsOnNode ,
429+ existingAllocationsOnNode + notYetAssignedAllocations
430+ ) > assignmentPlan .getRemainingNodeMemory (node .getId ())) {
426431 // If any ML processes are running on a node we require some space to load the shared libraries.
427432 // So if none are currently running then this per-node overhead must be added to the requirement.
428433 // From node load we know if we had any jobs or models assigned before the rebalance.
0 commit comments