Skip to content
Merged
Show file tree
Hide file tree
Changes from 19 commits
Commits
Show all changes
20 commits
Select commit Hold shift + click to select a range
a465c86
refactored names and added jsdoc
Adir111 Aug 24, 2025
4f9ae0c
fixed bug where workers stats under node were not calculated correctly
Adir111 Aug 24, 2025
2e21573
fixed bug - bad calculation, as long free is negative it causes reser…
Adir111 Aug 25, 2025
bb03d10
added new method to get default value for resources, via limitrange
Adir111 Aug 25, 2025
ccdbf5b
added stub data for test
Adir111 Aug 25, 2025
995d96c
fixed bug - should apply workerResourceRequests only if settings.appl…
Adir111 Aug 25, 2025
1ce519f
refactor a bit, and changed behavior to take max out of requests and …
Adir111 Aug 25, 2025
d820bad
jsdoc was added and small refactor
Adir111 Aug 25, 2025
811cd0e
now taking worker resources in account when calculating resources per…
Adir111 Aug 25, 2025
6fd74e3
extracted default container resources out of the reconcile, and now i…
Adir111 Aug 25, 2025
424aec0
fix all graph calculations errors
Adir111 Aug 27, 2025
94ab179
Now takes maximum between request and limit in case useResourceLimits…
Adir111 Aug 27, 2025
b37a317
Passing default resources, so when calculating a worker without any s…
Adir111 Aug 27, 2025
66f0978
Now sending actual request to other always, while keeping the request…
Adir111 Aug 27, 2025
41afc43
fixed free&other calculation after changes in task-executor
Adir111 Aug 27, 2025
b4e7cd6
.
Adir111 Aug 27, 2025
71df2e1
Merge branch 'master' into incorrect_resources_allocation
Adir111 Aug 27, 2025
6146e33
Merge branch 'incorrect_resources_allocation' of https://github.com/k…
Adir111 Aug 27, 2025
a0e1cce
added unit tests to test the method after it changes.
Adir111 Aug 27, 2025
9dcc9d0
update k8s client version
Adir111 Aug 31, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 16 additions & 11 deletions core/api-server/api/graphql/queries/statistics-querier.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,26 +76,32 @@ class NodesStatistics {
return results;
}

_buildAlgorithmResult(node, algorithms, metric, resourcePressure) {
_buildAlgorithmResult(node, algorithms, metric, resourcePressure, defaultWorker) {
let otherAmount = 0;
let algorithmTotalSize = 0;
const algorithmsData = [];

const getMetric = (mtr, algorithm) => {
const rawMetric = algorithm[mtr] ? algorithm[mtr] : 0;
const algoRawMetric = algorithm[mtr] ? algorithm[mtr] : 0;
const workerRawMetric = algorithm?.workerCustomResources?.requests[mtr] || defaultWorker || 0;
if (mtr === 'mem') {
return parse.getMemoryInMi(rawMetric);
return parse.getMemoryInMi(algoRawMetric) + (workerRawMetric ? parse.getMemoryInMi(workerRawMetric) : 0);
}
return rawMetric;
if (mtr === 'cpu') {
return algoRawMetric + (workerRawMetric ? parse.getCpuInCore(workerRawMetric) : 0);
}
return algoRawMetric; // for gpu, worker doesnt need gpu
};
node.workers.stats.forEach(algorithm => {
const requestedAlgorithm = algorithms.find(alg => alg.name === algorithm.algorithmName);

const size = +(algorithm.count * getMetric(metric, requestedAlgorithm)).toFixed(1);
if (requestedAlgorithm) {
algorithmsData.push({
name: algorithm.algorithmName,
amount: algorithm.count,
size: +(algorithm.count * getMetric(metric, requestedAlgorithm)).toFixed(1),
size
});
algorithmTotalSize += size;
}
else {
otherAmount += algorithm.count;
Expand All @@ -104,10 +110,9 @@ class NodesStatistics {
algorithmsData.push({
name: 'other',
amount: otherAmount,
// size: +(node.total[metric] * resourcePressure -(nodeFree + (algorithmsData.reduce((sum, alg) => sum + alg.size, 0)))).toFixed(1),
size: +(node.other[metric].toFixed(1)),
size: +(node.other[metric].toFixed(1))
});
const free = node.total[metric] * resourcePressure - node.requests[metric];
const free = node.total[metric] * resourcePressure - node.other[metric] - algorithmTotalSize;
algorithmsData.push({
name: 'free',
amount: -1,
Expand All @@ -116,7 +121,7 @@ class NodesStatistics {
algorithmsData.push({
name: 'reserved',
amount: otherAmount,
size: +(node.total[metric] * (1 - resourcePressure) + (free < 0 ? free : 0)).toFixed(1),
size: Math.max(+(node.total[metric] * (1 - resourcePressure) + (free < 0 ? free : 0)).toFixed(1), 0)
});
algorithmsData.push({
name: 'total',
Expand All @@ -135,7 +140,7 @@ class NodesStatistics {
'reserved'
];
const results = taskExecutor.length ? taskExecutor[0].nodes.map(node => {
const algorithmsData = this._buildAlgorithmResult(node, algorithms, metric, taskExecutor[0].resourcePressure[metric]);
const algorithmsData = this._buildAlgorithmResult(node, algorithms, metric, taskExecutor[0].resourcePressure[metric], taskExecutor[0].defaultWorkerResources[metric]);
return {
name: node.name,
algorithmsData
Expand Down
8 changes: 5 additions & 3 deletions core/task-executor/lib/executor.js
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,16 @@ class Executor {

const { pods } = resources;
const normResources = normalizeResources(resources);
const containerDefaults = await kubernetes.getContainerDefaultResources();
const data = {
versions,
normResources,
options,
registry,
clusterOptions,
pods,
workerResources: options.resources.worker
workerResources: options.resources.worker,
containerDefaults
};

await Promise.all([
Expand All @@ -86,7 +88,7 @@ class Executor {
}
}

async _algorithmsHandle({ versions, normResources, registry, options, clusterOptions, pods, workerResources }) {
async _algorithmsHandle(data) {
const [algorithmTemplates, algorithmRequests, workers, jobs] = await Promise.all([
etcd.getAlgorithmTemplate(),
etcd.getAlgorithmRequests({}),
Expand All @@ -95,7 +97,7 @@ class Executor {
]);

const reconcilerResults = await reconciler.reconcile({
algorithmTemplates, algorithmRequests, workers, jobs, pods, versions, normResources, registry, options, clusterOptions, workerResources
algorithmTemplates, algorithmRequests, workers, jobs, ...data
});
Object.entries(reconcilerResults).forEach(([algorithmName, res]) => {
this[metricsNames.TASK_EXECUTOR_JOB_REQUESTS].set({ value: res.required || 0, labelValues: { algorithmName } });
Expand Down
60 changes: 55 additions & 5 deletions core/task-executor/lib/helpers/kubernetes.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ let log;
class KubernetesApi {
async init(options = {}) {
log = Logger.GetLogFromContainer();
this._crdMissingWarnLogged = false;
this._warnWasLogged = { crdMissing: false, noLimitRange: false, moreThanOneLimit: false }; // To avoid spamming the logs
this._client = new KubernetesClient();
await this._client.init(options.kubernetes);
this._isNamespaced = options.kubernetes.isNamespaced;
Expand Down Expand Up @@ -208,13 +208,13 @@ class KubernetesApi {
const exists = crdList?.body?.items?.some(cr => cr.metadata.name === kaiValues.KUBERNETES.QUEUES_CRD_NAME);

if (!exists) {
if (!this._crdMissingWarnLogged) {
log.info(`Kai Queues CRD (${kaiValues.KUBERNETES.QUEUES_CRD_NAME}) not found. Assuming Kai is not installed.`, { component });
this._crdMissingWarnLogged = true;
if (!this._warnWasLogged.crdMissing) {
log.warning(`Kai Queues CRD (${kaiValues.KUBERNETES.QUEUES_CRD_NAME}) not found. Assuming Kai is not installed.`, { component });
this._warnWasLogged.crdMissing = true;
}
return [];
}
this._crdMissingWarnLogged = false;
this._warnWasLogged.crdMissing = false;

const crd = crdList.body.items.find(cr => cr.metadata.name === kaiValues.KUBERNETES.QUEUES_CRD_NAME);
const version = crd?.spec?.versions?.find(v => v.served)?.name;
Expand All @@ -232,6 +232,56 @@ class KubernetesApi {
return [];
}
}

/**
* Get default CPU and memory requests/limits for containers
* from LimitRange resources in the namespace.
*/
async getContainerDefaultResources() {
try {
const res = await this._client.limitRanges.all();
const items = res.body?.items || [];

const containerLimits = items
.flatMap(item => item.spec.limits.map(limit => ({
...limit,
source: item.metadata?.name,
})))
.filter(limit => limit.type === 'Container');

if (containerLimits.length === 0) {
if (!this._warnWasLogged.noLimitRange) {
log.warning('No LimitRange with type=Container found.', { component });
this._warnWasLogged.noLimitRange = true;
}
return {};
}
this._warnWasLogged.noLimitRange = false; // Reset warning flag if situation is resolved

if (containerLimits.length > 1 && !this._warnWasLogged.moreThanOneLimit) {
log.warning(`Multiple LimitRanges with type=Container found: ${containerLimits.map(l => l.source)}. Taking the first one.`, { component });
this._warnWasLogged.moreThanOneLimit = true;
}
else this._warnWasLogged.moreThanOneLimit = false; // Reset warning flag if situation is resolved

const selected = containerLimits[0];

return {
cpu: {
defaultRequest: selected.defaultRequest?.cpu,
defaultLimits: selected.default?.cpu
},
memory: {
defaultRequest: selected.defaultRequest?.memory,
defaultLimits: selected.default?.memory
}
};
}
catch (error) {
log.error(`Failed to fetch container default resources ${error.message}`, { component }, error);
return {};
}
}
}

module.exports = new KubernetesApi();
Expand Down
23 changes: 15 additions & 8 deletions core/task-executor/lib/jobs/jobCreator.js
Original file line number Diff line number Diff line change
Expand Up @@ -327,13 +327,19 @@ const applyAnnotations = (spec, keyVal) => {
};

const mergeResourceRequest = (defaultResource, customResource) => {
const mergedRequest = { requests: {}, limits: {} };
const mergedRequest = {};

for (const key of ['requests', 'limits']) {
mergedRequest[key].memory = customResource[key]?.memory || defaultResource[key]?.memory || null;
mergedRequest[key].cpu = customResource[key]?.cpu || defaultResource[key]?.cpu || null;
const cpu = customResource?.[key]?.cpu || defaultResource?.[key]?.cpu;
const memory = customResource?.[key]?.memory || defaultResource?.[key]?.memory;

if (cpu || memory) {
mergedRequest[key] = {};
if (cpu) mergedRequest[key].cpu = cpu;
if (memory) mergedRequest[key].memory = memory;
}
}
return mergedRequest;
return Object.keys(mergedRequest).length > 0 ? mergedRequest : undefined;
};

const _applyDefaultResourcesSideCar = (container) => {
Expand Down Expand Up @@ -453,12 +459,13 @@ const createJobSpec = ({ kind, algorithmName, resourceRequests, workerImage, alg
spec = applyEnvToContainer(spec, CONTAINERS.WORKER, { ALGORITHM_VERSION: algorithmVersion });
spec = applyEnvToContainer(spec, CONTAINERS.WORKER, { WORKER_IMAGE: workerImage });
spec = applyAlgorithmResourceRequests(spec, resourceRequests, node);
if (settings.applyResources || workerCustomResources) {
if (workerCustomResources) {
workerResourceRequests = mergeResourceRequest(workerResourceRequests, workerCustomResources);
}
if (settings.applyResources) {
workerResourceRequests = mergeResourceRequest(workerResourceRequests, workerCustomResources);
spec = applyWorkerResourceRequests(spec, workerResourceRequests);
}
else if (workerCustomResources) {
spec = applyWorkerResourceRequests(spec, workerCustomResources);
}
spec = applyNodeSelector(spec, nodeSelector);
spec = applyHotWorker(spec, hotWorker);
spec = applyEntryPoint(spec, entryPoint);
Expand Down
13 changes: 7 additions & 6 deletions core/task-executor/lib/reconcile/managers/jobs.js
Original file line number Diff line number Diff line change
Expand Up @@ -52,16 +52,17 @@ class JobsHandler {
* @param {Object} clusterOptions - Cluster-wide configuration.
* @param {Object} workerResources - Default worker resource requests.
* @param {Object} options - Confguration containing additional job creation options.
* @param {Object} containerDefaults - Default container resources from Kubernetes.
* @param {Object} reconcileResult - Scheduling reconcile stats by algorithm.
*/
async schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, requests, registry, clusterOptions, workerResources, options, reconcileResult) {
async schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, requests, registry, clusterOptions, workerResources, options, containerDefaults, reconcileResult) {
// 1. Assign requests to workers or prepare job creation details
const { createDetails, toResume, scheduledRequests } = this._processAllRequests(allAllocatedJobs, algorithmTemplates,
versions, requests, registry, clusterOptions, workerResources, reconcileResult);

// 2. Match jobs to resources, and skip those that doesn't have the required resources.
const extraResources = await this._getExtraResources();
const { jobsToRequest, skipped } = matchJobsToResources(createDetails, normResources, scheduledRequests, extraResources);
const { jobsToRequest, skipped } = matchJobsToResources(createDetails, normResources, scheduledRequests, extraResources, containerDefaults);

// 3. Find workers to stop if resources insufficient
const stopDetails = this._findWorkersToStop(skipped, allAllocatedJobs, algorithmTemplates);
Expand Down Expand Up @@ -183,14 +184,14 @@ class JobsHandler {

// No existing worker found — prepare job creation request
const algorithmTemplate = algorithmTemplates[algorithmName];
const { workerCustomResources } = algorithmTemplates[algorithmName];
const algorithmImage = setAlgorithmImage(algorithmTemplate, versions, registry);
const workerImage = setWorkerImage(algorithmTemplate, versions, registry);
const resourceRequests = createContainerResource(algorithmTemplate);
const workerResourceRequests = createContainerResource(workerResources);

const { kind, workerEnv, algorithmEnv, labels, annotations, version: algorithmVersion, nodeSelector, stateType: algorithmStateType = 'batch',
entryPoint, options: algorithmOptions, reservedMemory, mounts, env, sideCars, volumes, volumeMounts, kaiObject } = algorithmTemplate;
const { kind, workerEnv, algorithmEnv, labels, annotations, version: algorithmVersion, nodeSelector,
stateType: algorithmStateType = 'batch', entryPoint, options: algorithmOptions, reservedMemory,
mounts, env, sideCars, volumes, volumeMounts, workerCustomResources, kaiObject } = algorithmTemplate;

createDetails.push({
numberOfNewJobs: 1,
Expand Down Expand Up @@ -451,7 +452,7 @@ class JobsHandler {
*
* Logic:
* 1. Add skipped algorithms to the `unScheduledAlgorithms` map if not already present.
* 2. Check if any of these algorithms have been created, requested, or removed from templates.
* 2. Check if any of these algorithms have been created, not requested anymore, or removed from templates.
* 3. Remove such algorithms from the map and move them to `ignoredUnScheduledAlgorithms`.
*
* @private
Expand Down
Loading
Loading