Skip to content

Commit 4289683

Browse files
committed
[Feature] Use seperate log files for different workers
1 parent 8f4f45e commit 4289683

File tree

5 files changed

+21
-5
lines changed

5 files changed

+21
-5
lines changed

src/executorlib/standalone/interactive/spawner.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ def __init__(
1111
self,
1212
cwd: Optional[str] = None,
1313
cores: int = 1,
14+
worker_id: int = 0,
1415
openmpi_oversubscribe: bool = False,
1516
):
1617
"""
@@ -20,9 +21,11 @@ def __init__(
2021
cwd (str): The current working directory.
2122
cores (int, optional): The number of cores to use. Defaults to 1.
2223
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
24+
worker_id (int): The worker ID. Defaults to 0.
2325
"""
2426
self._cwd = cwd
2527
self._cores = cores
28+
self._worker_id = worker_id
2629
self._openmpi_oversubscribe = openmpi_oversubscribe
2730

2831
@abstractmethod
@@ -69,6 +72,7 @@ def __init__(
6972
self,
7073
cwd: Optional[str] = None,
7174
cores: int = 1,
75+
worker_id: int = 0,
7276
openmpi_oversubscribe: bool = False,
7377
threads_per_core: int = 1,
7478
):
@@ -79,11 +83,13 @@ def __init__(
7983
cwd (str, optional): The current working directory. Defaults to None.
8084
cores (int, optional): The number of cores to use. Defaults to 1.
8185
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
86+
worker_id (int): The worker ID. Defaults to 0.
8287
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
8388
"""
8489
super().__init__(
8590
cwd=cwd,
8691
cores=cores,
92+
worker_id=worker_id,
8793
openmpi_oversubscribe=openmpi_oversubscribe,
8894
)
8995
self._process: Optional[subprocess.Popen] = None

src/executorlib/task_scheduler/interactive/blockallocation.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -244,7 +244,7 @@ def _execute_multiple_tasks(
244244
command_lst=get_interactive_execute_command(
245245
cores=cores,
246246
),
247-
connections=spawner(cores=cores, **kwargs),
247+
connections=spawner(cores=cores, worker_id=worker_id, **kwargs),
248248
hostname_localhost=hostname_localhost,
249249
log_obj_size=log_obj_size,
250250
worker_id=worker_id,

src/executorlib/task_scheduler/interactive/spawner_flux.py

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ class FluxPythonSpawner(BaseSpawner):
3131
threads_per_core (int, optional): The number of threads per base. Defaults to 1.
3232
gpus_per_core (int, optional): The number of GPUs per base. Defaults to 0.
3333
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
34+
worker_id (int): The worker ID. Defaults to 0.
3435
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to
3536
False.
3637
openmpi_oversubscribe (bool, optional): Whether to oversubscribe. Defaults to False.
@@ -49,6 +50,7 @@ def __init__(
4950
threads_per_core: int = 1,
5051
gpus_per_core: int = 0,
5152
num_nodes: Optional[int] = None,
53+
worker_id: int = 0,
5254
exclusive: bool = False,
5355
priority: Optional[int] = None,
5456
openmpi_oversubscribe: bool = False,
@@ -60,6 +62,7 @@ def __init__(
6062
super().__init__(
6163
cwd=cwd,
6264
cores=cores,
65+
worker_id=worker_id,
6366
openmpi_oversubscribe=openmpi_oversubscribe,
6467
)
6568
self._threads_per_core = threads_per_core
@@ -121,12 +124,13 @@ def bootup(
121124
if self._cwd is not None:
122125
jobspec.cwd = self._cwd
123126
os.makedirs(self._cwd, exist_ok=True)
127+
file_prefix = "flux_" + str(self._worker_id)
124128
if self._flux_log_files and self._cwd is not None:
125-
jobspec.stderr = os.path.join(self._cwd, "flux.err")
126-
jobspec.stdout = os.path.join(self._cwd, "flux.out")
129+
jobspec.stderr = os.path.join(self._cwd, file_prefix + ".err")
130+
jobspec.stdout = os.path.join(self._cwd, file_prefix + ".out")
127131
elif self._flux_log_files:
128-
jobspec.stderr = os.path.abspath("flux.err")
129-
jobspec.stdout = os.path.abspath("flux.out")
132+
jobspec.stderr = os.path.abspath(file_prefix + ".err")
133+
jobspec.stdout = os.path.abspath(file_prefix + ".out")
130134
if self._priority is not None:
131135
self._future = self._flux_executor.submit(
132136
jobspec=jobspec, urgency=self._priority

src/executorlib/task_scheduler/interactive/spawner_pysqa.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ def __init__(
2121
threads_per_core: int = 1,
2222
gpus_per_core: int = 0,
2323
num_nodes: Optional[int] = None,
24+
worker_id: int = 0,
2425
exclusive: bool = False,
2526
openmpi_oversubscribe: bool = False,
2627
slurm_cmd_args: Optional[list[str]] = None,
@@ -38,6 +39,7 @@ def __init__(
3839
threads_per_core (int): The number of threads per core. Defaults to 1.
3940
gpus_per_core (int): number of GPUs per worker - defaults to 0
4041
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
42+
worker_id (int): The worker ID. Defaults to 0.
4143
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults
4244
to False.
4345
openmpi_oversubscribe (bool): Whether to oversubscribe the cores. Defaults to False.
@@ -49,6 +51,7 @@ def __init__(
4951
super().__init__(
5052
cwd=cwd,
5153
cores=cores,
54+
worker_id=worker_id,
5255
openmpi_oversubscribe=openmpi_oversubscribe,
5356
)
5457
self._threads_per_core = threads_per_core

src/executorlib/task_scheduler/interactive/spawner_slurm.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@ def __init__(
2727
threads_per_core: int = 1,
2828
gpus_per_core: int = 0,
2929
num_nodes: Optional[int] = None,
30+
worker_id: int = 0,
3031
exclusive: bool = False,
3132
openmpi_oversubscribe: bool = False,
3233
slurm_cmd_args: Optional[list[str]] = None,
@@ -41,6 +42,7 @@ def __init__(
4142
threads_per_core (int, optional): The number of threads per core. Defaults to 1.
4243
gpus_per_core (int, optional): The number of GPUs per core. Defaults to 0.
4344
num_nodes (int, optional): The number of compute nodes to use for executing the task. Defaults to None.
45+
worker_id (int): The worker ID. Defaults to 0.
4446
exclusive (bool): Whether to exclusively reserve the compute nodes, or allow sharing compute notes. Defaults to False.
4547
openmpi_oversubscribe (bool, optional): Whether to oversubscribe the cores. Defaults to False.
4648
slurm_cmd_args (list[str], optional): Additional command line arguments. Defaults to [].
@@ -49,6 +51,7 @@ def __init__(
4951
super().__init__(
5052
cwd=cwd,
5153
cores=cores,
54+
worker_id=worker_id,
5255
openmpi_oversubscribe=openmpi_oversubscribe,
5356
threads_per_core=threads_per_core,
5457
)

0 commit comments

Comments
 (0)