From a465c863fc77ea7fe1b83da18619e17ea019f926 Mon Sep 17 00:00:00 2001 From: Adir David Date: Sun, 24 Aug 2025 14:18:44 +0300 Subject: [PATCH 01/18] refactored names and added jsdoc --- core/task-executor/lib/reconcile/normalize.js | 112 ++++++++++++------ 1 file changed, 73 insertions(+), 39 deletions(-) diff --git a/core/task-executor/lib/reconcile/normalize.js b/core/task-executor/lib/reconcile/normalize.js index 771a8389d..b11e30e58 100644 --- a/core/task-executor/lib/reconcile/normalize.js +++ b/core/task-executor/lib/reconcile/normalize.js @@ -185,6 +185,11 @@ const normalizeColdWorkers = (normWorkers, algorithmTemplates) => { return coldWorkers; }; +/** + * Calculates ratio of used resources and remaining free capacity for a node. + * + * @param {Object} node - Node object with `total` and `requests` fields. + */ const _calcRatioFree = (node) => { node.ratio = { cpu: node.requests.cpu / node.total.cpu, @@ -198,22 +203,40 @@ const _calcRatioFree = (node) => { }; }; -const _nodeTaintsFilter = (node) => { - return !(node.spec && node.spec.taints && node.spec.taints.some(t => t.effect === 'NoSchedule')); -}; +/** + * Filters out nodes that are unschedulable due to "NoSchedule" taints. + * + * @param {Object} node - Node resource object with spec/taints. + * @returns {boolean} True if node can accept pods, false otherwise. + */ +const _filterSchedulableNodes = (node) => { + return !(node.spec && node.spec.taints && node.spec.taints.some(taint => taint.effect === 'NoSchedule')); + }; -const _parseGpu = (gpu) => { +/** + * Extracts numeric GPU allocation from a resource object. + * + * @param {Object} gpu - GPU resource object (e.g., from node allocatable or container limits). + * @returns {number} GPU amount, defaults to 0. + */ +const _extractGpuValue = (gpu) => { if (!gpu || !gpu[gpuVendors.NVIDIA]) { return 0; } return parseFloat(gpu[gpuVendors.NVIDIA]); }; -const _getGpuSpec = (pod) => { - let limitsGpu = sumBy(pod.spec.containers, c => _parseGpu(objectPath.get(c, 'resources.limits', 0))); +/** + * Extracts GPU requests and limits from a pod definition. + * + * @param {Object} pod - Pod object with containers and metadata. + * @returns {Object} { limitsGpu, requestGpu } + */ +const _extractGpuResources = (pod) => { + let limitsGpu = sumBy(pod.spec.containers, c => _extractGpuValue(objectPath.get(c, 'resources.limits', 0))); if (!limitsGpu) { - limitsGpu = _parseGpu(objectPath.get(pod, 'metadata.annotations', null)); + limitsGpu = _extractGpuValue(objectPath.get(pod, 'metadata.annotations', null)); } const requestGpu = limitsGpu; return { limitsGpu, requestGpu }; @@ -231,54 +254,63 @@ const _getRequestsAndLimits = (pod) => { ? limitsMem : sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.requests.memory', 0), true)); return { - requestCpu, requestGpu, requestMem, limitsCpu, limitsGpu, limitsMem + requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu }; }; +/** + * Builds cluster resource usage view by aggregating pod requests and node capacities. + * + * Produces a clear view of: + * - per-node usage and free resources + * - overall "allNodes" usage across the cluster + * + * @param {Object} resources - Cluster data from Kubernetes. + * @param {Object} resources.pods - Pod list API response. + * @param {Object} resources.nodes - Node list API response. + * @returns {Object} { allNodes, nodeList } + */ const normalizeResources = ({ pods, nodes } = {}) => { if (!pods || !nodes) { return { allNodes: { - ratio: { - cpu: 0, - gpu: 0, - memory: 0, - }, - free: { - cpu: 0, - gpu: 0, - memory: 0 - } + ratio: { cpu: 0, gpu: 0, memory: 0 }, + free: { cpu: 0, gpu: 0, memory: 0 } }, nodeList: [] }; } - const initial = nodes.body.items.filter(_nodeTaintsFilter).reduce((acc, cur) => { - acc[cur.metadata.name] = { - labels: cur.metadata.labels, + + // Build initial node map + const nodeMap = nodes.body.items.filter(_filterSchedulableNodes).reduce((accumulator, node) => { + accumulator[node.metadata.name] = { + labels: node.metadata.labels, requests: { cpu: 0, gpu: 0, memory: 0 }, limits: { cpu: 0, gpu: 0, memory: 0 }, workersTotal: { cpu: 0, gpu: 0, memory: 0 }, workers: [], other: { cpu: 0, gpu: 0, memory: 0 }, total: { - cpu: parse.getCpuInCore(cur.status.allocatable.cpu), - gpu: _parseGpu(cur.status.allocatable) || 0, - memory: parse.getMemoryInMi(cur.status.allocatable.memory, true) + cpu: parse.getCpuInCore(node.status.allocatable.cpu), + gpu: _extractGpuValue(node.status.allocatable) || 0, + memory: parse.getMemoryInMi(node.status.allocatable.memory, true) } }; - return acc; + return accumulator; }, {}); + const allNodes = { requests: { cpu: 0, gpu: 0, memory: 0 }, limits: { cpu: 0, gpu: 0, memory: 0 }, total: { - cpu: sumBy(Object.values(initial), 'total.cpu'), - gpu: sumBy(Object.values(initial), 'total.gpu'), - memory: sumBy(Object.values(initial), 'total.memory'), + cpu: sumBy(Object.values(nodeMap), 'total.cpu'), + gpu: sumBy(Object.values(nodeMap), 'total.gpu'), + memory: sumBy(Object.values(nodeMap), 'total.memory'), } }; - const stateFilter = p => p.status.phase === 'Running' || p.status.phase === 'Pending'; + + // Track pod usage per node + const stateFilter = pod => pod.status.phase === 'Running' || pod.status.phase === 'Pending'; const resourcesPerNode = pods.body.items.filter(stateFilter).reduce((accumulator, pod) => { const { nodeName } = pod.spec; if (!nodeName || !accumulator[nodeName]) { @@ -308,19 +340,21 @@ const normalizeResources = ({ pods, nodes } = {}) => { } return accumulator; - }, initial); + }, nodeMap); + // Build node list and aggregate totals const nodeList = []; - Object.entries(resourcesPerNode).forEach(([k, v]) => { - _calcRatioFree(v); - allNodes.requests.cpu += v.requests.cpu; - allNodes.requests.gpu += v.requests.gpu; - allNodes.requests.memory += v.requests.memory; - allNodes.limits.cpu += v.limits.cpu; - allNodes.limits.gpu += v.limits.gpu; - allNodes.limits.memory += v.limits.memory; - nodeList.push({ name: k, ...v }); + Object.entries(resourcesPerNode).forEach(([nodeName, nodeData]) => { + _calcRatioFree(nodeData); + allNodes.requests.cpu += nodeData.requests.cpu; + allNodes.requests.memory += nodeData.requests.memory; + allNodes.requests.gpu += nodeData.requests.gpu; + allNodes.limits.cpu += nodeData.limits.cpu; + allNodes.limits.memory += nodeData.limits.memory; + allNodes.limits.gpu += nodeData.limits.gpu; + nodeList.push({ name: nodeName, ...nodeData }); }); + _calcRatioFree(allNodes); return { allNodes, nodeList }; }; From 4f9ae0c1e5e51f5b54dd4abf318ca7f61231b2f8 Mon Sep 17 00:00:00 2001 From: Adir David Date: Sun, 24 Aug 2025 15:15:55 +0300 Subject: [PATCH 02/18] fixed bug where workers stats under node were not calculated correctly --- core/task-executor/lib/reconcile/normalize.js | 1 + core/task-executor/lib/reconcile/reconciler.js | 15 ++++++++------- 2 files changed, 9 insertions(+), 7 deletions(-) diff --git a/core/task-executor/lib/reconcile/normalize.js b/core/task-executor/lib/reconcile/normalize.js index b11e30e58..1eb21ca75 100644 --- a/core/task-executor/lib/reconcile/normalize.js +++ b/core/task-executor/lib/reconcile/normalize.js @@ -330,6 +330,7 @@ const normalizeResources = ({ pods, nodes } = {}) => { accumulator[nodeName].workersTotal.memory += requestMem; accumulator[nodeName].workers.push({ algorithmName: objectPath.get(pod, 'metadata.labels.algorithm-name'), + podName: objectPath.get(pod, 'metadata.name'), nodeName }); } diff --git a/core/task-executor/lib/reconcile/reconciler.js b/core/task-executor/lib/reconcile/reconciler.js index e70a8e57d..58d8a983a 100644 --- a/core/task-executor/lib/reconcile/reconciler.js +++ b/core/task-executor/lib/reconcile/reconciler.js @@ -35,7 +35,7 @@ const _calcStats = (data) => { return { stats, total: data.length }; }; -const _getNodeStats = (normResources) => { +const _getNodeStats = (normResources, normalizedWorkers) => { const localResources = clonedeep(normResources); const resourcesWithWorkers = localResources.nodeList; const statsPerNode = resourcesWithWorkers.map(n => ({ @@ -64,8 +64,7 @@ const _getNodeStats = (normResources) => { }, labels: n.labels, workers2: n.workers, - workers: _calcStats(n.workers) - + workers: _calcStats(normalizedWorkers.filter(worker => n.workers.some(nWorker => nWorker.podName === worker.podName))) } )); return statsPerNode; @@ -80,7 +79,7 @@ const _checkResourcePressure = (normResources) => { } }; -const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, ignoredUnScheduledAlgorithms, jobsInfo, workerStats, normResources }) => { +const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, ignoredUnScheduledAlgorithms, jobsInfo, normalizedWorkers, normResources }) => { const { created, skipped, toStop, toResume } = jobsInfo; Object.entries(reconcileResult).forEach(([algorithmName, res]) => { res.created = created.filter(c => c.algorithmName === algorithmName).length; @@ -89,6 +88,8 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, res.resumed = toResume.filter(c => c.algorithmName === algorithmName).length; }); + const workerStats = _calcStats(normalizedWorkers); + await etcd.updateDiscovery({ reconcileResult, unScheduledAlgorithms, @@ -99,7 +100,7 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, gpu: consts.GPU_RATIO_PRESSURE, mem: consts.MEMORY_RATIO_PRESSURE }, - nodes: _getNodeStats(normResources) + nodes: _getNodeStats(normResources, normalizedWorkers) }); workerStats.stats.forEach((ws) => { @@ -167,9 +168,9 @@ const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, await Promise.all([...workersToStopPromises, ...workersToExitPromises, ...workersToWarmUpPromises, ...workersToCoolDownPromises, ...workersToResumePromises]); // Write in etcd the reconcile result - const workerStats = _calcStats(workersManager.normalizedWorkers); + const { normalizedWorkers } = workersManager; await _updateReconcileResult({ - reconcileResult, ...jobsHandler, jobsInfo, workerStats, normResources + reconcileResult, ...jobsHandler, jobsInfo, normalizedWorkers, normResources }); return reconcileResult; From 2e215731d12a0b564847157224dc9c431c775bc5 Mon Sep 17 00:00:00 2001 From: Adir David Date: Mon, 25 Aug 2025 15:12:58 +0300 Subject: [PATCH 03/18] fixed bug - bad calculation, as long free is negative it causes reserved to always be negative. --- core/api-server/api/graphql/queries/statistics-querier.js | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/api-server/api/graphql/queries/statistics-querier.js b/core/api-server/api/graphql/queries/statistics-querier.js index 9c55eb1e8..d9c5bb186 100644 --- a/core/api-server/api/graphql/queries/statistics-querier.js +++ b/core/api-server/api/graphql/queries/statistics-querier.js @@ -116,7 +116,7 @@ class NodesStatistics { algorithmsData.push({ name: 'reserved', amount: otherAmount, - size: +(node.total[metric] * (1 - resourcePressure) + (free < 0 ? free : 0)).toFixed(1), + size: free < 0 ? +node.total[metric].toFixed(1) - +(node.other[metric].toFixed(1)) : +(node.total[metric] * (1 - resourcePressure)).toFixed(1) }); algorithmsData.push({ name: 'total', From bb03d1027627a080a09adfc75cc564befe85fe62 Mon Sep 17 00:00:00 2001 From: Adir David Date: Mon, 25 Aug 2025 15:16:26 +0300 Subject: [PATCH 04/18] added new method to get default value for resources, via limitrange --- core/task-executor/lib/helpers/kubernetes.js | 60 +++++++++++++++++-- .../task-executor/tests/kubernetesApiTests.js | 39 +++++++++++- .../tests/mocks/kubernetes-server.mock.js | 27 ++++++++- .../tests/mocks/kubernetes.mock.js | 1 + 4 files changed, 119 insertions(+), 8 deletions(-) diff --git a/core/task-executor/lib/helpers/kubernetes.js b/core/task-executor/lib/helpers/kubernetes.js index afa2fed95..469951ffe 100644 --- a/core/task-executor/lib/helpers/kubernetes.js +++ b/core/task-executor/lib/helpers/kubernetes.js @@ -13,7 +13,7 @@ let log; class KubernetesApi { async init(options = {}) { log = Logger.GetLogFromContainer(); - this._crdMissingWarnLogged = false; + this._warnWasLogged = { crdMissing: false, noLimitRange: false, moreThanOneLimit: false }; // To avoid spamming the logs this._client = new KubernetesClient(); await this._client.init(options.kubernetes); this._isNamespaced = options.kubernetes.isNamespaced; @@ -208,13 +208,13 @@ class KubernetesApi { const exists = crdList?.body?.items?.some(cr => cr.metadata.name === kaiValues.KUBERNETES.QUEUES_CRD_NAME); if (!exists) { - if (!this._crdMissingWarnLogged) { - log.info(`Kai Queues CRD (${kaiValues.KUBERNETES.QUEUES_CRD_NAME}) not found. Assuming Kai is not installed.`, { component }); - this._crdMissingWarnLogged = true; + if (!this._warnWasLogged.crdMissing) { + log.warning(`Kai Queues CRD (${kaiValues.KUBERNETES.QUEUES_CRD_NAME}) not found. Assuming Kai is not installed.`, { component }); + this._warnWasLogged.crdMissing = true; } return []; } - this._crdMissingWarnLogged = false; + this._warnWasLogged.crdMissing = false; const crd = crdList.body.items.find(cr => cr.metadata.name === kaiValues.KUBERNETES.QUEUES_CRD_NAME); const version = crd?.spec?.versions?.find(v => v.served)?.name; @@ -232,6 +232,56 @@ class KubernetesApi { return []; } } + + /** + * Get default CPU and memory requests/limits for containers + * from LimitRange resources in the namespace. + */ + async getContainerDefaultResources() { + try { + const res = await this._client.limitRanges.all(); + const items = res.body?.items || []; + + const containerLimits = items + .flatMap(item => item.spec.limits.map(limit => ({ + ...limit, + source: item.metadata?.name, + }))) + .filter(limit => limit.type === 'Container'); + + if (containerLimits.length === 0) { + if (!this._warnWasLogged.noLimitRange) { + log.warning('No LimitRange with type=Container found.', { component }); + this._warnWasLogged.noLimitRange = true; + } + return {}; + } + this._warnWasLogged.noLimitRange = false; // Reset warning flag if situation is resolved + + if (containerLimits.length > 1 && !this._warnWasLogged.moreThanOneLimit) { + log.warning(`Multiple LimitRanges with type=Container found: ${containerLimits.map(l => l.source)}. Taking the first one.`, { component }); + this._warnWasLogged.moreThanOneLimit = true; + } + else this._warnWasLogged.moreThanOneLimit = false; // Reset warning flag if situation is resolved + + const selected = containerLimits[0]; + + return { + cpu: { + defaultRequest: selected.defaultRequest?.cpu, + defaultLimits: selected.default?.cpu + }, + memory: { + defaultRequest: selected.defaultRequest?.memory, + defaultLimits: selected.default?.memory + } + }; + } + catch (error) { + log.error(`Failed to fetch container default resources ${error.message}`, { component }, error); + return {}; + } + } } module.exports = new KubernetesApi(); diff --git a/core/task-executor/tests/kubernetesApiTests.js b/core/task-executor/tests/kubernetesApiTests.js index b1a50c589..2c446fa4c 100644 --- a/core/task-executor/tests/kubernetesApiTests.js +++ b/core/task-executor/tests/kubernetesApiTests.js @@ -44,49 +44,61 @@ describe('Kubernetes API', () => { before(async () => { await kubernetesServerMock.start({ port: 9001 }); }); + beforeEach(async () => { instance = new KubernetesApi(); await instance.init(options); + sinon.restore(); }); + it('should create job', async () => { const res = await instance.createJob({ spec: { metadata: { name: 'mySpec' } } }); expect(res.body.metadata.name).to.eql('mySpec'); }); + it('should get worker jobs', async () => { const res = await instance.getWorkerJobs(); expect(res.statusCode).to.eql(StatusCodes.OK); }); + it('should get worker for job', async () => { const res = await instance.getPodsForJob({ spec: { selector: { matchLabels: 'myLabel=mySelector' } } }); expect(res.statusCode).to.eql(StatusCodes.OK); }); + it('should fail to get worker for job if no selector', async () => { const res = await instance.getPodsForJob({ spec: { selector: { matchLabels: null } } }); expect(res).to.be.empty }); + it('should fail to get worker for job if no job spec', async () => { const res = await instance.getPodsForJob(); expect(res).to.be.empty }); + it('should get config map', async () => { const res = await instance.getVersionsConfigMap(); expect(res).to.have.property('versions'); expect(res).to.have.property('registry'); expect(res).to.have.property('clusterOptions'); }); + it('should throw', async () => { const res = instance.init(optionsDummy); expect(res).to.be.rejectedWith('Invalid URI "no.such.url/version"'); }); + it('should get nodes and pods', async () => { const res = await instance.getResourcesPerNode(); expect(res).to.have.property('pods') expect(res).to.have.property('nodes') }); + it('should get all PVC names', async () => { const res = await instance.getAllPVCNames(); expect(res).to.be.an('array').that.includes('pvc-1', 'pvc-2'); }); + it('should get all ConfigMap names', async () => { const res = await instance.getAllConfigMapNames(); expect(res).to.be.an('array').that.includes('config-map-1', 'config-map-2'); @@ -95,22 +107,24 @@ describe('Kubernetes API', () => { const res = await instance.getAllSecretNames(); expect(res).to.be.an('array').that.includes('secret-1', 'secret-2'); }); + it('should get all Kai Queues names', async () => { kubernetesServerMock.setKaiCRDEnabled(true); const res = await instance.getAllQueueNames(); expect(res).to.be.an('array').that.includes('test', 'default'); }); + it('should return empty list when Kai CRD is missing', async () => { kubernetesServerMock.setKaiCRDEnabled(false); const res = await instance.getAllQueueNames(); expect(res).to.be.an('array').that.is.empty; }); + it('should log missing CRD warning only once', async () => { kubernetesServerMock.setKaiCRDEnabled(false); - const Logger = require('@hkube/logger'); const log1 = Logger.GetLogFromContainer(); - const logSpy = sinon.spy(log1, 'info'); + const logSpy = sinon.spy(log1, 'warning'); await instance.getAllQueueNames(); await instance.getAllQueueNames(); @@ -120,4 +134,25 @@ describe('Kubernetes API', () => { expect(matchingLogs.length).to.equal(1); }); + + it('should get all limit range data for containers', async () => { + const res = await instance.getContainerDefaultResources(); + expect(res).to.be.an('object').to.have.keys('cpu', 'memory'); + expect(res.cpu).to.have.keys('defaultRequest', 'defaultLimits'); + expect(res.memory).to.have.keys('defaultRequest', 'defaultLimits'); + }); + + it('should log limitrange warning only once', async () => { + const log1 = Logger.GetLogFromContainer(); + + const logSpy = sinon.spy(log1, 'warning'); + + await instance.getContainerDefaultResources(); + await instance.getContainerDefaultResources(); + const matchingLogs = logSpy.getCalls().filter(call => + call.args[0].includes('Multiple LimitRanges with type=Container found') && call.args[0].includes('Taking the first one') + ); + + expect(matchingLogs.length).to.equal(1); + }); }); diff --git a/core/task-executor/tests/mocks/kubernetes-server.mock.js b/core/task-executor/tests/mocks/kubernetes-server.mock.js index 4591eb945..5cfc75005 100644 --- a/core/task-executor/tests/mocks/kubernetes-server.mock.js +++ b/core/task-executor/tests/mocks/kubernetes-server.mock.js @@ -10,7 +10,8 @@ const { secret, configMap, customResourceDefinition, - queues + queues, + limitRanges } = resources; const app = express(); @@ -79,6 +80,30 @@ class MockClient { res.json(queues); return; } + if (req.url === '/api/v1/limitranges') { + res.json(limitRanges); + return; + } + + if (req.url.startsWith('/api/v1/limitranges/')) { + // extract the name if needed + const name = req.url.split('/').pop(); + const item = limitRanges.items.find(lr => lr.metadata.name === name); + if (item) { + res.json(item); + } else { + res.status(404).json({ + kind: 'Status', + apiVersion: 'v1', + metadata: {}, + status: 'Failure', + message: `limitranges "${name}" not found`, + reason: 'NotFound', + code: 404 + }); + } + return; + } res.json(req.body); }); diff --git a/core/task-executor/tests/mocks/kubernetes.mock.js b/core/task-executor/tests/mocks/kubernetes.mock.js index 2ec2d1331..290c6011f 100644 --- a/core/task-executor/tests/mocks/kubernetes.mock.js +++ b/core/task-executor/tests/mocks/kubernetes.mock.js @@ -43,6 +43,7 @@ module.exports = { getAllSecretNames: async () => { return ['secret-1', 'secret-2']; }, getAllConfigMapNames: async () => { return ['config-map-1', 'config-map-2']; }, getAllQueueNames: async () => { return ['test', 'default']; }, + getContainerDefaultResources: async () => { return { cpu: { defaultRequest: 0.1, defaultLimits: 0.2 }, memory: { defaultRequest: '128Mi', defaultLimits: '256Mi' } }; }, }, callCount: (name) => { return callCount[name]; From ccdbf5b2efa15c6b2c016f9015563caa300dbf20 Mon Sep 17 00:00:00 2001 From: Adir David Date: Mon, 25 Aug 2025 15:18:31 +0300 Subject: [PATCH 05/18] added stub data for test --- core/task-executor/tests/stub/resources.js | 44 +++++++++++++++++++++- 1 file changed, 43 insertions(+), 1 deletion(-) diff --git a/core/task-executor/tests/stub/resources.js b/core/task-executor/tests/stub/resources.js index c7adc9799..ca2e168c8 100644 --- a/core/task-executor/tests/stub/resources.js +++ b/core/task-executor/tests/stub/resources.js @@ -472,6 +472,47 @@ const queues = { // for Kai (run-ai) ] }; +const limitRanges = { + kind: 'LimitRangeList', + apiVersion: 'v1', + items: [ + { + kind: 'LimitRange', + apiVersion: 'v1', + metadata: { + name: 'default-limits', + namespace: 'default', + }, + spec: { + limits: [ + { + type: 'Container', + default: { cpu: '500m', memory: '512Mi' }, + defaultRequest: { cpu: '250m', memory: '256Mi' } + } + ] + } + }, + { + kind: 'LimitRange', // dummy for testing + apiVersion: 'v1', + metadata: { + name: 'another-limits', + namespace: 'default', + }, + spec: { + limits: [ + { + type: 'Container', + default: { cpu: '1', memory: '1Gi' }, + defaultRequest: { cpu: '500m', memory: '512Mi' } + } + ] + } + } + ] +}; + module.exports = { pods, @@ -483,5 +524,6 @@ module.exports = { configMap, secret, customResourceDefinition, - queues + queues, + limitRanges }; From 995d96ccd27540beaaba3fbb2511d5be99b24e16 Mon Sep 17 00:00:00 2001 From: Adir David Date: Mon, 25 Aug 2025 15:22:04 +0300 Subject: [PATCH 06/18] fixed bug - should apply workerResourceRequests only if settings.applyResources is true, plus fixed merged method to not include fields that are empty or null, since null will fail k8s job and empty will just be odd in the job spec, better of without it. --- core/task-executor/lib/jobs/jobCreator.js | 23 +++++++++++++++-------- 1 file changed, 15 insertions(+), 8 deletions(-) diff --git a/core/task-executor/lib/jobs/jobCreator.js b/core/task-executor/lib/jobs/jobCreator.js index 879324d36..747187a24 100644 --- a/core/task-executor/lib/jobs/jobCreator.js +++ b/core/task-executor/lib/jobs/jobCreator.js @@ -327,13 +327,19 @@ const applyAnnotations = (spec, keyVal) => { }; const mergeResourceRequest = (defaultResource, customResource) => { - const mergedRequest = { requests: {}, limits: {} }; + const mergedRequest = {}; for (const key of ['requests', 'limits']) { - mergedRequest[key].memory = customResource[key]?.memory || defaultResource[key]?.memory || null; - mergedRequest[key].cpu = customResource[key]?.cpu || defaultResource[key]?.cpu || null; + const cpu = customResource?.[key]?.cpu || defaultResource?.[key]?.cpu; + const memory = customResource?.[key]?.memory || defaultResource?.[key]?.memory; + + if (cpu || memory) { + mergedRequest[key] = {}; + if (cpu) mergedRequest[key].cpu = cpu; + if (memory) mergedRequest[key].memory = memory; + } } - return mergedRequest; + return Object.keys(mergedRequest).length > 0 ? mergedRequest : undefined; }; const _applyDefaultResourcesSideCar = (container) => { @@ -453,12 +459,13 @@ const createJobSpec = ({ kind, algorithmName, resourceRequests, workerImage, alg spec = applyEnvToContainer(spec, CONTAINERS.WORKER, { ALGORITHM_VERSION: algorithmVersion }); spec = applyEnvToContainer(spec, CONTAINERS.WORKER, { WORKER_IMAGE: workerImage }); spec = applyAlgorithmResourceRequests(spec, resourceRequests, node); - if (settings.applyResources || workerCustomResources) { - if (workerCustomResources) { - workerResourceRequests = mergeResourceRequest(workerResourceRequests, workerCustomResources); - } + if (settings.applyResources) { + workerResourceRequests = mergeResourceRequest(workerResourceRequests, workerCustomResources); spec = applyWorkerResourceRequests(spec, workerResourceRequests); } + else if (workerCustomResources) { + spec = applyWorkerResourceRequests(spec, workerCustomResources); + } spec = applyNodeSelector(spec, nodeSelector); spec = applyHotWorker(spec, hotWorker); spec = applyEntryPoint(spec, entryPoint); From 1ce519f61a020103ace8105d636fb3f9f9a2eb32 Mon Sep 17 00:00:00 2001 From: Adir David Date: Mon, 25 Aug 2025 15:24:55 +0300 Subject: [PATCH 07/18] refactor a bit, and changed behavior to take max out of requests and limits if useResouceLimits is on (some pods have no limit, which can cause limits to be lower than requests, which is impossible). --- core/task-executor/lib/reconcile/normalize.js | 41 ++++++++++++------- 1 file changed, 27 insertions(+), 14 deletions(-) diff --git a/core/task-executor/lib/reconcile/normalize.js b/core/task-executor/lib/reconcile/normalize.js index 1eb21ca75..eed9c4913 100644 --- a/core/task-executor/lib/reconcile/normalize.js +++ b/core/task-executor/lib/reconcile/normalize.js @@ -211,7 +211,7 @@ const _calcRatioFree = (node) => { */ const _filterSchedulableNodes = (node) => { return !(node.spec && node.spec.taints && node.spec.taints.some(taint => taint.effect === 'NoSchedule')); - }; +}; /** * Extracts numeric GPU allocation from a resource object. @@ -242,17 +242,24 @@ const _extractGpuResources = (pod) => { return { limitsGpu, requestGpu }; }; -const _getRequestsAndLimits = (pod) => { - const { useResourceLimits } = globalSettings; +/** + * Extracts CPU, memory, and GPU requests/limits from a pod definition. + * + * @param {Object} pod - Pod object with containers and resources. + * @returns {Object} { requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu } + */ +const _extractPodResources = (pod) => { + // CPU + const requestCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.requests.cpu', '0m'))); const limitsCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.limits.cpu', '0m'))); - const { limitsGpu, requestGpu } = _getGpuSpec(pod); + + // Memory + const requestMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.requests.memory', 0), true)); const limitsMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.limits.memory', 0), true)); - const requestCpu = useResourceLimits && limitsCpu - ? limitsCpu - : sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.requests.cpu', '0m'))); - const requestMem = useResourceLimits && limitsMem - ? limitsMem - : sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.requests.memory', 0), true)); + + // GPU + const { limitsGpu, requestGpu } = _extractGpuResources(pod); + return { requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu }; @@ -316,14 +323,17 @@ const normalizeResources = ({ pods, nodes } = {}) => { if (!nodeName || !accumulator[nodeName]) { return accumulator; } - const { requestCpu, requestGpu, requestMem, limitsCpu, limitsGpu, limitsMem } = _getRequestsAndLimits(pod); + const { requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu } = _extractPodResources(pod); + const { useResourceLimits } = globalSettings; - accumulator[nodeName].requests.cpu += requestCpu; + accumulator[nodeName].requests.cpu += (useResourceLimits && limitsCpu) ? Math.max(requestCpu, limitsCpu) : requestCpu; + accumulator[nodeName].requests.memory += (useResourceLimits && limitsMem) ? Math.max(requestMem, limitsMem) : requestMem; accumulator[nodeName].requests.gpu += requestGpu; - accumulator[nodeName].requests.memory += requestMem; + accumulator[nodeName].limits.cpu += limitsCpu; - accumulator[nodeName].limits.gpu += limitsGpu; accumulator[nodeName].limits.memory += limitsMem; + accumulator[nodeName].limits.gpu += limitsGpu; + if (objectPath.get(pod, 'metadata.labels.type') === 'worker') { accumulator[nodeName].workersTotal.cpu += requestCpu; accumulator[nodeName].workersTotal.gpu += requestGpu; @@ -347,12 +357,15 @@ const normalizeResources = ({ pods, nodes } = {}) => { const nodeList = []; Object.entries(resourcesPerNode).forEach(([nodeName, nodeData]) => { _calcRatioFree(nodeData); + allNodes.requests.cpu += nodeData.requests.cpu; allNodes.requests.memory += nodeData.requests.memory; allNodes.requests.gpu += nodeData.requests.gpu; + allNodes.limits.cpu += nodeData.limits.cpu; allNodes.limits.memory += nodeData.limits.memory; allNodes.limits.gpu += nodeData.limits.gpu; + nodeList.push({ name: nodeName, ...nodeData }); }); From d820bad72b4b1a6539975bfe74a786be3e890d6d Mon Sep 17 00:00:00 2001 From: Adir David Date: Mon, 25 Aug 2025 15:34:45 +0300 Subject: [PATCH 08/18] jsdoc was added and small refactor --- .../task-executor/lib/reconcile/reconciler.js | 113 ++++++++++++------ 1 file changed, 77 insertions(+), 36 deletions(-) diff --git a/core/task-executor/lib/reconcile/reconciler.js b/core/task-executor/lib/reconcile/reconciler.js index 58d8a983a..76dadfc36 100644 --- a/core/task-executor/lib/reconcile/reconciler.js +++ b/core/task-executor/lib/reconcile/reconciler.js @@ -8,11 +8,42 @@ const { WorkersManager, requestPreprocessor, jobsHandler } = require('./managers const { CPU_RATIO_PRESSURE, MEMORY_RATIO_PRESSURE } = consts; -const _calcStats = (data) => { - const stats = Object.values(data.reduce((acc, cur) => { - if (!acc[cur.algorithmName]) { - acc[cur.algorithmName] = { - algorithmName: cur.algorithmName, +/** + * Checks if there is resource pressure (CPU or Memory) based on the provided normalized resources. + * Logs the details of CPU and memory pressure if either exceeds the defined threshold. + * + * @param {Object} normResources - Normalized resource usage data. + * @param {Object} normResources.allNodes - Resource data for all nodes. + * @param {Object} normResources.allNodes.ratio - Resource usage ratios. + * @param {number} normResources.allNodes.ratio.cpu - The CPU usage ratio (0-1). + * @param {number} normResources.allNodes.ratio.memory - The memory usage ratio (0-1). + */ +const _checkResourcePressure = (normResources) => { + const isCpuPressure = normResources.allNodes.ratio.cpu > CPU_RATIO_PRESSURE; + const isMemoryPressure = normResources.allNodes.ratio.memory > MEMORY_RATIO_PRESSURE; + const isResourcePressure = isCpuPressure || isMemoryPressure; + if (isResourcePressure) { + log.trace(`isCpuPressure: ${isCpuPressure}, isMemoryPressure: ${isMemoryPressure}`, { component }); + } +}; + +/** + * Calculates statistics for workers, grouped by algorithm name. + * For each algorithm, it counts the number of workers in various states (e.g., ready, working, etc.), + * and tracks redundant (workers with no status) and hot workers. + * + * @param {Object[]} workers - List of workers. + * @param {string} workers[].algorithmName - Algorithm name. + * @param {string} [workers[].workerStatus] - Worker status (init, ready, working, exit). + * @param {boolean} [workers[].hotWorker] - Whether worker is hot. + * @returns {{ stats: Object[], total: number }} Aggregated stats and total workers. + */ +const _aggregateWorkerStats = (workers) => { + const stats = Object.values(workers.reduce((acc, worker) => { + const { algorithmName, workerStatus, hotWorker } = worker; + if (!acc[algorithmName]) { + acc[algorithmName] = { + algorithmName, count: 0, init: 0, ready: 0, @@ -21,52 +52,62 @@ const _calcStats = (data) => { hot: 0 }; } - acc[cur.algorithmName].count += 1; - if (cur.workerStatus === undefined) { - acc[cur.algorithmName].redundant = (acc[cur.algorithmName].redundant || 0) + 1; + acc[algorithmName].count += 1; + if (workerStatus) { + acc[algorithmName][workerStatus] += 1; } - else acc[cur.algorithmName][cur.workerStatus] += 1; - if (cur.hotWorker) { - acc[cur.algorithmName].hot += 1; + else acc[algorithmName].redundant = (acc[algorithmName].redundant || 0) + 1; + if (hotWorker) { + acc[algorithmName].hot += 1; } return acc; }, {})); - return { stats, total: data.length }; + return { stats, total: workers.length }; }; -const _getNodeStats = (normResources, normalizedWorkers) => { - const localResources = clonedeep(normResources); - const resourcesWithWorkers = localResources.nodeList; - const statsPerNode = resourcesWithWorkers.map(n => ({ - name: n.name, +/** + * Builds resource and worker statistics per node. + * + * @param {Object} normResources - Normalized resources. + * @param {Object[]} normalizedWorkers - Normalized workers list. + * @returns {Object[]} Stats per node including resources and aggregated worker stats. + */ +const _buildNodeStats = (normResources, normalizedWorkers) => { + const clonedResources = clonedeep(normResources); + + const statsPerNode = clonedResources.nodeList.map(node => { + const nodeWorkers = normalizedWorkers.filter(w => node.workers.some(nw => nw.podName === w.podName)); + + return { + name: node.name, total: { - cpu: n.total.cpu, - gpu: n.total.gpu, - mem: n.total.memory, + cpu: node.total.cpu, + gpu: node.total.gpu, + mem: node.total.memory, }, requests: { - cpu: n.requests.cpu, - gpu: n.requests.gpu, - mem: n.requests.memory, + cpu: node.requests.cpu, + gpu: node.requests.gpu, + mem: node.requests.memory, }, other: { - cpu: n.other.cpu, - gpu: n.other.gpu, - mem: n.other.memory, + cpu: node.other.cpu, + gpu: node.other.gpu, + mem: node.other.memory, }, workersTotal: { - cpu: n.workersTotal.cpu, - gpu: n.workersTotal.gpu, - mem: n.workersTotal.memory, + cpu: node.workersTotal.cpu, + gpu: node.workersTotal.gpu, + mem: node.workersTotal.memory, }, - labels: n.labels, - workers2: n.workers, - workers: _calcStats(normalizedWorkers.filter(worker => n.workers.some(nWorker => nWorker.podName === worker.podName))) - } - )); + labels: node.labels, + workers2: node.workers, + workers: _aggregateWorkerStats(nodeWorkers) + }; + }); return statsPerNode; }; @@ -88,7 +129,7 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, res.resumed = toResume.filter(c => c.algorithmName === algorithmName).length; }); - const workerStats = _calcStats(normalizedWorkers); + const workerStats = _aggregateWorkerStats(normalizedWorkers); await etcd.updateDiscovery({ reconcileResult, @@ -100,7 +141,7 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, gpu: consts.GPU_RATIO_PRESSURE, mem: consts.MEMORY_RATIO_PRESSURE }, - nodes: _getNodeStats(normResources, normalizedWorkers) + nodes: _buildNodeStats(normResources, normalizedWorkers) }); workerStats.stats.forEach((ws) => { From 811cd0e9dc085fc707f952ad54f65cac044d15ab Mon Sep 17 00:00:00 2001 From: Adir David Date: Mon, 25 Aug 2025 15:36:54 +0300 Subject: [PATCH 09/18] now taking worker resources in account when calculating resources per algortihm --- .../api/graphql/queries/statistics-querier.js | 14 ++-- .../lib/reconcile/managers/jobs.js | 8 +- .../task-executor/lib/reconcile/reconciler.js | 75 +++++++++++++++---- 3 files changed, 72 insertions(+), 25 deletions(-) diff --git a/core/api-server/api/graphql/queries/statistics-querier.js b/core/api-server/api/graphql/queries/statistics-querier.js index d9c5bb186..6865775fd 100644 --- a/core/api-server/api/graphql/queries/statistics-querier.js +++ b/core/api-server/api/graphql/queries/statistics-querier.js @@ -76,16 +76,20 @@ class NodesStatistics { return results; } - _buildAlgorithmResult(node, algorithms, metric, resourcePressure) { + _buildAlgorithmResult(node, algorithms, metric, resourcePressure, defaultWorker) { let otherAmount = 0; const algorithmsData = []; const getMetric = (mtr, algorithm) => { - const rawMetric = algorithm[mtr] ? algorithm[mtr] : 0; + const algoRawMetric = algorithm[mtr] ? algorithm[mtr] : 0; + const workerRawMetric = algorithm?.workerCustomResources?.requests[mtr] || defaultWorker || 0; if (mtr === 'mem') { - return parse.getMemoryInMi(rawMetric); + return parse.getMemoryInMi(algoRawMetric) + (workerRawMetric ? parse.getMemoryInMi(workerRawMetric) : 0); } - return rawMetric; + if (mtr === 'cpu') { + return algoRawMetric + (workerRawMetric ? parse.getCpuInCore(workerRawMetric) : 0); + } + return algoRawMetric; // for gpu, worker doesnt need gpu }; node.workers.stats.forEach(algorithm => { const requestedAlgorithm = algorithms.find(alg => alg.name === algorithm.algorithmName); @@ -135,7 +139,7 @@ class NodesStatistics { 'reserved' ]; const results = taskExecutor.length ? taskExecutor[0].nodes.map(node => { - const algorithmsData = this._buildAlgorithmResult(node, algorithms, metric, taskExecutor[0].resourcePressure[metric]); + const algorithmsData = this._buildAlgorithmResult(node, algorithms, metric, taskExecutor[0].resourcePressure[metric], taskExecutor[0].defaultWorkerResources[metric]); return { name: node.name, algorithmsData diff --git a/core/task-executor/lib/reconcile/managers/jobs.js b/core/task-executor/lib/reconcile/managers/jobs.js index 15ec97035..1bbea0d5b 100644 --- a/core/task-executor/lib/reconcile/managers/jobs.js +++ b/core/task-executor/lib/reconcile/managers/jobs.js @@ -183,14 +183,14 @@ class JobsHandler { // No existing worker found — prepare job creation request const algorithmTemplate = algorithmTemplates[algorithmName]; - const { workerCustomResources } = algorithmTemplates[algorithmName]; const algorithmImage = setAlgorithmImage(algorithmTemplate, versions, registry); const workerImage = setWorkerImage(algorithmTemplate, versions, registry); const resourceRequests = createContainerResource(algorithmTemplate); const workerResourceRequests = createContainerResource(workerResources); - const { kind, workerEnv, algorithmEnv, labels, annotations, version: algorithmVersion, nodeSelector, stateType: algorithmStateType = 'batch', - entryPoint, options: algorithmOptions, reservedMemory, mounts, env, sideCars, volumes, volumeMounts, kaiObject } = algorithmTemplate; + const { kind, workerEnv, algorithmEnv, labels, annotations, version: algorithmVersion, nodeSelector, + stateType: algorithmStateType = 'batch', entryPoint, options: algorithmOptions, reservedMemory, + mounts, env, sideCars, volumes, volumeMounts, workerCustomResources, kaiObject } = algorithmTemplate; createDetails.push({ numberOfNewJobs: 1, @@ -451,7 +451,7 @@ class JobsHandler { * * Logic: * 1. Add skipped algorithms to the `unScheduledAlgorithms` map if not already present. - * 2. Check if any of these algorithms have been created, requested, or removed from templates. + * 2. Check if any of these algorithms have been created, not requested anymore, or removed from templates. * 3. Remove such algorithms from the map and move them to `ignoredUnScheduledAlgorithms`. * * @private diff --git a/core/task-executor/lib/reconcile/reconciler.js b/core/task-executor/lib/reconcile/reconciler.js index 76dadfc36..fac3d76a8 100644 --- a/core/task-executor/lib/reconcile/reconciler.js +++ b/core/task-executor/lib/reconcile/reconciler.js @@ -1,6 +1,8 @@ const Logger = require('@hkube/logger'); const log = Logger.GetLogFromContainer(); const clonedeep = require('lodash.clonedeep'); +const kubernetes = require('../helpers/kubernetes'); +const { settings } = require('../helpers/settings'); const etcd = require('../helpers/etcd'); const { components, consts } = require('../consts'); const component = components.RECONCILER; @@ -81,28 +83,28 @@ const _buildNodeStats = (normResources, normalizedWorkers) => { return { name: node.name, - total: { + total: { cpu: node.total.cpu, gpu: node.total.gpu, mem: node.total.memory, - }, - requests: { + }, + requests: { cpu: node.requests.cpu, gpu: node.requests.gpu, mem: node.requests.memory, - }, - other: { + }, + other: { cpu: node.other.cpu, gpu: node.other.gpu, mem: node.other.memory, - }, - workersTotal: { + }, + workersTotal: { cpu: node.workersTotal.cpu, gpu: node.workersTotal.gpu, mem: node.workersTotal.memory, - }, + }, labels: node.labels, workers2: node.workers, workers: _aggregateWorkerStats(nodeWorkers) @@ -111,16 +113,55 @@ const _buildNodeStats = (normResources, normalizedWorkers) => { return statsPerNode; }; -const _checkResourcePressure = (normResources) => { - const isCpuPressure = normResources.allNodes.ratio.cpu > CPU_RATIO_PRESSURE; - const isMemoryPressure = normResources.allNodes.ratio.memory > MEMORY_RATIO_PRESSURE; - const isResourcePressure = isCpuPressure || isMemoryPressure; - if (isResourcePressure) { - log.trace(`isCpuPressure: ${isCpuPressure}, isMemoryPressure: ${isMemoryPressure}`, { component }); +/** + * Retrieves default CPU and memory resources for workers. + * + * Priority: + * 1. Use explicit worker resources from options if `applyResources` is enabled. + * 2. Otherwise, fallback to Kubernetes container default requests. + * + * @async + * @param {Object} options - Configuration options. + * @param {Object} options.resources - Resource configurations. + * @param {Object} options.resources.worker - Worker resource configuration. + * @param {number|string} [options.resources.worker.cpu] - Worker CPU request. + * @param {number|string} [options.resources.worker.mem] - Worker memory request. + * @returns {Promise<{cpu: (number|string|undefined), mem: (number|string|undefined)}>} + * Default worker resources (CPU, memory), or `undefined` if not resolved. + */ +const _resolveWorkerResourceDefaults = async (options) => { + const defaults = {}; + + if (settings.applyResources) { + defaults.cpu = options.resources.worker.cpu; + defaults.mem = options.resources.worker.mem; } + + const containerDefaults = await kubernetes.getContainerDefaultResources(); + if (!defaults.cpu && containerDefaults.cpu) defaults.cpu = containerDefaults.cpu.defaultRequest; + if (!defaults.mem && containerDefaults.memory) defaults.mem = containerDefaults.memory?.defaultRequest; + + return defaults; }; -const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, ignoredUnScheduledAlgorithms, jobsInfo, normalizedWorkers, normResources }) => { +/** + * Updates reconciliation results with job and worker statistics, + * synchronizes the current cluster state with etcd, and logs summary information. + * + * @param {Object} params + * @param {Object.} params.reconcileResult - Current reconciliation results per algorithm. + * @param {Object} params.unScheduledAlgorithms - Algorithms that could not be scheduled and were skipped. + * @param {Object} params.ignoredUnScheduledAlgorithms - Algorithms previously unscheduled but now ignored (if they were created, not requested anymore, or removed from templates). + * @param {Object} params.jobsInfo - Job scheduling information. + * @param {Object[]} params.jobsInfo.created - Jobs successfully created. + * @param {Object[]} params.jobsInfo.skipped - Jobs that were skipped due any reason (resources missing etc). + * @param {Object[]} params.jobsInfo.toStop - Jobs whose workers should be stopped. + * @param {Object[]} params.jobsInfo.toResume - Jobs whose workers should be resumed. + * @param {Object[]} params.normalizedWorkers - Normalized workers. + * @param {Object} params.normResources - Normalized cluster resources. + * @param {Object} params.options - Global configuration. + */ +const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, ignoredUnScheduledAlgorithms, jobsInfo, normalizedWorkers, normResources, options }) => { const { created, skipped, toStop, toResume } = jobsInfo; Object.entries(reconcileResult).forEach(([algorithmName, res]) => { res.created = created.filter(c => c.algorithmName === algorithmName).length; @@ -130,6 +171,7 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, }); const workerStats = _aggregateWorkerStats(normalizedWorkers); + const defaultWorkerResources = await _resolveWorkerResourceDefaults(options); await etcd.updateDiscovery({ reconcileResult, @@ -141,6 +183,7 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, gpu: consts.GPU_RATIO_PRESSURE, mem: consts.MEMORY_RATIO_PRESSURE }, + defaultWorkerResources, nodes: _buildNodeStats(normResources, normalizedWorkers) }); @@ -211,7 +254,7 @@ const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, // Write in etcd the reconcile result const { normalizedWorkers } = workersManager; await _updateReconcileResult({ - reconcileResult, ...jobsHandler, jobsInfo, normalizedWorkers, normResources + reconcileResult, ...jobsHandler, jobsInfo, normalizedWorkers, normResources, options }); return reconcileResult; From 6fd74e3e79f7397f8f95434e57a1ad80fe4c1515 Mon Sep 17 00:00:00 2001 From: Adir David Date: Mon, 25 Aug 2025 18:07:28 +0300 Subject: [PATCH 10/18] extracted default container resources out of the reconcile, and now in data preparation --- core/task-executor/lib/executor.js | 8 +++++--- core/task-executor/lib/reconcile/reconciler.js | 18 +++++++++++------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/core/task-executor/lib/executor.js b/core/task-executor/lib/executor.js index 8483562f1..da1b2fc68 100644 --- a/core/task-executor/lib/executor.js +++ b/core/task-executor/lib/executor.js @@ -64,6 +64,7 @@ class Executor { const { pods } = resources; const normResources = normalizeResources(resources); + const containerDefaults = await kubernetes.getContainerDefaultResources(); const data = { versions, normResources, @@ -71,7 +72,8 @@ class Executor { registry, clusterOptions, pods, - workerResources: options.resources.worker + workerResources: options.resources.worker, + containerDefaults }; await Promise.all([ @@ -86,7 +88,7 @@ class Executor { } } - async _algorithmsHandle({ versions, normResources, registry, options, clusterOptions, pods, workerResources }) { + async _algorithmsHandle(data) { const [algorithmTemplates, algorithmRequests, workers, jobs] = await Promise.all([ etcd.getAlgorithmTemplate(), etcd.getAlgorithmRequests({}), @@ -95,7 +97,7 @@ class Executor { ]); const reconcilerResults = await reconciler.reconcile({ - algorithmTemplates, algorithmRequests, workers, jobs, pods, versions, normResources, registry, options, clusterOptions, workerResources + algorithmTemplates, algorithmRequests, workers, jobs, ...data }); Object.entries(reconcilerResults).forEach(([algorithmName, res]) => { this[metricsNames.TASK_EXECUTOR_JOB_REQUESTS].set({ value: res.required || 0, labelValues: { algorithmName } }); diff --git a/core/task-executor/lib/reconcile/reconciler.js b/core/task-executor/lib/reconcile/reconciler.js index fac3d76a8..4f1ca76a4 100644 --- a/core/task-executor/lib/reconcile/reconciler.js +++ b/core/task-executor/lib/reconcile/reconciler.js @@ -1,7 +1,6 @@ const Logger = require('@hkube/logger'); const log = Logger.GetLogFromContainer(); const clonedeep = require('lodash.clonedeep'); -const kubernetes = require('../helpers/kubernetes'); const { settings } = require('../helpers/settings'); const etcd = require('../helpers/etcd'); const { components, consts } = require('../consts'); @@ -126,10 +125,15 @@ const _buildNodeStats = (normResources, normalizedWorkers) => { * @param {Object} options.resources.worker - Worker resource configuration. * @param {number|string} [options.resources.worker.cpu] - Worker CPU request. * @param {number|string} [options.resources.worker.mem] - Worker memory request. + * @param {Object} containerDefaults - Default container resources from Kubernetes. + * @param {Object} [containerDefaults.cpu] - Default CPU resource from Kubernetes. + * @param {string|number} [containerDefaults.cpu.defaultRequest] - Default CPU request + * @param {Object} [containerDefaults.memory] - Default memory resource from Kubernetes. + * @param {string|number} [containerDefaults.memory.defaultRequest] - Default memory request * @returns {Promise<{cpu: (number|string|undefined), mem: (number|string|undefined)}>} * Default worker resources (CPU, memory), or `undefined` if not resolved. */ -const _resolveWorkerResourceDefaults = async (options) => { +const _resolveWorkerResourceDefaults = async (options, containerDefaults) => { const defaults = {}; if (settings.applyResources) { @@ -137,7 +141,6 @@ const _resolveWorkerResourceDefaults = async (options) => { defaults.mem = options.resources.worker.mem; } - const containerDefaults = await kubernetes.getContainerDefaultResources(); if (!defaults.cpu && containerDefaults.cpu) defaults.cpu = containerDefaults.cpu.defaultRequest; if (!defaults.mem && containerDefaults.memory) defaults.mem = containerDefaults.memory?.defaultRequest; @@ -160,8 +163,9 @@ const _resolveWorkerResourceDefaults = async (options) => { * @param {Object[]} params.normalizedWorkers - Normalized workers. * @param {Object} params.normResources - Normalized cluster resources. * @param {Object} params.options - Global configuration. + * @param {Object} params.containerDefaults - Default container resources from Kubernetes. */ -const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, ignoredUnScheduledAlgorithms, jobsInfo, normalizedWorkers, normResources, options }) => { +const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, ignoredUnScheduledAlgorithms, jobsInfo, normalizedWorkers, normResources, options, containerDefaults }) => { const { created, skipped, toStop, toResume } = jobsInfo; Object.entries(reconcileResult).forEach(([algorithmName, res]) => { res.created = created.filter(c => c.algorithmName === algorithmName).length; @@ -171,7 +175,7 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, }); const workerStats = _aggregateWorkerStats(normalizedWorkers); - const defaultWorkerResources = await _resolveWorkerResourceDefaults(options); + const defaultWorkerResources = await _resolveWorkerResourceDefaults(options, containerDefaults); await etcd.updateDiscovery({ reconcileResult, @@ -209,7 +213,7 @@ const _updateReconcileResult = async ({ reconcileResult, unScheduledAlgorithms, }); }; -const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, pods, versions, normResources, registry, options, clusterOptions, workerResources } = {}) => { +const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, versions, normResources, options, registry, clusterOptions, pods, workerResources, containerDefaults } = {}) => { // Update the cache of jobs lately created by removing old jobs const reconcileResult = {}; @@ -254,7 +258,7 @@ const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, // Write in etcd the reconcile result const { normalizedWorkers } = workersManager; await _updateReconcileResult({ - reconcileResult, ...jobsHandler, jobsInfo, normalizedWorkers, normResources, options + reconcileResult, ...jobsHandler, jobsInfo, normalizedWorkers, normResources, options, containerDefaults }); return reconcileResult; From 424aec08c094de75b3e90789c5160171d4a4c7bc Mon Sep 17 00:00:00 2001 From: Adir David Date: Wed, 27 Aug 2025 14:47:20 +0300 Subject: [PATCH 11/18] fix all graph calculations errors --- .../api/graphql/queries/statistics-querier.js | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/core/api-server/api/graphql/queries/statistics-querier.js b/core/api-server/api/graphql/queries/statistics-querier.js index 6865775fd..22ff86eaa 100644 --- a/core/api-server/api/graphql/queries/statistics-querier.js +++ b/core/api-server/api/graphql/queries/statistics-querier.js @@ -78,6 +78,7 @@ class NodesStatistics { _buildAlgorithmResult(node, algorithms, metric, resourcePressure, defaultWorker) { let otherAmount = 0; + let algorithmTotalSize = 0; const algorithmsData = []; const getMetric = (mtr, algorithm) => { @@ -93,13 +94,14 @@ class NodesStatistics { }; node.workers.stats.forEach(algorithm => { const requestedAlgorithm = algorithms.find(alg => alg.name === algorithm.algorithmName); - + const size = +(algorithm.count * getMetric(metric, requestedAlgorithm)).toFixed(1); if (requestedAlgorithm) { algorithmsData.push({ name: algorithm.algorithmName, amount: algorithm.count, - size: +(algorithm.count * getMetric(metric, requestedAlgorithm)).toFixed(1), + size }); + algorithmTotalSize += size; } else { otherAmount += algorithm.count; @@ -108,8 +110,7 @@ class NodesStatistics { algorithmsData.push({ name: 'other', amount: otherAmount, - // size: +(node.total[metric] * resourcePressure -(nodeFree + (algorithmsData.reduce((sum, alg) => sum + alg.size, 0)))).toFixed(1), - size: +(node.other[metric].toFixed(1)), + size: Math.min(+(node.other[metric].toFixed(1)), node.total[metric] - algorithmTotalSize), }); const free = node.total[metric] * resourcePressure - node.requests[metric]; algorithmsData.push({ @@ -120,7 +121,7 @@ class NodesStatistics { algorithmsData.push({ name: 'reserved', amount: otherAmount, - size: free < 0 ? +node.total[metric].toFixed(1) - +(node.other[metric].toFixed(1)) : +(node.total[metric] * (1 - resourcePressure)).toFixed(1) + size: Math.max(+(node.total[metric] * (1 - resourcePressure) + (free < 0 ? free : 0)).toFixed(1), 0) }); algorithmsData.push({ name: 'total', From 94ab17949abe7eceb8d4f831746bbf2696403cc3 Mon Sep 17 00:00:00 2001 From: Adir David Date: Wed, 27 Aug 2025 14:49:15 +0300 Subject: [PATCH 12/18] Now takes maximum between request and limit in case useResourceLimits flag is true, in case many pods have no limit and causes limit to be lower than request (which is impossible) --- core/task-executor/lib/reconcile/normalize.js | 44 ++++++++++--------- 1 file changed, 23 insertions(+), 21 deletions(-) diff --git a/core/task-executor/lib/reconcile/normalize.js b/core/task-executor/lib/reconcile/normalize.js index eed9c4913..b7d35dfdc 100644 --- a/core/task-executor/lib/reconcile/normalize.js +++ b/core/task-executor/lib/reconcile/normalize.js @@ -233,13 +233,13 @@ const _extractGpuValue = (gpu) => { * @returns {Object} { limitsGpu, requestGpu } */ const _extractGpuResources = (pod) => { - let limitsGpu = sumBy(pod.spec.containers, c => _extractGpuValue(objectPath.get(c, 'resources.limits', 0))); + let podLimitsGpu = sumBy(pod.spec.containers, c => _extractGpuValue(objectPath.get(c, 'resources.limits', 0))); - if (!limitsGpu) { - limitsGpu = _extractGpuValue(objectPath.get(pod, 'metadata.annotations', null)); + if (!podLimitsGpu) { + podLimitsGpu = _extractGpuValue(objectPath.get(pod, 'metadata.annotations', null)); } - const requestGpu = limitsGpu; - return { limitsGpu, requestGpu }; + const podRequestGpu = podLimitsGpu; + return { podRequestGpu, podLimitsGpu }; }; /** @@ -248,20 +248,20 @@ const _extractGpuResources = (pod) => { * @param {Object} pod - Pod object with containers and resources. * @returns {Object} { requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu } */ -const _extractPodResources = (pod) => { +const _extractPodResources = (pod) => { // CPU - const requestCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.requests.cpu', '0m'))); - const limitsCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.limits.cpu', '0m'))); + const podRequestCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.requests.cpu', '0m'))); + const podLimitsCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.limits.cpu', '0m'))); // Memory - const requestMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.requests.memory', 0), true)); - const limitsMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.limits.memory', 0), true)); + const podRequestMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.requests.memory', 0), true)); + const podLimitsMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.limits.memory', 0), true)); // GPU - const { limitsGpu, requestGpu } = _extractGpuResources(pod); + const { podRequestGpu, podLimitsGpu } = _extractGpuResources(pod); return { - requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu + podRequestCpu, podRequestMem, podRequestGpu, podLimitsCpu, podLimitsMem, podLimitsGpu }; }; @@ -323,20 +323,22 @@ const normalizeResources = ({ pods, nodes } = {}) => { if (!nodeName || !accumulator[nodeName]) { return accumulator; } - const { requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu } = _extractPodResources(pod); + const { podRequestCpu, podRequestMem, podRequestGpu, podLimitsCpu, podLimitsMem, podLimitsGpu } = _extractPodResources(pod); const { useResourceLimits } = globalSettings; + const requestCpu = (useResourceLimits && podLimitsCpu) ? Math.max(podRequestCpu, podLimitsCpu) : podRequestCpu; + const requestMem = (useResourceLimits && podLimitsMem) ? Math.max(podRequestMem, podLimitsMem) : podRequestMem; - accumulator[nodeName].requests.cpu += (useResourceLimits && limitsCpu) ? Math.max(requestCpu, limitsCpu) : requestCpu; - accumulator[nodeName].requests.memory += (useResourceLimits && limitsMem) ? Math.max(requestMem, limitsMem) : requestMem; - accumulator[nodeName].requests.gpu += requestGpu; + accumulator[nodeName].requests.cpu += requestCpu; + accumulator[nodeName].requests.memory += requestMem; + accumulator[nodeName].requests.gpu += podRequestGpu; - accumulator[nodeName].limits.cpu += limitsCpu; - accumulator[nodeName].limits.memory += limitsMem; - accumulator[nodeName].limits.gpu += limitsGpu; + accumulator[nodeName].limits.cpu += podLimitsCpu; + accumulator[nodeName].limits.memory += podLimitsMem; + accumulator[nodeName].limits.gpu += podLimitsGpu; if (objectPath.get(pod, 'metadata.labels.type') === 'worker') { accumulator[nodeName].workersTotal.cpu += requestCpu; - accumulator[nodeName].workersTotal.gpu += requestGpu; + accumulator[nodeName].workersTotal.gpu += podRequestGpu; accumulator[nodeName].workersTotal.memory += requestMem; accumulator[nodeName].workers.push({ algorithmName: objectPath.get(pod, 'metadata.labels.algorithm-name'), @@ -346,7 +348,7 @@ const normalizeResources = ({ pods, nodes } = {}) => { } else { accumulator[nodeName].other.cpu += requestCpu; - accumulator[nodeName].other.gpu += requestGpu; + accumulator[nodeName].other.gpu += podRequestGpu; accumulator[nodeName].other.memory += requestMem; } From b37a317bcb22816e19fea63ccf26ff00900f6590 Mon Sep 17 00:00:00 2001 From: Adir David Date: Wed, 27 Aug 2025 14:50:31 +0300 Subject: [PATCH 13/18] Passing default resources, so when calculating a worker without any special definition of resource, it will take its default resource instead to be more concise in calculation of requirements --- .../lib/reconcile/managers/jobs.js | 5 ++-- .../task-executor/lib/reconcile/reconciler.js | 6 ++--- core/task-executor/lib/reconcile/resources.js | 24 ++++++++++++------- core/task-executor/tests/managers.js | 4 ++-- 4 files changed, 24 insertions(+), 15 deletions(-) diff --git a/core/task-executor/lib/reconcile/managers/jobs.js b/core/task-executor/lib/reconcile/managers/jobs.js index 1bbea0d5b..40cf66a83 100644 --- a/core/task-executor/lib/reconcile/managers/jobs.js +++ b/core/task-executor/lib/reconcile/managers/jobs.js @@ -52,16 +52,17 @@ class JobsHandler { * @param {Object} clusterOptions - Cluster-wide configuration. * @param {Object} workerResources - Default worker resource requests. * @param {Object} options - Confguration containing additional job creation options. + * @param {Object} containerDefaults - Default container resources from Kubernetes. * @param {Object} reconcileResult - Scheduling reconcile stats by algorithm. */ - async schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, requests, registry, clusterOptions, workerResources, options, reconcileResult) { + async schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, requests, registry, clusterOptions, workerResources, options, containerDefaults, reconcileResult) { // 1. Assign requests to workers or prepare job creation details const { createDetails, toResume, scheduledRequests } = this._processAllRequests(allAllocatedJobs, algorithmTemplates, versions, requests, registry, clusterOptions, workerResources, reconcileResult); // 2. Match jobs to resources, and skip those that doesn't have the required resources. const extraResources = await this._getExtraResources(); - const { jobsToRequest, skipped } = matchJobsToResources(createDetails, normResources, scheduledRequests, extraResources); + const { jobsToRequest, skipped } = matchJobsToResources(createDetails, normResources, scheduledRequests, extraResources, containerDefaults); // 3. Find workers to stop if resources insufficient const stopDetails = this._findWorkersToStop(skipped, allAllocatedJobs, algorithmTemplates); diff --git a/core/task-executor/lib/reconcile/reconciler.js b/core/task-executor/lib/reconcile/reconciler.js index 4f1ca76a4..3353e140f 100644 --- a/core/task-executor/lib/reconcile/reconciler.js +++ b/core/task-executor/lib/reconcile/reconciler.js @@ -141,8 +141,8 @@ const _resolveWorkerResourceDefaults = async (options, containerDefaults) => { defaults.mem = options.resources.worker.mem; } - if (!defaults.cpu && containerDefaults.cpu) defaults.cpu = containerDefaults.cpu.defaultRequest; - if (!defaults.mem && containerDefaults.memory) defaults.mem = containerDefaults.memory?.defaultRequest; + if (!defaults.cpu && containerDefaults?.cpu) defaults.cpu = containerDefaults.cpu.defaultRequest; + if (!defaults.mem && containerDefaults?.memory) defaults.mem = containerDefaults.memory?.defaultRequest; return defaults; }; @@ -247,7 +247,7 @@ const reconcile = async ({ algorithmTemplates, algorithmRequests, workers, jobs, // Schedule the requests const jobsInfo = await jobsHandler.schedule(allAllocatedJobs, algorithmTemplates, normResources, versions, - requests, registry, clusterOptions, workerResources, options, reconcileResult); + requests, registry, clusterOptions, workerResources, options, containerDefaults, reconcileResult); // Handle workers life-cycle & wait for the promises to resolve const { toResume, toStop } = jobsInfo; diff --git a/core/task-executor/lib/reconcile/resources.js b/core/task-executor/lib/reconcile/resources.js index 6b3ea7a86..c5da865c6 100644 --- a/core/task-executor/lib/reconcile/resources.js +++ b/core/task-executor/lib/reconcile/resources.js @@ -183,14 +183,19 @@ const validateKaiQueue = ({ kaiObject, algorithmName }, existingQueuesNames) => * @param {Object} [params.sideCars] - The optional sidecar container. * @param {Object} [params.sideCars.container] - The container inside the sidecar. * @param {Object} [params.sideCars.container.resources] - The resource requests of the sidecar container. + * @param {Object} containerDefaults - Default container resources from Kubernetes. + * @param {Object} [containerDefaults.cpu] - Default CPU resource from Kubernetes. + * @param {string|number} [containerDefaults.cpu.defaultRequest] - Default CPU request + * @param {Object} [containerDefaults.memory] - Default memory resource from Kubernetes. + * @param {string|number} [containerDefaults.memory.defaultRequest] - Default memory request * @returns {Object} An object containing the total requested CPU and memory. * @returns {number} return.requestedCpu - The total requested CPU in cores. * @returns {number} return.requestedMemory - The total requested memory in MiB. */ -const getAllRequested = ({ resourceRequests, workerResourceRequests, workerCustomResources, sideCars }) => { +const getAllRequested = ({ resourceRequests, workerResourceRequests, workerCustomResources, sideCars }, containerDefaults) => { const sideCarResources = sideCars?.map(sideCar => sideCar?.container?.resources) || []; - const workerRequestedCPU = settings.applyResources ? workerResourceRequests.requests.cpu : '0'; - const workerRequestedMemory = settings.applyResources ? workerResourceRequests.requests.memory : '0Mi'; + const workerRequestedCPU = settings.applyResources ? workerResourceRequests.requests.cpu : (containerDefaults?.cpu?.defaultRequest || '0'); + const workerRequestedMemory = settings.applyResources ? workerResourceRequests.requests.memory : (containerDefaults?.memory?.defaultRequest || '0Mi'); const requestedCpu = parse.getCpuInCore(resourceRequests?.requests?.cpu || '0') + parse.getCpuInCore(workerCustomResources?.requests?.cpu || workerRequestedCPU) @@ -213,22 +218,24 @@ const getAllRequested = ({ resourceRequests, workerResourceRequests, workerCusto * @param {Object} availableResources - Current cluster resource state. * @param {number} totalAdded - Number of jobs added so far this tick. * @param {Object} [extraResources] - Additional metadata such as volumes and queues. + * @param {Object} containerDefaults - Default container resources from Kubernetes. * @returns {Object} Scheduling decision with `shouldAdd`, optional `warning`, and updated resources. */ -const shouldAddJob = (jobDetails, availableResources, totalAdded, extraResources) => { +const shouldAddJob = (jobDetails, availableResources, totalAdded, extraResources, containerDefaults) => { const { allVolumesNames, existingQueuesNames } = extraResources || {}; if (totalAdded >= MAX_JOBS_PER_TICK) { return { shouldAdd: false, newResources: { ...availableResources } }; } - const { requestedCpu, requestedMemory } = getAllRequested(jobDetails); + const { requestedCpu, requestedMemory } = getAllRequested(jobDetails, containerDefaults); const requestedGpu = jobDetails.resourceRequests.requests[gpuVendors.NVIDIA] || 0; - const nodesBySelector = availableResources.nodeList.filter(n => nodeSelectorFilter(n.labels, jobDetails.nodeSelector)); + const nodesBySelector = availableResources.nodeList.filter(node => nodeSelectorFilter(node.labels, jobDetails.nodeSelector)); const nodesForSchedule = nodesBySelector.map(r => findNodeForSchedule(r, requestedCpu, requestedGpu, requestedMemory)); const availableNode = nodesForSchedule.find(n => n.available); if (!availableNode) { // Number of total nodes that don't fit the attribute under nodeSelector const unMatchedNodesBySelector = availableResources.nodeList.length - nodesBySelector.length; + const warning = createWarning({ unMatchedNodesBySelector, jobDetails, @@ -456,9 +463,10 @@ const pauseAccordingToResources = (stopDetails, availableResources, skippedReque * @param {Object} availableResources - Current cluster resource state. * @param {Object[]} [scheduledRequests=[]] - Already scheduled requests. * @param {Object} [extraResources] - Additional metadata for scheduling checks. + * @param {Object} containerDefaults - Default container resources from Kubernetes. * @returns {Object} { requested: jobs scheduled, skipped: jobs not scheduled } */ -const matchJobsToResources = (createDetails, availableResources, scheduledRequests = [], extraResources) => { +const matchJobsToResources = (createDetails, availableResources, scheduledRequests = [], extraResources, containerDefaults) => { const jobsToRequest = []; const skipped = []; const localDetails = clone(createDetails); @@ -467,7 +475,7 @@ const matchJobsToResources = (createDetails, availableResources, scheduledReques // loop over all the job types one by one and assign until it can't fit in any node const cb = (j) => { if (j.numberOfNewJobs > 0) { - const { shouldAdd, warning, newResources, node } = shouldAddJob(j.jobDetails, availableResources, totalAdded, extraResources); + const { shouldAdd, warning, newResources, node } = shouldAddJob(j.jobDetails, availableResources, totalAdded, extraResources, containerDefaults); if (shouldAdd) { const toCreate = { ...j.jobDetails, createdTime: Date.now(), node }; jobsToRequest.push(toCreate); diff --git a/core/task-executor/tests/managers.js b/core/task-executor/tests/managers.js index e9a44d8f5..84cdec7c8 100644 --- a/core/task-executor/tests/managers.js +++ b/core/task-executor/tests/managers.js @@ -1033,7 +1033,7 @@ describe('Managers tests', () => { versions, requests, registry, clusterOptions, workerResources, options, - {} // reconcileResult + {}, {} // containerDefaults & reconcileResult ); expect(result).to.have.all.keys(['created', 'skipped', 'toResume', 'toStop']); @@ -1044,7 +1044,7 @@ describe('Managers tests', () => { }); it('should handle empty requests gracefully', async () => { - const result = await jobsHandler.schedule(allAllocatedJobs, {}, {}, {}, [], {}, {}, {}, {}, {}); + const result = await jobsHandler.schedule(allAllocatedJobs, {}, {}, {}, [], {}, {}, {}, {}, {}, {}); expect(result).to.deep.equal({ created: [], From 66f097886972063798e422673974ea98c0997e65 Mon Sep 17 00:00:00 2001 From: Adir David Date: Wed, 27 Aug 2025 16:51:07 +0300 Subject: [PATCH 14/18] Now sending actual request to other always, while keeping the requests for calculations dependant on the requests or limits & requests (for missing limit) --- core/task-executor/lib/reconcile/normalize.js | 43 +++++++++---------- 1 file changed, 21 insertions(+), 22 deletions(-) diff --git a/core/task-executor/lib/reconcile/normalize.js b/core/task-executor/lib/reconcile/normalize.js index b7d35dfdc..6763fc6f1 100644 --- a/core/task-executor/lib/reconcile/normalize.js +++ b/core/task-executor/lib/reconcile/normalize.js @@ -233,13 +233,13 @@ const _extractGpuValue = (gpu) => { * @returns {Object} { limitsGpu, requestGpu } */ const _extractGpuResources = (pod) => { - let podLimitsGpu = sumBy(pod.spec.containers, c => _extractGpuValue(objectPath.get(c, 'resources.limits', 0))); + let limitsGpu = sumBy(pod.spec.containers, c => _extractGpuValue(objectPath.get(c, 'resources.limits', 0))); - if (!podLimitsGpu) { - podLimitsGpu = _extractGpuValue(objectPath.get(pod, 'metadata.annotations', null)); + if (!limitsGpu) { + limitsGpu = _extractGpuValue(objectPath.get(pod, 'metadata.annotations', null)); } - const podRequestGpu = podLimitsGpu; - return { podRequestGpu, podLimitsGpu }; + const requestGpu = limitsGpu; + return { requestGpu, limitsGpu }; }; /** @@ -250,18 +250,18 @@ const _extractGpuResources = (pod) => { */ const _extractPodResources = (pod) => { // CPU - const podRequestCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.requests.cpu', '0m'))); - const podLimitsCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.limits.cpu', '0m'))); + const requestCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.requests.cpu', '0m'))); + const limitsCpu = sumBy(pod.spec.containers, c => parse.getCpuInCore(objectPath.get(c, 'resources.limits.cpu', '0m'))); // Memory - const podRequestMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.requests.memory', 0), true)); - const podLimitsMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.limits.memory', 0), true)); + const requestMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.requests.memory', 0), true)); + const limitsMem = sumBy(pod.spec.containers, c => parse.getMemoryInMi(objectPath.get(c, 'resources.limits.memory', 0), true)); // GPU - const { podRequestGpu, podLimitsGpu } = _extractGpuResources(pod); + const { requestGpu, limitsGpu } = _extractGpuResources(pod); return { - podRequestCpu, podRequestMem, podRequestGpu, podLimitsCpu, podLimitsMem, podLimitsGpu + requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu }; }; @@ -323,23 +323,22 @@ const normalizeResources = ({ pods, nodes } = {}) => { if (!nodeName || !accumulator[nodeName]) { return accumulator; } - const { podRequestCpu, podRequestMem, podRequestGpu, podLimitsCpu, podLimitsMem, podLimitsGpu } = _extractPodResources(pod); + const { requestCpu, requestMem, requestGpu, limitsCpu, limitsMem, limitsGpu } = _extractPodResources(pod); const { useResourceLimits } = globalSettings; - const requestCpu = (useResourceLimits && podLimitsCpu) ? Math.max(podRequestCpu, podLimitsCpu) : podRequestCpu; - const requestMem = (useResourceLimits && podLimitsMem) ? Math.max(podRequestMem, podLimitsMem) : podRequestMem; - accumulator[nodeName].requests.cpu += requestCpu; - accumulator[nodeName].requests.memory += requestMem; - accumulator[nodeName].requests.gpu += podRequestGpu; + accumulator[nodeName].requests.cpu += (useResourceLimits && limitsCpu) ? Math.max(requestCpu, limitsCpu) : requestCpu; + accumulator[nodeName].requests.memory += (useResourceLimits && limitsMem) ? Math.max(requestMem, limitsMem) : requestMem; + accumulator[nodeName].requests.gpu += requestGpu; - accumulator[nodeName].limits.cpu += podLimitsCpu; - accumulator[nodeName].limits.memory += podLimitsMem; - accumulator[nodeName].limits.gpu += podLimitsGpu; + accumulator[nodeName].limits.cpu += limitsCpu; + accumulator[nodeName].limits.memory += requestMem; + accumulator[nodeName].limits.gpu += limitsGpu; + // Use actual requests value for worker and other pods accounting if (objectPath.get(pod, 'metadata.labels.type') === 'worker') { accumulator[nodeName].workersTotal.cpu += requestCpu; - accumulator[nodeName].workersTotal.gpu += podRequestGpu; accumulator[nodeName].workersTotal.memory += requestMem; + accumulator[nodeName].workersTotal.gpu += requestGpu; accumulator[nodeName].workers.push({ algorithmName: objectPath.get(pod, 'metadata.labels.algorithm-name'), podName: objectPath.get(pod, 'metadata.name'), @@ -348,8 +347,8 @@ const normalizeResources = ({ pods, nodes } = {}) => { } else { accumulator[nodeName].other.cpu += requestCpu; - accumulator[nodeName].other.gpu += podRequestGpu; accumulator[nodeName].other.memory += requestMem; + accumulator[nodeName].other.gpu += requestGpu; } return accumulator; From 41afc43fd72292ad6704e3f0aa9bce6c101675b9 Mon Sep 17 00:00:00 2001 From: Adir David Date: Wed, 27 Aug 2025 16:51:56 +0300 Subject: [PATCH 15/18] fixed free&other calculation after changes in task-executor --- core/api-server/api/graphql/queries/statistics-querier.js | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/api-server/api/graphql/queries/statistics-querier.js b/core/api-server/api/graphql/queries/statistics-querier.js index 22ff86eaa..8d83a7a3f 100644 --- a/core/api-server/api/graphql/queries/statistics-querier.js +++ b/core/api-server/api/graphql/queries/statistics-querier.js @@ -110,9 +110,9 @@ class NodesStatistics { algorithmsData.push({ name: 'other', amount: otherAmount, - size: Math.min(+(node.other[metric].toFixed(1)), node.total[metric] - algorithmTotalSize), + size: +(node.other[metric].toFixed(1)) }); - const free = node.total[metric] * resourcePressure - node.requests[metric]; + const free = node.total[metric] * resourcePressure - node.other[metric] - algorithmTotalSize; algorithmsData.push({ name: 'free', amount: -1, From b4e7cd6beb86acf11736c120b6617ea0feaf75f8 Mon Sep 17 00:00:00 2001 From: Adir David Date: Wed, 27 Aug 2025 17:02:01 +0300 Subject: [PATCH 16/18] . --- core/task-executor/lib/reconcile/normalize.js | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/core/task-executor/lib/reconcile/normalize.js b/core/task-executor/lib/reconcile/normalize.js index 6763fc6f1..bc5a445b8 100644 --- a/core/task-executor/lib/reconcile/normalize.js +++ b/core/task-executor/lib/reconcile/normalize.js @@ -306,16 +306,6 @@ const normalizeResources = ({ pods, nodes } = {}) => { return accumulator; }, {}); - const allNodes = { - requests: { cpu: 0, gpu: 0, memory: 0 }, - limits: { cpu: 0, gpu: 0, memory: 0 }, - total: { - cpu: sumBy(Object.values(nodeMap), 'total.cpu'), - gpu: sumBy(Object.values(nodeMap), 'total.gpu'), - memory: sumBy(Object.values(nodeMap), 'total.memory'), - } - }; - // Track pod usage per node const stateFilter = pod => pod.status.phase === 'Running' || pod.status.phase === 'Pending'; const resourcesPerNode = pods.body.items.filter(stateFilter).reduce((accumulator, pod) => { @@ -356,6 +346,15 @@ const normalizeResources = ({ pods, nodes } = {}) => { // Build node list and aggregate totals const nodeList = []; + const allNodes = { + requests: { cpu: 0, gpu: 0, memory: 0 }, + limits: { cpu: 0, gpu: 0, memory: 0 }, + total: { + cpu: sumBy(Object.values(nodeMap), 'total.cpu'), + gpu: sumBy(Object.values(nodeMap), 'total.gpu'), + memory: sumBy(Object.values(nodeMap), 'total.memory'), + } + }; Object.entries(resourcesPerNode).forEach(([nodeName, nodeData]) => { _calcRatioFree(nodeData); From a0e1ccef21c3da292090b765d3114e8de28d6dd0 Mon Sep 17 00:00:00 2001 From: Adir David Date: Wed, 27 Aug 2025 17:20:15 +0300 Subject: [PATCH 17/18] added unit tests to test the method after it changes. --- core/task-executor/lib/reconcile/normalize.js | 2 +- core/task-executor/tests/normalizeTests.js | 218 ++++++++++++++++++ 2 files changed, 219 insertions(+), 1 deletion(-) diff --git a/core/task-executor/lib/reconcile/normalize.js b/core/task-executor/lib/reconcile/normalize.js index bc5a445b8..e153867fa 100644 --- a/core/task-executor/lib/reconcile/normalize.js +++ b/core/task-executor/lib/reconcile/normalize.js @@ -321,7 +321,7 @@ const normalizeResources = ({ pods, nodes } = {}) => { accumulator[nodeName].requests.gpu += requestGpu; accumulator[nodeName].limits.cpu += limitsCpu; - accumulator[nodeName].limits.memory += requestMem; + accumulator[nodeName].limits.memory += limitsMem; accumulator[nodeName].limits.gpu += limitsGpu; // Use actual requests value for worker and other pods accounting diff --git a/core/task-executor/tests/normalizeTests.js b/core/task-executor/tests/normalizeTests.js index 2e00c4f77..74026b6a3 100644 --- a/core/task-executor/tests/normalizeTests.js +++ b/core/task-executor/tests/normalizeTests.js @@ -448,6 +448,224 @@ describe('normalize', () => { expect(res.nodeList[1].free.cpu).to.eq(7.55); expect(res.nodeList[2].free.cpu).to.eq(7.8); }); + + it('should filter out nodes with NoSchedule taint', () => { + const nodesWithTaint = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, spec: { taints: [{ effect: 'NoSchedule' }] }, status: { allocatable: { cpu: '4', memory: '8Gi' } } }, + { metadata: { name: 'node2', labels: {} }, status: { allocatable: { cpu: '2', memory: '4Gi' } } } + ] + } + }; + const podsEmpty = { body: { items: [] } }; + + const res = normalizeResources({ pods: podsEmpty, nodes: nodesWithTaint }); + expect(res.nodeList.length).to.eq(1); + expect(res.nodeList[0].name).to.eq('node2'); + }); + + it('should ignore pods not in Running or Pending state', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } } + ] + } + }; + const pods = { + body: { + items: [ + { status: { phase: 'Succeeded' }, spec: { nodeName: 'node1', containers: [] } }, + { status: { phase: 'Failed' }, spec: { nodeName: 'node1', containers: [] } } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + expect(res.nodeList[0].requests.cpu).to.eq(0); + expect(res.nodeList[0].requests.memory).to.eq(0); + }); + + it('should account worker pods separately from other pods', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } } + ] + } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { nodeName: 'node1', containers: [{ resources: { requests: { cpu: '100m', memory: '128Mi' } } }] }, + metadata: { labels: { type: 'worker', 'algorithm-name': 'algo1' }, name: 'workerPod1' } + }, + { + status: { phase: 'Running' }, + spec: { nodeName: 'node1', containers: [{ resources: { requests: { cpu: '200m', memory: '256Mi' } } }] }, + metadata: { labels: { type: 'other' }, name: 'otherPod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + expect(node.workersTotal.cpu).to.eq(0.1); + expect(node.other.cpu).to.eq(0.2); + expect(node.workers.length).to.eq(1); + expect(node.workers[0].algorithmName).to.eq('algo1'); + }); + + it('should calculate GPU resources correctly', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi', 'nvidia.com/gpu': '2' } } } + ] + } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { nodeName: 'node1', containers: [{ resources: { limits: { 'nvidia.com/gpu': '1' } } }] }, + metadata: { labels: {}, name: 'gpuPod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + expect(node.total.gpu).to.eq(2); + expect(node.requests.gpu).to.eq(1); + expect(node.free.gpu).to.eq(1); + expect(node.ratio.gpu).to.eq(0.5); + }); + + it('should accumulate limits separately from requests', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } } + ] + } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { + nodeName: 'node1', + containers: [{ + resources: { + requests: { cpu: '100m', memory: '128Mi' }, + limits: { cpu: '200m', memory: '256Mi' } + } + }] + }, + metadata: { labels: {}, name: 'pod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + expect(node.requests.cpu).to.eq(0.1); + expect(node.limits.cpu).to.eq(0.2); + expect(node.requests.memory).to.eq(128); + expect(node.limits.memory).to.eq(256); + }); + + it('should include nodes with no pods in nodeList with zero requests', () => { + const nodes = { + body: { + items: [ + { metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } } + ] + } + }; + const pods = { body: { items: [] } }; + + const res = normalizeResources({ pods, nodes }); + expect(res.nodeList[0].requests.cpu).to.eq(0); + expect(res.nodeList[0].free.cpu).to.eq(4); + }); + + it('should still use actual requests for worker pods when useResourceLimits=true', () => { + globalSettings.useResourceLimits = true; + const nodes = { + body: { items: [{ metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } }] } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { + nodeName: 'node1', + containers: [{ + resources: { + requests: { cpu: '100m', memory: '128Mi' }, + limits: { cpu: '1000m', memory: '512Mi' } + } + }] + }, + metadata: { labels: { type: 'worker', 'algorithm-name': 'algoX' }, name: 'workerPod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + + // requests bucket uses limit (1 core), not request + expect(node.requests.cpu).to.eq(1); + // workersTotal should use actual request (0.1 cores) + expect(node.workersTotal.cpu).to.eq(0.1); + }); + + it('should still use actual requests for "other" pods when useResourceLimits=true', () => { + globalSettings.useResourceLimits = true; + const nodes = { + body: { items: [{ metadata: { name: 'node1', labels: {} }, status: { allocatable: { cpu: '4', memory: '8Gi' } } }] } + }; + const pods = { + body: { + items: [ + { + status: { phase: 'Running' }, + spec: { + nodeName: 'node1', + containers: [{ + resources: { + requests: { cpu: '200m', memory: '256Mi' }, + limits: { cpu: '2000m', memory: '1Gi' } + } + }] + }, + metadata: { labels: { type: 'other' }, name: 'otherPod1' } + } + ] + } + }; + + const res = normalizeResources({ pods, nodes }); + const node = res.nodeList[0]; + + // requests bucket uses limit (2 cores) + expect(node.requests.cpu).to.eq(2); + // "other" bucket should still use actual request (0.2 cores) + expect(node.other.cpu).to.eq(0.2); + }); }); describe('merge workers', () => { From 9dcc9d0a05961f6cd8a83c795b6c39a0f1f652f2 Mon Sep 17 00:00:00 2001 From: Adir David Date: Sun, 31 Aug 2025 15:28:47 +0300 Subject: [PATCH 18/18] update k8s client version --- core/task-executor/package-lock.json | 8 ++++---- core/task-executor/package.json | 2 +- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/task-executor/package-lock.json b/core/task-executor/package-lock.json index 077e54dee..7dd004db5 100644 --- a/core/task-executor/package-lock.json +++ b/core/task-executor/package-lock.json @@ -14,7 +14,7 @@ "@hkube/db": "^2.0.16", "@hkube/etcd": "^5.1.2", "@hkube/healthchecks": "^1.0.1", - "@hkube/kubernetes-client": "^2.0.8", + "@hkube/kubernetes-client": "^2.0.9", "@hkube/logger": "^2.0.2", "@hkube/metrics": "^1.0.42", "@hkube/uid": "^1.0.4", @@ -464,9 +464,9 @@ } }, "node_modules/@hkube/kubernetes-client": { - "version": "2.0.8", - "resolved": "https://registry.npmjs.org/@hkube/kubernetes-client/-/kubernetes-client-2.0.8.tgz", - "integrity": "sha512-AS7fkHF6ROeJEgZtb2VRJSxYgSxoGyGoG1XuwO4CHjm/zf31yalypSWmqvMVvqVifNrlFwBl89DpZSEAzDWXPA==", + "version": "2.0.9", + "resolved": "https://registry.npmjs.org/@hkube/kubernetes-client/-/kubernetes-client-2.0.9.tgz", + "integrity": "sha512-g2SGpQn92Tmwba+uc8Ic7E5Hi15rlsJPMDLS/363JydKoDONuSZ1wGOuJIIHLXzOlHRNYbIGuDs5I7hAD/iJzA==", "dependencies": { "compare-versions": "^3.6.0", "js-yaml": "^4.1.0", diff --git a/core/task-executor/package.json b/core/task-executor/package.json index 3a8ff0d2c..bc3d3e410 100644 --- a/core/task-executor/package.json +++ b/core/task-executor/package.json @@ -28,7 +28,7 @@ "@hkube/db": "^2.0.16", "@hkube/etcd": "^5.1.2", "@hkube/healthchecks": "^1.0.1", - "@hkube/kubernetes-client": "^2.0.8", + "@hkube/kubernetes-client": "^2.0.9", "@hkube/logger": "^2.0.2", "@hkube/metrics": "^1.0.42", "@hkube/uid": "^1.0.4",