diff --git a/CHANGELOG.md b/CHANGELOG.md
index aa3f05eb..e7da4ee0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -7,12 +7,67 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
## Unreleased on the [23.11.x](https://github.com/PySlurm/pyslurm/tree/23.11.x) branch
+### Added
+
- New Classes to interact with Database Associations (WIP)
- `pyslurm.db.Association`
- `pyslurm.db.Associations`
- New Classes to interact with Database QoS (WIP)
- `pyslurm.db.QualityOfService`
- `pyslurm.db.QualitiesOfService`
+- Added `stats` attribute to both `pyslurm.Job`, `pyslurm.Jobs` and
+ `pyslurm.db.Jobs`
+- Added `pids` attribute to `pyslurm.Job` which contains Process-IDs of the Job
+ organized by node-name
+- Added `load_stats` method to `pyslurm.Job` and `pyslurm.Jobs` classes.
+ Together with the `stats` and `pids` attributes mentioned above, it is now
+ possible to fetch live statistics (like sstat)
+- Switch to link with `libslurmfull.so` instead of `libslurm.so`
+ This change really has no impact from a user perspective. Everything will
+ keep working the same, except that Slurms more internal library
+ `libslurmfull.so` is linked with (which is located alongside the plugins
+ inside the `slurm` directory, which itself is next to `libslurm.so`)
+ Why the change? Because it will likely make development easier. It allows
+ access to more functions that might be needed in some places, without
+ completely having to implement them on our own. Implementing the
+ live-statistics feature, so basically `sstat`, is for example not possible
+ with `libslurm.so`
+ You can keep providing the directory where `libslurm.so` resided as
+ `$SLURM_LIB_DIR` to pyslurm, and it will automatically find `libslurmfull.so`
+ from there.
+
+### Fixed
+
+- Fixed `total_cpu_time`, `system_cpu_time` and `user_cpu_time` not getting
+ calculated correctly for Job statistics
+- Actually make sure that `avg_cpu_time`, `min_cpu_time`, `total_cpu_time`,
+ `system_cpu_time` and `user_cpu_time` are integers, not float.
+
+### Changed
+
+- Breaking: rename `cpu_time` to `elapsed_cpu_time` in `pyslurm.Job` and
+ `pyslurm.Jobs` classes
+- Breaking: removed the following attributes from `pyslurm.db.Jobs`:
+ * `consumed_energy`
+ * `disk_read`
+ * `disk_write`
+ * `page_faults`
+ * `resident_memory`
+ * `virtual_memory`
+ * `elapsed_cpu_time`
+ * `total_cpu_time`
+ * `user_cpu_time`
+ * `system_cpu_time`
+- The removed attributes above are now all available within the `stats`
+ attribute, which is of type `pyslurm.db.JobStatistics`
+- Renamed the original class of `pyslurm.db.JobStatistics` to
+ `pyslurm.db.JobStepStatistics`.
+ All this class contains is really mostly applicable only to Steps, but
+ doesn't fully apply at the Job Level.
+ Therefore, the new `pyslurm.db.JobStatistics` class only contains all
+ statistics that make sense at the Job-level.
+- return `1` as a value for the `cpus` attribute in `pyslurm.db.Job` when there
+ is no value set from Slurm's side.
## [23.11.0](https://github.com/PySlurm/pyslurm/releases/tag/v23.11.0) - 2024-01-27
diff --git a/pyslurm/__init__.py b/pyslurm/__init__.py
index 4d3a5101..8d61e436 100644
--- a/pyslurm/__init__.py
+++ b/pyslurm/__init__.py
@@ -4,10 +4,10 @@
"""
from __future__ import absolute_import
-import ctypes
+import os
import sys
-sys.setdlopenflags(sys.getdlopenflags() | ctypes.RTLD_GLOBAL)
+sys.setdlopenflags(sys.getdlopenflags() | os.RTLD_GLOBAL | os.RTLD_DEEPBIND )
# Initialize slurm api
from pyslurm.api import slurm_init, slurm_fini
diff --git a/pyslurm/core/job/job.pxd b/pyslurm/core/job/job.pxd
index 15d44be9..aae445a7 100644
--- a/pyslurm/core/job/job.pxd
+++ b/pyslurm/core/job/job.pxd
@@ -25,6 +25,7 @@
from pyslurm.utils cimport cstr, ctime
from pyslurm.utils.uint cimport *
from pyslurm.utils.ctime cimport time_t
+from pyslurm.db.stats cimport JobStatistics
from libc.string cimport memcpy, memset
from libc.stdint cimport uint8_t, uint16_t, uint32_t, uint64_t, int64_t
from libc.stdlib cimport free
@@ -77,16 +78,16 @@ cdef class Jobs(MultiClusterMap):
Attributes:
memory (int):
- Total amount of memory for all Jobs in this collection, in
- Mebibytes
+ Total amount of memory requested for all Jobs in this collection,
+ in Mebibytes
cpus (int):
- Total amount of cpus for all Jobs in this collection.
+ Total amount of cpus requested for all Jobs in this collection.
ntasks (int):
- Total amount of tasks for all Jobs in this collection.
- cpu_time (int):
+ Total amount of tasks requested for all Jobs in this collection.
+ elapsed_cpu_time (int):
Total amount of CPU-Time used by all the Jobs in the collection.
This is the result of multiplying the run_time with the amount of
- cpus for each job.
+ cpus requested for each job.
frozen (bool):
If this is set to True and the `reload()` method is called, then
*ONLY* Jobs that already exist in this collection will be
@@ -95,6 +96,10 @@ cdef class Jobs(MultiClusterMap):
Slurm controllers memory will not be removed either.
The default is False, so old jobs will be removed, and new Jobs
will be added - basically the same behaviour as doing Jobs.load().
+ stats (JobStatistics):
+ Real-time statistics of all Jobs in this collection.
+ Before you can access the stats data for this, you have to call
+ the `load_stats` method on this collection.
"""
cdef:
job_info_msg_t *info
@@ -102,6 +107,7 @@ cdef class Jobs(MultiClusterMap):
cdef public:
frozen
+ JobStatistics stats
cdef class Job:
@@ -119,6 +125,14 @@ cdef class Job:
Before you can access the Steps data for a Job, you have to call
the `reload()` method of a Job instance or the `load_steps()`
method of a Jobs collection.
+ stats (JobStatistics):
+ Real-time statistics of a Job.
+ Before you can access the stats data for a Job, you have to call
+ the `load_stats` method of a Job instance or the Jobs collection.
+ pids (dict[str, list]):
+ Current Process-IDs of the Job, organized by node name.
+ Before you can access the pids data for a Job, you have to call
+ the `load_stats` method of a Job instance or the Jobs collection.
name (str):
Name of the Job
id (int):
@@ -362,17 +376,20 @@ cdef class Job:
Whether this Job is a cronjob.
cronjob_time (str):
The time specification for the Cronjob.
- cpu_time (int):
+ elapsed_cpu_time (int):
Amount of CPU-Time used by the Job so far.
This is the result of multiplying the run_time with the amount of
- cpus.
+ cpus requested.
"""
cdef:
slurm_job_info_t *ptr
dict passwd
dict groups
- cdef public JobSteps steps
+ cdef public:
+ JobSteps steps
+ JobStatistics stats
+ dict pids
cdef _calc_run_time(self)
diff --git a/pyslurm/core/job/job.pyx b/pyslurm/core/job/job.pyx
index 68821b75..fc0663d8 100644
--- a/pyslurm/core/job/job.pyx
+++ b/pyslurm/core/job/job.pyx
@@ -63,6 +63,7 @@ cdef class Jobs(MultiClusterMap):
def __init__(self, jobs=None, frozen=False):
self.frozen = frozen
+ self.stats = JobStatistics()
super().__init__(data=jobs,
typ="Jobs",
val_type=Job,
@@ -161,8 +162,7 @@ cdef class Jobs(MultiClusterMap):
Pending Jobs will be ignored, since they don't have any Steps yet.
Raises:
- RPCError: When retrieving the Job information for all the Steps
- failed.
+ RPCError: When retrieving the information for all the Steps failed.
"""
cdef dict steps = JobSteps.load_all()
for job in self.values():
@@ -170,6 +170,40 @@ cdef class Jobs(MultiClusterMap):
if jid in steps:
job.steps = steps[jid]
+ def load_stats(self):
+ """Load realtime stats for this collection of Jobs.
+
+ This function additionally fills in the `stats` attribute for all Jobs
+ in the collection, and also populates its own `stats` attribute.
+ Implicitly calls `load_steps()`.
+
+ !!! note
+
+ Pending Jobs will be ignored, since they don't have any Stats yet.
+
+ Returns:
+ (JobStatistics): The statistics of this job collection.
+
+ Raises:
+ RPCError: When retrieving the stats for all the Jobs failed.
+
+ Examples:
+ >>> import pyslurm
+ >>> jobs = pyslurm.Jobs.load()
+ >>> stats = jobs.load_stats()
+ >>>
+ >>> # Print the CPU Time Used
+ >>> print(stats.total_cpu_time)
+ """
+ self.load_steps()
+ stats = JobStatistics()
+ for job in self.values():
+ job.load_stats()
+ stats.add(job.stats)
+
+ self.stats = stats
+ return self.stats
+
@property
def memory(self):
return xcollections.sum_property(self, Job.memory)
@@ -183,7 +217,7 @@ cdef class Jobs(MultiClusterMap):
return xcollections.sum_property(self, Job.ntasks)
@property
- def cpu_time(self):
+ def elapsed_cpu_time(self):
return xcollections.sum_property(self, Job.cpu_time)
@@ -199,6 +233,8 @@ cdef class Job:
self.groups = {}
cstr.fmalloc(&self.ptr.cluster, LOCAL_CLUSTER)
self.steps = JobSteps()
+ self.stats = JobStatistics()
+ self.pids = {}
def _alloc_impl(self):
if not self.ptr:
@@ -225,7 +261,7 @@ cdef class Job:
!!! note
If the Job is not pending, the related Job steps will also be
- loaded.
+ loaded. Job statistics are however not loaded automatically.
Args:
job_id (int):
@@ -276,6 +312,8 @@ cdef class Job:
wrap.passwd = {}
wrap.groups = {}
wrap.steps = JobSteps.__new__(JobSteps)
+ wrap.stats = JobStatistics()
+ wrap.pids = {}
memcpy(wrap.ptr, in_ptr, sizeof(slurm_job_info_t))
return wrap
@@ -297,6 +335,8 @@ cdef class Job:
"""
cdef dict out = instance_to_dict(self)
out["steps"] = self.steps.to_dict()
+ out["stats"] = self.stats.to_dict()
+ out["pids"] = self.pids
return out
def send_signal(self, signal, steps="children", hurry=False):
@@ -516,6 +556,49 @@ cdef class Job:
"""
verify_rpc(slurm_notify_job(self.id, msg))
+ def load_stats(self):
+ """Load realtime statistics for a Job and its steps.
+
+ Calling this function returns the Job statistics, and additionally
+ populates the `stats` and `pids` attribute of the instance.
+
+ Returns:
+ (JobStatistics): The statistics of the job.
+
+ Raises:
+ RPCError: When receiving the Statistics was not successful.
+
+ Examples:
+ >>> import pyslurm
+ >>> job = pyslurm.Job.load(9999)
+ >>> stats = job.load_stats()
+ >>>
+ >>> # Print the CPU Time Used
+ >>> print(stats.total_cpu_time)
+ >>>
+ >>> # Print the Process-IDs for the whole Job, organized by hostname
+ >>> print(job.pids)
+ """
+ if not self.steps:
+ job = Job.load(self.id)
+ self.steps = job.steps
+
+ all_pids = {}
+ for step in self.steps.values():
+ step.load_stats()
+ self.stats._sum_steps(step.stats)
+
+ for node, pids in step.pids.items():
+ if node not in all_pids:
+ all_pids[node] = []
+
+ all_pids[node].extend(pids)
+
+ self.stats.elapsed_cpu_time = self.run_time * self.cpus
+
+ self.pids = all_pids
+ return self.stats
+
def get_batch_script(self):
"""Return the content of the script for a Batch-Job.
@@ -1186,7 +1269,7 @@ cdef class Job:
return cstr.to_unicode(self.ptr.cronspec)
@property
- def cpu_time(self):
+ def elapsed_cpu_time(self):
return self.cpus * self.run_time
@property
diff --git a/pyslurm/core/job/stats.pxd b/pyslurm/core/job/stats.pxd
new file mode 100644
index 00000000..bb397ebe
--- /dev/null
+++ b/pyslurm/core/job/stats.pxd
@@ -0,0 +1,152 @@
+#########################################################################
+# job/stats.pxd - interface to retrieve slurm job realtime stats
+#########################################################################
+# Copyright (C) 2024 Toni Harzendorf
+#
+#########################################################################
+# Note: Some struct definitions have been taken directly from various parts of
+# the Slurm source code, and just translated to Cython-Syntax. The structs are
+# appropriately annotated with the respective Copyright notices, and a link to
+# the source-code.
+
+# Slurm is licensed under the GNU General Public License. For the full text of
+# Slurm's License, please see here: pyslurm/slurm/SLURM_LICENSE
+
+#########################################################################
+# This file is part of PySlurm
+#
+# PySlurm is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# PySlurm is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with PySlurm; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# cython: c_string_type=unicode, c_string_encoding=default
+# cython: language_level=3
+
+from libc.stdint cimport uint8_t, uint16_t, uint32_t, uint64_t
+from libc.string cimport memcpy, memset
+from posix.unistd cimport pid_t
+from pyslurm cimport slurm
+from pyslurm.slurm cimport (
+ slurm_job_step_stat,
+ slurmdb_step_rec_t,
+ slurmdb_stats_t,
+ slurmdb_free_slurmdb_stats_members,
+ job_step_stat_t,
+ job_step_stat_response_msg_t,
+ slurm_job_step_stat_response_msg_free,
+ jobacctinfo_t,
+ list_t,
+ xfree,
+ try_xmalloc,
+)
+from pyslurm.utils cimport cstr, ctime
+from pyslurm.utils.uint cimport *
+from pyslurm.utils.ctime cimport time_t
+from pyslurm.db.util cimport SlurmList, SlurmListItem
+from pyslurm.db.stats cimport JobStepStatistics
+from pyslurm.core.job.step cimport JobStep
+
+cdef load_single(JobStep step)
+
+# The real definition for this is too long, including too many other types that
+# we don't have directly access to.
+cdef extern from *:
+ """
+ typedef struct stepd_step_rec_s stepd_step_rec_t; \
+ """
+ ctypedef struct stepd_step_rec_t
+
+
+# https://github.com/SchedMD/slurm/blob/slurm-24-05-3-1/src/interfaces/jobacct_gather.h#L75
+# Copyright (C) 2003 The Regents of the University of California.
+# Copyright (C) 2005 Hewlett-Packard Development Company, L.P.
+ctypedef struct jobacct_id_t:
+ uint32_t taskid
+ uint32_t nodeid
+ stepd_step_rec_t *step
+
+
+# https://github.com/SchedMD/slurm/blob/slurm-24-05-3-1/src/interfaces/jobacct_gather.h#L81
+# Copyright (C) 2003 The Regents of the University of California.
+# Copyright (C) 2005 Hewlett-Packard Development Company, L.P.
+ctypedef struct jobacctinfo:
+ pid_t pid
+ uint64_t sys_cpu_sec
+ uint32_t sys_cpu_usec
+ uint64_t user_cpu_sec
+ uint32_t user_cpu_usec
+ uint32_t act_cpufreq
+ slurm.acct_gather_energy_t energy
+ double last_total_cputime
+ double this_sampled_cputime
+ uint32_t current_weighted_freq
+ uint32_t current_weighted_power
+ uint32_t tres_count
+ uint32_t *tres_ids
+ list_t *tres_list
+ uint64_t *tres_usage_in_max
+ uint64_t *tres_usage_in_max_nodeid
+ uint64_t *tres_usage_in_max_taskid
+ uint64_t *tres_usage_in_min
+ uint64_t *tres_usage_in_min_nodeid
+ uint64_t *tres_usage_in_min_taskid
+ uint64_t *tres_usage_in_tot
+ uint64_t *tres_usage_out_max
+ uint64_t *tres_usage_out_max_nodeid
+ uint64_t *tres_usage_out_max_taskid
+ uint64_t *tres_usage_out_min
+ uint64_t *tres_usage_out_min_nodeid
+ uint64_t *tres_usage_out_min_taskid
+ uint64_t *tres_usage_out_tot
+
+ jobacct_id_t id
+ int dataset_id
+
+ double last_tres_usage_in_tot
+ double last_tres_usage_out_tot
+ time_t cur_time
+ time_t last_time
+
+
+# https://github.com/SchedMD/slurm/blob/slurm-24-05-3-1/src/slurmctld/locks.h#L97
+# Copyright (C) 2002 The Regents of the University of California.
+ctypedef enum lock_level_t:
+ NO_LOCK
+ READ_LOCK
+ WRITE_LOCK
+
+
+# https://github.com/SchedMD/slurm/blob/slurm-24-05-3-1/src/common/assoc_mgr.h#L71
+# Copyright (C) 2004-2007 The Regents of the University of California.
+# Copyright (C) 2008 Lawrence Livermore National Security.
+ctypedef struct assoc_mgr_lock_t:
+ lock_level_t assoc
+ lock_level_t file
+ lock_level_t qos
+ lock_level_t res
+ lock_level_t tres
+ lock_level_t user
+ lock_level_t wckey
+
+
+cdef extern jobacctinfo_t *jobacctinfo_create(jobacct_id_t *jobacct_id)
+cdef extern void jobacctinfo_destroy(void *object)
+cdef extern void jobacctinfo_aggregate(jobacctinfo_t *dest, jobacctinfo_t *src)
+cdef extern void jobacctinfo_2_stats(slurmdb_stats_t *stats, jobacctinfo_t *jobacct)
+
+cdef extern list_t* assoc_mgr_tres_list
+cdef extern void assoc_mgr_lock(assoc_mgr_lock_t *locks)
+cdef extern void assoc_mgr_unlock(assoc_mgr_lock_t *locks)
+cdef extern int assoc_mgr_post_tres_list(list_t *new_list)
+
+cdef extern char *slurmdb_ave_tres_usage(char *tres_string, int tasks);
diff --git a/pyslurm/core/job/stats.pyx b/pyslurm/core/job/stats.pyx
new file mode 100644
index 00000000..39bc0b36
--- /dev/null
+++ b/pyslurm/core/job/stats.pyx
@@ -0,0 +1,137 @@
+#########################################################################
+# job/stats.pyx - interface to retrieve slurm job realtime stats
+#########################################################################
+# Copyright (C) 2024 Toni Harzendorf
+# Copyright (C) 2008 Lawrence Livermore National Security.
+#
+#########################################################################
+# The main logic for this file's code was taken from:
+# https://github.com/SchedMD/slurm/blob/42a05b1bb4a7719944ee26adc6bc5d73e2d36823/src/sstat/sstat.c#L109
+#
+# The code has been modified a bit and translated to Cython syntax. Slurm's
+# sstat was originally written by Morris Jette
+#
+# Slurm is licensed under the GNU General Public License. For the full text of
+# Slurm's License, please see here: pyslurm/slurm/SLURM_LICENSE
+#
+#########################################################################
+# This file is part of PySlurm
+#
+# PySlurm is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+
+# PySlurm is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License along
+# with PySlurm; if not, write to the Free Software Foundation, Inc.,
+# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA.
+#
+# cython: c_string_type=unicode, c_string_encoding=default
+# cython: language_level=3
+
+from pyslurm.core.error import verify_rpc
+from pyslurm.utils.helpers import nodelist_to_range_str
+
+
+cdef load_single(JobStep step):
+ cdef:
+ # jobacctinfo_t is the opaque data type provided in slurm.h
+ # jobacctinfo is the actual (partial) re-definition of the jobacctinfo
+ # type
+ #
+ # This is because we need to have access to jobacct.tres_list,
+ # otherwise we cannot call jobacctinfo_aggregate. Also we want to have
+ # the values of user_cpu_sec and sys_cpu_sec.
+ jobacctinfo_t *total_jobacct = NULL
+ jobacctinfo *stat_jobacct = NULL
+
+ job_step_stat_t *step_stat = NULL
+ job_step_stat_response_msg_t *stat_resp = NULL
+ assoc_mgr_lock_t locks
+ slurmdb_step_rec_t db_step
+ SlurmList stats_list
+ SlurmListItem stat_list_ptr
+ char *usage_tmp = NULL
+ int rc = slurm.SLURM_SUCCESS
+ int ntasks = 0
+ list nodes = []
+
+ rc = slurm_job_step_stat(&step.ptr.step_id, NULL,
+ step.ptr.start_protocol_ver, &stat_resp)
+ if rc != slurm.SLURM_SUCCESS:
+ slurm_job_step_stat_response_msg_free(stat_resp)
+ if rc == slurm.ESLURM_INVALID_JOB_ID:
+ return None
+ else:
+ verify_rpc(rc)
+
+ memset(&db_step, 0, sizeof(slurmdb_step_rec_t))
+ memset(&db_step.stats, 0, sizeof(slurmdb_stats_t))
+
+ stats_list = SlurmList.wrap(stat_resp.stats_list, owned=False)
+ for stat_list_ptr in stats_list:
+ step_stat = stat_list_ptr.data
+ # Casting jobacctinfo_t to jobacctinfo... hoping this is sane to do
+ stat_jobacct = step_stat.jobacct
+
+ if not step_stat.step_pids or not step_stat.step_pids.node_name:
+ continue
+
+ node = cstr.to_unicode(step_stat.step_pids.node_name)
+ if step_stat.step_pids.pid_cnt > 0:
+ for i in range(step_stat.step_pids.pid_cnt):
+ if node not in step.pids:
+ step.pids[node] = []
+
+ step.pids[node].append(step_stat.step_pids.pid[i])
+
+ nodes.append(node)
+ ntasks += step_stat.num_tasks
+ if step_stat.jobacct:
+ if not assoc_mgr_tres_list and stat_jobacct.tres_list:
+ locks.tres = WRITE_LOCK
+ assoc_mgr_lock(&locks)
+ assoc_mgr_post_tres_list(stat_jobacct.tres_list)
+ assoc_mgr_unlock(&locks)
+ stat_jobacct.tres_list = NULL
+
+ if not total_jobacct:
+ total_jobacct = jobacctinfo_create(NULL)
+
+ jobacctinfo_aggregate(total_jobacct, step_stat.jobacct)
+
+ db_step.user_cpu_sec += stat_jobacct.user_cpu_sec
+ db_step.user_cpu_usec += stat_jobacct.user_cpu_usec
+ db_step.sys_cpu_sec += stat_jobacct.sys_cpu_sec
+ db_step.sys_cpu_usec += stat_jobacct.sys_cpu_usec
+
+ if total_jobacct:
+ jobacctinfo_2_stats(&db_step.stats, total_jobacct)
+ jobacctinfo_destroy(total_jobacct)
+
+ if ntasks:
+ db_step.stats.act_cpufreq /= ntasks
+
+ usage_tmp = db_step.stats.tres_usage_in_ave
+ db_step.stats.tres_usage_in_ave = slurmdb_ave_tres_usage(usage_tmp, ntasks)
+ xfree(usage_tmp)
+
+ usage_tmp = db_step.stats.tres_usage_out_ave
+ db_step.stats.tres_usage_out_ave = slurmdb_ave_tres_usage(usage_tmp, ntasks)
+ xfree(usage_tmp)
+
+ step.stats = JobStepStatistics.from_ptr(
+ &db_step,
+ nodes,
+ step.alloc_cpus if step.alloc_cpus else 0,
+ step.run_time if step.run_time else 0,
+ is_live=True,
+ )
+
+ slurm_job_step_stat_response_msg_free(stat_resp)
+ slurmdb_free_slurmdb_stats_members(&db_step.stats)
diff --git a/pyslurm/core/job/step.pxd b/pyslurm/core/job/step.pxd
index 92d7fcad..7bea049b 100644
--- a/pyslurm/core/job/step.pxd
+++ b/pyslurm/core/job/step.pxd
@@ -47,6 +47,8 @@ from pyslurm.utils cimport cstr, ctime
from pyslurm.utils.uint cimport *
from pyslurm.utils.ctime cimport time_t
from pyslurm.core.job.task_dist cimport TaskDistribution
+from pyslurm.db.stats cimport JobStepStatistics
+from pyslurm.core.job cimport stats
cdef class JobSteps(dict):
@@ -64,7 +66,7 @@ cdef class JobSteps(dict):
@staticmethod
cdef JobSteps _load_single(Job job)
cdef dict _load_data(self, uint32_t job_id, int flags)
-
+
cdef class JobStep:
"""A Slurm Jobstep
@@ -80,6 +82,14 @@ cdef class JobStep:
Time limit in Minutes for this step.
Attributes:
+ stats (JobStepStatistics):
+ Real-time statistics of a Step.
+ Before you can access the stats data for a Step, you have to call
+ the `load_stats` method of a Step instance or the Jobs collection.
+ pids (dict[str, list]):
+ Current Process-IDs of the Step, organized by node name. Before you
+ can access the pids data, you have to call the `load_stats` method
+ of a Srep instance or the Jobs collection.
id (Union[str, int]):
The id for this step.
job_id (int):
@@ -136,5 +146,9 @@ cdef class JobStep:
job_step_info_t *ptr
step_update_request_msg_t *umsg
+ cdef public:
+ JobStepStatistics stats
+ dict pids
+
@staticmethod
cdef JobStep from_ptr(job_step_info_t *in_ptr)
diff --git a/pyslurm/core/job/step.pyx b/pyslurm/core/job/step.pyx
index 812d4f1d..35047566 100644
--- a/pyslurm/core/job/step.pyx
+++ b/pyslurm/core/job/step.pyx
@@ -157,6 +157,8 @@ cdef class JobStep:
self._alloc_impl()
self.job_id = job_id.id if isinstance(job_id, Job) else job_id
self.id = step_id
+ self.stats = JobStepStatistics()
+ self.pids = {}
cstr.fmalloc(&self.ptr.cluster, LOCAL_CLUSTER)
# Initialize attributes, if any were provided
@@ -251,9 +253,38 @@ cdef class JobStep:
cdef JobStep from_ptr(job_step_info_t *in_ptr):
cdef JobStep wrap = JobStep.__new__(JobStep)
wrap._alloc_info()
+ wrap.stats = JobStepStatistics()
+ wrap.pids = {}
memcpy(wrap.ptr, in_ptr, sizeof(job_step_info_t))
return wrap
+ def load_stats(self):
+ """Load realtime stats for this Step.
+
+ Calling this function returns the live statistics of the step, and
+ additionally populates the `stats` and `pids` attribute of the
+ instance.
+
+ Returns:
+ (JobStepStatistics): The statistics of the Step.
+
+ Raises:
+ RPCError: When retrieving the stats for the Step failed.
+
+ Examples:
+ >>> import pyslurm
+ >>> step = pyslurm.JobStep.load(9999, 1)
+ >>> stats = step.load_stats()
+ >>>
+ >>> # Print the CPU Time Used
+ >>> print(stats.total_cpu_time)
+ >>>
+ >>> # Print the Process-IDs for the Step, organized by hostname
+ >>> print(step.pids)
+ """
+ stats.load_single(self)
+ return self.stats
+
def send_signal(self, signal):
"""Send a signal to a running Job step.
@@ -338,6 +369,8 @@ cdef class JobStep:
if dist:
out["distribution"] = dist.to_dict()
+ out["stats"] = self.stats.to_dict()
+ out["pids"] = self.pids
return out
@property
@@ -434,11 +467,11 @@ cdef class JobStep:
@property
def alloc_cpus(self):
- return u32_parse(self.ptr.num_cpus)
+ return u32_parse(self.ptr.num_cpus, on_noval=1)
@property
def ntasks(self):
- return u32_parse(self.ptr.num_tasks)
+ return u32_parse(self.ptr.num_tasks, on_noval=1)
@property
def distribution(self):
diff --git a/pyslurm/db/__init__.py b/pyslurm/db/__init__.py
index acd36a40..f563aff0 100644
--- a/pyslurm/db/__init__.py
+++ b/pyslurm/db/__init__.py
@@ -21,7 +21,7 @@
from .connection import Connection
from .step import JobStep, JobSteps
-from .stats import JobStatistics
+from .stats import JobStatistics, JobStepStatistics
from .job import (
Job,
Jobs,
diff --git a/pyslurm/db/job.pxd b/pyslurm/db/job.pxd
index e0848e0a..0622b57c 100644
--- a/pyslurm/db/job.pxd
+++ b/pyslurm/db/job.pxd
@@ -166,46 +166,17 @@ cdef class Jobs(MultiClusterMap):
Jobs to initialize this collection with.
Attributes:
- consumed_energy (int):
- Total amount of energy consumed, in joules.
- disk_read (int):
- Total amount of bytes read.
- disk_write (int):
- Total amount of bytes written.
- page_faults (int):
- Total amount of page faults.
- resident_memory (int):
- Total Resident Set Size (RSS) used in bytes.
- virtual_memory (int):
- Total Virtual Memory Size (VSZ) used in bytes.
- elapsed_cpu_time (int):
- Total amount of time used (Elapsed time * cpu count) in seconds.
- This is not the real CPU-Efficiency, but rather the total amount
- of cpu-time the CPUs were occupied for.
- total_cpu_time (int):
- Sum of `user_cpu_time` and `system_cpu_time`, in seconds
- user_cpu_time (int):
- Total amount of Time spent in user space, in seconds
- system_cpu_time (int):
- Total amount of Time spent in kernel space, in seconds
+ stats (pyslurm.db.JobStatistics):
+ Utilization statistics of this Job Collection
cpus (int):
- Total amount of cpus.
+ Total amount of cpus requested.
nodes (int):
- Total amount of nodes.
+ Total amount of nodes requested.
memory (int):
Total amount of requested memory in Mebibytes.
"""
cdef public:
- consumed_energy
- disk_read
- disk_write
- page_faults
- resident_memory
- virtual_memory
- elapsed_cpu_time
- total_cpu_time
- user_cpu_time
- system_cpu_time
+ stats
cpus
nodes
memory
@@ -256,7 +227,7 @@ cdef class Job:
association_id (int):
ID of the Association this job runs in.
block_id (str):
- Name of the block used (for BlueGene Systems)
+ Name of the block used (for BlueGene Systems)
cluster (str):
Cluster this Job belongs to
constraints (str):
diff --git a/pyslurm/db/job.pyx b/pyslurm/db/job.pyx
index 0b081a0e..66bb8afa 100644
--- a/pyslurm/db/job.pyx
+++ b/pyslurm/db/job.pyx
@@ -29,10 +29,6 @@ from typing import Any
from pyslurm.utils.uint import *
from pyslurm.settings import LOCAL_CLUSTER
from pyslurm import xcollections
-from pyslurm.db.stats import (
- reset_stats_for_job_collection,
- add_stats_to_job_collection,
-)
from pyslurm.utils.ctime import (
date_to_timestamp,
timestr_to_mins,
@@ -71,7 +67,7 @@ cdef class JobFilter:
self.ptr = try_xmalloc(sizeof(slurmdb_job_cond_t))
if not self.ptr:
raise MemoryError("xmalloc failed for slurmdb_job_cond_t")
-
+
self.ptr.db_flags = slurm.SLURMDB_JOB_FLAG_NOTSET
self.ptr.flags |= slurm.JOBCOND_FLAG_NO_TRUNC
@@ -119,15 +115,15 @@ cdef class JobFilter:
def _parse_state(self):
# TODO: implement
return None
-
+
def _create(self):
self._alloc()
cdef:
slurmdb_job_cond_t *ptr = self.ptr
slurm_selected_step_t *selected_step
- ptr.usage_start = date_to_timestamp(self.start_time)
- ptr.usage_end = date_to_timestamp(self.end_time)
+ ptr.usage_start = date_to_timestamp(self.start_time)
+ ptr.usage_end = date_to_timestamp(self.end_time)
ptr.cpus_min = u32(self.cpus, on_noval=0)
ptr.cpus_max = u32(self.max_cpus, on_noval=0)
ptr.nodes_min = u32(self.nodes, on_noval=0)
@@ -153,7 +149,7 @@ cdef class JobFilter:
if self.truncate_time:
ptr.flags &= ~slurm.JOBCOND_FLAG_NO_TRUNC
-
+
if self.ids:
# These are only allowed by the slurmdbd when specific jobs are
# requested.
@@ -283,34 +279,38 @@ cdef class Jobs(MultiClusterMap):
job = Job.from_ptr(job_ptr.data)
job.qos_data = qos_data
job._create_steps()
- job.stats = JobStatistics.from_job_steps(job)
+ job.stats = JobStatistics.from_steps(job.steps)
+
+ elapsed = job.elapsed_time if job.elapsed_time else 0
+ cpus = job.cpus if job.cpus else 1
+ job.stats.elapsed_cpu_time = elapsed * cpus
cluster = job.cluster
if cluster not in out.data:
out.data[cluster] = {}
out[cluster][job.id] = job
- add_stats_to_job_collection(out, job.stats)
- out.cpus += job.cpus
- out.nodes += job.num_nodes
- out.memory += job.memory
+ out._add_stats(job)
return out
def _reset_stats(self):
- reset_stats_for_job_collection(self)
+ self.stats = JobStatistics()
self.cpus = 0
self.nodes = 0
self.memory = 0
+ def _add_stats(self, job):
+ self.stats.add(job.stats)
+ self.cpus += job.cpus
+ self.nodes += job.num_nodes
+ self.memory += job.memory
+
def calc_stats(self):
"""(Re)Calculate Statistics for the Job Collection."""
self._reset_stats()
for job in self.values():
- add_stats_to_job_collection(self, job.stats)
- self.cpus += job.cpus
- self.nodes += job.num_nodes
- self.memory += job.memory
+ self._add_stats(job)
@staticmethod
def modify(db_filter, Job changes, db_connection=None):
@@ -353,7 +353,7 @@ cdef class Jobs(MultiClusterMap):
In its simplest form, you can do something like this:
>>> import pyslurm
- >>>
+ >>>
>>> db_filter = pyslurm.db.JobFilter(ids=[9999])
>>> changes = pyslurm.db.Job(comment="A comment for the job")
>>> modified_jobs = pyslurm.db.Jobs.modify(db_filter, changes)
@@ -366,7 +366,7 @@ cdef class Jobs(MultiClusterMap):
connection object:
>>> import pyslurm
- >>>
+ >>>
>>> db_conn = pyslurm.db.Connection.open()
>>> db_filter = pyslurm.db.JobFilter(ids=[9999])
>>> changes = pyslurm.db.Job(comment="A comment for the job")
@@ -433,7 +433,7 @@ cdef class Jobs(MultiClusterMap):
else:
# Autodetects the last slurm error
raise RPCError()
-
+
if not db_connection:
# Autocommit if no connection was explicitly specified.
conn.commit()
@@ -528,7 +528,7 @@ cdef class Job:
SlurmList step_list
SlurmListItem step_ptr
- step_list = SlurmList.wrap(self.ptr.steps, owned=False)
+ step_list = SlurmList.wrap(self.ptr.steps, owned=False)
for step_ptr in SlurmList.iter_and_pop(step_list):
step = JobStep.from_ptr(step_ptr.data)
self.steps[step.id] = step
@@ -593,7 +593,7 @@ cdef class Job:
@property
def num_nodes(self):
- val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str,
+ val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str,
slurm.TRES_NODE)
if val is not None:
# Job is already running and has nodes allocated
@@ -601,7 +601,7 @@ cdef class Job:
else:
# Job is still pending, so we return the number of requested nodes
# instead.
- val = TrackableResources.find_count_in_str(self.ptr.tres_req_str,
+ val = TrackableResources.find_count_in_str(self.ptr.tres_req_str,
slurm.TRES_NODE)
return val
@@ -622,7 +622,7 @@ cdef class Job:
task_str = cstr.to_unicode(self.ptr.array_task_str)
if not task_str:
return None
-
+
if "%" in task_str:
# We don't want this % character and everything after it
# in here, so remove it.
@@ -730,7 +730,7 @@ cdef class Job:
return cstr.to_unicode(self.ptr.jobname)
# uint32_t lft
-
+
@property
def mcs_label(self):
return cstr.to_unicode(self.ptr.mcs_label)
@@ -757,7 +757,7 @@ cdef class Job:
@property
def cpus(self):
- val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str,
+ val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str,
slurm.TRES_CPU)
if val is not None:
# Job is already running and has cpus allocated
@@ -765,11 +765,11 @@ cdef class Job:
else:
# Job is still pending, so we return the number of requested cpus
# instead.
- return u32_parse(self.ptr.req_cpus, on_noval=0, zero_is_noval=False)
+ return u32_parse(self.ptr.req_cpus, on_noval=1)
@property
def memory(self):
- val = TrackableResources.find_count_in_str(self.ptr.tres_req_str,
+ val = TrackableResources.find_count_in_str(self.ptr.tres_req_str,
slurm.TRES_MEM)
return val
diff --git a/pyslurm/db/stats.pxd b/pyslurm/db/stats.pxd
index 9004402b..71faca88 100644
--- a/pyslurm/db/stats.pxd
+++ b/pyslurm/db/stats.pxd
@@ -26,6 +26,7 @@ from pyslurm cimport slurm
from pyslurm.slurm cimport (
try_xmalloc,
slurmdb_stats_t,
+ slurmdb_step_rec_t,
slurmdb_job_rec_t,
)
from pyslurm.db.tres cimport TrackableResources
@@ -33,9 +34,48 @@ from pyslurm.db.step cimport JobStep, JobSteps
from pyslurm.db.job cimport Job
from pyslurm.utils cimport cstr
-
cdef class JobStatistics:
- """Statistics for a Slurm Job or Step.
+ """Statistics for a Slurm Job or Job Collection.
+
+ Attributes:
+ total_cpu_time (int):
+ Sum of user_cpu_time and system_cpu_time, in seconds
+ user_cpu_time (int):
+ Total amount of time spent in user space, in seconds
+ system_cpu_time (int):
+ Total amount of time spent in kernel space, in seconds
+ consumed_energy (int):
+ Total amount of energy consumed, in joules
+ elapsed_cpu_time (int):
+ Total amount of time used(Elapsed time * cpu count) in seconds.
+ This is not the real CPU-Efficiency, but rather the total amount
+ of cpu-time the CPUs were occupied for.
+ disk_read (int):
+ Total amount of bytes read.
+ disk_write (int):
+ Total amount of bytes written.
+ page_faults (int):
+ Total amount of page faults.
+ resident_memory (int):
+ Total Resident Set Size (RSS) used in bytes.
+ virtual_memory (int):
+ Total Virtual Memory Size (VSZ) used in bytes.
+ """
+ cdef public:
+ total_cpu_time
+ user_cpu_time
+ system_cpu_time
+ elapsed_cpu_time
+ consumed_energy
+ disk_read
+ disk_write
+ page_faults
+ resident_memory
+ virtual_memory
+
+
+cdef class JobStepStatistics:
+ """Statistics for a Slurm JobStep.
!!! note
@@ -105,8 +145,6 @@ cdef class JobStatistics:
system_cpu_time (int):
Amount of Time spent in kernel space, in seconds
"""
- cdef slurmdb_job_rec_t *job
-
cdef public:
consumed_energy
elapsed_cpu_time
@@ -140,8 +178,8 @@ cdef class JobStatistics:
system_cpu_time
@staticmethod
- cdef JobStatistics from_job_steps(Job job)
+ cdef JobStepStatistics from_step(JobStep step)
@staticmethod
- cdef JobStatistics from_step(JobStep step)
+ cdef JobStepStatistics from_ptr(slurmdb_step_rec_t *step, list nodes, cpus=*, elapsed_time=*, is_live=*)
diff --git a/pyslurm/db/stats.pyx b/pyslurm/db/stats.pyx
index c2da1145..c3379f7d 100644
--- a/pyslurm/db/stats.pyx
+++ b/pyslurm/db/stats.pyx
@@ -28,33 +28,52 @@ from pyslurm.utils.helpers import (
)
-def reset_stats_for_job_collection(jobs):
- jobs.consumed_energy = 0
- jobs.disk_read = 0
- jobs.disk_write = 0
- jobs.page_faults = 0
- jobs.resident_memory = 0
- jobs.virtual_memory = 0
- jobs.elapsed_cpu_time = 0
- jobs.total_cpu_time = 0
- jobs.user_cpu_time = 0
- jobs.system_cpu_time = 0
-
-
-def add_stats_to_job_collection(jobs, JobStatistics js):
- jobs.consumed_energy += js.consumed_energy
- jobs.disk_read += js.avg_disk_read
- jobs.disk_write += js.avg_disk_write
- jobs.page_faults += js.avg_page_faults
- jobs.resident_memory += js.avg_resident_memory
- jobs.virtual_memory += js.avg_virtual_memory
- jobs.elapsed_cpu_time += js.elapsed_cpu_time
- jobs.total_cpu_time += js.total_cpu_time
- jobs.user_cpu_time += js.user_cpu_time
- jobs.system_cpu_time += js.system_cpu_time
+cdef class JobStatistics:
+ def __init__(self):
+ for attr, val in instance_to_dict(self).items():
+ setattr(self, attr, 0)
-cdef class JobStatistics:
+ def to_dict(self):
+ return instance_to_dict(self)
+
+ @staticmethod
+ def from_steps(steps):
+ cdef JobStatistics total_stats = JobStatistics()
+ for step in steps.values():
+ total_stats._sum_steps(step.stats)
+
+ return total_stats
+
+ def _sum_steps(self, src):
+ self.consumed_energy += src.consumed_energy
+ self.disk_read += src.avg_disk_read
+ self.disk_write += src.avg_disk_write
+ self.page_faults += src.avg_page_faults
+ self.total_cpu_time += src.total_cpu_time
+ self.user_cpu_time += src.user_cpu_time
+ self.system_cpu_time += src.system_cpu_time
+
+ if src.max_resident_memory > self.resident_memory:
+ self.resident_memory = src.max_resident_memory
+
+ if src.max_virtual_memory > self.resident_memory:
+ self.virtual_memory = src.max_virtual_memory
+
+ def add(self, src):
+ self.consumed_energy += src.consumed_energy
+ self.disk_read += src.disk_read
+ self.disk_write += src.disk_write
+ self.page_faults += src.page_faults
+ self.total_cpu_time += src.total_cpu_time
+ self.user_cpu_time += src.user_cpu_time
+ self.system_cpu_time += src.system_cpu_time
+ self.resident_memory += src.resident_memory
+ self.virtual_memory += src.virtual_memory
+ self.elapsed_cpu_time += src.elapsed_cpu_time
+
+
+cdef class JobStepStatistics:
def __init__(self):
for attr, val in instance_to_dict(self).items():
@@ -77,41 +96,32 @@ cdef class JobStatistics:
return instance_to_dict(self)
@staticmethod
- cdef JobStatistics from_job_steps(Job job):
- cdef JobStatistics job_stats = JobStatistics()
-
- for step in job.steps.values():
- job_stats._add_base_stats(step.stats)
-
- job_stats._sum_cpu_time(job)
-
- step_count = len(job.steps)
- if step_count:
- job_stats.avg_cpu_frequency /= step_count
-
- return job_stats
+ cdef JobStepStatistics from_step(JobStep step):
+ return JobStepStatistics.from_ptr(
+ step.ptr,
+ nodelist_from_range_str(cstr.to_unicode(step.ptr.nodes)),
+ step.cpus if step.cpus else 0,
+ step.elapsed_time if step.elapsed_time else 0,
+ is_live=False,
+ )
@staticmethod
- cdef JobStatistics from_step(JobStep step):
- cdef JobStatistics wrap = JobStatistics()
- if not &step.ptr.stats:
+ cdef JobStepStatistics from_ptr(slurmdb_step_rec_t *step, list nodes, cpus=0, elapsed_time=0, is_live=False):
+ cdef JobStepStatistics wrap = JobStepStatistics()
+ if not step:
return wrap
cdef:
- list nodes = nodelist_from_range_str(
- cstr.to_unicode(step.ptr.nodes))
cpu_time_adj = 1000
- slurmdb_stats_t *ptr = &step.ptr.stats
+ slurmdb_stats_t *ptr = &step.stats
if ptr.consumed_energy != slurm.NO_VAL64:
wrap.consumed_energy = ptr.consumed_energy
- wrap.avg_cpu_time = TrackableResources.find_count_in_str(
- ptr.tres_usage_in_ave, slurm.TRES_CPU) / cpu_time_adj
+ wrap.avg_cpu_time = int(TrackableResources.find_count_in_str(
+ ptr.tres_usage_in_ave, slurm.TRES_CPU) / cpu_time_adj)
- elapsed = step.elapsed_time if step.elapsed_time else 0
- cpus = step.cpus if step.cpus else 0
- wrap.elapsed_cpu_time = elapsed * cpus
+ wrap.elapsed_cpu_time = elapsed_time * cpus
ave_freq = int(ptr.act_cpufreq)
if ave_freq != slurm.NO_VAL:
@@ -127,7 +137,7 @@ cdef class JobStatistics:
ptr.tres_usage_in_ave, slurm.TRES_MEM)
wrap.avg_virtual_memory = TrackableResources.find_count_in_str(
ptr.tres_usage_in_ave, slurm.TRES_VMEM)
-
+
wrap.max_disk_read = TrackableResources.find_count_in_str(
ptr.tres_usage_in_max, slurm.TRES_FS_DISK)
max_disk_read_nodeid = TrackableResources.find_count_in_str(
@@ -156,15 +166,27 @@ cdef class JobStatistics:
wrap.max_virtual_memory_task = TrackableResources.find_count_in_str(
ptr.tres_usage_in_max_taskid, slurm.TRES_VMEM)
- wrap.min_cpu_time = TrackableResources.find_count_in_str(
- ptr.tres_usage_in_min, slurm.TRES_CPU) / cpu_time_adj
+ wrap.min_cpu_time = int(TrackableResources.find_count_in_str(
+ ptr.tres_usage_in_min, slurm.TRES_CPU) / cpu_time_adj)
min_cpu_time_nodeid = TrackableResources.find_count_in_str(
ptr.tres_usage_in_min_nodeid, slurm.TRES_CPU)
wrap.min_cpu_time_task = TrackableResources.find_count_in_str(
ptr.tres_usage_in_min_taskid, slurm.TRES_CPU)
- wrap.total_cpu_time = TrackableResources.find_count_in_str(
- ptr.tres_usage_in_tot, slurm.TRES_CPU)
+ # The Total CPU-Time extracted here is only used for live-stats.
+ # sacct does not use it from the tres_usage_in_tot string, but instead
+ # the tot_cpu_sec value from the step pointer directly, so do that too.
+ if is_live:
+ wrap.total_cpu_time = int(TrackableResources.find_count_in_str(
+ ptr.tres_usage_in_tot, slurm.TRES_CPU) / cpu_time_adj)
+ elif step.tot_cpu_sec != slurm.NO_VAL64:
+ wrap.total_cpu_time += step.tot_cpu_sec
+
+ if step.user_cpu_sec != slurm.NO_VAL64:
+ wrap.user_cpu_time += step.user_cpu_sec
+
+ if step.sys_cpu_sec != slurm.NO_VAL64:
+ wrap.system_cpu_time += step.sys_cpu_sec
if nodes:
wrap.max_disk_write_node = nodes[max_disk_write_nodeid]
@@ -173,64 +195,4 @@ cdef class JobStatistics:
wrap.max_virtual_memory_node = nodes[max_virtual_memory_nodeid]
wrap.min_cpu_time_node = nodes[min_cpu_time_nodeid]
- if step.ptr.user_cpu_sec != slurm.NO_VAL64:
- wrap.user_cpu_time = step.ptr.user_cpu_sec
-
- if step.ptr.sys_cpu_sec != slurm.NO_VAL64:
- wrap.system_cpu_time = step.ptr.sys_cpu_sec
-
return wrap
-
- def _add_base_stats(self, JobStatistics src):
- self.consumed_energy += src.consumed_energy
- self.avg_cpu_time += src.avg_cpu_time
- self.avg_cpu_frequency += src.avg_cpu_frequency
- self.avg_disk_read += src.avg_disk_read
- self.avg_disk_write += src.avg_disk_write
- self.avg_page_faults += src.avg_page_faults
-
- if src.max_disk_read >= self.max_disk_read:
- self.max_disk_read = src.max_disk_read
- self.max_disk_read_node = src.max_disk_read_node
- self.max_disk_read_task = src.max_disk_read_task
-
- if src.max_disk_write >= self.max_disk_write:
- self.max_disk_write = src.max_disk_write
- self.max_disk_write_node = src.max_disk_write_node
- self.max_disk_write_task = src.max_disk_write_task
-
- if src.max_page_faults >= self.max_page_faults:
- self.max_page_faults = src.max_page_faults
- self.max_page_faults_node = src.max_page_faults_node
- self.max_page_faults_task = src.max_page_faults_task
-
- if src.max_resident_memory >= self.max_resident_memory:
- self.max_resident_memory = src.max_resident_memory
- self.max_resident_memory_node = src.max_resident_memory_node
- self.max_resident_memory_task = src.max_resident_memory_task
- self.avg_resident_memory = self.max_resident_memory
-
- if src.max_virtual_memory >= self.max_virtual_memory:
- self.max_virtual_memory = src.max_virtual_memory
- self.max_virtual_memory_node = src.max_virtual_memory_node
- self.max_virtual_memory_task = src.max_virtual_memory_task
- self.avg_virtual_memory = self.max_virtual_memory
-
- if src.min_cpu_time >= self.min_cpu_time:
- self.min_cpu_time = src.min_cpu_time
- self.min_cpu_time_node = src.min_cpu_time_node
- self.min_cpu_time_task = src.min_cpu_time_task
-
- def _sum_cpu_time(self, Job job):
- if job.ptr.tot_cpu_sec != slurm.NO_VAL64:
- self.total_cpu_time += job.ptr.tot_cpu_sec
-
- if job.ptr.user_cpu_sec != slurm.NO_VAL64:
- self.user_cpu_time += job.ptr.user_cpu_sec
-
- if job.ptr.sys_cpu_sec != slurm.NO_VAL64:
- self.system_cpu_time += job.ptr.sys_cpu_sec
-
- elapsed = job.elapsed_time if job.elapsed_time else 0
- cpus = job.cpus if job.cpus else 0
- self.elapsed_cpu_time += elapsed * cpus
diff --git a/pyslurm/db/step.pxd b/pyslurm/db/step.pxd
index 906be912..6b9466ce 100644
--- a/pyslurm/db/step.pxd
+++ b/pyslurm/db/step.pxd
@@ -39,7 +39,7 @@ from pyslurm.slurm cimport (
from pyslurm.db.util cimport SlurmList, SlurmListItem
from pyslurm.db.connection cimport Connection
from pyslurm.utils cimport cstr
-from pyslurm.db.stats cimport JobStatistics
+from pyslurm.db.stats cimport JobStepStatistics
from pyslurm.db.tres cimport TrackableResources, TrackableResource
@@ -52,7 +52,7 @@ cdef class JobStep:
"""A Slurm Database JobStep.
Attributes:
- stats (pyslurm.db.JobStatistics):
+ stats (pyslurm.db.JobStepStatistics):
Utilization statistics for this Step
num_nodes (int):
Amount of nodes this Step has allocated
@@ -96,7 +96,7 @@ cdef class JobStep:
Amount of seconds the Step was suspended
"""
cdef slurmdb_step_rec_t *ptr
- cdef public JobStatistics stats
+ cdef public JobStepStatistics stats
@staticmethod
cdef JobStep from_ptr(slurmdb_step_rec_t *step)
diff --git a/pyslurm/db/step.pyx b/pyslurm/db/step.pyx
index 2d71ca73..0faf4d79 100644
--- a/pyslurm/db/step.pyx
+++ b/pyslurm/db/step.pyx
@@ -65,7 +65,7 @@ cdef class JobStep:
cdef JobStep from_ptr(slurmdb_step_rec_t *step):
cdef JobStep wrap = JobStep.__new__(JobStep)
wrap.ptr = step
- wrap.stats = JobStatistics.from_step(wrap)
+ wrap.stats = JobStepStatistics.from_step(wrap)
return wrap
def to_dict(self):
@@ -110,7 +110,7 @@ cdef class JobStep:
@property
def memory(self):
- val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str,
+ val = TrackableResources.find_count_in_str(self.ptr.tres_alloc_str,
slurm.TRES_MEM)
return val
diff --git a/pyslurm/xcollections.pyx b/pyslurm/xcollections.pyx
index a0ce2e6b..4e99eaf5 100644
--- a/pyslurm/xcollections.pyx
+++ b/pyslurm/xcollections.pyx
@@ -21,7 +21,7 @@
#
# cython: c_string_type=unicode, c_string_encoding=default
# cython: language_level=3
-"""Custom Collection utilities"""
+"""Custom Collection utilities"""
from pyslurm.settings import LOCAL_CLUSTER
import json
@@ -158,7 +158,7 @@ class ItemsView(BaseView):
"""
return MCItemsView(self._mcm)
-
+
class MCItemsView(BaseView):
"""A Multi-Cluster Items View.
@@ -240,7 +240,7 @@ cdef class MultiClusterMap:
cluster, key = item
else:
cluster, key = self._get_cluster(), item
-
+
return cluster, key
def _check_val_type(self, item):
@@ -450,12 +450,18 @@ cdef class MultiClusterMap:
if not self.data:
return '{}'
+ return json.dumps(self.to_dict(multi_cluster=multi_cluster))
+
+ def to_dict(self, multi_cluster=False):
+ if not self.data:
+ return {}
+
data = multi_dict_recursive(self)
if multi_cluster:
- return json.dumps(data)
+ return data
else:
cluster = self._get_cluster()
- return json.dumps(data[cluster])
+ return data[cluster]
def keys(self):
"""Return a View of all the Keys in this collection
@@ -476,7 +482,7 @@ cdef class MultiClusterMap:
... print(cluster, key)
"""
return KeysView(self)
-
+
def items(self):
"""Return a View of all the Values in this collection
@@ -556,12 +562,12 @@ cdef class MultiClusterMap:
item = self.get(key, default=default)
if item is default or item == default:
return default
-
+
cluster = item.cluster
del self.data[cluster][key]
if not self.data[cluster]:
del self.data[cluster]
-
+
return item
def update(self, data={}, **kwargs):
@@ -591,7 +597,7 @@ def multi_reload(cur, frozen=True):
for cluster, item in new.keys().with_cluster():
if (cluster, item) not in cur.keys().with_cluster():
cur[cluster][item] = new[cluster][item]
-
+
return cur
diff --git a/setup.py b/setup.py
index 77e767cd..48c2c14d 100644
--- a/setup.py
+++ b/setup.py
@@ -15,7 +15,7 @@
CYTHON_VERSION_MIN = "0.29.37" # Keep in sync with pyproject.toml
-SLURM_LIB = "libslurm"
+SLURM_LIB = "libslurmfull"
TOPDIR = Path(__file__).parent
PYTHON_MIN_REQUIRED = (3, 6)
@@ -101,6 +101,8 @@ class SlurmConfig():
def __init__(self):
# Assume some defaults here
self._lib_dir = Path("/usr/lib64")
+ self._lib = None
+ self._lib_dir_search_paths = []
self.inc_dir = Path("/usr/include")
self._version = None
@@ -110,13 +112,17 @@ def _find_hdr(self, name):
raise RuntimeError(f"Cannot locate {name} in {self.inc_full_dir}")
return hdr
- def _find_lib(self, lib_dir):
+ def _search_lib(self, lib_dir):
+ if self._lib:
+ return
+
lib = lib_dir / f"{SLURM_LIB}.so"
if not lib.exists():
- raise RuntimeError(f"Cannot locate Slurm library in {lib_dir}")
-
- print(f"Found {SLURM_LIB} library in {lib}")
- return lib_dir
+ self._lib_dir_search_paths.append(str(lib_dir))
+ else:
+ print(f"Found slurm library: {lib}")
+ self._lib = lib
+ self._lib_dir = lib_dir
@property
def lib_dir(self):
@@ -124,11 +130,14 @@ def lib_dir(self):
@lib_dir.setter
def lib_dir(self, path):
- lib_dir = Path(path)
- if SLURM_LIB == "libslurmfull":
- lib_dir /= "slurm"
-
- self._lib_dir = self._find_lib(lib_dir)
+ self._search_lib(path)
+ self._search_lib(path / "slurm")
+ self._search_lib(path / "slurm-wlm")
+
+ if not self._lib:
+ searched = "\n- ".join(self._lib_dir_search_paths)
+ raise RuntimeError("Cannot locate Slurm library. Searched paths: "
+ f"\n- {searched}")
@property
def inc_full_dir(self):
diff --git a/tests/integration/test_job.py b/tests/integration/test_job.py
index 8c9d4750..a6e0c6e4 100644
--- a/tests/integration/test_job.py
+++ b/tests/integration/test_job.py
@@ -32,6 +32,7 @@
JobSubmitDescription,
RPCError,
)
+from pyslurm.db import JobStatistics, JobStepStatistics
def test_parse_all(submit_job):
@@ -171,6 +172,27 @@ def test_load_steps(submit_job):
assert job.steps.get("batch")
+def test_load_stats(submit_job):
+ job = submit_job()
+ util.wait(100)
+
+ job = Job.load(job.id)
+ job.load_stats()
+
+ assert job.state == "RUNNING"
+ assert job.stats
+ assert isinstance(job.stats, JobStatistics)
+ assert job.stats.elapsed_cpu_time > 0
+ assert job.stats.resident_memory > 0
+
+ for step in job.steps.values():
+ assert step.stats
+ assert step.state == "RUNNING"
+ assert isinstance(step.stats, JobStepStatistics)
+ assert step.stats.avg_resident_memory > 0
+ assert step.stats.elapsed_cpu_time > 0
+
+
def test_to_json(submit_job):
job_list = [submit_job() for i in range(3)]
util.wait()