|
7 | 7 |
|
8 | 8 | package org.elasticsearch.xpack.ml.autoscaling;
|
9 | 9 |
|
| 10 | +import org.elasticsearch.ElasticsearchException; |
10 | 11 | import org.elasticsearch.Version;
|
11 | 12 | import org.elasticsearch.cluster.ClusterInfo;
|
12 | 13 | import org.elasticsearch.cluster.ClusterName;
|
|
21 | 22 | import org.elasticsearch.common.settings.Settings;
|
22 | 23 | import org.elasticsearch.common.unit.ByteSizeValue;
|
23 | 24 | import org.elasticsearch.common.util.set.Sets;
|
| 25 | +import org.elasticsearch.core.Tuple; |
24 | 26 | import org.elasticsearch.persistent.PersistentTasksCustomMetadata;
|
25 | 27 | import org.elasticsearch.snapshots.SnapshotShardSizeInfo;
|
26 | 28 | import org.elasticsearch.test.ESTestCase;
|
|
33 | 35 | import org.elasticsearch.xpack.core.ml.action.StartDatafeedAction;
|
34 | 36 | import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsState;
|
35 | 37 | import org.elasticsearch.xpack.core.ml.dataframe.DataFrameAnalyticsTaskState;
|
| 38 | +import org.elasticsearch.xpack.core.ml.job.config.Job; |
36 | 39 | import org.elasticsearch.xpack.core.ml.job.config.JobState;
|
37 | 40 | import org.elasticsearch.xpack.core.ml.job.config.JobTaskState;
|
38 | 41 | import org.elasticsearch.xpack.ml.MachineLearning;
|
|
44 | 47 | import org.junit.Before;
|
45 | 48 |
|
46 | 49 | import java.time.Duration;
|
| 50 | +import java.util.ArrayList; |
47 | 51 | import java.util.Arrays;
|
48 | 52 | import java.util.Collections;
|
49 | 53 | import java.util.Date;
|
|
54 | 58 | import java.util.function.LongSupplier;
|
55 | 59 | import java.util.stream.Collectors;
|
56 | 60 |
|
| 61 | +import static org.elasticsearch.xpack.ml.MachineLearning.MACHINE_MEMORY_NODE_ATTR; |
| 62 | +import static org.elasticsearch.xpack.ml.MachineLearning.MAX_JVM_SIZE_NODE_ATTR; |
57 | 63 | import static org.elasticsearch.xpack.ml.job.JobNodeSelector.AWAITING_LAZY_ASSIGNMENT;
|
| 64 | +import static org.hamcrest.Matchers.allOf; |
58 | 65 | import static org.hamcrest.Matchers.containsString;
|
59 | 66 | import static org.hamcrest.Matchers.equalTo;
|
| 67 | +import static org.hamcrest.Matchers.greaterThan; |
60 | 68 | import static org.hamcrest.Matchers.greaterThanOrEqualTo;
|
61 | 69 | import static org.hamcrest.Matchers.is;
|
| 70 | +import static org.hamcrest.Matchers.lessThan; |
62 | 71 | import static org.hamcrest.Matchers.lessThanOrEqualTo;
|
63 | 72 | import static org.mockito.ArgumentMatchers.any;
|
64 | 73 | import static org.mockito.ArgumentMatchers.anyBoolean;
|
65 | 74 | import static org.mockito.ArgumentMatchers.anyInt;
|
| 75 | +import static org.mockito.ArgumentMatchers.eq; |
66 | 76 | import static org.mockito.Mockito.mock;
|
67 | 77 | import static org.mockito.Mockito.when;
|
68 | 78 |
|
69 | 79 | public class MlAutoscalingDeciderServiceTests extends ESTestCase {
|
70 | 80 |
|
| 81 | + private static final long[] NODE_TIERS = new long[] { |
| 82 | + 1073741824L, |
| 83 | + 2147483648L, |
| 84 | + 4294967296L, |
| 85 | + 8589934592L, |
| 86 | + 17179869184L, |
| 87 | + 34359738368L, |
| 88 | + 68719476736L, |
| 89 | + 16106127360L, |
| 90 | + 32212254720L, |
| 91 | + 64424509440L }; |
| 92 | + |
| 93 | + public static final List<Tuple<Long, Long>> AUTO_NODE_TIERS = org.elasticsearch.core.List.of( |
| 94 | + Tuple.tuple(1073741824L, 432013312L), // 1GB and true JVM size |
| 95 | + Tuple.tuple(2147483648L, 536870912L), // 2GB ... |
| 96 | + Tuple.tuple(4294967296L, 1073741824L), // 4GB ... |
| 97 | + Tuple.tuple(8589934592L, 2147483648L), // 8GB ... |
| 98 | + Tuple.tuple(17179869184L, 2147483648L), // 16GB ... |
| 99 | + Tuple.tuple(34359738368L, 2147483648L), // 32GB ... |
| 100 | + Tuple.tuple(68719476736L, 2147483648L), // 64GB ... |
| 101 | + Tuple.tuple(16106127360L, 2147483648L), // 15GB ... |
| 102 | + Tuple.tuple(32212254720L, 2147483648L), // 30GB ... |
| 103 | + Tuple.tuple(64424509440L, 2147483648L) // 60GB ... |
| 104 | + ); |
| 105 | + |
71 | 106 | private static final long DEFAULT_NODE_SIZE = ByteSizeValue.ofGb(20).getBytes();
|
72 | 107 | private static final long DEFAULT_JVM_SIZE = ByteSizeValue.ofMb((long) (DEFAULT_NODE_SIZE * 0.25)).getBytes();
|
73 | 108 | private static final long DEFAULT_JOB_SIZE = ByteSizeValue.ofMb(200).getBytes();
|
@@ -104,6 +139,168 @@ public void setup() {
|
104 | 139 | when(clusterService.getClusterSettings()).thenReturn(cSettings);
|
105 | 140 | }
|
106 | 141 |
|
| 142 | + public void testScalingEdgeCase() { |
| 143 | + // This scale up should push above 1gb, but under 2gb. |
| 144 | + // The unassigned job barely doesn't fit within the current scale (by a handful of mb) |
| 145 | + when(mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(any())).thenReturn( |
| 146 | + ByteSizeValue.ofMb(128).getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes() |
| 147 | + ); |
| 148 | + List<String> jobTasks = org.elasticsearch.core.List.of("waiting_job"); |
| 149 | + List<NodeLoad> nodesForScaleup = org.elasticsearch.core.List.of( |
| 150 | + NodeLoad.builder("any") |
| 151 | + .setMaxMemory(432013312) |
| 152 | + .setUseMemory(true) |
| 153 | + .incAssignedJobMemory( |
| 154 | + (long) (168.7 * 1024 + 0.5) + (long) (1.4 * 1024 * 1024 + 0.5) + ByteSizeValue.ofMb(256).getBytes() |
| 155 | + + Job.PROCESS_MEMORY_OVERHEAD.getBytes() * 3 |
| 156 | + ) |
| 157 | + .incNumAssignedJobs() |
| 158 | + .incNumAssignedJobs() |
| 159 | + .incNumAssignedJobs() |
| 160 | + .build() |
| 161 | + ); |
| 162 | + MlScalingReason.Builder reasonBuilder = new MlScalingReason.Builder().setPassedConfiguration(Settings.EMPTY) |
| 163 | + .setCurrentMlCapacity( |
| 164 | + AutoscalingCapacity.builder().node(null, AUTO_NODE_TIERS.get(0).v1()).total(null, AUTO_NODE_TIERS.get(0).v1()).build() |
| 165 | + ); |
| 166 | + MlAutoscalingDeciderService service = buildService(); |
| 167 | + service.setUseAuto(true); |
| 168 | + AutoscalingDeciderResult scaleUpResult = service.checkForScaleUp( |
| 169 | + 0, |
| 170 | + 0, |
| 171 | + nodesForScaleup, |
| 172 | + jobTasks, |
| 173 | + org.elasticsearch.core.List.of(), |
| 174 | + null, |
| 175 | + new NativeMemoryCapacity(432013312, 432013312, 432013312L), |
| 176 | + reasonBuilder |
| 177 | + ).orElseThrow(() -> new ElasticsearchException("unexpected empty result for scale up")); |
| 178 | + |
| 179 | + assertThat( |
| 180 | + scaleUpResult.requiredCapacity().total().memory().getBytes(), |
| 181 | + allOf(greaterThan(ByteSizeValue.ofGb(1).getBytes()), lessThan(ByteSizeValue.ofGb(2).getBytes())) |
| 182 | + ); |
| 183 | + |
| 184 | + // Assume a scale up to 2gb nodes |
| 185 | + // We should NOT scale down below or to 1gb given the same jobs with 2gb node |
| 186 | + long bytesForML = autoBytesForMl(AUTO_NODE_TIERS.get(1).v1(), AUTO_NODE_TIERS.get(1).v2()); |
| 187 | + List<NodeLoad> nodeForScaleDown = org.elasticsearch.core.List.of( |
| 188 | + NodeLoad.builder("any") |
| 189 | + .setMaxMemory(bytesForML) |
| 190 | + .setUseMemory(true) |
| 191 | + .incAssignedJobMemory( |
| 192 | + (long) (168.7 * 1024 + 0.5) + (long) (1.4 * 1024 * 1024 + 0.5) + ByteSizeValue.ofMb(256).getBytes() + ByteSizeValue |
| 193 | + .ofMb(128) |
| 194 | + .getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes() * 4 |
| 195 | + ) |
| 196 | + .incNumAssignedJobs() |
| 197 | + .incNumAssignedJobs() |
| 198 | + .incNumAssignedJobs() |
| 199 | + .incNumAssignedJobs() |
| 200 | + .build() |
| 201 | + ); |
| 202 | + reasonBuilder = new MlScalingReason.Builder().setPassedConfiguration(Settings.EMPTY) |
| 203 | + .setCurrentMlCapacity(AutoscalingCapacity.builder().node(null, 2147483648L).total(null, 2147483648L).build()); |
| 204 | + AutoscalingDeciderResult result = service.checkForScaleDown( |
| 205 | + nodeForScaleDown, |
| 206 | + ByteSizeValue.ofMb(256).getBytes() + Job.PROCESS_MEMORY_OVERHEAD.getBytes(), |
| 207 | + new NativeMemoryCapacity(bytesForML, bytesForML, 536870912L), |
| 208 | + reasonBuilder |
| 209 | + ).orElseThrow(() -> new ElasticsearchException("unexpected empty result for scale down")); |
| 210 | + assertThat( |
| 211 | + result.requiredCapacity().total().memory().getBytes(), |
| 212 | + allOf(greaterThan(ByteSizeValue.ofGb(1).getBytes()), lessThan(ByteSizeValue.ofGb(2).getBytes())) |
| 213 | + ); |
| 214 | + } |
| 215 | + |
| 216 | + public void testScaleStability() { |
| 217 | + for (int i = 0; i < 10; i++) { |
| 218 | + for (int tier = 0; tier < AUTO_NODE_TIERS.size() - 1; tier++) { |
| 219 | + Tuple<Long, Long> lowerTier = AUTO_NODE_TIERS.get(tier); |
| 220 | + final long memoryForMl = autoBytesForMl(lowerTier.v1(), lowerTier.v2()); |
| 221 | + Tuple<Long, Long> upperTier = AUTO_NODE_TIERS.get(tier + 1); |
| 222 | + // The jobs that currently exist, to use in the scaleUp call |
| 223 | + NodeLoad.Builder forScaleUp = new NodeLoad.Builder("any").setMaxMemory(memoryForMl) |
| 224 | + .setMaxJobs(Integer.MAX_VALUE) |
| 225 | + .setUseMemory(true); |
| 226 | + // The jobs + load that exists for all jobs (after scale up), used in scaleDown call |
| 227 | + NodeLoad.Builder forScaleDown = new NodeLoad.Builder("any").setMaxMemory(autoBytesForMl(upperTier.v1(), upperTier.v2())) |
| 228 | + .setMaxJobs(Integer.MAX_VALUE) |
| 229 | + .setUseMemory(true); |
| 230 | + long maxJob = 0; |
| 231 | + // Fill with existing tier jobs |
| 232 | + while (forScaleUp.getFreeMemory() > Job.PROCESS_MEMORY_OVERHEAD.getBytes()) { |
| 233 | + long jobSize = randomLongBetween(Job.PROCESS_MEMORY_OVERHEAD.getBytes(), forScaleUp.getFreeMemory()); |
| 234 | + maxJob = Math.max(jobSize, maxJob); |
| 235 | + forScaleUp.incNumAssignedJobs().incAssignedJobMemory(jobSize); |
| 236 | + forScaleDown.incNumAssignedJobs().incAssignedJobMemory(jobSize); |
| 237 | + } |
| 238 | + // Create jobs for scale up |
| 239 | + NodeLoad nodeLoadForScaleUp = forScaleUp.build(); |
| 240 | + List<String> waitingJobs = new ArrayList<>(); |
| 241 | + while (forScaleDown.getFreeMemory() > Job.PROCESS_MEMORY_OVERHEAD.getBytes()) { |
| 242 | + long jobSize = randomLongBetween(Job.PROCESS_MEMORY_OVERHEAD.getBytes(), forScaleDown.getFreeMemory()); |
| 243 | + if (forScaleDown.getFreeMemory() - jobSize <= 0) { |
| 244 | + break; |
| 245 | + } |
| 246 | + maxJob = Math.max(jobSize, maxJob); |
| 247 | + forScaleDown.incNumAssignedJobs().incAssignedJobMemory(jobSize); |
| 248 | + String waitingJob = randomAlphaOfLength(10); |
| 249 | + when(mlMemoryTracker.getAnomalyDetectorJobMemoryRequirement(eq(waitingJob))).thenReturn(jobSize); |
| 250 | + waitingJobs.add(waitingJob); |
| 251 | + } |
| 252 | + MlAutoscalingDeciderService service = buildService(); |
| 253 | + service.setUseAuto(true); |
| 254 | + |
| 255 | + AutoscalingDeciderResult scaleUpResult = service.checkForScaleUp( |
| 256 | + 0, |
| 257 | + 0, |
| 258 | + org.elasticsearch.core.List.of(nodeLoadForScaleUp), |
| 259 | + waitingJobs, |
| 260 | + org.elasticsearch.core.List.of(), |
| 261 | + null, |
| 262 | + new NativeMemoryCapacity(memoryForMl, memoryForMl, lowerTier.v2()), |
| 263 | + new MlScalingReason.Builder().setPassedConfiguration(Settings.EMPTY) |
| 264 | + .setCurrentMlCapacity(AutoscalingCapacity.builder().node(null, lowerTier.v1()).total(null, lowerTier.v1()).build()) |
| 265 | + ).orElseThrow(() -> new ElasticsearchException("unexpected empty result for scale down")); |
| 266 | + |
| 267 | + assertThat(scaleUpResult.requiredCapacity().total().memory().getBytes(), greaterThan(lowerTier.v1())); |
| 268 | + assertThat(scaleUpResult.requiredCapacity().node().memory().getBytes(), greaterThanOrEqualTo(lowerTier.v1())); |
| 269 | + AutoscalingCapacity requiredScaleUp = scaleUpResult.requiredCapacity(); |
| 270 | + // Its possible that the next tier is above what we consider "upperTier" |
| 271 | + // This is just fine for this test, as long as scale_down does not drop below this tier |
| 272 | + int nextTier = Arrays.binarySearch(NODE_TIERS, requiredScaleUp.total().memory().getBytes()); |
| 273 | + if (nextTier < 0) { |
| 274 | + nextTier = -nextTier - 1; |
| 275 | + } |
| 276 | + // Its possible we requested a huge scale up, this is OK, we just don't have validation numbers that exist past a certain |
| 277 | + // point. |
| 278 | + if (nextTier >= NODE_TIERS.length) { |
| 279 | + return; |
| 280 | + } |
| 281 | + long size = NODE_TIERS[nextTier]; |
| 282 | + long scaledBytesForMl = autoBytesForMl(size, AUTO_NODE_TIERS.get(nextTier).v2()); |
| 283 | + // It could be that scale down doesn't occur, this is fine as we are "perfectly scaled" |
| 284 | + Optional<AutoscalingDeciderResult> result = service.checkForScaleDown( |
| 285 | + org.elasticsearch.core.List.of(forScaleDown.build()), |
| 286 | + maxJob, |
| 287 | + new NativeMemoryCapacity(scaledBytesForMl, scaledBytesForMl, AUTO_NODE_TIERS.get(nextTier).v2()), |
| 288 | + new MlScalingReason.Builder().setPassedConfiguration(Settings.EMPTY) |
| 289 | + .setCurrentMlCapacity(AutoscalingCapacity.builder().node(null, size).total(null, size).build()) |
| 290 | + ); |
| 291 | + // If scale down is present, we don't want to drop below our current tier. |
| 292 | + // If we do, that means that for the same jobs we scaled with, we calculated something incorrectly. |
| 293 | + if (result.isPresent()) { |
| 294 | + int afterScaleDownTier = Arrays.binarySearch(NODE_TIERS, result.get().requiredCapacity().total().memory().getBytes()); |
| 295 | + if (afterScaleDownTier < 0) { |
| 296 | + afterScaleDownTier = -afterScaleDownTier - 1; |
| 297 | + } |
| 298 | + assertThat(afterScaleDownTier, equalTo(nextTier)); |
| 299 | + } |
| 300 | + } |
| 301 | + } |
| 302 | + } |
| 303 | + |
107 | 304 | public void testScale_whenNotOnMaster() {
|
108 | 305 | MlAutoscalingDeciderService service = buildService();
|
109 | 306 | service.offMaster();
|
@@ -758,4 +955,21 @@ public SnapshotShardSizeInfo snapshotShardSizeInfo() {
|
758 | 955 | }
|
759 | 956 | }
|
760 | 957 |
|
| 958 | + private static long autoBytesForMl(Long nodeSize, Long jvmSize) { |
| 959 | + return NativeMemoryCalculator.allowedBytesForMl( |
| 960 | + new DiscoveryNode( |
| 961 | + "node", |
| 962 | + ESTestCase.buildNewFakeTransportAddress(), |
| 963 | + MapBuilder.<String, String>newMapBuilder() |
| 964 | + .put(MAX_JVM_SIZE_NODE_ATTR, jvmSize.toString()) |
| 965 | + .put(MACHINE_MEMORY_NODE_ATTR, nodeSize.toString()) |
| 966 | + .map(), |
| 967 | + DiscoveryNodeRole.BUILT_IN_ROLES, |
| 968 | + Version.CURRENT |
| 969 | + ), |
| 970 | + 30, |
| 971 | + true |
| 972 | + ).orElseThrow(() -> new ElasticsearchException("Unexpected null for calculating bytes for ML")); |
| 973 | + } |
| 974 | + |
761 | 975 | }
|
0 commit comments