-
Notifications
You must be signed in to change notification settings - Fork 50
Expand file tree
/
Copy pathutils.py
More file actions
387 lines (328 loc) · 13.4 KB
/
utils.py
File metadata and controls
387 lines (328 loc) · 13.4 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
# utility functions for the SLURM executor plugin
from collections import Counter
import math
import os
import shlex
import subprocess
import re
from pathlib import Path
from typing import Union
from snakemake_interface_executor_plugins.dag import DAGExecutorInterface
from snakemake_interface_executor_plugins.jobs import (
JobExecutorInterface,
)
from snakemake_interface_common.exceptions import WorkflowError
def get_max_array_size() -> int:
"""
Function to get the maximum array size for SLURM job arrays. This is used
to determine how many jobs can be submitted in a single array job.
Returns:
The maximum array size for SLURM job arrays, as an integer.
Defaults to 1000 if the SLURM_ARRAY_MAX environment variable is not set
or cannot be parsed as an integer.
"""
max_array_size_str = None
scontrol_cmd = "scontrol show config"
try:
res = subprocess.run(
shlex.split(scontrol_cmd),
capture_output=True,
text=True,
timeout=5,
)
out = (res.stdout or "") + (res.stderr or "")
m = re.search(r"MaxArraySize\s*=?\s*(\d+)", out, re.IGNORECASE)
if m:
max_array_size_str = m.group(1)
except (subprocess.SubprocessError, OSError):
max_array_size_str = None
try:
max_array_size = int(max_array_size_str)
except (ValueError, TypeError):
max_array_size = 1000
# The SLURM_ARRAY_MAX limits to its value -1
return max_array_size - 1
def get_job_wildcards(job: JobExecutorInterface) -> str:
"""
Function to get the wildcards of a job as a string. This is used to
create the job name for the SLURM job submission.
Args:
job: The JobExecutorInterface instance representing the job
Returns:
A string representation of the job's wildcards, with slashes replaced
by underscores.
"""
try:
wildcard_str = (
"_".join(job.wildcards).replace("/", "_") if job.wildcards else ""
)
except AttributeError:
wildcard_str = ""
return wildcard_str
def pending_jobs_for_rule(dag: DAGExecutorInterface, rule_name: str) -> int:
"""Count pending jobs for a given rule in the DAG."""
counts = Counter(job.rule.name for job in dag.needrun_jobs())
return counts.get(rule_name, 0)
def round_half_up(n):
return int(math.floor(n + 0.5))
def parse_time_to_minutes(time_value: Union[str, int, float]) -> int:
"""
Convert a time specification to minutes (integer). This function
is intended to handle the partition definitions for the max_runtime
value in a partition config file.
Supports:
- Numeric values (assumed to be in minutes): 120, 120.5
- Snakemake-style time strings: "6d", "12h", "30m", "90s", "2d12h30m"
- SLURM time formats:
- "minutes" (e.g., "60")
- "minutes:seconds" (interpreted as hours:minutes, e.g., "60:30")
- "hours:minutes:seconds" (e.g., "1:30:45")
- "days-hours" (e.g., "2-12")
- "days-hours:minutes" (e.g., "2-12:30")
- "days-hours:minutes:seconds" (e.g., "2-12:30:45")
Args:
time_value: Time specification as string, int, or float
Returns:
Time in minutes as integer (fractional minutes are rounded)
Raises:
WorkflowError: If the time format is invalid
"""
# If already numeric, return as integer minutes (rounded)
if isinstance(time_value, (int, float)):
return round_half_up(time_value) # implicit conversion to int
# Convert to string and strip whitespace
time_str = str(time_value).strip()
# Try to parse as plain number first
try:
return round_half_up(float(time_str)) # implicit conversion to int
except ValueError:
pass
# Try SLURM time formats first (with colons and dashes)
# Format: days-hours:minutes:seconds or variations
if "-" in time_str or ":" in time_str:
try:
days = 0
hours = 0
minutes = 0
seconds = 0
# Split by dash first (days separator)
if "-" in time_str:
parts = time_str.split("-")
if len(parts) != 2:
raise ValueError("Invalid format with dash")
days = int(parts[0])
time_str = parts[1]
# Split by colon (time separator)
time_parts = time_str.split(":")
if len(time_parts) == 1:
# Just hours (after dash) or just minutes
if days > 0:
hours = int(time_parts[0])
else:
minutes = int(time_parts[0])
elif len(time_parts) == 2:
# was: days-hours:minutes
hours = int(time_parts[0])
minutes = int(time_parts[1])
elif len(time_parts) == 3:
# was: hours:minutes:seconds
hours = int(time_parts[0])
minutes = int(time_parts[1])
seconds = int(time_parts[2])
else:
raise ValueError("Too many colons in time format")
# Convert everything to minutes
total_minutes = days * 24 * 60 + hours * 60 + minutes + seconds / 60.0
return round_half_up(total_minutes) # implicit conversion to int
except (ValueError, IndexError):
# If SLURM format parsing fails, try Snakemake style below
pass
# Parse Snakemake-style time strings (e.g., "6d", "12h", "30m", "90s", "2d12h30m")
# Pattern matches: optional number followed by unit (d, h, m, s)
pattern = r"(\d+(?:\.\d+)?)\s*([dhms])"
matches = re.findall(pattern, time_str.lower())
if not matches:
raise WorkflowError(
f"Invalid time format: '{time_value}'. "
f"Expected formats:\n"
f" - Numeric value in minutes: 120\n"
f" - Snakemake style: '6d', '12h', '30m', '90s', '2d12h30m'\n"
f" - SLURM style: 'minutes', 'minutes:seconds', 'hours:minutes:seconds',\n"
f" 'days-hours', 'days-hours:minutes', 'days-hours:minutes:seconds'"
)
total_minutes = 0.0
for value, unit in matches:
num = float(value)
if unit == "d":
total_minutes += num * 24 * 60
elif unit == "h":
total_minutes += num * 60
elif unit == "m":
total_minutes += num
elif unit == "s":
total_minutes += num / 60
return round_half_up(total_minutes)
# NOTE: The time_to_seconds function has been re-implemented below to avoid
# deprecation warnings from pandas.to_timedelta when parsing SLURM time formats
def time_to_seconds(time_str):
"""
Convert SLURM sacct time format to seconds.
Handles sacct output formats:
- Elapsed: [D-]HH:MM:SS or [DD-]HH:MM:SS (no fractional seconds)
- TotalCPU: [D-][HH:]MM:SS or [DD-][HH:]MM:SS (with fractional seconds)
Examples:
- "1-12:30:45" -> 131445 seconds (1 day + 12h 30m 45s)
- "23:59:59" -> 86399 seconds
- "45:30" -> 2730 seconds (45 minutes 30 seconds)
- "30.5" -> 30.5 seconds (fractional seconds for TotalCPU)
NOTE: This function uses manual string parsing to avoid deprecation warnings
from datetime.strptime() when parsing dates without a year (Python 3.15+).
"""
# Handle pandas NA values
try:
import pandas as pd
if pd.isna(time_str):
return 0
except (ImportError, TypeError):
pass
time_str = str(time_str).strip()
if time_str == "" or time_str == "invalid" or time_str.startswith("-"):
return 0
# Parse SLURM time formats manually to avoid deprecation warnings
total_seconds = 0.0
# Check for days-hours:minutes:seconds format (D-HH:MM:SS or D-HH:MM:SS.fff)
if "-" in time_str:
parts = time_str.split("-", 1)
try:
days = int(parts[0])
total_seconds += days * 86400
time_str = parts[1] # Continue parsing the rest as HH:MM:SS or similar
except ValueError:
pass # Not a valid day format, continue with other parsing
# Now parse the time portion (HH:MM:SS, MM:SS, or SS format)
if ":" in time_str:
time_parts = time_str.split(":")
try:
if len(time_parts) == 3: # HH:MM:SS or HH:MM:SS.fff
hours = int(time_parts[0])
minutes = int(time_parts[1])
seconds = float(time_parts[2])
total_seconds += hours * 3600 + minutes * 60 + seconds
return total_seconds
elif len(time_parts) == 2: # MM:SS or MM:SS.fff
minutes = int(time_parts[0])
seconds = float(time_parts[1])
total_seconds += minutes * 60 + seconds
return total_seconds
except ValueError:
pass # Continue to next format attempt
# Try parsing as pure seconds (with possible fractional part)
try:
seconds = float(time_str)
total_seconds += seconds
return total_seconds
except ValueError:
pass
return 0 # If all parsing attempts fail, return 0
def delete_slurm_environment():
"""
Function to delete all environment variables
starting with 'SLURM_'. The parent shell will
still have this environment. This is needed to
submit within a SLURM job context to avoid
conflicting environments.
"""
for var in os.environ:
if var.startswith("SLURM_"):
del os.environ[var]
def delete_empty_dirs(path: Path) -> None:
"""
Function to delete all empty directories in a given path.
This is needed to clean up the working directory after
a job has sucessfully finished. This function is needed because
the shutil.rmtree() function does not delete empty
directories.
"""
if not path.is_dir():
return
# Process subdirectories first (bottom-up)
for child in path.iterdir():
if child.is_dir():
delete_empty_dirs(child)
try:
# Check if directory is now empty after processing children
if not any(path.iterdir()):
path.rmdir()
except (OSError, FileNotFoundError) as e:
# Provide more context in the error message
raise OSError(f"Failed to remove empty directory {path}: {e}") from e
def set_gres_string(job: JobExecutorInterface) -> str:
"""
Function to set the gres string for the SLURM job
based on the resources requested in the job.
"""
# generic resources (GRES) arguments can be of type
# "string:int" or "string:string:int"
gres_re = re.compile(r"^[a-zA-Z0-9_]+(:[a-zA-Z0-9_\.]+)?:\d+$")
# gpu model arguments can be of type "string"
# The model string may contain a dot for variants, see
# https://github.com/snakemake/snakemake-executor-plugin-slurm/issues/387
gpu_model_re = re.compile(r"^[a-zA-Z0-9_\.]+$")
# any arguments should not start and end with ticks or
# quotation marks:
string_check = re.compile(r"^[^'\"].*[^'\"]$")
# The Snakemake resources can be only be of type "int",
# hence no further regex is needed.
gpu_string = None
if job.resources.get("gpu"):
gpu_string = str(job.resources.get("gpu"))
gpu_model = None
if job.resources.get("gpu_model"):
gpu_model = job.resources.get("gpu_model")
# ensure that gres is not set, if gpu and gpu_model are set
if job.resources.get("gres") and gpu_string:
raise WorkflowError(
"GRES and GPU are set. Please only set one of them.", rule=job.rule
)
elif not job.resources.get("gres") and not gpu_model and not gpu_string:
return ""
if job.resources.get("gres"):
# Validate GRES format (e.g., "gpu:1", "gpu:tesla:2")
gres = job.resources.gres
if not gres_re.match(gres):
if not string_check.match(gres):
raise WorkflowError(
"GRES format should not be a nested string (start "
"and end with ticks or quotation marks). "
"Expected format: "
"'<name>:<number>' or '<name>:<type>:<number>' "
"(e.g., 'gpu:1' or 'gpu:tesla:2')"
)
else:
raise WorkflowError(
f"Invalid GRES format: {gres}. Expected format: "
"'<name>:<number>' or '<name>:<type>:<number>' "
"(e.g., 'gpu:1' or 'gpu:tesla:2')"
)
return f" --gres={job.resources.gres}"
if gpu_model and gpu_string:
# validate GPU model format
if not gpu_model_re.match(gpu_model):
if not string_check.match(gpu_model):
raise WorkflowError(
"GPU model format should not be a nested string (start "
"and end with ticks or quotation marks). "
"Expected format: '<name>' (e.g., 'tesla')"
)
else:
raise WorkflowError(
f"Invalid GPU model format: {gpu_model}."
" Expected format: '<name>' (e.g., 'tesla')"
)
return f" --gpus={gpu_model}:{gpu_string}"
elif gpu_model and not gpu_string:
raise WorkflowError("GPU model is set, but no GPU number is given")
elif gpu_string:
# we assume here, that the validator ensures that the 'gpu_string'
# is an integer
return f" --gpus={gpu_string}"