Skip to content

Commit e673c8b

Browse files
committed
Simplify logic for scaling CKS cluster
1 parent 8ab7d5c commit e673c8b

File tree

2 files changed

+38
-191
lines changed

2 files changed

+38
-191
lines changed

plugins/integrations/kubernetes-service/src/main/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorker.java

Lines changed: 38 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,8 @@ public boolean scaleCluster() throws CloudRuntimeException {
532532
}
533533
scaleTimeoutTime = System.currentTimeMillis() + KubernetesClusterService.KubernetesClusterScaleTimeout.value() * 1000;
534534
final long originalClusterSize = kubernetesCluster.getNodeCount();
535+
536+
// DEFAULT node type means only the global service offering has been set for the Kubernetes cluster
535537
boolean scaleClusterDefaultOffering = serviceOfferingNodeTypeMap.containsKey(DEFAULT.name());
536538
if (scaleClusterDefaultOffering) {
537539
final ServiceOffering existingServiceOffering = serviceOfferingDao.findById(kubernetesCluster.getServiceOfferingId());
@@ -542,44 +544,40 @@ public boolean scaleCluster() throws CloudRuntimeException {
542544
}
543545
}
544546

545-
checkScalingKubernetesClusterOfferingsPerNodeType(serviceOfferingNodeTypeMap, kubernetesCluster);
546-
547547
final boolean autoscalingChanged = isAutoscalingChanged();
548548
ServiceOffering defaultServiceOffering = serviceOfferingNodeTypeMap.getOrDefault(DEFAULT.name(), null);
549549

550550
for (KubernetesClusterNodeType nodeType : Arrays.asList(CONTROL, ETCD, WORKER)) {
551-
boolean isWorkerNodeOrAllNodes = WORKER == nodeType;
552-
final long newVMRequired = (!isWorkerNodeOrAllNodes || clusterSize == null) ? 0 : clusterSize - originalClusterSize;
551+
boolean isWorkerNode = WORKER == nodeType;
552+
final long newVMRequired = (!isWorkerNode || clusterSize == null) ? 0 : clusterSize - originalClusterSize;
553553
if (!scaleClusterDefaultOffering && !serviceOfferingNodeTypeMap.containsKey(nodeType.name()) && newVMRequired == 0) {
554554
continue;
555555
}
556556

557-
Long existingNodeTypeOfferingId = getKubernetesClusterNodeTypeOfferingId(kubernetesCluster, nodeType);
558-
boolean clusterHasExistingOfferingForNodeType = existingNodeTypeOfferingId != null;
559-
boolean serviceOfferingScalingNeeded = isServiceOfferingScalingNeededForNodeType(nodeType, serviceOfferingNodeTypeMap, kubernetesCluster);
560-
ServiceOffering serviceOffering = serviceOfferingNodeTypeMap.getOrDefault(nodeType.name(), defaultServiceOffering);
561-
boolean updateNodeOffering = serviceOfferingNodeTypeMap.containsKey(nodeType.name()) ||
562-
scaleClusterDefaultOffering && clusterHasExistingOfferingForNodeType;
563-
boolean updateClusterOffering = isWorkerNodeOrAllNodes && scaleClusterDefaultOffering;
564-
if (isWorkerNodeOrAllNodes && autoscalingChanged) {
557+
ServiceOffering existingServiceOffering = getExistingServiceOfferingForNodeType(nodeType, kubernetesCluster);
558+
ServiceOffering scalingServiceOffering = serviceOfferingNodeTypeMap.getOrDefault(nodeType.name(), defaultServiceOffering);
559+
boolean isNodeOfferingScalingNeeded = isServiceOfferingScalingNeededForNodeType(existingServiceOffering, scalingServiceOffering);
560+
561+
boolean updateClusterOffering = isWorkerNode && scaleClusterDefaultOffering;
562+
if (isWorkerNode && autoscalingChanged) {
565563
boolean autoScaled = autoscaleCluster(this.isAutoscalingEnabled, minSize, maxSize);
566-
if (autoScaled && serviceOfferingScalingNeeded) {
567-
scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering);
564+
if (autoScaled && isNodeOfferingScalingNeeded) {
565+
scaleKubernetesClusterOffering(nodeType, scalingServiceOffering, true, updateClusterOffering);
568566
}
569567
stateTransitTo(kubernetesCluster.getId(), KubernetesCluster.Event.OperationSucceeded);
570568
return autoScaled;
571569
}
572-
final boolean clusterSizeScalingNeeded = isWorkerNodeOrAllNodes && clusterSize != null && clusterSize != originalClusterSize;
573-
if (serviceOfferingScalingNeeded && clusterSizeScalingNeeded) {
570+
final boolean clusterSizeScalingNeeded = isWorkerNode && clusterSize != null && clusterSize != originalClusterSize;
571+
if (isNodeOfferingScalingNeeded && clusterSizeScalingNeeded) {
574572
if (newVMRequired > 0) {
575-
scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering);
573+
scaleKubernetesClusterOffering(nodeType, scalingServiceOffering, true, updateClusterOffering);
576574
scaleKubernetesClusterSize(nodeType);
577575
} else {
578576
scaleKubernetesClusterSize(nodeType);
579-
scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering);
577+
scaleKubernetesClusterOffering(nodeType, scalingServiceOffering, true, updateClusterOffering);
580578
}
581-
} else if (serviceOfferingScalingNeeded) {
582-
scaleKubernetesClusterOffering(nodeType, serviceOffering, updateNodeOffering, updateClusterOffering);
579+
} else if (isNodeOfferingScalingNeeded) {
580+
scaleKubernetesClusterOffering(nodeType, scalingServiceOffering, true, updateClusterOffering);
583581
} else if (clusterSizeScalingNeeded) {
584582
scaleKubernetesClusterSize(nodeType);
585583
}
@@ -589,6 +587,20 @@ public boolean scaleCluster() throws CloudRuntimeException {
589587
return true;
590588
}
591589

590+
private ServiceOffering getExistingServiceOfferingForNodeType(KubernetesClusterNodeType nodeType, KubernetesCluster kubernetesCluster) {
591+
Long existingOfferingId = getExistingOfferingIdForNodeType(nodeType, kubernetesCluster);
592+
if (existingOfferingId == null) {
593+
logAndThrow(Level.ERROR, String.format("The Kubernetes cluster %s does not have a service offering set for node type %s",
594+
kubernetesCluster.getName(), nodeType.name()));
595+
}
596+
ServiceOffering existingOffering = serviceOfferingDao.findById(existingOfferingId);
597+
if (existingOffering == null) {
598+
logAndThrow(Level.ERROR, String.format("Cannot find service offering with ID %s set on the Kubernetes cluster %s node type %s",
599+
existingOfferingId, kubernetesCluster.getName(), nodeType.name()));
600+
}
601+
return existingOffering;
602+
}
603+
592604
protected void compareExistingToScalingServiceOfferingForNodeType(Long existingOfferingId, Long scalingOfferingId,
593605
KubernetesClusterNodeType nodeType) {
594606
if (existingOfferingId.equals(scalingOfferingId)) {
@@ -599,65 +611,21 @@ protected void compareExistingToScalingServiceOfferingForNodeType(Long existingO
599611
}
600612
}
601613

602-
protected void checkScalingKubernetesClusterOfferingsPerNodeType(Map<String, ServiceOffering> scalingNodeTypeMap,
603-
KubernetesCluster kubernetesCluster) {
604-
for (KubernetesClusterNodeType nodeType : Arrays.asList(WORKER, CONTROL, ETCD)) {
605-
Long existingNodeTypeOfferingId = getKubernetesClusterNodeTypeOfferingId(kubernetesCluster, nodeType);
606-
if (existingNodeTypeOfferingId == null) {
607-
existingNodeTypeOfferingId = kubernetesCluster.getServiceOfferingId();
608-
}
609-
if (ETCD == nodeType && (kubernetesCluster.getEtcdNodeCount() == null || kubernetesCluster.getEtcdNodeCount() == 0)) {
610-
continue;
611-
}
612-
String scalingMapKey = scalingNodeTypeMap.containsKey(nodeType.name()) ? nodeType.name() : DEFAULT.name();
613-
ServiceOffering scalingServiceOffering = scalingNodeTypeMap.get(scalingMapKey);
614-
if (scalingServiceOffering == null) {
615-
String err = String.format("Cannot find a service offering to scale the nodes of type %s", nodeType.name());
616-
logger.error(err);
617-
throw new CloudRuntimeException(err);
618-
}
619-
compareExistingToScalingServiceOfferingForNodeType(existingNodeTypeOfferingId,
620-
scalingServiceOffering.getId(), nodeType);
621-
}
622-
}
623-
624-
private Long getKubernetesClusterNodeTypeOfferingId(KubernetesCluster kubernetesCluster, KubernetesClusterNodeType nodeType) {
625-
if (nodeType == WORKER) {
626-
return kubernetesCluster.getWorkerNodeServiceOfferingId();
627-
} else if (nodeType == ETCD) {
628-
return kubernetesCluster.getEtcdNodeServiceOfferingId();
629-
} else if (nodeType == CONTROL) {
630-
return kubernetesCluster.getControlNodeServiceOfferingId();
631-
}
632-
return kubernetesCluster.getServiceOfferingId();
633-
}
634-
635-
protected boolean isServiceOfferingScalingNeededForNodeType(KubernetesClusterNodeType nodeType,
636-
Map<String, ServiceOffering> map, KubernetesCluster kubernetesCluster) {
637-
// DEFAULT node type means only the global service offering has been set for the Kubernetes cluster
638-
Long existingOfferingId = map.containsKey(DEFAULT.name()) ?
639-
kubernetesCluster.getServiceOfferingId() :
640-
getExistingOfferingIdForNodeType(nodeType, kubernetesCluster);
641-
if (existingOfferingId == null) {
642-
logAndThrow(Level.ERROR, String.format("The Kubernetes cluster %s does not have a global service offering set", kubernetesCluster.getName()));
643-
}
644-
ServiceOffering existingOffering = serviceOfferingDao.findById(existingOfferingId);
645-
if (existingOffering == null) {
646-
logAndThrow(Level.ERROR, String.format("Cannot find the global service offering with ID %s set on the Kubernetes cluster %s", existingOfferingId, kubernetesCluster.getName()));
647-
}
648-
ServiceOffering newOffering = map.containsKey(DEFAULT.name()) ? map.get(DEFAULT.name()) : map.get(nodeType.name());
649-
return newOffering != null && newOffering.getId() != existingOffering.getId();
614+
protected boolean isServiceOfferingScalingNeededForNodeType(ServiceOffering existingServiceOffering,
615+
ServiceOffering scalingServiceOffering) {
616+
return scalingServiceOffering != null && existingServiceOffering != null &&
617+
scalingServiceOffering.getId() != existingServiceOffering.getId();
650618
}
651619

652620
protected Long getExistingOfferingIdForNodeType(KubernetesClusterNodeType nodeType, KubernetesCluster kubernetesCluster) {
653621
List<KubernetesClusterVmMapVO> clusterVms = kubernetesClusterVmMapDao.listByClusterIdAndVmType(kubernetesCluster.getId(), nodeType);
654622
if (CollectionUtils.isEmpty(clusterVms)) {
655-
return null;
623+
return kubernetesCluster.getServiceOfferingId();
656624
}
657625
KubernetesClusterVmMapVO clusterVm = clusterVms.get(0);
658626
UserVmVO clusterUserVm = userVmDao.findById(clusterVm.getVmId());
659627
if (clusterUserVm == null) {
660-
return null;
628+
return kubernetesCluster.getServiceOfferingId();
661629
}
662630
return clusterUserVm.getServiceOfferingId();
663631
}

plugins/integrations/kubernetes-service/src/test/java/com/cloud/kubernetes/cluster/actionworkers/KubernetesClusterScaleWorkerTest.java

Lines changed: 0 additions & 121 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import com.cloud.service.ServiceOfferingVO;
2525
import com.cloud.service.dao.ServiceOfferingDao;
2626
import com.cloud.utils.Pair;
27-
import com.cloud.utils.exception.CloudRuntimeException;
2827
import com.cloud.vm.UserVmVO;
2928
import com.cloud.vm.dao.UserVmDao;
3029
import org.junit.Assert;
@@ -35,14 +34,10 @@
3534
import org.mockito.Mockito;
3635
import org.mockito.junit.MockitoJUnitRunner;
3736

38-
import java.util.HashMap;
3937
import java.util.List;
40-
import java.util.Map;
4138

4239
import static com.cloud.kubernetes.cluster.KubernetesServiceHelper.KubernetesClusterNodeType.DEFAULT;
4340
import static com.cloud.kubernetes.cluster.KubernetesServiceHelper.KubernetesClusterNodeType.CONTROL;
44-
import static com.cloud.kubernetes.cluster.KubernetesServiceHelper.KubernetesClusterNodeType.ETCD;
45-
import static com.cloud.kubernetes.cluster.KubernetesServiceHelper.KubernetesClusterNodeType.WORKER;
4641

4742
@RunWith(MockitoJUnitRunner.class)
4843
public class KubernetesClusterScaleWorkerTest {
@@ -130,120 +125,4 @@ public void testCalculateNewClusterCountAndCapacityNodeTypeScaleControlOffering(
130125
Assert.assertEquals(expectedCores, newClusterCapacity.first().longValue());
131126
Assert.assertEquals(expectedMemory, newClusterCapacity.second().longValue());
132127
}
133-
134-
private KubernetesCluster createExistingKubernetesClusterForTesting(Long defaultOfferingId, Long workerOfferingId,
135-
Long controlOfferingId) {
136-
KubernetesCluster kubernetesClusterMock = Mockito.mock(KubernetesCluster.class);
137-
Mockito.when(kubernetesClusterMock.getServiceOfferingId()).thenReturn(defaultOfferingId);
138-
Mockito.when(kubernetesClusterMock.getWorkerNodeServiceOfferingId()).thenReturn(workerOfferingId);
139-
Mockito.when(kubernetesClusterMock.getControlNodeServiceOfferingId()).thenReturn(controlOfferingId);
140-
return kubernetesClusterMock;
141-
}
142-
143-
private ServiceOffering createServiceOfferingForTesting(Long offeringId) {
144-
ServiceOffering cksOffering = Mockito.mock(ServiceOffering.class);
145-
Mockito.when(cksOffering.getId()).thenReturn(offeringId);
146-
return cksOffering;
147-
}
148-
149-
@Test
150-
public void testCheckScalingKubernetesClusterOfferingsPerNodeTypeToNewDefaultOffering() {
151-
Long cksOfferingId = 20L;
152-
153-
ServiceOffering newCksOffering = createServiceOfferingForTesting(21L);
154-
155-
KubernetesCluster kubernetesClusterMock = createExistingKubernetesClusterForTesting(cksOfferingId,
156-
null, null);
157-
158-
Map<String, ServiceOffering> scalingMap = new HashMap<>();
159-
scalingMap.put(DEFAULT.name(), newCksOffering);
160-
161-
worker.checkScalingKubernetesClusterOfferingsPerNodeType(scalingMap, kubernetesClusterMock);
162-
}
163-
164-
@Test(expected = CloudRuntimeException.class)
165-
public void testCheckScalingKubernetesClusterOfferingsPerNodeTypeToSameDefaultOffering() {
166-
Long cksOfferingId = 20L;
167-
ServiceOffering cksOffering = createServiceOfferingForTesting(cksOfferingId);
168-
169-
KubernetesCluster kubernetesClusterMock = createExistingKubernetesClusterForTesting(cksOfferingId,
170-
null, null);
171-
172-
Map<String, ServiceOffering> scalingMap = new HashMap<>();
173-
scalingMap.put(DEFAULT.name(), cksOffering);
174-
175-
worker.checkScalingKubernetesClusterOfferingsPerNodeType(scalingMap, kubernetesClusterMock);
176-
}
177-
178-
@Test
179-
public void testCheckScalingKubernetesClusterOfferingsPerNodeTypeWorkerAndControlNodes() {
180-
Long cksOfferingId = 20L;
181-
Long workerOfferingId = 21L;
182-
Long controlOfferingId = 22L;
183-
ServiceOffering scalingWorkerOffering = createServiceOfferingForTesting(30L);
184-
ServiceOffering scalingControlOffering = createServiceOfferingForTesting(31L);
185-
186-
KubernetesCluster kubernetesClusterMock = createExistingKubernetesClusterForTesting(cksOfferingId,
187-
workerOfferingId, controlOfferingId);
188-
189-
Map<String, ServiceOffering> scalingMap = new HashMap<>();
190-
scalingMap.put(WORKER.name(), scalingWorkerOffering);
191-
scalingMap.put(CONTROL.name(), scalingControlOffering);
192-
193-
worker.checkScalingKubernetesClusterOfferingsPerNodeType(scalingMap, kubernetesClusterMock);
194-
}
195-
196-
@Test(expected = CloudRuntimeException.class)
197-
public void testCheckScalingKubernetesClusterOfferingsPerNodeTypeWorkerAndControlNodesUseSameOffering() {
198-
Long cksOfferingId = 20L;
199-
Long workerOfferingId = 21L;
200-
Long controlOfferingId = 22L;
201-
ServiceOffering scalingWorkerOffering = createServiceOfferingForTesting(30L);
202-
ServiceOffering controlOffering = createServiceOfferingForTesting(controlOfferingId);
203-
204-
KubernetesCluster kubernetesClusterMock = createExistingKubernetesClusterForTesting(cksOfferingId,
205-
workerOfferingId, controlOfferingId);
206-
207-
Map<String, ServiceOffering> scalingMap = new HashMap<>();
208-
scalingMap.put(WORKER.name(), scalingWorkerOffering);
209-
scalingMap.put(CONTROL.name(), controlOffering);
210-
211-
worker.checkScalingKubernetesClusterOfferingsPerNodeType(scalingMap, kubernetesClusterMock);
212-
}
213-
214-
@Test
215-
public void testCheckScalingKubernetesClusterOfferingsPerNodeTypeWorkerAndControlNodesByDefaultOffering() {
216-
Long cksOfferingId = 20L;
217-
Long workerOfferingId = 21L;
218-
Long controlOfferingId = 22L;
219-
ServiceOffering scalingOffering = createServiceOfferingForTesting(30L);
220-
221-
KubernetesCluster kubernetesClusterMock = createExistingKubernetesClusterForTesting(cksOfferingId,
222-
workerOfferingId, controlOfferingId);
223-
224-
Map<String, ServiceOffering> scalingMap = new HashMap<>();
225-
scalingMap.put(DEFAULT.name(), scalingOffering);
226-
227-
worker.checkScalingKubernetesClusterOfferingsPerNodeType(scalingMap, kubernetesClusterMock);
228-
}
229-
230-
@Test
231-
public void testCheckScalingKubernetesClusterOfferingsPerNodeTypeWorkerEtcdAndControlNodes() {
232-
Long cksOfferingId = 20L;
233-
Long workerOfferingId = 21L;
234-
Long controlOfferingId = 22L;
235-
Long etcdOfferingId = 23L;
236-
ServiceOffering scalingOffering = createServiceOfferingForTesting(30L);
237-
238-
KubernetesCluster kubernetesClusterMock = createExistingKubernetesClusterForTesting(cksOfferingId,
239-
workerOfferingId, controlOfferingId);
240-
Mockito.when(kubernetesClusterMock.getEtcdNodeServiceOfferingId()).thenReturn(etcdOfferingId);
241-
242-
Map<String, ServiceOffering> scalingMap = new HashMap<>();
243-
scalingMap.put(WORKER.name(), scalingOffering);
244-
scalingMap.put(CONTROL.name(), scalingOffering);
245-
scalingMap.put(ETCD.name(), scalingOffering);
246-
247-
worker.checkScalingKubernetesClusterOfferingsPerNodeType(scalingMap, kubernetesClusterMock);
248-
}
249128
}

0 commit comments

Comments
 (0)