diff --git a/core/api-server/api/graphql/queries/statistics-querier.js b/core/api-server/api/graphql/queries/statistics-querier.js index 9c55eb1e8..8d83a7a3f 100644 --- a/core/api-server/api/graphql/queries/statistics-querier.js +++ b/core/api-server/api/graphql/queries/statistics-querier.js @@ -76,26 +76,32 @@ class NodesStatistics { return results; } - _buildAlgorithmResult(node, algorithms, metric, resourcePressure) { + _buildAlgorithmResult(node, algorithms, metric, resourcePressure, defaultWorker) { let otherAmount = 0; + let algorithmTotalSize = 0; const algorithmsData = []; const getMetric = (mtr, algorithm) => { - const rawMetric = algorithm[mtr] ? algorithm[mtr] : 0; + const algoRawMetric = algorithm[mtr] ? algorithm[mtr] : 0; + const workerRawMetric = algorithm?.workerCustomResources?.requests[mtr] || defaultWorker || 0; if (mtr === 'mem') { - return parse.getMemoryInMi(rawMetric); + return parse.getMemoryInMi(algoRawMetric) + (workerRawMetric ? parse.getMemoryInMi(workerRawMetric) : 0); } - return rawMetric; + if (mtr === 'cpu') { + return algoRawMetric + (workerRawMetric ? parse.getCpuInCore(workerRawMetric) : 0); + } + return algoRawMetric; // for gpu, worker doesnt need gpu }; node.workers.stats.forEach(algorithm => { const requestedAlgorithm = algorithms.find(alg => alg.name === algorithm.algorithmName); - + const size = +(algorithm.count * getMetric(metric, requestedAlgorithm)).toFixed(1); if (requestedAlgorithm) { algorithmsData.push({ name: algorithm.algorithmName, amount: algorithm.count, - size: +(algorithm.count * getMetric(metric, requestedAlgorithm)).toFixed(1), + size }); + algorithmTotalSize += size; } else { otherAmount += algorithm.count; @@ -104,10 +110,9 @@ class NodesStatistics { algorithmsData.push({ name: 'other', amount: otherAmount, - // size: +(node.total[metric] * resourcePressure -(nodeFree + (algorithmsData.reduce((sum, alg) => sum + alg.size, 0)))).toFixed(1), - size: +(node.other[metric].toFixed(1)), + size: +(node.other[metric].toFixed(1)) }); - const free = node.total[metric] * resourcePressure - node.requests[metric]; + const free = node.total[metric] * resourcePressure - node.other[metric] - algorithmTotalSize; algorithmsData.push({ name: 'free', amount: -1, @@ -116,7 +121,7 @@ class NodesStatistics { algorithmsData.push({ name: 'reserved', amount: otherAmount, - size: +(node.total[metric] * (1 - resourcePressure) + (free < 0 ? free : 0)).toFixed(1), + size: Math.max(+(node.total[metric] * (1 - resourcePressure) + (free < 0 ? free : 0)).toFixed(1), 0) }); algorithmsData.push({ name: 'total', @@ -135,7 +140,7 @@ class NodesStatistics { 'reserved' ]; const results = taskExecutor.length ? taskExecutor[0].nodes.map(node => { - const algorithmsData = this._buildAlgorithmResult(node, algorithms, metric, taskExecutor[0].resourcePressure[metric]); + const algorithmsData = this._buildAlgorithmResult(node, algorithms, metric, taskExecutor[0].resourcePressure[metric], taskExecutor[0].defaultWorkerResources[metric]); return { name: node.name, algorithmsData diff --git a/core/task-executor/lib/executor.js b/core/task-executor/lib/executor.js index 8483562f1..da1b2fc68 100644 --- a/core/task-executor/lib/executor.js +++ b/core/task-executor/lib/executor.js @@ -64,6 +64,7 @@ class Executor { const { pods } = resources; const normResources = normalizeResources(resources); + const containerDefaults = await kubernetes.getContainerDefaultResources(); const data = { versions, normResources, @@ -71,7 +72,8 @@ class Executor { registry, clusterOptions, pods, - workerResources: options.resources.worker + workerResources: options.resources.worker, + containerDefaults }; await Promise.all([ @@ -86,7 +88,7 @@ class Executor { } } - async _algorithmsHandle({ versions, normResources, registry, options, clusterOptions, pods, workerResources }) { + async _algorithmsHandle(data) { const [algorithmTemplates, algorithmRequests, workers, jobs] = await Promise.all([ etcd.getAlgorithmTemplate(), etcd.getAlgorithmRequests({}), @@ -95,7 +97,7 @@ class Executor { ]); const reconcilerResults = await reconciler.reconcile({ - algorithmTemplates, algorithmRequests, workers, jobs, pods, versions, normResources, registry, options, clusterOptions, workerResources + algorithmTemplates, algorithmRequests, workers, jobs, ...data }); Object.entries(reconcilerResults).forEach(([algorithmName, res]) => { this[metricsNames.TASK_EXECUTOR_JOB_REQUESTS].set({ value: res.required || 0, labelValues: { algorithmName } }); diff --git a/core/task-executor/lib/helpers/kubernetes.js b/core/task-executor/lib/helpers/kubernetes.js index afa2fed95..469951ffe 100644 --- a/core/task-executor/lib/helpers/kubernetes.js +++ b/core/task-executor/lib/helpers/kubernetes.js @@ -13,7 +13,7 @@ let log; class KubernetesApi { async init(options = {}) { log = Logger.GetLogFromContainer(); - this._crdMissingWarnLogged = false; + this._warnWasLogged = { crdMissing: false, noLimitRange: false, moreThanOneLimit: false }; // To avoid spamming the logs this._client = new KubernetesClient(); await this._client.init(options.kubernetes); this._isNamespaced = options.kubernetes.isNamespaced; @@ -208,13 +208,13 @@ class KubernetesApi { const exists = crdList?.body?.items?.some(cr => cr.metadata.name === kaiValues.KUBERNETES.QUEUES_CRD_NAME); if (!exists) { - if (!this._crdMissingWarnLogged) { - log.info(`Kai Queues CRD (${kaiValues.KUBERNETES.QUEUES_CRD_NAME}) not found. Assuming Kai is not installed.`, { component }); - this._crdMissingWarnLogged = true; + if (!this._warnWasLogged.crdMissing) { + log.warning(`Kai Queues CRD (${kaiValues.KUBERNETES.QUEUES_CRD_NAME}) not found. Assuming Kai is not installed.`, { component }); + this._warnWasLogged.crdMissing = true; } return []; } - this._crdMissingWarnLogged = false; + this._warnWasLogged.crdMissing = false; const crd = crdList.body.items.find(cr => cr.metadata.name === kaiValues.KUBERNETES.QUEUES_CRD_NAME); const version = crd?.spec?.versions?.find(v => v.served)?.name; @@ -232,6 +232,56 @@ class KubernetesApi { return []; } } + + /** + * Get default CPU and memory requests/limits for containers + * from LimitRange resources in the namespace. + */ + async getContainerDefaultResources() { + try { + const res = await this._client.limitRanges.all(); + const items = res.body?.items || []; + + const containerLimits = items + .flatMap(item => item.spec.limits.map(limit => ({ + ...limit, + source: item.metadata?.name, + }))) + .filter(limit => limit.type === 'Container'); + + if (containerLimits.length === 0) { + if (!this._warnWasLogged.noLimitRange) { + log.warning('No LimitRange with type=Container found.', { component }); + this._warnWasLogged.noLimitRange = true; + } + return {}; + } + this._warnWasLogged.noLimitRange = false; // Reset warning flag if situation is resolved + + if (containerLimits.length > 1 && !this._warnWasLogged.moreThanOneLimit) { + log.warning(`Multiple LimitRanges with type=Container found: ${containerLimits.map(l => l.source)}. Taking the first one.`, { component }); + this._warnWasLogged.moreThanOneLimit = true; + } + else this._warnWasLogged.moreThanOneLimit = false; // Reset warning flag if situation is resolved + + const selected = containerLimits[0]; + + return { + cpu: { + defaultRequest: selected.defaultRequest?.cpu, + defaultLimits: selected.default?.cpu + }, + memory: { + defaultRequest: selected.defaultRequest?.memory, + defaultLimits: selected.default?.memory + } + }; + } + catch (error) { + log.error(`Failed to fetch container default resources ${error.message}`, { component }, error); + return {}; + } + } } module.exports = new KubernetesApi(); diff --git a/core/task-executor/lib/jobs/jobCreator.js b/core/task-executor/lib/jobs/jobCreator.js index 879324d36..747187a24 100644 --- a/core/task-executor/lib/jobs/jobCreator.js +++ b/core/task-executor/lib/jobs/jobCreator.js @@ -327,13 +327,19 @@ const applyAnnotations = (spec, keyVal) => { }; const mergeResourceRequest = (defaultResource, customResource) => { - const mergedRequest = { requests: {}, limits: {} }; + const mergedRequest = {}; for (const key of ['requests', 'limits']) { - mergedRequest[key].memory = customResource[key]?.memory || defaultResource[key]?.memory || null; - mergedRequest[key].cpu = customResource[key]?.cpu || defaultResource[key]?.cpu || null; + const cpu = customResource?.[key]?.cpu || defaultResource?.[key]?.cpu; + const memory = customResource?.[key]?.memory || defaultResource?.[key]?.memory; + + if (cpu || memory) { + mergedRequest[key] = {}; + if (cpu) mergedRequest[key].cpu = cpu; + if (memory) mergedRequest[key].memory = memory; + } } - return mergedRequest; + return Object.keys(mergedRequest).length > 0 ? mergedRequest : undefined; }; const _applyDefaultResourcesSideCar = (container) => { @@ -453,12 +459,13 @@ const createJobSpec = ({ kind, algorithmName, resourceRequests, workerImage, alg spec = applyEnvToContainer(spec, CONTAINERS.WORKER, { ALGORITHM_VERSION: algorithmVersion }); spec = applyEnvToContainer(spec, CONTAINERS.WORKER, { WORKER_IMAGE: workerImage }); spec = applyAlgorithmResourceRequests(spec, resourceRequests, node); - if (settings.applyResources || workerCustomResources) { - if (workerCustomResources) { - workerResourceRequests = mergeResourceRequest(workerResourceRequests, workerCustomResources); - } + if (settings.applyResources) { + workerResourceRequests = mergeResourceRequest(workerResourceRequests, workerCustomResources); spec = applyWorkerResourceRequests(spec, workerResourceRequests); } + else if (workerCustomResources) { + spec = applyWorkerResourceRequests(spec, workerCustomResources); + } spec = applyNodeSelector(spec, nodeSelector); spec = applyHotWorker(spec, hotWorker); spec = applyEntryPoint(spec, entryPoint); diff --git a/core/task-executor/lib/reconcile/managers/jobs.js b/core/task-executor/lib/reconcile/managers/jobs.js index 15ec97035..40cf66a83 100644 --- a/core/task-executor/lib/reconcile/managers/jobs.js +++ b/core/task-executor/lib/reconcile/managers/jobs.js @@ -52,16 +52,17 @@ class JobsHandler { * @param {Object} clusterOptions - Cluster-wide configuration. * @param {Object} workerResources - Default worker resource requests. * @param {Object} options - Confguration containing additional job creation options. + * @param {Object} containerDefaults - Default container resources from Kubernetes. * @param {Object} reconcileResult - Scheduling reconcile stats by algorithm. */ - async schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, requests, registry, clusterOptions, workerResources, options, reconcileResult) { + async schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, requests, registry, clusterOptions, workerResources, options, containerDefaults, reconcileResult) { // 1. Assign requests to workers or prepare job creation details const { createDetails, toResume, scheduledRequests } = this._processAllRequests(allAllocatedJobs, algorithmTemplates, versions, requests, registry, clusterOptions, workerResources, reconcileResult); // 2. Match jobs to resources, and skip those that doesn't have the required resources. const extraResources = await this._getExtraResources(); - const { jobsToRequest, skipped } = matchJobsToResources(createDetails, normResources, scheduledRequests, extraResources); + const { jobsToRequest, skipped } = matchJobsToResources(createDetails, normResources, scheduledRequests, extraResources, containerDefaults); // 3. Find workers to stop if resources insufficient const stopDetails = this._findWorkersToStop(skipped, allAllocatedJobs, algorithmTemplates); @@ -183,14 +184,14 @@ class JobsHandler { // No existing worker found — prepare job creation request const algorithmTemplate = algorithmTemplates[algorithmName]; - const { workerCustomResources } = algorithmTemplates[algorithmName]; const algorithmImage = setAlgorithmImage(algorithmTemplate, versions, registry); const workerImage = setWorkerImage(algorithmTemplate, versions, registry); const resourceRequests = createContainerResource(algorithmTemplate); const workerResourceRequests = createContainerResource(workerResources); - const { kind, workerEnv, algorithmEnv, labels, annotations, version: algorithmVersion, nodeSelector, stateType: algorithmStateType = 'batch', - entryPoint, options: algorithmOptions, reservedMemory, mounts, env, sideCars, volumes, volumeMounts, kaiObject } = algorithmTemplate; + const { kind, workerEnv, algorithmEnv, labels, annotations, version: algorithmVersion, nodeSelector, + stateType: algorithmStateType = 'batch', entryPoint, options: algorithmOptions, reservedMemory, + mounts, env, sideCars, volumes, volumeMounts, workerCustomResources, kaiObject } = algorithmTemplate; createDetails.push({ numberOfNewJobs: 1, @@ -451,7 +452,7 @@ class JobsHandler { * * Logic: * 1. Add skipped algorithms to the `unScheduledAlgorithms` map if not already present. - * 2. Check if any of these algorithms have been created, requested, or removed from templates. + * 2. Check if any of these algorithms have been created, not requested anymore, or removed from templates. * 3. Remove such algorithms from the map and move them to `ignoredUnScheduledAlgorithms`. * * @private diff --git a/core/task-executor/lib/reconcile/normalize.js b/core/task-executor/lib/reconcile/normalize.js index 771a8389d..e153867fa 100644 --- a/core/task-executor/lib/reconcile/normalize.js +++ b/core/task-executor/lib/reconcile/normalize.js @@ -185,6 +185,11 @@ const normalizeColdWorkers = (normWorkers, algorithmTemplates) => { return coldWorkers; }; +/** + * Calculates ratio of used resources and remaining free capacity for a node. + * + * @param {Object} node - Node object with `total` and `requests` fields. + */ const _calcRatioFree = (node) => { node.ratio = { cpu: node.requests.cpu / node.total.cpu, @@ -198,129 +203,172 @@ const _calcRatioFree = (node) => { }; }; -const _nodeTaintsFilter = (node) => { - return !(node.spec && node.spec.taints && node.spec.taints.some(t => t.effect === 'NoSchedule')); +/** + * Filters out nodes that are unschedulable due to "NoSchedule" taints. + * + * @param {Object} node - Node resource object with spec/taints. + * @returns {boolean} True if node can accept pods, false otherwise. + */ +const _filterSchedulableNodes = (node) => { + return !(node.spec && node.spec.taints && node.spec.taints.some(taint => taint.effect === 'NoSchedule')); }; -const _parseGpu = (gpu) => { +/** + * Extracts numeric GPU allocation from a resource object. + * + * @param {Object} gpu - GPU resource object (e.g., from node allocatable or container limits). + * @returns {number} GPU amount, defaults to 0. + */ +const _extractGpuValue = (gpu) => { if (!gpu || !gpu[gpuVendors.NVIDIA]) { return 0; } return parseFloat(gpu[gpuVendors.NVIDIA]); }; -const _getGpuSpec = (pod) => { - let limitsGpu = sumBy(pod.spec.containers, c => _parseGpu(objectPath.get(c, 'resources.limits', 0))); +/** + * Extracts GPU requests and limits from a pod definition. + * + * @param {Object} pod - Pod object with containers and metadata. + * @returns {Object} { limitsGpu, requestGpu } + */ +const _extractGpuResources = (pod) => { + let limitsGpu = sumBy(pod.spec.containers, c => _extractGpuValue(objectPath.get(c, 'resources.limits', 0))); if (!limitsGpu) { - limitsGpu = _parseGpu(objectPath.get(pod, 'metadata.annotations', null)); + limitsGpu = _extractGpuValue(objectPath.get(pod, 'metadata.annotations', null)); } const requestGpu = limitsGpu; - return { limitsGpu, requestGpu }; + return { requestGpu, limitsGpu }; }; -const _getRequestsAndLimits = (pod) => { - const { useResourceLimits } = globalSettings; +/** + * Extracts CPU, memory, and GPU requests/limits from a pod definition. + * + * @param {Object} pod - Pod object with containers and resources. + * @returns {Object} { requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu } + */ +const _extractPodResources = (pod) => { + // CPU + const requestCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.requests.cpu', '0m'))); const limitsCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.limits.cpu', '0m'))); - const { limitsGpu, requestGpu } = _getGpuSpec(pod); + + // Memory + const requestMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.requests.memory', 0), true)); const limitsMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.limits.memory', 0), true)); - const requestCpu = useResourceLimits && limitsCpu - ? limitsCpu - : sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.requests.cpu', '0m'))); - const requestMem = useResourceLimits && limitsMem - ? limitsMem - : sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.requests.memory', 0), true)); + + // GPU + const { requestGpu, limitsGpu } = _extractGpuResources(pod); + return { - requestCpu, requestGpu, requestMem, limitsCpu, limitsGpu, limitsMem + requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu }; }; +/** + * Builds cluster resource usage view by aggregating pod requests and node capacities. + * + * Produces a clear view of: + * - per-node usage and free resources + * - overall "allNodes" usage across the cluster + * + * @param {Object} resources - Cluster data from Kubernetes. + * @param {Object} resources.pods - Pod list API response. + * @param {Object} resources.nodes - Node list API response. + * @returns {Object} { allNodes, nodeList } + */ const normalizeResources = ({ pods, nodes } = {}) => { if (!pods || !nodes) { return { allNodes: { - ratio: { - cpu: 0, - gpu: 0, - memory: 0, - }, - free: { - cpu: 0, - gpu: 0, - memory: 0 - } + ratio: { cpu: 0, gpu: 0, memory: 0 }, + free: { cpu: 0, gpu: 0, memory: 0 } }, nodeList: [] }; } - const initial = nodes.body.items.filter(_nodeTaintsFilter).reduce((acc, cur) => { - acc[cur.metadata.name] = { - labels: cur.metadata.labels, + + // Build initial node map + const nodeMap = nodes.body.items.filter(_filterSchedulableNodes).reduce((accumulator, node) => { + accumulator[node.metadata.name] = { + labels: node.metadata.labels, requests: { cpu: 0, gpu: 0, memory: 0 }, limits: { cpu: 0, gpu: 0, memory: 0 }, workersTotal: { cpu: 0, gpu: 0, memory: 0 }, workers: [], other: { cpu: 0, gpu: 0, memory: 0 }, total: { - cpu: parse.getCpuInCore(cur.status.allocatable.cpu), - gpu: _parseGpu(cur.status.allocatable) || 0, - memory: parse.getMemoryInMi(cur.status.allocatable.memory, true) + cpu: parse.getCpuInCore(node.status.allocatable.cpu), + gpu: _extractGpuValue(node.status.allocatable) || 0, + memory: parse.getMemoryInMi(node.status.allocatable.memory, true) } }; - return acc; + return accumulator; }, {}); - const allNodes = { - requests: { cpu: 0, gpu: 0, memory: 0 }, - limits: { cpu: 0, gpu: 0, memory: 0 }, - total: { - cpu: sumBy(Object.values(initial), 'total.cpu'), - gpu: sumBy(Object.values(initial), 'total.gpu'), - memory: sumBy(Object.values(initial), 'total.memory'), - } - }; - const stateFilter = p => p.status.phase === 'Running' || p.status.phase === 'Pending'; + + // Track pod usage per node + const stateFilter = pod => pod.status.phase === 'Running' || pod.status.phase === 'Pending'; const resourcesPerNode = pods.body.items.filter(stateFilter).reduce((accumulator, pod) => { const { nodeName } = pod.spec; if (!nodeName || !accumulator[nodeName]) { return accumulator; } - const { requestCpu, requestGpu, requestMem, limitsCpu, limitsGpu, limitsMem } = _getRequestsAndLimits(pod); + const { requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu } = _extractPodResources(pod); + const { useResourceLimits } = globalSettings; - accumulator[nodeName].requests.cpu += requestCpu; + accumulator[nodeName].requests.cpu += (useResourceLimits && limitsCpu) ? Math.max(requestCpu, limitsCpu) : requestCpu; + accumulator[nodeName].requests.memory += (useResourceLimits && limitsMem) ? Math.max(requestMem, limitsMem) : requestMem; accumulator[nodeName].requests.gpu += requestGpu; - accumulator[nodeName].requests.memory += requestMem; + accumulator[nodeName].limits.cpu += limitsCpu; - accumulator[nodeName].limits.gpu += limitsGpu; accumulator[nodeName].limits.memory += limitsMem; + accumulator[nodeName].limits.gpu += limitsGpu; + + // Use actual requests value for worker and other pods accounting if (objectPath.get(pod, 'metadata.labels.type') === 'worker') { accumulator[nodeName].workersTotal.cpu += requestCpu; - accumulator[nodeName].workersTotal.gpu += requestGpu; accumulator[nodeName].workersTotal.memory += requestMem; + accumulator[nodeName].workersTotal.gpu += requestGpu; accumulator[nodeName].workers.push({ algorithmName: objectPath.get(pod, 'metadata.labels.algorithm-name'), + podName: objectPath.get(pod, 'metadata.name'), nodeName }); } else { accumulator[nodeName].other.cpu += requestCpu; - accumulator[nodeName].other.gpu += requestGpu; accumulator[nodeName].other.memory += requestMem; + accumulator[nodeName].other.gpu += requestGpu; } return accumulator; - }, initial); + }, nodeMap); + // Build node list and aggregate totals const nodeList = []; - Object.entries(resourcesPerNode).forEach(([k, v]) => { - _calcRatioFree(v); - allNodes.requests.cpu += v.requests.cpu; - allNodes.requests.gpu += v.requests.gpu; - allNodes.requests.memory += v.requests.memory; - allNodes.limits.cpu += v.limits.cpu; - allNodes.limits.gpu += v.limits.gpu; - allNodes.limits.memory += v.limits.memory; - nodeList.push({ name: k, ...v }); + const allNodes = { + requests: { cpu: 0, gpu: 0, memory: 0 }, + limits: { cpu: 0, gpu: 0, memory: 0 }, + total: { + cpu: sumBy(Object.values(nodeMap), 'total.cpu'), + gpu: sumBy(Object.values(nodeMap), 'total.gpu'), + memory: sumBy(Object.values(nodeMap), 'total.memory'), + } + }; + Object.entries(resourcesPerNode).forEach(([nodeName, nodeData]) => { + _calcRatioFree(nodeData); + + allNodes.requests.cpu += nodeData.requests.cpu; + allNodes.requests.memory += nodeData.requests.memory; + allNodes.requests.gpu += nodeData.requests.gpu; + + allNodes.limits.cpu += nodeData.limits.cpu; + allNodes.limits.memory += nodeData.limits.memory; + allNodes.limits.gpu += nodeData.limits.gpu; + + nodeList.push({ name: nodeName, ...nodeData }); }); + _calcRatioFree(allNodes); return { allNodes, nodeList }; }; diff --git a/core/task-executor/lib/reconcile/reconciler.js b/core/task-executor/lib/reconcile/reconciler.js index e70a8e57d..3353e140f 100644 --- a/core/task-executor/lib/reconcile/reconciler.js +++ b/core/task-executor/lib/reconcile/reconciler.js @@ -1,6 +1,7 @@ const Logger = require('@hkube/logger'); const log = Logger.GetLogFromContainer(); const clonedeep = require('lodash.clonedeep'); +const { settings } = require('../helpers/settings'); const etcd = require('../helpers/etcd'); const { components, consts } = require('../consts'); const component = components.RECONCILER; @@ -8,11 +9,42 @@ const { WorkersManager, requestPreprocessor, jobsHandler } = require('./managers const { CPU_RATIO_PRESSURE, MEMORY_RATIO_PRESSURE } = consts; -const _calcStats = (data) => { - const stats = Object.values(data.reduce((acc, cur) => { - if (!acc[cur.algorithmName]) { - acc[cur.algorithmName] = { - algorithmName: cur.algorithmName, +/** + * Checks if there is resource pressure (CPU or Memory) based on the provided normalized resources. + * Logs the details of CPU and memory pressure if either exceeds the defined threshold. + * + * @param {Object} normResources - Normalized resource usage data. + * @param {Object} normResources.allNodes - Resource data for all nodes. + * @param {Object} normResources.allNodes.ratio - Resource usage ratios. + * @param {number} normResources.allNodes.ratio.cpu - The CPU usage ratio (0-1). + * @param {number} normResources.allNodes.ratio.memory - The memory usage ratio (0-1). + */ +const _checkResourcePressure = (normResources) => { + const isCpuPressure = normResources.allNodes.ratio.cpu > CPU_RATIO_PRESSURE; + const isMemoryPressure = normResources.allNodes.ratio.memory > MEMORY_RATIO_PRESSURE; + const isResourcePressure = isCpuPressure || isMemoryPressure; + if (isResourcePressure) { + log.trace(`isCpuPressure: ${isCpuPressure}, isMemoryPressure: ${isMemoryPressure}`, { component }); + } +}; + +/** + * Calculates statistics for workers, grouped by algorithm name. + * For each algorithm, it counts the number of workers in various states (e.g., ready, working, etc.), + * and tracks redundant (workers with no status) and hot workers. + * + * @param {Object[]} workers - List of workers. + * @param {string} workers[].algorithmName - Algorithm name. + * @param {string} [workers[].workerStatus] - Worker status (init, ready, working, exit). + * @param {boolean} [workers[].hotWorker] - Whether worker is hot. + * @returns {{ stats: Object[], total: number }} Aggregated stats and total workers. + */ +const _aggregateWorkerStats = (workers) => { + const stats = Object.values(workers.reduce((acc, worker) => { + const { algorithmName, workerStatus, hotWorker } = worker; + if (!acc[algorithmName]) { + acc[algorithmName] = { + algorithmName, count: 0, init: 0, ready: 0, @@ -21,66 +53,119 @@ const _calcStats = (data) => { hot: 0 }; } - acc[cur.algorithmName].count += 1; - if (cur.workerStatus === undefined) { - acc[cur.algorithmName].redundant = (acc[cur.algorithmName].redundant || 0) + 1; + acc[algorithmName].count += 1; + if (workerStatus) { + acc[algorithmName][workerStatus] += 1; } - else acc[cur.algorithmName][cur.workerStatus] += 1; - if (cur.hotWorker) { - acc[cur.algorithmName].hot += 1; + else acc[algorithmName].redundant = (acc[algorithmName].redundant || 0) + 1; + if (hotWorker) { + acc[algorithmName].hot += 1; } return acc; }, {})); - return { stats, total: data.length }; + return { stats, total: workers.length }; }; -const _getNodeStats = (normResources) => { - const localResources = clonedeep(normResources); - const resourcesWithWorkers = localResources.nodeList; - const statsPerNode = resourcesWithWorkers.map(n => ({ - name: n.name, - total: { - cpu: n.total.cpu, - gpu: n.total.gpu, - mem: n.total.memory, +/** + * Builds resource and worker statistics per node. + * + * @param {Object} normResources - Normalized resources. + * @param {Object[]} normalizedWorkers - Normalized workers list. + * @returns {Object[]} Stats per node including resources and aggregated worker stats. + */ +const _buildNodeStats = (normResources, normalizedWorkers) => { + const clonedResources = clonedeep(normResources); - }, - requests: { - cpu: n.requests.cpu, - gpu: n.requests.gpu, - mem: n.requests.memory, + const statsPerNode = clonedResources.nodeList.map(node => { + const nodeWorkers = normalizedWorkers.filter(w => node.workers.some(nw => nw.podName === w.podName)); - }, - other: { - cpu: n.other.cpu, - gpu: n.other.gpu, - mem: n.other.memory, - }, - workersTotal: { - cpu: n.workersTotal.cpu, - gpu: n.workersTotal.gpu, - mem: n.workersTotal.memory, - }, - labels: n.labels, - workers2: n.workers, - workers: _calcStats(n.workers) + return { + name: node.name, + total: { + cpu: node.total.cpu, + gpu: node.total.gpu, + mem: node.total.memory, - } - )); + }, + requests: { + cpu: node.requests.cpu, + gpu: node.requests.gpu, + mem: node.requests.memory, + + }, + other: { + cpu: node.other.cpu, + gpu: node.other.gpu, + mem: node.other.memory, + }, + workersTotal: { + cpu: node.workersTotal.cpu, + gpu: node.workersTotal.gpu, + mem: node.workersTotal.memory, + }, + labels: node.labels, + workers2: node.workers, + workers: _aggregateWorkerStats(nodeWorkers) + }; + }); return statsPerNode; }; -const _checkResourcePressure = (normResources) => { - const isCpuPressure = normResources.allNodes.ratio.cpu > CPU_RATIO_PRESSURE; - const isMemoryPressure = normResources.allNodes.ratio.memory > MEMORY_RATIO_PRESSURE; - const isResourcePressure = isCpuPressure || isMemoryPressure; - if (isResourcePressure) { - log.trace(`isCpuPressure: ${isCpuPressure}, isMemoryPressure: ${isMemoryPressure}`, { component }); +/** + * Retrieves default CPU and memory resources for workers. + * + * Priority: + * 1. Use explicit worker resources from options if `applyResources` is enabled. + * 2. Otherwise, fallback to Kubernetes container default requests. + * + * @async + * @param {Object} options - Configuration options. + * @param {Object} options.resources - Resource configurations. + * @param {Object} options.resources.worker - Worker resource configuration. + * @param {number|string} [options.resources.worker.cpu] - Worker CPU request. + * @param {number|string} [options.resources.worker.mem] - Worker memory request. + * @param {Object} containerDefaults - Default container resources from Kubernetes. + * @param {Object} [containerDefaults.cpu] - Default CPU resource from Kubernetes. + * @param {string|number} [containerDefaults.cpu.defaultRequest] - Default CPU request + * @param {Object} [containerDefaults.memory] - Default memory resource from Kubernetes. + * @param {string|number} [containerDefaults.memory.defaultRequest] - Default memory request + * @returns {Promise<{cpu: (number|string|undefined), mem: (number|string|undefined)}>} + * Default worker resources (CPU, memory), or `undefined` if not resolved. + */ +const _resolveWorkerResourceDefaults = async (options, containerDefaults) => { + const defaults = {}; + + if (settings.applyResources) { + defaults.cpu = options.resources.worker.cpu; + defaults.mem = options.resources.worker.mem; } + + if (!defaults.cpu && containerDefaults?.cpu) defaults.cpu = containerDefaults.cpu.defaultRequest; + if (!defaults.mem && containerDefaults?.memory) defaults.mem = containerDefaults.memory?.defaultRequest; + + return defaults; }; -const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, ignoredUnScheduledAlgorithms, jobsInfo, workerStats, normResources }) => { +/** + * Updates reconciliation results with job and worker statistics, + * synchronizes the current cluster state with etcd, and logs summary information. + * + * @param {Object} params + * @param {Object.} params.reconcileResult - Current reconciliation results per algorithm. + * @param {Object} params.unScheduledAlgorithms - Algorithms that could not be scheduled and were skipped. + * @param {Object} params.ignoredUnScheduledAlgorithms - Algorithms previously unscheduled but now ignored (if they were created, not requested anymore, or removed from templates). + * @param {Object} params.jobsInfo - Job scheduling information. + * @param {Object[]} params.jobsInfo.created - Jobs successfully created. + * @param {Object[]} params.jobsInfo.skipped - Jobs that were skipped due any reason (resources missing etc). + * @param {Object[]} params.jobsInfo.toStop - Jobs whose workers should be stopped. + * @param {Object[]} params.jobsInfo.toResume - Jobs whose workers should be resumed. + * @param {Object[]} params.normalizedWorkers - Normalized workers. + * @param {Object} params.normResources - Normalized cluster resources. + * @param {Object} params.options - Global configuration. + * @param {Object} params.containerDefaults - Default container resources from Kubernetes. + */ +const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, ignoredUnScheduledAlgorithms, jobsInfo, normalizedWorkers, normResources, options, containerDefaults }) => { const { created, skipped, toStop, toResume } = jobsInfo; Object.entries(reconcileResult).forEach(([algorithmName, res]) => { res.created = created.filter(c => c.algorithmName === algorithmName).length; @@ -89,6 +174,9 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, res.resumed = toResume.filter(c => c.algorithmName === algorithmName).length; }); + const workerStats = _aggregateWorkerStats(normalizedWorkers); + const defaultWorkerResources = await _resolveWorkerResourceDefaults(options, containerDefaults); + await etcd.updateDiscovery({ reconcileResult, unScheduledAlgorithms, @@ -99,7 +187,8 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, gpu: consts.GPU_RATIO_PRESSURE, mem: consts.MEMORY_RATIO_PRESSURE }, - nodes: _getNodeStats(normResources) + defaultWorkerResources, + nodes: _buildNodeStats(normResources, normalizedWorkers) }); workerStats.stats.forEach((ws) => { @@ -124,7 +213,7 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, }); }; -const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, pods, versions, normResources, registry, options, clusterOptions, workerResources } = {}) => { +const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, versions, normResources, options, registry, clusterOptions, pods, workerResources, containerDefaults } = {}) => { // Update the cache of jobs lately created by removing old jobs const reconcileResult = {}; @@ -158,7 +247,7 @@ const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, // Schedule the requests const jobsInfo = await jobsHandler.schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, - requests, registry, clusterOptions, workerResources, options, reconcileResult); + requests, registry, clusterOptions, workerResources, options, containerDefaults, reconcileResult); // Handle workers life-cycle & wait for the promises to resolve const { toResume, toStop } = jobsInfo; @@ -167,9 +256,9 @@ const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, await Promise.all([...workersToStopPromises, ...workersToExitPromises, ...workersToWarmUpPromises, ...workersToCoolDownPromises, ...workersToResumePromises]); // Write in etcd the reconcile result - const workerStats = _calcStats(workersManager.normalizedWorkers); + const { normalizedWorkers } = workersManager; await _updateReconcileResult({ - reconcileResult, ...jobsHandler, jobsInfo, workerStats, normResources + reconcileResult, ...jobsHandler, jobsInfo, normalizedWorkers, normResources, options, containerDefaults }); return reconcileResult; diff --git a/core/task-executor/lib/reconcile/resources.js b/core/task-executor/lib/reconcile/resources.js index 6b3ea7a86..c5da865c6 100644 --- a/core/task-executor/lib/reconcile/resources.js +++ b/core/task-executor/lib/reconcile/resources.js @@ -183,14 +183,19 @@ const validateKaiQueue = ({ kaiObject, algorithmName }, existingQueuesNames) => * @param {Object} [params.sideCars] - The optional sidecar container. * @param {Object} [params.sideCars.container] - The container inside the sidecar. * @param {Object} [params.sideCars.container.resources] - The resource requests of the sidecar container. + * @param {Object} containerDefaults - Default container resources from Kubernetes. + * @param {Object} [containerDefaults.cpu] - Default CPU resource from Kubernetes. + * @param {string|number} [containerDefaults.cpu.defaultRequest] - Default CPU request + * @param {Object} [containerDefaults.memory] - Default memory resource from Kubernetes. + * @param {string|number} [containerDefaults.memory.defaultRequest] - Default memory request * @returns {Object} An object containing the total requested CPU and memory. * @returns {number} return.requestedCpu - The total requested CPU in cores. * @returns {number} return.requestedMemory - The total requested memory in MiB. */ -const getAllRequested = ({ resourceRequests, workerResourceRequests, workerCustomResources, sideCars }) => { +const getAllRequested = ({ resourceRequests, workerResourceRequests, workerCustomResources, sideCars }, containerDefaults) => { const sideCarResources = sideCars?.map(sideCar => sideCar?.container?.resources) || []; - const workerRequestedCPU = settings.applyResources ? workerResourceRequests.requests.cpu : '0'; - const workerRequestedMemory = settings.applyResources ? workerResourceRequests.requests.memory : '0Mi'; + const workerRequestedCPU = settings.applyResources ? workerResourceRequests.requests.cpu : (containerDefaults?.cpu?.defaultRequest || '0'); + const workerRequestedMemory = settings.applyResources ? workerResourceRequests.requests.memory : (containerDefaults?.memory?.defaultRequest || '0Mi'); const requestedCpu = parse.getCpuInCore(resourceRequests?.requests?.cpu || '0') + parse.getCpuInCore(workerCustomResources?.requests?.cpu || workerRequestedCPU) @@ -213,22 +218,24 @@ const getAllRequested = ({ resourceRequests, workerResourceRequests, workerCusto * @param {Object} availableResources - Current cluster resource state. * @param {number} totalAdded - Number of jobs added so far this tick. * @param {Object} [extraResources] - Additional metadata such as volumes and queues. + * @param {Object} containerDefaults - Default container resources from Kubernetes. * @returns {Object} Scheduling decision with `shouldAdd`, optional `warning`, and updated resources. */ -const shouldAddJob = (jobDetails, availableResources, totalAdded, extraResources) => { +const shouldAddJob = (jobDetails, availableResources, totalAdded, extraResources, containerDefaults) => { const { allVolumesNames, existingQueuesNames } = extraResources || {}; if (totalAdded >= MAX_JOBS_PER_TICK) { return { shouldAdd: false, newResources: { ...availableResources } }; } - const { requestedCpu, requestedMemory } = getAllRequested(jobDetails); + const { requestedCpu, requestedMemory } = getAllRequested(jobDetails, containerDefaults); const requestedGpu = jobDetails.resourceRequests.requests[gpuVendors.NVIDIA] || 0; - const nodesBySelector = availableResources.nodeList.filter(n => nodeSelectorFilter(n.labels, jobDetails.nodeSelector)); + const nodesBySelector = availableResources.nodeList.filter(node => nodeSelectorFilter(node.labels, jobDetails.nodeSelector)); const nodesForSchedule = nodesBySelector.map(r => findNodeForSchedule(r, requestedCpu, requestedGpu, requestedMemory)); const availableNode = nodesForSchedule.find(n => n.available); if (!availableNode) { // Number of total nodes that don't fit the attribute under nodeSelector const unMatchedNodesBySelector = availableResources.nodeList.length - nodesBySelector.length; + const warning = createWarning({ unMatchedNodesBySelector, jobDetails, @@ -456,9 +463,10 @@ const pauseAccordingToResources = (stopDetails, availableResources, skippedReque * @param {Object} availableResources - Current cluster resource state. * @param {Object[]} [scheduledRequests=[]] - Already scheduled requests. * @param {Object} [extraResources] - Additional metadata for scheduling checks. + * @param {Object} containerDefaults - Default container resources from Kubernetes. * @returns {Object} { requested: jobs scheduled, skipped: jobs not scheduled } */ -const matchJobsToResources = (createDetails, availableResources, scheduledRequests = [], extraResources) => { +const matchJobsToResources = (createDetails, availableResources, scheduledRequests = [], extraResources, containerDefaults) => { const jobsToRequest = []; const skipped = []; const localDetails = clone(createDetails); @@ -467,7 +475,7 @@ const matchJobsToResources = (createDetails, availableResources, scheduledReques // loop over all the job types one by one and assign until it can't fit in any node const cb = (j) => { if (j.numberOfNewJobs > 0) { - const { shouldAdd, warning, newResources, node } = shouldAddJob(j.jobDetails, availableResources, totalAdded, extraResources); + const { shouldAdd, warning, newResources, node } = shouldAddJob(j.jobDetails, availableResources, totalAdded, extraResources, containerDefaults); if (shouldAdd) { const toCreate = { ...j.jobDetails, createdTime: Date.now(), node }; jobsToRequest.push(toCreate); diff --git a/core/task-executor/package-lock.json b/core/task-executor/package-lock.json index 077e54dee..7dd004db5 100644 --- a/core/task-executor/package-lock.json +++ b/core/task-executor/package-lock.json @@ -14,7 +14,7 @@ "@hkube/db": "^2.0.16", "@hkube/etcd": "^5.1.2", "@hkube/healthchecks": "^1.0.1", - "@hkube/kubernetes-client": "^2.0.8", + "@hkube/kubernetes-client": "^2.0.9", "@hkube/logger": "^2.0.2", "@hkube/metrics": "^1.0.42", "@hkube/uid": "^1.0.4", @@ -464,9 +464,9 @@ } }, "node_modules/@hkube/kubernetes-client": { - "version": "2.0.8", - "resolved": "https://registry.npmjs.org/@hkube/kubernetes-client/-/kubernetes-client-2.0.8.tgz", - "integrity": "sha512-AS7fkHF6ROeJEgZtb2VRJSxYgSxoGyGoG1XuwO4CHjm/zf31yalypSWmqvMVvqVifNrlFwBl89DpZSEAzDWXPA==", + "version": "2.0.9", + "resolved": "https://registry.npmjs.org/@hkube/kubernetes-client/-/kubernetes-client-2.0.9.tgz", + "integrity": "sha512-g2SGpQn92Tmwba+uc8Ic7E5Hi15rlsJPMDLS/363JydKoDONuSZ1wGOuJIIHLXzOlHRNYbIGuDs5I7hAD/iJzA==", "dependencies": { "compare-versions": "^3.6.0", "js-yaml": "^4.1.0", diff --git a/core/task-executor/package.json b/core/task-executor/package.json index 3a8ff0d2c..bc3d3e410 100644 --- a/core/task-executor/package.json +++ b/core/task-executor/package.json @@ -28,7 +28,7 @@ "@hkube/db": "^2.0.16", "@hkube/etcd": "^5.1.2", "@hkube/healthchecks": "^1.0.1", - "@hkube/kubernetes-client": "^2.0.8", + "@hkube/kubernetes-client": "^2.0.9", "@hkube/logger": "^2.0.2", "@hkube/metrics": "^1.0.42", "@hkube/uid": "^1.0.4", diff --git a/core/task-executor/tests/kubernetesApiTests.js b/core/task-executor/tests/kubernetesApiTests.js index b1a50c589..2c446fa4c 100644 --- a/core/task-executor/tests/kubernetesApiTests.js +++ b/core/task-executor/tests/kubernetesApiTests.js @@ -44,49 +44,61 @@ describe('Kubernetes API', () => { before(async () => { await kubernetesServerMock.start({ port: 9001 }); }); + beforeEach(async () => { instance = new KubernetesApi(); await instance.init(options); + sinon.restore(); }); + it('should create job', async () => { const res = await instance.createJob({ spec: { metadata: { name: 'mySpec' } } }); expect(res.body.metadata.name).to.eql('mySpec'); }); + it('should get worker jobs', async () => { const res = await instance.getWorkerJobs(); expect(res.statusCode).to.eql(StatusCodes.OK); }); + it('should get worker for job', async () => { const res = await instance.getPodsForJob({ spec: { selector: { matchLabels: 'myLabel=mySelector' } } }); expect(res.statusCode).to.eql(StatusCodes.OK); }); + it('should fail to get worker for job if no selector', async () => { const res = await instance.getPodsForJob({ spec: { selector: { matchLabels: null } } }); expect(res).to.be.empty }); + it('should fail to get worker for job if no job spec', async () => { const res = await instance.getPodsForJob(); expect(res).to.be.empty }); + it('should get config map', async () => { const res = await instance.getVersionsConfigMap(); expect(res).to.have.property('versions'); expect(res).to.have.property('registry'); expect(res).to.have.property('clusterOptions'); }); + it('should throw', async () => { const res = instance.init(optionsDummy); expect(res).to.be.rejectedWith('Invalid URI "no.such.url/version"'); }); + it('should get nodes and pods', async () => { const res = await instance.getResourcesPerNode(); expect(res).to.have.property('pods') expect(res).to.have.property('nodes') }); + it('should get all PVC names', async () => { const res = await instance.getAllPVCNames(); expect(res).to.be.an('array').that.includes('pvc-1', 'pvc-2'); }); + it('should get all ConfigMap names', async () => { const res = await instance.getAllConfigMapNames(); expect(res).to.be.an('array').that.includes('config-map-1', 'config-map-2'); @@ -95,22 +107,24 @@ describe('Kubernetes API', () => { const res = await instance.getAllSecretNames(); expect(res).to.be.an('array').that.includes('secret-1', 'secret-2'); }); + it('should get all Kai Queues names', async () => { kubernetesServerMock.setKaiCRDEnabled(true); const res = await instance.getAllQueueNames(); expect(res).to.be.an('array').that.includes('test', 'default'); }); + it('should return empty list when Kai CRD is missing', async () => { kubernetesServerMock.setKaiCRDEnabled(false); const res = await instance.getAllQueueNames(); expect(res).to.be.an('array').that.is.empty; }); + it('should log missing CRD warning only once', async () => { kubernetesServerMock.setKaiCRDEnabled(false); - const Logger = require('@hkube/logger'); const log1 = Logger.GetLogFromContainer(); - const logSpy = sinon.spy(log1, 'info'); + const logSpy = sinon.spy(log1, 'warning'); await instance.getAllQueueNames(); await instance.getAllQueueNames(); @@ -120,4 +134,25 @@ describe('Kubernetes API', () => { expect(matchingLogs.length).to.equal(1); }); + + it('should get all limit range data for containers', async () => { + const res = await instance.getContainerDefaultResources(); + expect(res).to.be.an('object').to.have.keys('cpu', 'memory'); + expect(res.cpu).to.have.keys('defaultRequest', 'defaultLimits'); + expect(res.memory).to.have.keys('defaultRequest', 'defaultLimits'); + }); + + it('should log limitrange warning only once', async () => { + const log1 = Logger.GetLogFromContainer(); + + const logSpy = sinon.spy(log1, 'warning'); + + await instance.getContainerDefaultResources(); + await instance.getContainerDefaultResources(); + const matchingLogs = logSpy.getCalls().filter(call => + call.args[0].includes('Multiple LimitRanges with type=Container found') && call.args[0].includes('Taking the first one') + ); + + expect(matchingLogs.length).to.equal(1); + }); }); diff --git a/core/task-executor/tests/managers.js b/core/task-executor/tests/managers.js index e9a44d8f5..84cdec7c8 100644 --- a/core/task-executor/tests/managers.js +++ b/core/task-executor/tests/managers.js @@ -1033,7 +1033,7 @@ describe('Managers tests', () => { versions, requests, registry, clusterOptions, workerResources, options, - {} // reconcileResult + {}, {} // containerDefaults & reconcileResult ); expect(result).to.have.all.keys(['created', 'skipped', 'toResume', 'toStop']); @@ -1044,7 +1044,7 @@ describe('Managers tests', () => { }); it('should handle empty requests gracefully', async () => { - const result = await jobsHandler.schedule(allAllocatedJobs, {}, {}, {}, [], {}, {}, {}, {}, {}); + const result = await jobsHandler.schedule(allAllocatedJobs, {}, {}, {}, [], {}, {}, {}, {}, {}, {}); expect(result).to.deep.equal({ created: [], diff --git a/core/task-executor/tests/mocks/kubernetes-server.mock.js b/core/task-executor/tests/mocks/kubernetes-server.mock.js index 4591eb945..5cfc75005 100644 --- a/core/task-executor/tests/mocks/kubernetes-server.mock.js +++ b/core/task-executor/tests/mocks/kubernetes-server.mock.js @@ -10,7 +10,8 @@ const { secret, configMap, customResourceDefinition, - queues + queues, + limitRanges } = resources; const app = express(); @@ -79,6 +80,30 @@ class MockClient { res.json(queues); return; } + if (req.url === '/api/v1/limitranges') { + res.json(limitRanges); + return; + } + + if (req.url.startsWith('/api/v1/limitranges/')) { + // extract the name if needed + const name = req.url.split('/').pop(); + const item = limitRanges.items.find(lr => lr.metadata.name === name); + if (item) { + res.json(item); + } else { + res.status(404).json({ + kind: 'Status', + apiVersion: 'v1', + metadata: {}, + status: 'Failure', + message: `limitranges "${name}" not found`, + reason: 'NotFound', + code: 404 + }); + } + return; + } res.json(req.body); }); diff --git a/core/task-executor/tests/mocks/kubernetes.mock.js b/core/task-executor/tests/mocks/kubernetes.mock.js index 2ec2d1331..290c6011f 100644 --- a/core/task-executor/tests/mocks/kubernetes.mock.js +++ b/core/task-executor/tests/mocks/kubernetes.mock.js @@ -43,6 +43,7 @@ module.exports = { getAllSecretNames: async () => { return ['secret-1', 'secret-2']; }, getAllConfigMapNames: async () => { return ['config-map-1', 'config-map-2']; }, getAllQueueNames: async () => { return ['test', 'default']; }, + getContainerDefaultResources: async () => { return { cpu: { defaultRequest: 0.1, defaultLimits: 0.2 }, memory: { defaultRequest: '128Mi', defaultLimits: '256Mi' } }; }, }, callCount: (name) => { return callCount[name]; diff --git a/core/task-executor/tests/normalizeTests.js b/core/task-executor/tests/normalizeTests.js index 2e00c4f77..74026b6a3 100644 --- a/core/task-executor/tests/normalizeTests.js +++ b/core/task-executor/tests/normalizeTests.js @@ -448,6 +448,224 @@ describe('normalize', () => { expect(res.nodeList[1].free.cpu).to.eq(7.55); expect(res.nodeList[2].free.cpu).to.eq(7.8); }); + + it('should filter out nodes with NoSchedule taint', () => { + const nodesWithTaint = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, spec: { taints: [{ effect: 'NoSchedule' }] }, status: { allocatable: { cpu: '4', memory: '8Gi' } } }, + { metadata: { name: 'node2', labels: {} }, status: { allocatable: { cpu: '2', memory: '4Gi' } } } + ] + } + }; + const podsEmpty = { body: { items: [] } }; + + const res = normalizeResources({ pods: podsEmpty, nodes: nodesWithTaint }); + expect(res.nodeList.length).to.eq(1); + expect(res.nodeList[0].name).to.eq('node2'); + }); + + it('should ignore pods not in Running or Pending state', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } } + ] + } + }; + const pods = { + body: { + items: [ + { status: { phase: 'Succeeded' }, spec: { nodeName: 'node1', containers: [] } }, + { status: { phase: 'Failed' }, spec: { nodeName: 'node1', containers: [] } } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + expect(res.nodeList[0].requests.cpu).to.eq(0); + expect(res.nodeList[0].requests.memory).to.eq(0); + }); + + it('should account worker pods separately from other pods', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } } + ] + } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { nodeName: 'node1', containers: [{ resources: { requests: { cpu: '100m', memory: '128Mi' } } }] }, + metadata: { labels: { type: 'worker', 'algorithm-name': 'algo1' }, name: 'workerPod1' } + }, + { + status: { phase: 'Running' }, + spec: { nodeName: 'node1', containers: [{ resources: { requests: { cpu: '200m', memory: '256Mi' } } }] }, + metadata: { labels: { type: 'other' }, name: 'otherPod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + expect(node.workersTotal.cpu).to.eq(0.1); + expect(node.other.cpu).to.eq(0.2); + expect(node.workers.length).to.eq(1); + expect(node.workers[0].algorithmName).to.eq('algo1'); + }); + + it('should calculate GPU resources correctly', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi', 'nvidia.com/gpu': '2' } } } + ] + } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { nodeName: 'node1', containers: [{ resources: { limits: { 'nvidia.com/gpu': '1' } } }] }, + metadata: { labels: {}, name: 'gpuPod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + expect(node.total.gpu).to.eq(2); + expect(node.requests.gpu).to.eq(1); + expect(node.free.gpu).to.eq(1); + expect(node.ratio.gpu).to.eq(0.5); + }); + + it('should accumulate limits separately from requests', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } } + ] + } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { + nodeName: 'node1', + containers: [{ + resources: { + requests: { cpu: '100m', memory: '128Mi' }, + limits: { cpu: '200m', memory: '256Mi' } + } + }] + }, + metadata: { labels: {}, name: 'pod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + expect(node.requests.cpu).to.eq(0.1); + expect(node.limits.cpu).to.eq(0.2); + expect(node.requests.memory).to.eq(128); + expect(node.limits.memory).to.eq(256); + }); + + it('should include nodes with no pods in nodeList with zero requests', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } } + ] + } + }; + const pods = { body: { items: [] } }; + + const res = normalizeResources({ pods, nodes }); + expect(res.nodeList[0].requests.cpu).to.eq(0); + expect(res.nodeList[0].free.cpu).to.eq(4); + }); + + it('should still use actual requests for worker pods when useResourceLimits=true', () => { + globalSettings.useResourceLimits = true; + const nodes = { + body: { items: [{ metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } }] } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { + nodeName: 'node1', + containers: [{ + resources: { + requests: { cpu: '100m', memory: '128Mi' }, + limits: { cpu: '1000m', memory: '512Mi' } + } + }] + }, + metadata: { labels: { type: 'worker', 'algorithm-name': 'algoX' }, name: 'workerPod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + + // requests bucket uses limit (1 core), not request + expect(node.requests.cpu).to.eq(1); + // workersTotal should use actual request (0.1 cores) + expect(node.workersTotal.cpu).to.eq(0.1); + }); + + it('should still use actual requests for "other" pods when useResourceLimits=true', () => { + globalSettings.useResourceLimits = true; + const nodes = { + body: { items: [{ metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } }] } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { + nodeName: 'node1', + containers: [{ + resources: { + requests: { cpu: '200m', memory: '256Mi' }, + limits: { cpu: '2000m', memory: '1Gi' } + } + }] + }, + metadata: { labels: { type: 'other' }, name: 'otherPod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + + // requests bucket uses limit (2 cores) + expect(node.requests.cpu).to.eq(2); + // "other" bucket should still use actual request (0.2 cores) + expect(node.other.cpu).to.eq(0.2); + }); }); describe('merge workers', () => { diff --git a/core/task-executor/tests/stub/resources.js b/core/task-executor/tests/stub/resources.js index c7adc9799..ca2e168c8 100644 --- a/core/task-executor/tests/stub/resources.js +++ b/core/task-executor/tests/stub/resources.js @@ -472,6 +472,47 @@ const queues = { // for Kai (run-ai) ] }; +const limitRanges = { + kind: 'LimitRangeList', + apiVersion: 'v1', + items: [ + { + kind: 'LimitRange', + apiVersion: 'v1', + metadata: { + name: 'default-limits', + namespace: 'default', + }, + spec: { + limits: [ + { + type: 'Container', + default: { cpu: '500m', memory: '512Mi' }, + defaultRequest: { cpu: '250m', memory: '256Mi' } + } + ] + } + }, + { + kind: 'LimitRange', // dummy for testing + apiVersion: 'v1', + metadata: { + name: 'another-limits', + namespace: 'default', + }, + spec: { + limits: [ + { + type: 'Container', + default: { cpu: '1', memory: '1Gi' }, + defaultRequest: { cpu: '500m', memory: '512Mi' } + } + ] + } + } + ] +}; + module.exports = { pods, @@ -483,5 +524,6 @@ module.exports = { configMap, secret, customResourceDefinition, - queues + queues, + limitRanges };