Skip to content

Commit 2f99b85

Browse files
committed
🚧 Some SlurmSite logic
xtl.jobs.sites:SlurmLocalSite/SlurmModuleSite - Implementation of .prepare_preamble() xtl.jobs.config2 - New ResourcesConfig class for holding compute resources related options - Added .resources attribute to BatchJobConfig
1 parent b2e0752 commit 2f99b85

File tree

2 files changed

+142
-25
lines changed

2 files changed

+142
-25
lines changed

src/xtl/jobs/config2.py

Lines changed: 80 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
1+
from datetime import timedelta
12
from pathlib import Path
23
from typing import Optional
34

45
from pydantic import PrivateAttr, model_validator, computed_field, field_serializer
56

67
from xtl import settings, Logger
7-
from xtl.common.options import Option
8+
from xtl.common.options import Option, Options
89
from xtl.common.os import FilePermissions
910
from xtl.common.serializers import PermissionOctal
1011
from xtl.common.validators import cast_as_temp_dir_if_none
@@ -16,8 +17,66 @@
1617
logger = Logger(__name__)
1718

1819

20+
class ResourcesConfig(Options):
21+
22+
# TODO: Custom formatters & aliases for SLURM
23+
# TODO: Custom validators for SLURM-like input
24+
cpus: int = \
25+
Option(
26+
default=1, ge=1,
27+
desc='Number of CPU cores required for the job',
28+
alias='cpus-per-task'
29+
)
30+
31+
memory: float = \
32+
Option(
33+
default=1.0, ge=0.0,
34+
desc='Amount of memory (in GB) required for the job',
35+
alias='mem',
36+
formatter=lambda x: f'{x}G'
37+
)
38+
39+
timeout: float | str | timedelta | None = \
40+
Option(
41+
default=None,
42+
desc='Maximum runtime for the job (in minutes or D-HH:MM:SS format)',
43+
alias='time',
44+
# cast_as=...,
45+
# formatter=...
46+
)
47+
48+
gpus: int = \
49+
Option(
50+
default=0, ge=0,
51+
desc='Number of GPUs required for the job',
52+
alias='gpus'
53+
)
54+
55+
no_tasks: int = \
56+
Option(
57+
default=1, ge=1,
58+
desc='Number of tasks required for the job (only used in MPI jobs)',
59+
alias='ntasks'
60+
)
61+
62+
no_nodes: int = \
63+
Option(
64+
default=1, ge=1,
65+
desc='Number of nodes required for the job (only used in MPI jobs)',
66+
alias='nodes'
67+
)
68+
69+
def to_slurm(self) -> list[str]:
70+
args = []
71+
for field, value in self.to_dict(by_alias=True).items():
72+
if value:
73+
args.append(f'--{field}={value}')
74+
return args
75+
76+
1977
class BatchJobConfig(JobConfig):
2078

79+
# Generate a temporary directory if not provided
2180
job_directory: Optional[Path] = Option(
2281
default_factory=lambda: cast_as_temp_dir_if_none(None, prefix='xtl_batch_'),
2382
desc='Directory for job execution and results',
@@ -40,7 +99,7 @@ class BatchJobConfig(JobConfig):
4099
) # for slurm --comment
41100
permissions: FilePermissions | str | int = \
42101
Option(
43-
default=FilePermissions(0o700),
102+
default=settings.jobs.batch.permissions,
44103
desc='Permissions for the batch file in octal format (e.g., 700)',
45104
cast_as=FilePermissions,
46105
formatter=PermissionOctal
@@ -66,6 +125,11 @@ class BatchJobConfig(JobConfig):
66125
default_factory=dict,
67126
desc='Templates for the content of the batch file for different shells'
68127
)
128+
resources: ResourcesConfig = \
129+
Option(
130+
default_factory=ResourcesConfig,
131+
desc='Resources required for the batch job'
132+
)
69133

70134
_shell: Shell | None = PrivateAttr(None)
71135
"""The shell that will be used to execute the batch file"""
@@ -171,3 +235,17 @@ def get_template(self) -> Optional[str]:
171235
Get the template for the content of the batch file for the selected shell.
172236
"""
173237
return self.templates.get(self.shell, None)
238+
239+
def to_slurm(self) -> list[str]:
240+
"""
241+
Convert the batch job configuration to a list of SLURM command-line arguments.
242+
"""
243+
args = []
244+
if self.name:
245+
args.append(f'--job-name={self.name}')
246+
if self.description:
247+
args.append(f'--comment={self.description}')
248+
args.extend(self.resources.to_slurm())
249+
args.append(f'--output={self.stdout}')
250+
args.append(f'--error={self.stderr}')
251+
return args

src/xtl/jobs/sites.py

Lines changed: 62 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
import aiofiles
1212

1313
if TYPE_CHECKING:
14+
from xtl.jobs.config2 import BatchJobConfig
1415
from xtl.config.settings import DependencySettings
1516
from xtl.common.compatibility import PY310_OR_LESS, XTL_COMPUTE_SITE
1617
from xtl.jobs.batchfiles import BatchFile, BatchFileStatus
@@ -191,6 +192,12 @@ def execute_command(command: Sequence[str], timeout: int = 20) \
191192
async def execute_batch(self, batch: BatchFile, **kwargs):
192193
...
193194

195+
@staticmethod
196+
def _prepare_shebang(shell: ShellType) -> str:
197+
if shell.shebang:
198+
return f'{shell.shebang}{shell.new_line_char}'
199+
return ''
200+
194201

195202
class LocalSite(BaseComputeSite):
196203
"""
@@ -202,7 +209,8 @@ def prepare_preamble(self, dependencies: Iterable[DependencySettings | str]
202209
| DependencySettings | str
203210
| None,
204211
shell: ShellType) -> str:
205-
return self.policy.intercept_preamble('') if self.policy else ''
212+
preamble = self._prepare_shebang(shell)
213+
return self.policy.intercept_preamble(preamble) if self.policy else preamble
206214

207215
def prepare_command(self, command: str) -> str:
208216
return self.policy.intercept_command(command) if self.policy else command
@@ -361,7 +369,6 @@ async def _log_stream_to_file(stream: asyncio.StreamReader,
361369
await f.close()
362370

363371

364-
365372
class ModulesSite(LocalSite):
366373
"""
367374
A compute site that uses
@@ -400,6 +407,16 @@ def _load_modules(modules: str | Iterable[str], shell: ShellType) -> str:
400407
cmd = f'call {cmd}'
401408
return cmd
402409

410+
def _prepare_modules_preamble(self, dependencies: Iterable[DependencySettings | str]
411+
| DependencySettings | str
412+
| None,
413+
shell: ShellType) -> str:
414+
preamble = self._purge_modules(shell=shell)
415+
for dep in self.resolve_dependencies(dependencies):
416+
if dep.modules:
417+
preamble += self._load_modules(modules=dep.modules, shell=shell)
418+
return preamble
419+
403420
def prepare_preamble(self, dependencies: Iterable[DependencySettings | str]
404421
| DependencySettings | str
405422
| None,
@@ -417,10 +434,9 @@ def prepare_preamble(self, dependencies: Iterable[DependencySettings | str]
417434
:param shell: The shell type to generate the commands for.
418435
:return: The preamble string.
419436
"""
420-
preamble = self._purge_modules(shell=shell)
421-
for dep in self.resolve_dependencies(dependencies):
422-
if dep.modules:
423-
preamble += self._load_modules(modules=dep.modules, shell=shell)
437+
preamble = self._prepare_shebang(shell)
438+
preamble += self._prepare_modules_preamble(dependencies=dependencies,
439+
shell=shell)
424440
return self.policy.intercept_preamble(preamble) if self.policy else preamble
425441

426442
def check_dependencies(self, dependencies: Iterable[DependencySettings | str]
@@ -506,12 +522,28 @@ class SlurmSite(SchedulerSite, ABC):
506522
Abstract base class for compute sites that utilize the
507523
`SLURM <https://slurm.schedmd.com/>`_ job scheduler.
508524
"""
525+
_default_shell = Shell.BASH
526+
_supported_shells = frozenset([Shell.BASH])
509527

510-
def prepare_preamble(self, dependencies: Iterable[DependencySettings | str]
511-
| DependencySettings | str
512-
| None,
513-
shell: ShellType) -> str:
514-
# SBATCH preamble
528+
@staticmethod
529+
def _prepare_slurm_preamble(shell: ShellType,
530+
config: BatchJobConfig | dict = None) -> str:
531+
from xtl.jobs.config2 import BatchJobConfig
532+
533+
args = []
534+
if config:
535+
if not isinstance(config, BatchJobConfig):
536+
config = BatchJobConfig(**config)
537+
args.extend(config.to_slurm())
538+
539+
if args:
540+
nl = shell.new_line_char
541+
return nl.join(f'#SBATCH {arg}' for arg in args) + nl
542+
return ''
543+
544+
async def validate_submission(self, batch: BatchFile, **kwargs) -> bool:
545+
# Run `sbatch --test-only <batchfile>` to check if allocation is possible
546+
# and check the exit code
515547
raise NotImplementedError()
516548

517549
async def schedule_batch(self, batch: BatchFile, **kwargs) -> str:
@@ -536,12 +568,14 @@ class SlurmLocalSite(SlurmSite, LocalSite):
536568
def prepare_preamble(self, dependencies: Iterable[DependencySettings | str]
537569
| DependencySettings | str
538570
| None,
539-
shell: ShellType) -> str:
540-
slurm_preamble = SlurmSite.prepare_preamble(self, dependencies=dependencies,
541-
shell=shell)
542-
local_preamble = LocalSite.prepare_preamble(self, dependencies=dependencies,
543-
shell=shell)
544-
raise NotImplementedError()
571+
shell: ShellType,
572+
config: BatchJobConfig | dict = None,
573+
**kwargs) -> str:
574+
preamble = self._prepare_shebang(shell=shell)
575+
# SBATCH preamble
576+
if config:
577+
preamble += self._prepare_slurm_preamble(shell=shell, config=config)
578+
return self.policy.intercept_preamble(preamble) if self.policy else preamble
545579

546580

547581
class SlurmModulesSite(SlurmSite, ModulesSite):
@@ -554,12 +588,17 @@ class SlurmModulesSite(SlurmSite, ModulesSite):
554588
def prepare_preamble(self, dependencies: Iterable[DependencySettings | str]
555589
| DependencySettings | str
556590
| None,
557-
shell: ShellType) -> str:
558-
slurm_preamble = SlurmSite.prepare_preamble(self, dependencies=dependencies,
559-
shell=shell)
560-
modules_preamble = ModulesSite.prepare_preamble(self, dependencies=dependencies,
561-
shell=shell)
562-
raise NotImplementedError()
591+
shell: ShellType,
592+
config: BatchJobConfig | dict = None,
593+
**kwargs) -> str:
594+
preamble = self._prepare_shebang(shell=shell)
595+
# SBATCH preamble
596+
if config:
597+
preamble += self._prepare_slurm_preamble(shell=shell, config=config)
598+
# Modules preamble
599+
preamble += self._prepare_modules_preamble(dependencies=dependencies,
600+
shell=shell)
601+
return self.policy.intercept_preamble(preamble) if self.policy else preamble
563602

564603

565604
class VirtualizationSite(BaseComputeSite, ABC): ...

0 commit comments

Comments
 (0)