Skip to content

Commit 3d308f8

Browse files
Adir111RonShvarz
authored andcommitted
Incorrect resources allocation (#2224)
* refactored names and added jsdoc * fixed bug where workers stats under node were not calculated correctly * fixed bug - bad calculation, as long free is negative it causes reserved to always be negative. * added new method to get default value for resources, via limitrange * added stub data for test * fixed bug - should apply workerResourceRequests only if settings.applyResources is true, plus fixed merged method to not include fields that are empty or null, since null will fail k8s job and empty will just be odd in the job spec, better of without it. * refactor a bit, and changed behavior to take max out of requests and limits if useResouceLimits is on (some pods have no limit, which can cause limits to be lower than requests, which is impossible). * jsdoc was added and small refactor * now taking worker resources in account when calculating resources per algortihm * extracted default container resources out of the reconcile, and now in data preparation * fix all graph calculations errors * Now takes maximum between request and limit in case useResourceLimits flag is true, in case many pods have no limit and causes limit to be lower than request (which is impossible) * Passing default resources, so when calculating a worker without any special definition of resource, it will take its default resource instead to be more concise in calculation of requirements * Now sending actual request to other always, while keeping the requests for calculations dependant on the requests or limits & requests (for missing limit) * fixed free&other calculation after changes in task-executor * . * added unit tests to test the method after it changes. * update k8s client version
1 parent 45edc71 commit 3d308f8

File tree

16 files changed

+697
-166
lines changed

16 files changed

+697
-166
lines changed

core/api-server/api/graphql/queries/statistics-querier.js

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -76,26 +76,32 @@ class NodesStatistics {
7676
return results;
7777
}
7878

79-
_buildAlgorithmResult(node, algorithms, metric, resourcePressure) {
79+
_buildAlgorithmResult(node, algorithms, metric, resourcePressure, defaultWorker) {
8080
let otherAmount = 0;
81+
let algorithmTotalSize = 0;
8182
const algorithmsData = [];
8283

8384
const getMetric = (mtr, algorithm) => {
84-
const rawMetric = algorithm[mtr] ? algorithm[mtr] : 0;
85+
const algoRawMetric = algorithm[mtr] ? algorithm[mtr] : 0;
86+
const workerRawMetric = algorithm?.workerCustomResources?.requests[mtr] || defaultWorker || 0;
8587
if (mtr === 'mem') {
86-
return parse.getMemoryInMi(rawMetric);
88+
return parse.getMemoryInMi(algoRawMetric) + (workerRawMetric ? parse.getMemoryInMi(workerRawMetric) : 0);
8789
}
88-
return rawMetric;
90+
if (mtr === 'cpu') {
91+
return algoRawMetric + (workerRawMetric ? parse.getCpuInCore(workerRawMetric) : 0);
92+
}
93+
return algoRawMetric; // for gpu, worker doesnt need gpu
8994
};
9095
node.workers.stats.forEach(algorithm => {
9196
const requestedAlgorithm = algorithms.find(alg => alg.name === algorithm.algorithmName);
92-
97+
const size = +(algorithm.count * getMetric(metric, requestedAlgorithm)).toFixed(1);
9398
if (requestedAlgorithm) {
9499
algorithmsData.push({
95100
name: algorithm.algorithmName,
96101
amount: algorithm.count,
97-
size: +(algorithm.count * getMetric(metric, requestedAlgorithm)).toFixed(1),
102+
size
98103
});
104+
algorithmTotalSize += size;
99105
}
100106
else {
101107
otherAmount += algorithm.count;
@@ -104,10 +110,9 @@ class NodesStatistics {
104110
algorithmsData.push({
105111
name: 'other',
106112
amount: otherAmount,
107-
// size: +(node.total[metric] * resourcePressure -(nodeFree + (algorithmsData.reduce((sum, alg) => sum + alg.size, 0)))).toFixed(1),
108-
size: +(node.other[metric].toFixed(1)),
113+
size: +(node.other[metric].toFixed(1))
109114
});
110-
const free = node.total[metric] * resourcePressure - node.requests[metric];
115+
const free = node.total[metric] * resourcePressure - node.other[metric] - algorithmTotalSize;
111116
algorithmsData.push({
112117
name: 'free',
113118
amount: -1,
@@ -116,7 +121,7 @@ class NodesStatistics {
116121
algorithmsData.push({
117122
name: 'reserved',
118123
amount: otherAmount,
119-
size: +(node.total[metric] * (1 - resourcePressure) + (free < 0 ? free : 0)).toFixed(1),
124+
size: Math.max(+(node.total[metric] * (1 - resourcePressure) + (free < 0 ? free : 0)).toFixed(1), 0)
120125
});
121126
algorithmsData.push({
122127
name: 'total',
@@ -135,7 +140,7 @@ class NodesStatistics {
135140
'reserved'
136141
];
137142
const results = taskExecutor.length ? taskExecutor[0].nodes.map(node => {
138-
const algorithmsData = this._buildAlgorithmResult(node, algorithms, metric, taskExecutor[0].resourcePressure[metric]);
143+
const algorithmsData = this._buildAlgorithmResult(node, algorithms, metric, taskExecutor[0].resourcePressure[metric], taskExecutor[0].defaultWorkerResources[metric]);
139144
return {
140145
name: node.name,
141146
algorithmsData

core/task-executor/lib/executor.js

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -64,14 +64,16 @@ class Executor {
6464

6565
const { pods } = resources;
6666
const normResources = normalizeResources(resources);
67+
const containerDefaults = await kubernetes.getContainerDefaultResources();
6768
const data = {
6869
versions,
6970
normResources,
7071
options,
7172
registry,
7273
clusterOptions,
7374
pods,
74-
workerResources: options.resources.worker
75+
workerResources: options.resources.worker,
76+
containerDefaults
7577
};
7678

7779
await Promise.all([
@@ -86,7 +88,7 @@ class Executor {
8688
}
8789
}
8890

89-
async _algorithmsHandle({ versions, normResources, registry, options, clusterOptions, pods, workerResources }) {
91+
async _algorithmsHandle(data) {
9092
const [algorithmTemplates, algorithmRequests, workers, jobs] = await Promise.all([
9193
etcd.getAlgorithmTemplate(),
9294
etcd.getAlgorithmRequests({}),
@@ -95,7 +97,7 @@ class Executor {
9597
]);
9698

9799
const reconcilerResults = await reconciler.reconcile({
98-
algorithmTemplates, algorithmRequests, workers, jobs, pods, versions, normResources, registry, options, clusterOptions, workerResources
100+
algorithmTemplates, algorithmRequests, workers, jobs, ...data
99101
});
100102
Object.entries(reconcilerResults).forEach(([algorithmName, res]) => {
101103
this[metricsNames.TASK_EXECUTOR_JOB_REQUESTS].set({ value: res.required || 0, labelValues: { algorithmName } });

core/task-executor/lib/helpers/kubernetes.js

Lines changed: 55 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ let log;
1313
class KubernetesApi {
1414
async init(options = {}) {
1515
log = Logger.GetLogFromContainer();
16-
this._crdMissingWarnLogged = false;
16+
this._warnWasLogged = { crdMissing: false, noLimitRange: false, moreThanOneLimit: false }; // To avoid spamming the logs
1717
this._client = new KubernetesClient();
1818
await this._client.init(options.kubernetes);
1919
this._isNamespaced = options.kubernetes.isNamespaced;
@@ -208,13 +208,13 @@ class KubernetesApi {
208208
const exists = crdList?.body?.items?.some(cr => cr.metadata.name === kaiValues.KUBERNETES.QUEUES_CRD_NAME);
209209

210210
if (!exists) {
211-
if (!this._crdMissingWarnLogged) {
212-
log.info(`Kai Queues CRD (${kaiValues.KUBERNETES.QUEUES_CRD_NAME}) not found. Assuming Kai is not installed.`, { component });
213-
this._crdMissingWarnLogged = true;
211+
if (!this._warnWasLogged.crdMissing) {
212+
log.warning(`Kai Queues CRD (${kaiValues.KUBERNETES.QUEUES_CRD_NAME}) not found. Assuming Kai is not installed.`, { component });
213+
this._warnWasLogged.crdMissing = true;
214214
}
215215
return [];
216216
}
217-
this._crdMissingWarnLogged = false;
217+
this._warnWasLogged.crdMissing = false;
218218

219219
const crd = crdList.body.items.find(cr => cr.metadata.name === kaiValues.KUBERNETES.QUEUES_CRD_NAME);
220220
const version = crd?.spec?.versions?.find(v => v.served)?.name;
@@ -232,6 +232,56 @@ class KubernetesApi {
232232
return [];
233233
}
234234
}
235+
236+
/**
237+
* Get default CPU and memory requests/limits for containers
238+
* from LimitRange resources in the namespace.
239+
*/
240+
async getContainerDefaultResources() {
241+
try {
242+
const res = await this._client.limitRanges.all();
243+
const items = res.body?.items || [];
244+
245+
const containerLimits = items
246+
.flatMap(item => item.spec.limits.map(limit => ({
247+
...limit,
248+
source: item.metadata?.name,
249+
})))
250+
.filter(limit => limit.type === 'Container');
251+
252+
if (containerLimits.length === 0) {
253+
if (!this._warnWasLogged.noLimitRange) {
254+
log.warning('No LimitRange with type=Container found.', { component });
255+
this._warnWasLogged.noLimitRange = true;
256+
}
257+
return {};
258+
}
259+
this._warnWasLogged.noLimitRange = false; // Reset warning flag if situation is resolved
260+
261+
if (containerLimits.length > 1 && !this._warnWasLogged.moreThanOneLimit) {
262+
log.warning(`Multiple LimitRanges with type=Container found: ${containerLimits.map(l => l.source)}. Taking the first one.`, { component });
263+
this._warnWasLogged.moreThanOneLimit = true;
264+
}
265+
else this._warnWasLogged.moreThanOneLimit = false; // Reset warning flag if situation is resolved
266+
267+
const selected = containerLimits[0];
268+
269+
return {
270+
cpu: {
271+
defaultRequest: selected.defaultRequest?.cpu,
272+
defaultLimits: selected.default?.cpu
273+
},
274+
memory: {
275+
defaultRequest: selected.defaultRequest?.memory,
276+
defaultLimits: selected.default?.memory
277+
}
278+
};
279+
}
280+
catch (error) {
281+
log.error(`Failed to fetch container default resources ${error.message}`, { component }, error);
282+
return {};
283+
}
284+
}
235285
}
236286

237287
module.exports = new KubernetesApi();

core/task-executor/lib/jobs/jobCreator.js

Lines changed: 15 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -327,13 +327,19 @@ const applyAnnotations = (spec, keyVal) => {
327327
};
328328

329329
const mergeResourceRequest = (defaultResource, customResource) => {
330-
const mergedRequest = { requests: {}, limits: {} };
330+
const mergedRequest = {};
331331

332332
for (const key of ['requests', 'limits']) {
333-
mergedRequest[key].memory = customResource[key]?.memory || defaultResource[key]?.memory || null;
334-
mergedRequest[key].cpu = customResource[key]?.cpu || defaultResource[key]?.cpu || null;
333+
const cpu = customResource?.[key]?.cpu || defaultResource?.[key]?.cpu;
334+
const memory = customResource?.[key]?.memory || defaultResource?.[key]?.memory;
335+
336+
if (cpu || memory) {
337+
mergedRequest[key] = {};
338+
if (cpu) mergedRequest[key].cpu = cpu;
339+
if (memory) mergedRequest[key].memory = memory;
340+
}
335341
}
336-
return mergedRequest;
342+
return Object.keys(mergedRequest).length > 0 ? mergedRequest : undefined;
337343
};
338344

339345
const _applyDefaultResourcesSideCar = (container) => {
@@ -453,12 +459,13 @@ const createJobSpec = ({ kind, algorithmName, resourceRequests, workerImage, alg
453459
spec = applyEnvToContainer(spec, CONTAINERS.WORKER, { ALGORITHM_VERSION: algorithmVersion });
454460
spec = applyEnvToContainer(spec, CONTAINERS.WORKER, { WORKER_IMAGE: workerImage });
455461
spec = applyAlgorithmResourceRequests(spec, resourceRequests, node);
456-
if (settings.applyResources || workerCustomResources) {
457-
if (workerCustomResources) {
458-
workerResourceRequests = mergeResourceRequest(workerResourceRequests, workerCustomResources);
459-
}
462+
if (settings.applyResources) {
463+
workerResourceRequests = mergeResourceRequest(workerResourceRequests, workerCustomResources);
460464
spec = applyWorkerResourceRequests(spec, workerResourceRequests);
461465
}
466+
else if (workerCustomResources) {
467+
spec = applyWorkerResourceRequests(spec, workerCustomResources);
468+
}
462469
spec = applyNodeSelector(spec, nodeSelector);
463470
spec = applyHotWorker(spec, hotWorker);
464471
spec = applyEntryPoint(spec, entryPoint);

core/task-executor/lib/reconcile/managers/jobs.js

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -52,16 +52,17 @@ class JobsHandler {
5252
* @param {Object} clusterOptions - Cluster-wide configuration.
5353
* @param {Object} workerResources - Default worker resource requests.
5454
* @param {Object} options - Confguration containing additional job creation options.
55+
* @param {Object} containerDefaults - Default container resources from Kubernetes.
5556
* @param {Object} reconcileResult - Scheduling reconcile stats by algorithm.
5657
*/
57-
async schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, requests, registry, clusterOptions, workerResources, options, reconcileResult) {
58+
async schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, requests, registry, clusterOptions, workerResources, options, containerDefaults, reconcileResult) {
5859
// 1. Assign requests to workers or prepare job creation details
5960
const { createDetails, toResume, scheduledRequests } = this._processAllRequests(allAllocatedJobs, algorithmTemplates,
6061
versions, requests, registry, clusterOptions, workerResources, reconcileResult);
6162

6263
// 2. Match jobs to resources, and skip those that doesn't have the required resources.
6364
const extraResources = await this._getExtraResources();
64-
const { jobsToRequest, skipped } = matchJobsToResources(createDetails, normResources, scheduledRequests, extraResources);
65+
const { jobsToRequest, skipped } = matchJobsToResources(createDetails, normResources, scheduledRequests, extraResources, containerDefaults);
6566

6667
// 3. Find workers to stop if resources insufficient
6768
const stopDetails = this._findWorkersToStop(skipped, allAllocatedJobs, algorithmTemplates);
@@ -183,14 +184,14 @@ class JobsHandler {
183184

184185
// No existing worker found — prepare job creation request
185186
const algorithmTemplate = algorithmTemplates[algorithmName];
186-
const { workerCustomResources } = algorithmTemplates[algorithmName];
187187
const algorithmImage = setAlgorithmImage(algorithmTemplate, versions, registry);
188188
const workerImage = setWorkerImage(algorithmTemplate, versions, registry);
189189
const resourceRequests = createContainerResource(algorithmTemplate);
190190
const workerResourceRequests = createContainerResource(workerResources);
191191

192-
const { kind, workerEnv, algorithmEnv, labels, annotations, version: algorithmVersion, nodeSelector, stateType: algorithmStateType = 'batch',
193-
entryPoint, options: algorithmOptions, reservedMemory, mounts, env, sideCars, volumes, volumeMounts, kaiObject } = algorithmTemplate;
192+
const { kind, workerEnv, algorithmEnv, labels, annotations, version: algorithmVersion, nodeSelector,
193+
stateType: algorithmStateType = 'batch', entryPoint, options: algorithmOptions, reservedMemory,
194+
mounts, env, sideCars, volumes, volumeMounts, workerCustomResources, kaiObject } = algorithmTemplate;
194195

195196
createDetails.push({
196197
numberOfNewJobs: 1,
@@ -451,7 +452,7 @@ class JobsHandler {
451452
*
452453
* Logic:
453454
* 1. Add skipped algorithms to the `unScheduledAlgorithms` map if not already present.
454-
* 2. Check if any of these algorithms have been created, requested, or removed from templates.
455+
* 2. Check if any of these algorithms have been created, not requested anymore, or removed from templates.
455456
* 3. Remove such algorithms from the map and move them to `ignoredUnScheduledAlgorithms`.
456457
*
457458
* @private

0 commit comments

Comments
 (0)