Skip to content
Merged
192 changes: 144 additions & 48 deletions api/src/main/java/org/apache/cloudstack/cluster/ClusterDrsAlgorithm.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@
import java.util.List;
import java.util.Map;

import static org.apache.cloudstack.cluster.ClusterDrsService.ClusterDrsMetric;
import static org.apache.cloudstack.cluster.ClusterDrsService.ClusterDrsMetricType;
import static org.apache.cloudstack.cluster.ClusterDrsService.ClusterDrsMetricUseRatio;

public interface ClusterDrsAlgorithm extends Adapter {

/**
Expand All @@ -42,16 +46,17 @@ public interface ClusterDrsAlgorithm extends Adapter {
* @param clusterId
* the ID of the cluster to check
* @param cpuList
* a list of CPU allocated values for each host in the cluster
* a list of Ternary of used, reserved & total CPU for each host in the cluster
* @param memoryList
* a list of memory allocated values for each host in the cluster
* a list of Ternary of used, reserved & total memory values for each host in the cluster
*
* @return true if a DRS operation is needed, false otherwise
*
* @throws ConfigurationException
* if there is an error in the configuration
*/
boolean needsDrs(long clusterId, List<Long> cpuList, List<Long> memoryList) throws ConfigurationException;
boolean needsDrs(long clusterId, List<Ternary<Long, Long, Long>> cpuList,
List<Ternary<Long, Long, Long>> memoryList) throws ConfigurationException;


/**
Expand All @@ -65,18 +70,19 @@ public interface ClusterDrsAlgorithm extends Adapter {
* the service offering for the virtual machine
* @param destHost
* the destination host for the virtual machine
* @param hostCpuFreeMap
* a map of host IDs to the amount of CPU free on each host
* @param hostMemoryFreeMap
* a map of host IDs to the amount of memory free on each host
* @param hostCpuMap
* a map of host IDs to the Ternary of used, reserved and total CPU on each host
* @param hostMemoryMap
* a map of host IDs to the Ternary of used, reserved and total memory on each host
* @param requiresStorageMotion
* whether storage motion is required for the virtual machine
*
* @return a ternary containing improvement, cost, benefit
*/
Ternary<Double, Double, Double> getMetrics(long clusterId, VirtualMachine vm, ServiceOffering serviceOffering,
Host destHost, Map<Long, Long> hostCpuFreeMap,
Map<Long, Long> hostMemoryFreeMap, Boolean requiresStorageMotion);
Host destHost, Map<Long, Ternary<Long, Long, Long>> hostCpuMap,
Map<Long, Ternary<Long, Long, Long>> hostMemoryMap,
Boolean requiresStorageMotion) throws ConfigurationException;

/**
* Calculates the imbalance of the cluster after a virtual machine migration.
Expand All @@ -87,62 +93,101 @@ Ternary<Double, Double, Double> getMetrics(long clusterId, VirtualMachine vm, Se
* the virtual machine being migrated
* @param destHost
* the destination host for the virtual machine
* @param hostCpuFreeMap
* a map of host IDs to the amount of CPU free on each host
* @param hostMemoryFreeMap
* a map of host IDs to the amount of memory free on each host
* @param hostCpuMap
* a map of host IDs to the Ternary of used, reserved and total CPU on each host
* @param hostMemoryMap
* a map of host IDs to the Ternary of used, reserved and total memory on each host
*
* @return a pair containing the CPU and memory imbalance of the cluster after the migration
*/
default Pair<Double, Double> getImbalancePostMigration(ServiceOffering serviceOffering, VirtualMachine vm,
Host destHost, Map<Long, Long> hostCpuFreeMap,
Map<Long, Long> hostMemoryFreeMap) {
List<Long> postCpuList = new ArrayList<>();
List<Long> postMemoryList = new ArrayList<>();
final int vmCpu = serviceOffering.getCpu() * serviceOffering.getSpeed();
final long vmRam = serviceOffering.getRamSize() * 1024L * 1024L;

for (Long hostId : hostCpuFreeMap.keySet()) {
long cpu = hostCpuFreeMap.get(hostId);
long memory = hostMemoryFreeMap.get(hostId);
if (hostId == destHost.getId()) {
postCpuList.add(cpu - vmCpu);
postMemoryList.add(memory - vmRam);
} else if (hostId.equals(vm.getHostId())) {
postCpuList.add(cpu + vmCpu);
postMemoryList.add(memory + vmRam);
} else {
postCpuList.add(cpu);
postMemoryList.add(memory);
}
default Double getImbalancePostMigration(ServiceOffering serviceOffering, VirtualMachine vm,
Host destHost, Map<Long, Ternary<Long, Long, Long>> hostCpuMap,
Map<Long, Ternary<Long, Long, Long>> hostMemoryMap) throws ConfigurationException {
Pair<Long, Map<Long, Ternary<Long, Long, Long>>> pair = getHostMetricsMapAndType(destHost.getClusterId(), serviceOffering, hostCpuMap, hostMemoryMap);
long vmMetric = pair.first();
Map<Long, Ternary<Long, Long, Long>> hostMetricsMap = pair.second();

List<Double> list = new ArrayList<>();
for (Long hostId : hostMetricsMap.keySet()) {
list.add(getMetricValuePostMigration(destHost.getClusterId(), hostMetricsMap.get(hostId), vmMetric, hostId, destHost.getId(), vm.getHostId()));
}
return new Pair<>(getClusterImbalance(postCpuList), getClusterImbalance(postMemoryList));
return getImbalance(list);
}

/**
* The cluster imbalance is defined as the percentage deviation from the mean
* for a configured metric of the cluster. The standard deviation is used as a
* mathematical tool to normalize the metric data for all the resource and the
* percentage deviation provides an easy tool to compare a cluster’s current
* state against the defined imbalance threshold. Because this is essentially a
* percentage, the value is a number between 0.0 and 1.0.
* Cluster Imbalance, Ic = σc / mavg , where σc is the standard deviation and
* mavg is the mean metric value for the cluster.
*/
default Double getClusterImbalance(List<Long> metricList) {
private Pair<Long, Map<Long, Ternary<Long, Long, Long>>> getHostMetricsMapAndType(Long clusterId,
ServiceOffering serviceOffering, Map<Long, Ternary<Long, Long, Long>> hostCpuMap,
Map<Long, Ternary<Long, Long, Long>> hostMemoryMap) throws ConfigurationException {
String metric = getClusterDrsMetric(clusterId);
Pair<Long, Map<Long, Ternary<Long, Long, Long>>> pair;
switch (metric) {
case "cpu":
pair = new Pair<>((long) serviceOffering.getCpu() * serviceOffering.getSpeed(), hostCpuMap);
break;
case "memory":
pair = new Pair<>(serviceOffering.getRamSize() * 1024L * 1024L, hostMemoryMap);
break;
default:
throw new ConfigurationException(
String.format("Invalid metric: %s for cluster: %d", metric, clusterId));
}
return pair;
}

private Double getMetricValuePostMigration(Long clusterId, Ternary<Long, Long, Long> metrics, long vmMetric,
long hostId, long destHostId, long vmHostId) {
long used = metrics.first();
long actualTotal = metrics.third() - metrics.second();
long free = actualTotal - metrics.first();

if (hostId == destHostId) {
used += vmMetric;
free -= vmMetric;
} else if (hostId == vmHostId) {
used -= vmMetric;
free += vmMetric;
}
return getMetricValue(clusterId, used, free, actualTotal, null);
}

private static Double getImbalance(List<Double> metricList) {
Double clusterMeanMetric = getClusterMeanMetric(metricList);
Double clusterStandardDeviation = getClusterStandardDeviation(metricList, clusterMeanMetric);
return clusterStandardDeviation / clusterMeanMetric;
}

static String getClusterDrsMetric(long clusterId) {
return ClusterDrsMetric.valueIn(clusterId);
}

static Double getMetricValue(long clusterId, long used, long free, long total, Float skipThreshold) {
boolean useRatio = getDrsMetricUseRatio(clusterId);
switch (getDrsMetricType(clusterId)) {
case "free":
if (skipThreshold != null && free < skipThreshold * total) return null;
if (useRatio) {
return (double) free / total;
} else {
return (double) free;
}
case "used":
if (skipThreshold != null && used > skipThreshold * total) return null;
if (useRatio) {
return (double) used / total;
} else {
return (double) used;
}
}
return null;
}

/**
* Mean is the average of a collection or set of metrics. In context of a DRS
* cluster, the cluster metrics defined as the average metrics value for some
* metric (such as CPU, memory etc.) for every resource such as host.
* Cluster Mean Metric, mavg = (∑mi) / N, where mi is a measurable metric for a
* resource ‘i’ in a cluster with total N number of resources.
*/
default Double getClusterMeanMetric(List<Long> metricList) {
static Double getClusterMeanMetric(List<Double> metricList) {
return new Mean().evaluate(metricList.stream().mapToDouble(i -> i).toArray());
}

Expand All @@ -157,11 +202,62 @@ default Double getClusterMeanMetric(List<Long> metricList) {
* mean metric value and mi is a measurable metric for some resource ‘i’ in the
* cluster with total N number of resources.
*/
default Double getClusterStandardDeviation(List<Long> metricList, Double mean) {
static Double getClusterStandardDeviation(List<Double> metricList, Double mean) {
if (mean != null) {
return new StandardDeviation(false).evaluate(metricList.stream().mapToDouble(i -> i).toArray(), mean);
} else {
return new StandardDeviation(false).evaluate(metricList.stream().mapToDouble(i -> i).toArray());
}
}

static boolean getDrsMetricUseRatio(long clusterId) {
return ClusterDrsMetricUseRatio.valueIn(clusterId);
}

static String getDrsMetricType(long clusterId) {
return ClusterDrsMetricType.valueIn(clusterId);
}

/**
* The cluster imbalance is defined as the percentage deviation from the mean
* for a configured metric of the cluster. The standard deviation is used as a
* mathematical tool to normalize the metric data for all the resource and the
* percentage deviation provides an easy tool to compare a cluster’s current
* state against the defined imbalance threshold. Because this is essentially a
* percentage, the value is a number between 0.0 and 1.0.
* Cluster Imbalance, Ic = σc / mavg , where σc is the standard deviation and
* mavg is the mean metric value for the cluster.
*/
static Double getClusterImbalance(Long clusterId, List<Ternary<Long, Long, Long>> cpuList,
List<Ternary<Long, Long, Long>> memoryList, Float skipThreshold) throws ConfigurationException {
String metric = getClusterDrsMetric(clusterId);
List<Double> list;
switch (metric) {
case "cpu":
list = getMetricList(clusterId, cpuList, skipThreshold);
break;
case "memory":
list = getMetricList(clusterId, memoryList, skipThreshold);
break;
default:
throw new ConfigurationException(
String.format("Invalid metric: %s for cluster: %d", metric, clusterId));
}
return getImbalance(list);
}

static List<Double> getMetricList(Long clusterId, List<Ternary<Long, Long, Long>> hostMetricsList,
Float skipThreshold) {
List<Double> list = new ArrayList<>();
for (Ternary<Long, Long, Long> ternary : hostMetricsList) {
long used = ternary.first();
long actualTotal = ternary.third() - ternary.second();
long free = actualTotal - ternary.first();
Double metricValue = getMetricValue(clusterId, used, free, actualTotal, skipThreshold);
if (metricValue != null) {
list.add(metricValue);
}
}
return list;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,29 @@ public interface ClusterDrsService extends Manager, Configurable, Scheduler {
true, ConfigKey.Scope.Cluster, null, "DRS metric", null, null, null, ConfigKey.Kind.Select,
"memory,cpu");

ConfigKey<String> ClusterDrsMetricType = new ConfigKey<>(String.class, "drs.metric.type", ConfigKey.CATEGORY_ADVANCED,
"used",
"The metric type used to measure imbalance in a cluster. This can completely change the imbalance value. Possible values are free, used.",
true, ConfigKey.Scope.Cluster, null, "DRS metric type", null, null, null, ConfigKey.Kind.Select,
"free,used");

ConfigKey<Boolean> ClusterDrsMetricUseRatio = new ConfigKey<>(Boolean.class, "drs.metric.use.ratio", ConfigKey.CATEGORY_ADVANCED,
"true",
"Whether to use ratio of selected metric & total. Useful when the cluster has hosts with different capacities",
true, ConfigKey.Scope.Cluster, null, "DRS metric use ratio", null, null, null, ConfigKey.Kind.Select,
"true,false");

ConfigKey<Float> ClusterDrsImbalanceSkipThreshold = new ConfigKey<>(Float.class,
"drs.imbalance.condensed.skip.threshold", ConfigKey.CATEGORY_ADVANCED, "0.95",
"Threshold to ignore the metric for a host while calculating the imbalance to decide " +
"whether DRS is required for a cluster.This is to avoid cases when the calculated imbalance" +
" gets skewed due to a single host having a very high/low metric value resulting in imbalance" +
" being higher than 1. If " + ClusterDrsMetricType.key() + " is 'free', set a lower value and if it is 'used' " +
"set a higher value. The value should be between 0.0 and 1.0",
true, ConfigKey.Scope.Cluster, null, "DRS imbalance skip threshold for Condensed algorithm",
null, null, null);


/**
* Generate a DRS plan for a cluster and save it as per the parameters
*
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.cloudstack.cluster;

import com.cloud.utils.Ternary;
import junit.framework.TestCase;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.MockedStatic;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;

import java.util.List;

import static org.apache.cloudstack.cluster.ClusterDrsAlgorithm.getMetricValue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyFloat;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.Mockito.when;

@RunWith(MockitoJUnitRunner.class)
public class ClusterDrsAlgorithmTest extends TestCase {

@Test
public void testGetMetricValue() {
List<Ternary<Boolean, String, Double>> testData = List.of(
new Ternary<>(true, "free", 0.4),
new Ternary<>(false, "free", 40.0),
new Ternary<>(true, "used", 0.3),
new Ternary<>(false, "used", 30.0)
);

long used = 30;
long free = 40;
long total = 100;

for (Ternary<Boolean, String, Double> data : testData) {
boolean useRatio = data.first();
String metricType = data.second();
double expectedValue = data.third();

try (MockedStatic<ClusterDrsAlgorithm> ignored = Mockito.mockStatic(ClusterDrsAlgorithm.class)) {
when(ClusterDrsAlgorithm.getDrsMetricUseRatio(1L)).thenReturn(useRatio);
when(ClusterDrsAlgorithm.getDrsMetricType(1L)).thenReturn(metricType);
when(ClusterDrsAlgorithm.getMetricValue(anyLong(), anyLong(), anyLong(), anyLong(), any())).thenCallRealMethod();

assertEquals(expectedValue, getMetricValue(1, used, free, total, null));
}
}
}

@Test
public void testGetMetricValueWithSkipThreshold() {
List<Ternary<Boolean, String, Double>> testData = List.of(
new Ternary<>(true, "free", 0.15),
new Ternary<>(false, "free", 15.0),
new Ternary<>(true, "used", null),
new Ternary<>(false, "used", null)
);

long used = 80;
long free = 15;
long total = 100;

for (Ternary<Boolean, String, Double> data : testData) {
boolean useRatio = data.first();
String metricType = data.second();
Double expectedValue = data.third();
float skipThreshold = metricType.equals("free") ? 0.1f : 0.7f;

try (MockedStatic<ClusterDrsAlgorithm> ignored = Mockito.mockStatic(ClusterDrsAlgorithm.class)) {
when(ClusterDrsAlgorithm.getDrsMetricUseRatio(1L)).thenReturn(useRatio);
when(ClusterDrsAlgorithm.getDrsMetricType(1L)).thenReturn(metricType);
when(ClusterDrsAlgorithm.getMetricValue(anyLong(), anyLong(), anyLong(), anyLong(), anyFloat())).thenCallRealMethod();

assertEquals(expectedValue, ClusterDrsAlgorithm.getMetricValue(1L, used, free, total, skipThreshold));
}
}
}
}
Loading