Skip to content

Commit f6c16d9

Browse files
committed
refactor capacity calculation
Added capacity.calculate.workers config to control the number threads that can be used to calculate capacities. Signed-off-by: Abhishek Kumar <[email protected]>
1 parent 16a541c commit f6c16d9

File tree

9 files changed

+94
-121
lines changed

9 files changed

+94
-121
lines changed

engine/components-api/src/main/java/com/cloud/capacity/CapacityManager.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,10 @@ public interface CapacityManager {
127127
true,
128128
ConfigKey.Scope.Zone);
129129

130+
ConfigKey<Integer> CapacityCalculateWorkers = new ConfigKey<>(ConfigKey.CATEGORY_ADVANCED, Integer.class,
131+
"capacity.calculate.workers", "1",
132+
"Number of worker threads to be used for capacities calculation", true);
133+
130134
public boolean releaseVmCapacity(VirtualMachine vm, boolean moveFromReserved, boolean moveToReservered, Long hostId);
131135

132136
void allocateVmCapacity(VirtualMachine vm, boolean fromLastHost);

engine/components-api/src/main/java/com/cloud/storage/StorageManager.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,8 +214,8 @@ public interface StorageManager extends StorageService {
214214
true, ConfigKey.Scope.Zone);
215215

216216
ConfigKey<Integer> PrimaryStorageHostConnectWorkers = new ConfigKey<>("Storage", Integer.class,
217-
"primary.storage.host.connect.workers", "3",
218-
"Number of worker threads to be used to connect primary a storage to hosts", false);
217+
"primary.storage.host.connect.workers", "1",
218+
"Number of worker threads to be used to connect hosts to a primary storage", true);
219219

220220
/**
221221
* should we execute in sequence not involving any storages?

engine/schema/src/main/java/com/cloud/host/dao/HostDao.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,8 @@ public interface HostDao extends GenericDao<HostVO, Long>, StateDao<Status, Stat
102102

103103
List<Long> listIdsForUpRouting(Long zoneId, Long podId, Long clusterId);
104104

105+
List<Long> listIdsByType(Type type);
106+
105107
List<Long> listIdsForUpEnabledByZoneAndHypervisor(Long zoneId, HypervisorType hypervisorType);
106108

107109
List<HostVO> findByClusterIdAndEncryptionSupport(Long clusterId);

engine/schema/src/main/java/com/cloud/host/dao/HostDaoImpl.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1245,6 +1245,11 @@ public List<Long> listIdsForUpRouting(Long zoneId, Long podId, Long clusterId) {
12451245
return listIdsBy(Type.Routing, Status.Up, null, null, zoneId, podId, clusterId);
12461246
}
12471247

1248+
@Override
1249+
public List<Long> listIdsByType(Type type) {
1250+
return listIdsBy(type, null, null, null, null, null, null);
1251+
}
1252+
12481253
@Override
12491254
public List<Long> listIdsForUpEnabledByZoneAndHypervisor(Long zoneId, HypervisorType hypervisorType) {
12501255
return listIdsBy(null, Status.Up, ResourceState.Enabled, hypervisorType, zoneId, null, null);

engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDao.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -142,4 +142,6 @@ Pair<List<Long>, Integer> searchForIdsAndCount(Long storagePoolId, String storag
142142
String keyword, Filter searchFilter);
143143

144144
List<StoragePoolVO> listByIds(List<Long> ids);
145+
146+
List<Long> listAllIds();
145147
}

engine/schema/src/main/java/org/apache/cloudstack/storage/datastore/db/PrimaryDataStoreDaoImpl.java

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,6 @@
2828
import javax.inject.Inject;
2929
import javax.naming.ConfigurationException;
3030

31-
import com.cloud.utils.Pair;
32-
import com.cloud.utils.db.Filter;
3331
import org.apache.commons.collections.CollectionUtils;
3432

3533
import com.cloud.host.Status;
@@ -41,7 +39,9 @@
4139
import com.cloud.storage.StoragePoolTagVO;
4240
import com.cloud.storage.dao.StoragePoolHostDao;
4341
import com.cloud.storage.dao.StoragePoolTagsDao;
42+
import com.cloud.utils.Pair;
4443
import com.cloud.utils.db.DB;
44+
import com.cloud.utils.db.Filter;
4545
import com.cloud.utils.db.GenericDaoBase;
4646
import com.cloud.utils.db.GenericSearchBuilder;
4747
import com.cloud.utils.db.JoinBuilder;
@@ -714,4 +714,12 @@ private SearchCriteria<StoragePoolVO> createStoragePoolSearchCriteria(Long stora
714714
sc.setParameters("parent", 0);
715715
return sc;
716716
}
717+
718+
@Override
719+
public List<Long> listAllIds() {
720+
GenericSearchBuilder<StoragePoolVO, Long> sb = createSearchBuilder(Long.class);
721+
sb.selectFields(sb.entity().getId());
722+
sb.done();
723+
return customSearch(sb.create(), null);
724+
}
717725
}

server/src/main/java/com/cloud/alert/AlertManagerImpl.java

Lines changed: 66 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -26,8 +26,11 @@
2626
import java.util.Map;
2727
import java.util.Set;
2828
import java.util.Timer;
29+
import java.util.concurrent.ConcurrentHashMap;
30+
import java.util.concurrent.ExecutionException;
2931
import java.util.concurrent.ExecutorService;
3032
import java.util.concurrent.Executors;
33+
import java.util.concurrent.Future;
3134

3235
import javax.inject.Inject;
3336
import javax.mail.MessagingException;
@@ -71,6 +74,7 @@
7174
import com.cloud.event.EventTypes;
7275
import com.cloud.host.Host;
7376
import com.cloud.host.HostVO;
77+
import com.cloud.host.dao.HostDao;
7478
import com.cloud.network.Ipv6Service;
7579
import com.cloud.network.dao.IPAddressDao;
7680
import com.cloud.org.Grouping.AllocationState;
@@ -119,6 +123,8 @@ public class AlertManagerImpl extends ManagerBase implements AlertManager, Confi
119123
protected ConfigDepot _configDepot;
120124
@Inject
121125
Ipv6Service ipv6Service;
126+
@Inject
127+
HostDao hostDao;
122128

123129
private Timer _timer = null;
124130
private long _capacityCheckPeriod = 60L * 60L * 1000L; // One hour by default.
@@ -252,6 +258,64 @@ public void sendAlert(AlertType alertType, long dataCenterId, Long podId, String
252258
}
253259
}
254260

261+
protected void recalculateHostCapacities() {
262+
// Calculate CPU and RAM capacities
263+
List<Long> hostIds = hostDao.listIdsByType(Host.Type.Routing);
264+
if (hostIds.isEmpty()) {
265+
return;
266+
}
267+
ConcurrentHashMap<Long, Future<Void>> futures = new ConcurrentHashMap<>();
268+
ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1,
269+
Math.min(CapacityManager.CapacityCalculateWorkers.value(), hostIds.size())));
270+
for (Long hostId : hostIds) {
271+
futures.put(hostId, executorService.submit(() -> {
272+
final HostVO host = hostDao.findById(hostId);
273+
_capacityMgr.updateCapacityForHost(host);
274+
return null;
275+
}));
276+
}
277+
for (Map.Entry<Long, Future<Void>> entry: futures.entrySet()) {
278+
try {
279+
entry.getValue().get();
280+
} catch (InterruptedException | ExecutionException e) {
281+
logger.error(String.format("Error during capacity calculation for host: %d due to : %s",
282+
entry.getKey(), e.getMessage()), e);
283+
}
284+
}
285+
executorService.shutdown();
286+
}
287+
288+
protected void recalculateStorageCapacities() {
289+
List<Long> storagePoolIds = _storagePoolDao.listAllIds();
290+
if (storagePoolIds.isEmpty()) {
291+
return;
292+
}
293+
ConcurrentHashMap<Long, Future<Void>> futures = new ConcurrentHashMap<>();
294+
ExecutorService executorService = Executors.newFixedThreadPool(Math.max(1,
295+
Math.min(CapacityManager.CapacityCalculateWorkers.value(), storagePoolIds.size())));
296+
for (Long poolId: storagePoolIds) {
297+
futures.put(poolId, executorService.submit(() -> {
298+
final StoragePoolVO pool = _storagePoolDao.findById(poolId);
299+
long disk = _capacityMgr.getAllocatedPoolCapacity(pool, null);
300+
if (pool.isShared()) {
301+
_storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED, disk);
302+
} else {
303+
_storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_LOCAL_STORAGE, disk);
304+
}
305+
return null;
306+
}));
307+
}
308+
for (Map.Entry<Long, Future<Void>> entry: futures.entrySet()) {
309+
try {
310+
entry.getValue().get();
311+
} catch (InterruptedException | ExecutionException e) {
312+
logger.error(String.format("Error during capacity calculation for storage pool: %d due to : %s",
313+
entry.getKey(), e.getMessage()), e);
314+
}
315+
}
316+
executorService.shutdown();
317+
}
318+
255319
@Override
256320
public void recalculateCapacity() {
257321
// FIXME: the right way to do this is to register a listener (see RouterStatsListener, VMSyncListener)
@@ -267,30 +331,14 @@ public void recalculateCapacity() {
267331
logger.debug("recalculating system capacity");
268332
logger.debug("Executing cpu/ram capacity update");
269333
}
270-
271334
// Calculate CPU and RAM capacities
272-
// get all hosts...even if they are not in 'UP' state
273-
List<HostVO> hosts = _resourceMgr.listAllNotInMaintenanceHostsInOneZone(Host.Type.Routing, null);
274-
if (hosts != null) {
275-
for (HostVO host : hosts) {
276-
_capacityMgr.updateCapacityForHost(host);
277-
}
278-
}
335+
recalculateHostCapacities();
279336
if (logger.isDebugEnabled()) {
280337
logger.debug("Done executing cpu/ram capacity update");
281338
logger.debug("Executing storage capacity update");
282339
}
283340
// Calculate storage pool capacity
284-
List<StoragePoolVO> storagePools = _storagePoolDao.listAll();
285-
for (StoragePoolVO pool : storagePools) {
286-
long disk = _capacityMgr.getAllocatedPoolCapacity(pool, null);
287-
if (pool.isShared()) {
288-
_storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_STORAGE_ALLOCATED, disk);
289-
} else {
290-
_storageMgr.createCapacityEntry(pool, Capacity.CAPACITY_TYPE_LOCAL_STORAGE, disk);
291-
}
292-
}
293-
341+
recalculateStorageCapacities();
294342
if (logger.isDebugEnabled()) {
295343
logger.debug("Done executing storage capacity update");
296344
logger.debug("Executing capacity updates for public ip and Vlans");

server/src/main/java/com/cloud/capacity/CapacityManagerImpl.java

Lines changed: 3 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@
4040
import org.apache.cloudstack.storage.datastore.db.StoragePoolVO;
4141
import org.apache.cloudstack.utils.cache.LazyCache;
4242
import org.apache.cloudstack.utils.cache.SingleCache;
43-
import org.apache.cloudstack.utils.executor.QueuedExecutor;
4443
import org.apache.commons.collections.CollectionUtils;
4544
import org.apache.commons.lang3.ObjectUtils;
4645
import org.apache.log4j.Logger;
@@ -150,7 +149,6 @@ public class CapacityManagerImpl extends ManagerBase implements CapacityManager,
150149

151150
private LazyCache<Long, Pair<String, String>> clusterValuesCache;
152151
private SingleCache<Map<Long, ServiceOfferingVO>> serviceOfferingsCache;
153-
private QueuedExecutor<Host> hostCapacityUpdateExecutor;
154152

155153
@Override
156154
public boolean configure(String name, Map<String, Object> params) throws ConfigurationException {
@@ -160,9 +158,6 @@ public boolean configure(String name, Map<String, Object> params) throws Configu
160158
_agentManager.registerForHostEvents(new StorageCapacityListener(_capacityDao, _storageMgr), true, false, false);
161159
_agentManager.registerForHostEvents(new ComputeCapacityListener(_capacityDao, this), true, false, false);
162160

163-
hostCapacityUpdateExecutor = new QueuedExecutor<>("HostCapacityUpdateExecutor", 10, 10,
164-
1, s_logger, this::updateCapacityForHostInternal);
165-
166161
return true;
167162
}
168163

@@ -172,13 +167,11 @@ public boolean start() {
172167
_resourceMgr.registerResourceEvent(ResourceListener.EVENT_CANCEL_MAINTENANCE_AFTER, this);
173168
clusterValuesCache = new LazyCache<>(128, 60, this::getClusterValues);
174169
serviceOfferingsCache = new SingleCache<>(60, this::getServiceOfferingsMap);
175-
hostCapacityUpdateExecutor.startProcessing();
176170
return true;
177171
}
178172

179173
@Override
180174
public boolean stop() {
181-
hostCapacityUpdateExecutor.shutdown();
182175
return true;
183176
}
184177

@@ -646,12 +639,6 @@ public long getAllocatedPoolCapacity(StoragePoolVO pool, VMTemplateVO templateFo
646639
return totalAllocatedSize;
647640
}
648641

649-
@DB
650-
@Override
651-
public void updateCapacityForHost(final Host host) {
652-
hostCapacityUpdateExecutor.queueRequest(host);
653-
}
654-
655642
protected Pair<String, String> getClusterValues(long clusterId) {
656643
Map<String, String> map = _clusterDetailsDao.findDetails(clusterId,
657644
List.of(VmDetailConstants.CPU_OVER_COMMIT_RATIO, VmDetailConstants.CPU_OVER_COMMIT_RATIO));
@@ -694,7 +681,8 @@ protected Map<String, String> getVmDetailsForCapacityCalculation(long vmId) {
694681
}
695682

696683
@DB
697-
protected void updateCapacityForHostInternal(final Host host) {
684+
@Override
685+
public void updateCapacityForHost(final Host host) {
698686
long usedCpuCore = 0;
699687
long reservedCpuCore = 0;
700688
long usedCpu = 0;
@@ -1324,6 +1312,6 @@ public String getConfigComponentName() {
13241312
public ConfigKey<?>[] getConfigKeys() {
13251313
return new ConfigKey<?>[] {CpuOverprovisioningFactor, MemOverprovisioningFactor, StorageCapacityDisableThreshold, StorageOverprovisioningFactor,
13261314
StorageAllocatedCapacityDisableThreshold, StorageOperationsExcludeCluster, ImageStoreNFSVersion, SecondaryStorageCapacityThreshold,
1327-
StorageAllocatedCapacityDisableThresholdForVolumeSize };
1315+
StorageAllocatedCapacityDisableThresholdForVolumeSize, CapacityCalculateWorkers };
13281316
}
13291317
}

utils/src/main/java/org/apache/cloudstack/utils/executor/QueuedExecutor.java

Lines changed: 0 additions & 84 deletions
This file was deleted.

0 commit comments

Comments
 (0)