Skip to content

Commit d7ec1cc

Browse files
committed
overhaul how number of requested cores is handled
1 parent ebdbed0 commit d7ec1cc

File tree

3 files changed

+28
-10
lines changed

3 files changed

+28
-10
lines changed

mona/dirtask.py

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
# License, v. 2.0. If a copy of the MPL was not distributed with this
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44
import logging
5+
import os
56
import subprocess
67
from pathlib import Path
78
from tempfile import TemporaryDirectory
@@ -151,14 +152,19 @@ async def dir_task(exe: File, inputs: List[DirtaskInput]) -> Dict[str, File]:
151152
input_names = {
152153
str(inp if isinstance(inp, File) else inp[0]) for inp in [exe, *inputs]
153154
}
155+
ncores = Session.active().running_task.storage.get('ncores')
154156
dirtask_tmpdir = DirtaskTmpdir(lambda p: p not in input_names)
155157
with dirtask_tmpdir as tmpdir:
156158
checkout_files(tmpdir, exe, inputs)
157159
out_path, err_path = tmpdir / 'STDOUT', tmpdir / 'STDERR'
158160
try:
159161
with out_path.open('w') as stdout, err_path.open('w') as stderr:
160162
await run_process(
161-
str(tmpdir / exe.path), stdout=stdout, stderr=stderr, cwd=tmpdir
163+
str(tmpdir / exe.path),
164+
stdout=stdout,
165+
stderr=stderr,
166+
cwd=tmpdir,
167+
ncores=ncores,
162168
)
163169
except subprocess.CalledProcessError as e:
164170
if dirtask_tmpdir.has_tmpdir_manager():

mona/plugins/parallel.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -105,11 +105,16 @@ async def _acquire(self, ncores: int) -> AsyncGenerator[None, None]:
105105

106106
async def _run_coro(self, corofunc: Corofunc[_T], *args: Any, **kwargs: Any) -> _T:
107107
task = Session.active().running_task
108-
n = cast(int, task.storage.get('ncores', 1))
108+
n: Optional[int] = kwargs.get('ncores')
109+
if n is not None:
110+
if n == -1:
111+
n = self._ncores
112+
kwargs['ncores'] = n
113+
else:
114+
n = 1
109115
if n > self._available:
110116
log.debug(
111-
f'Waiting for {n-self._available}/{n} '
112-
f'unavailable cores for "{task}"'
117+
f'Waiting for {n-self._available}/{n} unavailable cores for {task}'
113118
)
114119
waited = True
115120
else:

mona/runners.py

Lines changed: 13 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,15 @@
33
# file, You can obtain one at http://mozilla.org/MPL/2.0/.
44
import asyncio
55
import logging
6+
import os
67
import subprocess
78
from typing import Any, Callable, Optional, Tuple, TypeVar, Union
89
from typing_extensions import Protocol, runtime
910

1011
from .sessions import Session
1112
from .tasks import Corofunc
1213

13-
__version__ = '0.1.0'
14+
__version__ = '0.1.1'
1415
__all__ = ['run_shell', 'run_process', 'run_thread']
1516

1617
log = logging.getLogger(__name__)
@@ -33,14 +34,15 @@ def _scheduler() -> Optional[Scheduler]:
3334
return scheduler
3435

3536

36-
async def run_shell(cmd: str, **kwargs: Any) -> ProcessOutput:
37+
async def run_shell(cmd: str, ncores: int = None, **kwargs: Any) -> ProcessOutput:
3738
"""Execute a command in a shell.
3839
3940
Wrapper around :func:`asyncio.create_subprocess_shell` that handles errors
4041
and whose behavior can be modified by session plugins.
4142
4243
:param str cmd: a shell command to be executed
43-
:param kwargs: all keyword arguments are passed to
44+
:param int ncores: number of cores that should be taken by the process
45+
:param kwargs: all other keyword arguments are passed to
4446
:func:`~asyncio.create_subprocess_shell`.
4547
:data:`~subprocess.PIPE` is passed to `stdin` and `stdout`
4648
keyword arguments by default.
@@ -56,14 +58,15 @@ async def run_shell(cmd: str, **kwargs: Any) -> ProcessOutput:
5658
return await _run_process(cmd, **kwargs)
5759

5860

59-
async def run_process(*args: str, **kwargs: Any) -> ProcessOutput:
61+
async def run_process(*args: str, ncores: int = None, **kwargs: Any) -> ProcessOutput:
6062
"""Create a subprocess.
6163
6264
Wrapper around :func:`asyncio.create_subprocess_exec` that handles errors
6365
and whose behavior can be modified by session plugins.
6466
6567
:param str args: arguments of the subprocess
66-
:param kwargs: all keyword arguments are passed to
68+
:param int ncores: number of cores that should be taken by the process
69+
:param kwargs: all other keyword arguments are passed to
6770
:func:`~asyncio.create_subprocess_exec`.
6871
:data:`~subprocess.PIPE` is passed to `stdin` and `stdout`
6972
keyword arguments by default.
@@ -73,18 +76,22 @@ async def run_process(*args: str, **kwargs: Any) -> ProcessOutput:
7376
"""
7477
scheduler = _scheduler()
7578
if scheduler:
76-
return await scheduler(_run_process, args, **kwargs)
79+
return await scheduler(_run_process, args, ncores=ncores, **kwargs)
7780
return await _run_process(args, **kwargs)
7881

7982

8083
async def _run_process(
8184
args: Union[str, Tuple[str, ...]],
8285
shell: bool = False,
8386
input: bytes = None,
87+
ncores: int = None,
8488
**kwargs: Any,
8589
) -> Union[bytes, Tuple[bytes, bytes]]:
8690
kwargs.setdefault('stdin', subprocess.PIPE)
8791
kwargs.setdefault('stdout', subprocess.PIPE)
92+
kwargs.setdefault('env', os.environ.copy())
93+
if ncores is not None:
94+
kwargs['env']['MONA_NCORES'] = str(ncores)
8895
if shell:
8996
assert isinstance(args, str)
9097
proc = await asyncio.create_subprocess_shell(args, **kwargs)

0 commit comments

Comments
 (0)