Skip to content

Commit bb452a4

Browse files
authored
Merge pull request #8159 from fstagni/cherry-pick-2-16a2dd1a3-integration
[sweep:integration] Add cgroup2 limit support
2 parents 0601fa8 + 1bfdd4e commit bb452a4

File tree

6 files changed

+399
-20
lines changed

6 files changed

+399
-20
lines changed

docs/source/AdministratorGuide/Resources/computingelements.rst

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -175,3 +175,37 @@ section ::
175175
}
176176
}
177177
}
178+
179+
Applying cgroup2 limits to computing resources
180+
@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@@
181+
182+
Both the :mod:`~DIRAC.Resources.Computing.InProcessComputingElement` and
183+
:mod:`~DIRAC.Resources.Computing.SingularityComputingElement` CEs support applying Linux cgroup2 CPU and memory limits to
184+
the slot. These will be applied if the site allows cgroup2 delegation, if this is not available execution will continue
185+
without the limits. The limit values can be specified using the following CE parameters (all settings are optional and can
186+
be left undefined if not needed):
187+
188+
- CPULimit (float) - The number of cores that the job may use. Usage beyond this will be throttled.
189+
- MemoryLimitMB (int) - The memory limit for the job in MB. Usage beyond this will trigger the out-of-memory killer
190+
considering processes within the slot.
191+
- MemoryNoSwap (bool) - If yes or true, the job will not be allowed to use swap memory. Swap memory is not included
192+
in the main memory limit.
193+
194+
Note that the memory limit should be lower than the amount requested with the submission CE in order to allow the main
195+
pilot processes to be protected. For example if you request 4096M (e.g. via XRSL) at submission, around 150M is needed
196+
for the pilot, so a limit of 3950M would be recommended.
197+
198+
These can be specified in the CEDefaults section to apply a standardised slot size limit::
199+
200+
Resources
201+
{
202+
Computing
203+
{
204+
CEDefaults
205+
{
206+
CPULimit = 1.0
207+
MemoryLimitMB = 3950
208+
MemoryNoSwap = True
209+
}
210+
}
211+
}
Lines changed: 328 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,328 @@
1+
#!/usr/bin/env python3
2+
"""cgroup2 support for DIRAC pilot."""
3+
4+
import os
5+
import functools
6+
import subprocess
7+
from DIRAC import S_OK, S_ERROR, gLogger
8+
from DIRAC.Core.Utilities.DIRACSingleton import DIRACSingleton
9+
from DIRAC.Core.Utilities import Subprocess
10+
11+
12+
class CG2Manager(metaclass=DIRACSingleton):
13+
"""A class to manage cgroup2 hierachy for a typical pilot job use-case.
14+
15+
This creates a group for all of the pilot processes (anything in the
16+
group at the start. This is a requirement for controlling the
17+
sub-groups (no processes in non-leaf groups).
18+
19+
A group is then created on request for each "slot" under the pilot,
20+
with the requested limits.
21+
"""
22+
23+
# Paths used to lookup cgroup info
24+
FILE_MOUNTS = "/proc/mounts"
25+
FILE_CUR_CGROUP = f"/proc/{os.getpid()}/cgroup"
26+
# Control file names within the cgroup2 hierachy
27+
CTRL_CONTROLLERS = "cgroup.controllers"
28+
CTRL_PROCS = "cgroup.procs"
29+
CTRL_SUBTREE = "cgroup.subtree_control"
30+
CTRL_MEM_OOM_GROUP = "memory.oom.group"
31+
CTRL_MEM_EVENTS = "memory.events"
32+
CTRL_MEM_MAX = "memory.max"
33+
CTRL_MEM_SWAP_MAX = "memory.swap.max"
34+
CTRL_MEM_PEAK = "memory.peak"
35+
CTRL_CPU_MAX = "cpu.max"
36+
# CPU controller constants
37+
# Weight is the max value for 1 CPU core
38+
CPU_WEIGHT = 100000
39+
# Period is the averaging time in us to apply the limit
40+
# The default is 100k and I see no particularly reason this should change
41+
CPU_PERIOD = 100000
42+
# Name of the group for the existing pilot processes
43+
PILOT_GROUP = f"dirac_pilot_{os.getpid()}"
44+
45+
def __init__(self):
46+
"""Set-up CGroup2 manager."""
47+
# This boolean will be set to True if the cgroups are configured
48+
# in the expected way
49+
self._ready = False
50+
# A counter of number of subgroups created
51+
# Used to create unique group names
52+
self._subproc_num = 0
53+
# Physical path to the starting cgroup for this process
54+
# (i.e. the base of our hierachy)
55+
self._cgroup_path = None
56+
# Logger
57+
self.log = gLogger.getSubLogger("CG2Manager")
58+
59+
@staticmethod
60+
def _filter_file(path, filterfcn):
61+
"""Opens a file and runs filterfcn for each line.
62+
If filterfcn returns any value, that value will be returned
63+
by this function.
64+
Returns None if no line matches.
65+
"""
66+
with open(path, encoding="ascii") as file_in:
67+
for line in file_in.readlines():
68+
line = line.strip()
69+
if res := filterfcn(line):
70+
return res
71+
return None
72+
73+
def _detect_root(self):
74+
"""Find the cgroup2 filesystem mountpoint on this system.
75+
Returns the mountpoint path or None if it isn't found.
76+
"""
77+
78+
def filt(line):
79+
"""Filter function to find the first cgroup2 mount point
80+
from a standard /proc/mounts layout file.
81+
"""
82+
parts = line.split(" ")
83+
if len(parts) < 3:
84+
return None
85+
if parts[2] == "cgroup2":
86+
return parts[1]
87+
return None
88+
89+
return self._filter_file(self.FILE_MOUNTS, filt)
90+
91+
def _detect_path(self):
92+
"""Finds the full physical path to the current cgroup control dir.
93+
Sets self._cgroup_path on success.
94+
Raises a RuntimeError if the path cannot be determined.
95+
"""
96+
97+
def filt(line):
98+
"""Filter to find the current cgroup2 name for the current
99+
process, without the leading /.
100+
"""
101+
if line.startswith("0::/"):
102+
return line[4:]
103+
return False
104+
105+
if not (root_path := self._detect_root()):
106+
raise RuntimeError("Failed to find cgroup mount point")
107+
if not (cur_group := self._filter_file(self.FILE_CUR_CGROUP, filt)):
108+
raise RuntimeError("Failed to find current cgroup")
109+
self._cgroup_path = os.path.join(root_path, cur_group)
110+
111+
def _create_group(self, group_name, isolate_oom=True):
112+
"""Creates a new group.
113+
If "isolate_oom" is True, the new group will be decoupled
114+
from the parent's OOM group.
115+
Raises a RuntimeError if the group cannot be created.
116+
"""
117+
try:
118+
os.mkdir(os.path.join(self._cgroup_path, group_name))
119+
except PermissionError as err:
120+
raise RuntimeError(f"Permission denied creating sub-cgroup '{group_name}'") from err
121+
if isolate_oom:
122+
self._write_control(group_name, self.CTRL_MEM_OOM_GROUP, "0")
123+
124+
def _remove_group(self, group_name):
125+
"""Removes a group."""
126+
os.rmdir(os.path.join(self._cgroup_path, group_name))
127+
128+
def _move_init_procs(self):
129+
"""Creates the pilot sub-group and moves all of the initial processes
130+
from the top group into the new sub-group.
131+
Will raise a RuntimeError if any cgroup configuration problem
132+
prevents this from completing succesfully.
133+
"""
134+
self._create_group(self.PILOT_GROUP, isolate_oom=False)
135+
cur_pids = self._read_control("", self.CTRL_PROCS)
136+
self._write_control(self.PILOT_GROUP, self.CTRL_PROCS, cur_pids)
137+
138+
def _read_control(self, group_name, ctrl_name):
139+
"""Reads a control value for the given group_name (relative to our base path).
140+
The returned value varies depending on the value content:
141+
- For a single token value, a string containing that token will be returned.
142+
- For a single line value with space-seperated tokens, a list of tokens will be returned.
143+
- For a multi-line value (where each line is a token), a list of tokens will be returned.
144+
All tokens in the return values are strings.
145+
A RuntimeError will be raised if the control cannot be read.
146+
"""
147+
try:
148+
with open(
149+
os.path.join(self._cgroup_path, group_name, ctrl_name),
150+
encoding="ascii",
151+
) as file_in:
152+
values = [line.strip() for line in file_in.readlines()]
153+
if " " in values and len(values) == 1:
154+
values = values[0].split(" ")
155+
if len(values) == 1:
156+
values = values[0]
157+
return values
158+
except PermissionError as err:
159+
raise RuntimeError(f"Access denied reading read control '{group_name}/{ctrl_name}'") from err
160+
161+
def _write_control(self, group_name, ctrl_name, value):
162+
"""Writes a control value for a given group_name (relative to our base path).
163+
The value can be a string or an iterable of strings. The values should not
164+
contain any whitespace characters.
165+
A RuntimeError will be raised if the control cannot be set.
166+
"""
167+
try:
168+
ctrl_path = os.path.join(self._cgroup_path, group_name, ctrl_name)
169+
with open(ctrl_path, "w", encoding="ascii") as file_out:
170+
if isinstance(value, str):
171+
value = [value]
172+
for arg in value:
173+
file_out.write(f"{arg}\n")
174+
# Flush is critical here as setting multiple values at the same time may fail
175+
file_out.flush()
176+
except PermissionError as err:
177+
raise RuntimeError(f"Access denied writing control '{group_name}/{ctrl_name}'") from err
178+
except OSError as err:
179+
# This generally happens if we're trying to set a value that is
180+
# considered invalid, for example delegating a controller that isn't enabled
181+
# in the first place.
182+
raise RuntimeError(f"Error writing control '{group_name}/{ctrl_name}' = {value}") from err
183+
184+
def _get_oom_count(self, slot_name):
185+
"""Extracts the OOM counter as an int for the given slot.
186+
Returns an int on success, can return a None if the memory.events
187+
doesn't contain an oom counter or throws RuntimeError on failure.
188+
"""
189+
190+
def filt(line):
191+
"""Filter to find the oom counter from a memory.events file."""
192+
if line.startswith("oom "):
193+
return int(line[4:])
194+
return False
195+
196+
mem_events = os.path.join(self._cgroup_path, slot_name, self.CTRL_MEM_EVENTS)
197+
return self._filter_file(mem_events, filt)
198+
199+
def _set_limits(self, group_name, cores=None, memory=None, noswap=False):
200+
"""Sets the limits for an existing group.
201+
See create_slot for a description of the other parameters.
202+
This will raise a RuntimeError if appyling any of the limits fail to apply.
203+
"""
204+
if cores:
205+
proc_max = int(cores * self.CPU_WEIGHT)
206+
self._write_control(group_name, self.CTRL_CPU_MAX, f"{proc_max} {self.CPU_PERIOD}")
207+
if memory:
208+
self._write_control(group_name, self.CTRL_MEM_MAX, f"{memory}")
209+
if noswap:
210+
self._write_control(group_name, self.CTRL_MEM_SWAP_MAX, "0")
211+
212+
def _prepare(self):
213+
"""Sets up the cgroup tree for the current process.
214+
Should be called once, before using any of the other functions in this class.
215+
216+
Note that this function (specifcally the _move_init_procs call) assumes that
217+
the list of processes is static. If the process list changes while this is running,
218+
it is likely that this will fail to set things up properly.
219+
"""
220+
self._detect_path()
221+
controllers = self._read_control("", self.CTRL_CONTROLLERS)
222+
if not controllers:
223+
raise RuntimeError("No controllers enabled")
224+
for ctrl in ["cpu", "memory"]:
225+
if not ctrl in controllers:
226+
raise RuntimeError(f"{ctrl} controller not enabled")
227+
self._move_init_procs()
228+
self._write_control("", self.CTRL_SUBTREE, ["+cpu", "+memory"])
229+
self._ready = True
230+
231+
def _create_slot(self, slot_name, cores=None, memory=None, noswap=False):
232+
"""Creates a slot for a job with the given slot_name.
233+
Cores is a float, number of CPU cores this group may use.
234+
Memory is a string or int, either a number of bytes to limit the group RSS,
235+
or a string limit with a unit suffix, e.g. "1G" as supported by the cgroup memory
236+
controller.
237+
If noswap is set to true, the swap memory limit will be set to 0; this is mostly
238+
useful for testing (where the system may swap memory instead of triggering an
239+
OOM, which may allow a process to use more than the memory limit).
240+
This will raise a RuntimeError if setting up the slot fails.
241+
"""
242+
if not self._ready:
243+
return
244+
self._create_group(slot_name)
245+
self._set_limits(slot_name, cores, memory, noswap)
246+
247+
def _remove_slot(self, slot_name):
248+
"""Removes a slot with the given name.
249+
Can raise usual filesystem OSError if the slot doesn't exist.
250+
"""
251+
if not self._ready:
252+
return
253+
self._remove_group(slot_name)
254+
255+
def _setup_subproc(self, slot_name):
256+
"""A subprocess preexec function for setting up cgroups.
257+
This will move te current process into the given cgroup slot.
258+
On failure, no error will be reported.
259+
"""
260+
# Threading danger!
261+
# There are potential threading issues with preexec functions
262+
# They must not hold any locks that the parent process might already
263+
# be holding, including ones in standard library functions.
264+
# This function should be kept as minimal as possible.
265+
try:
266+
self._write_control(slot_name, self.CTRL_PROCS, f"{os.getpid()}")
267+
except Exception as err:
268+
# We can't even really log here as we're in the set-up
269+
# context of the new proces
270+
pass
271+
272+
def setUp(self):
273+
"""Creates the base cgroup tree if possible. Should be called once
274+
per process before using systemCall.
275+
Returns S_OK/S_ERROR.
276+
"""
277+
try:
278+
self._prepare()
279+
except Exception as err:
280+
# The majority of CGroup failures will be RuntimeError
281+
# However we don't want any unexpected failure to crash the upstream module,
282+
# We just want to continue without cgroup support instead
283+
return S_ERROR(str(err))
284+
return S_OK()
285+
286+
def systemCall(self, *args, **kwargs):
287+
"""A proxy function for Subprocess.systemCall but will create a cgroup2 slot
288+
if the functionality is available. An optional ceParameters dictionary
289+
may be included, which will be searched for specific cgroup memory options.
290+
Returns the usual S_OK/S_ERROR from Subprocess.systemCall.
291+
"""
292+
preexec_fn = None
293+
slot_name = f"subproc_{os.getpid()}_{self._subproc_num}"
294+
self._subproc_num += 1
295+
if self._ready:
296+
self.log.info(f"Creating slot cgroup {slot_name}")
297+
cores = None
298+
memory = None
299+
noswap = False
300+
if "ceParameters" in kwargs:
301+
if cpuLimit := kwargs["ceParameters"].get("CPULimit", None):
302+
cores = float(cpuLimit)
303+
if memoryMB := int(kwargs["ceParameters"].get("MemoryLimitMB", 0)):
304+
memory = memoryMB * 1024 * 1024
305+
if kwargs["ceParameters"].get("MemoryNoSwap", "no").lower() in ("yes", "true"):
306+
noswap = True
307+
try:
308+
self.log.info(f"CGroup Limits, CPU: {cores}, Mem: {memory}, NoSwap: {noswap}")
309+
self._create_slot(slot_name, cores=cores, memory=memory, noswap=noswap)
310+
preexec_fn = functools.partial(CG2Manager._setup_subproc, self, slot_name)
311+
except Exception as err:
312+
self.log.warn("Failed to create slot cgroup:", str(err))
313+
kwargs["preexec_fn"] = preexec_fn
314+
kwargs.pop("ceParameters", None)
315+
res = Subprocess.systemCall(*args, **kwargs)
316+
if self._ready:
317+
self.log.info(f"Removing slot cgroup {slot_name}")
318+
try:
319+
oom_count = self._get_oom_count(slot_name)
320+
if oom_count:
321+
# Child process triggered an OOM
322+
# We can't readily report this upstream (child process will probably
323+
# fail with an error code), so just log it and continue
324+
self.log.info(f"OOM detected from child process (slot {slot_name})")
325+
self._remove_slot(slot_name)
326+
except Exception as err:
327+
self.log.warn(f"Failed to delete slot {slot_name} cgroup:", str(err))
328+
return res

0 commit comments

Comments
 (0)