Skip to content

Commit bc9ebbf

Browse files
committed
allow redirect stdout/err to specific file
1 parent df861ff commit bc9ebbf

File tree

7 files changed

+111
-25
lines changed

7 files changed

+111
-25
lines changed

executor/engine/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from .core import Engine, EngineSetting
22
from .job import LocalJob, ThreadJob, ProcessJob
33

4-
__version__ = '0.3.2'
4+
__version__ = '0.3.3'
55

66
__all__ = [
77
'Engine', 'EngineSetting',

executor/engine/job/base.py

Lines changed: 10 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def __init__(
8383
name: T.Optional[str] = None,
8484
condition: T.Optional[Condition] = None,
8585
wait_time_delta: float = 0.01,
86-
redirect_out_err: bool = False,
86+
redirect_out_err: bool | str = False,
8787
change_dir: bool = False,
8888
**attrs
8989
) -> None:
@@ -103,7 +103,8 @@ def __init__(
103103
wait_time_delta: The time delta between each
104104
check of the condition.
105105
redirect_out_err: Whether to redirect the stdout
106-
and stderr to the log.
106+
and stderr to the log. If is a string, it will be
107+
used as the path of the log file.
107108
change_dir: Whether to change the working directory
108109
to the log directory.
109110
**attrs: The attributes of the job.
@@ -255,9 +256,13 @@ def process_func(self):
255256
change the dir, redirect the stdout and stderr
256257
before the actual run."""
257258
cache_dir = self.cache_dir.resolve()
258-
if self.redirect_out_err and (not isinstance(self.func, CaptureOut)):
259-
path_stdout = cache_dir / 'stdout.txt'
260-
path_stderr = cache_dir / 'stderr.txt'
259+
if (self.redirect_out_err is not False) and (not isinstance(self.func, CaptureOut)):
260+
if isinstance(self.redirect_out_err, str):
261+
path_stdout = Path(self.redirect_out_err)
262+
path_stderr = Path(self.redirect_out_err)
263+
else:
264+
path_stdout = cache_dir / 'stdout.txt'
265+
path_stderr = cache_dir / 'stderr.txt'
261266
self.func = CaptureOut(self.func, path_stdout, path_stderr)
262267
if self.change_dir:
263268
self.func = ChDir(self.func, cache_dir)

executor/engine/job/extend/subprocess.py

Lines changed: 21 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -112,16 +112,30 @@ def record_command():
112112
pkwargs = popen_kwargs or {}
113113
pkwargs['cwd'] = target_dir
114114

115-
if self.redirect_out_err:
116-
path_stdout = cache_dir / 'stdout.txt'
117-
path_stderr = cache_dir / 'stderr.txt'
115+
if self.redirect_out_err is not False:
116+
if isinstance(self.redirect_out_err, str):
117+
path_stdout = Path(self.redirect_out_err)
118+
if not path_stdout.parent.exists():
119+
path_stdout.parent.mkdir(parents=True, exist_ok=True)
120+
path_stdout = self.redirect_out_err
121+
path_stderr = self.redirect_out_err
122+
else:
123+
path_stdout = cache_dir / 'stdout.txt'
124+
path_stderr = cache_dir / 'stderr.txt'
118125

119126
def _run_cmd(runner: ProcessRunner): # pragma: no cover
120127
runner.run(**pkwargs)
121-
with open(path_stdout, 'w') as fo, \
122-
open(path_stderr, 'w') as fe:
123-
retcode = runner.write_stream_until_stop(
124-
fo, fe, flush_streams_each_time=True)
128+
if path_stdout == path_stderr:
129+
fo = open(path_stdout, 'a')
130+
fe = fo
131+
else:
132+
fo = open(path_stdout, 'a')
133+
fe = open(path_stderr, 'a')
134+
retcode = runner.write_stream_until_stop(
135+
fo, fe, flush_streams_each_time=True)
136+
fo.close()
137+
if path_stdout != path_stderr:
138+
fe.close()
125139
return retcode
126140
else:
127141
def _run_cmd(runner: ProcessRunner):

executor/engine/middle/capture.py

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,10 @@
33
from functools import update_wrapper
44
from pathlib import Path
55
import traceback
6+
import loguru
7+
8+
9+
LOGURU_HANDLERS = {}
610

711

812
class Tee(object):
@@ -23,10 +27,18 @@ def flush(self):
2327

2428
def __enter__(self):
2529
setattr(sys, self.stream_type, self)
30+
fname = repr(self.file)
31+
if fname not in LOGURU_HANDLERS:
32+
loguru_handler = loguru.logger.add(self.file)
33+
LOGURU_HANDLERS[fname] = loguru_handler
2634
return self
2735

2836
def __exit__(self, _type, _value, _traceback):
2937
setattr(sys, self.stream_type, self.stream)
38+
fname = repr(self.file)
39+
if fname in LOGURU_HANDLERS:
40+
loguru.logger.remove(LOGURU_HANDLERS[fname])
41+
del LOGURU_HANDLERS[fname]
3042

3143

3244
class CaptureOut(object):
@@ -41,13 +53,26 @@ def __init__(
4153
self.capture_traceback = capture_traceback
4254

4355
def __call__(self, *args, **kwargs) -> T.Any:
44-
with open(self.stdout_file, 'w') as fo, \
45-
open(self.stderr_file, 'w') as fe:
46-
with Tee(fo, 'stdout'), Tee(fe, 'stderr'):
47-
try:
48-
res = self.func(*args, **kwargs)
49-
except Exception as e:
50-
if self.capture_traceback:
51-
traceback.print_exc(file=fe)
52-
raise e
56+
if not self.stdout_file.parent.exists():
57+
self.stdout_file.parent.mkdir(parents=True, exist_ok=True) # pragma: no cover
58+
if not self.stderr_file.parent.exists():
59+
self.stderr_file.parent.mkdir(parents=True, exist_ok=True) # pragma: no cover
60+
61+
if self.stdout_file == self.stderr_file:
62+
outf = open(self.stdout_file, 'a')
63+
errf = outf
64+
else:
65+
outf = open(self.stdout_file, 'a')
66+
errf = open(self.stderr_file, 'a')
67+
with Tee(outf, 'stdout'), Tee(errf, 'stderr'):
68+
try:
69+
res = self.func(*args, **kwargs)
70+
except Exception as e:
71+
if self.capture_traceback:
72+
traceback.print_exc()
73+
raise e
74+
finally:
75+
outf.close()
76+
if self.stdout_file != self.stderr_file:
77+
errf.close()
5378
return res

tests/test_job.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
import os
3+
import shutil
24
import time
35

46
import pytest
@@ -372,3 +374,27 @@ async def async_func(x):
372374
await engine.submit_async(job)
373375
await job.wait_until_status("done")
374376
assert job.result() == 2
377+
378+
379+
@pytest.mark.asyncio
380+
async def test_redirect_out_err():
381+
with Engine() as engine:
382+
def func():
383+
import loguru
384+
logger = loguru.logger
385+
logger.info("test")
386+
logger.error("test2")
387+
import sys
388+
print("test3", file=sys.stderr)
389+
print("test4", file=sys.stdout)
390+
unexist_dir = "tmp/1"
391+
job = ProcessJob(func, redirect_out_err=f"{unexist_dir}/test.log")
392+
await engine.submit_async(job)
393+
await job.wait_until_status("done")
394+
with open(f"{unexist_dir}/test.log", "r") as f:
395+
content = f.read()
396+
assert "test" in content
397+
assert "test2" in content
398+
assert "test3" in content
399+
assert "test4" in content
400+
shutil.rmtree(unexist_dir)

tests/test_subp_job.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
11
import asyncio
2+
import pytest
3+
import shutil
24

35
from executor.engine.core import Engine, EngineSetting
46
from executor.engine.job.extend.subprocess import SubprocessJob
@@ -156,6 +158,20 @@ async def submit_job():
156158
asyncio.run(submit_job())
157159

158160

161+
@pytest.mark.asyncio
162+
async def test_redirect_out_err_str():
163+
engine = Engine()
164+
unexist_dir = "tmp/1"
165+
job = SubprocessJob("python -c 'print(1 + 1); raise ValueError(\"error\")'", redirect_out_err=f"{unexist_dir}/test.log")
166+
await engine.submit_async(job)
167+
await job.join()
168+
with open(f"{unexist_dir}/test.log") as f:
169+
content = f.read()
170+
assert "2" in content
171+
assert "error" in content
172+
#shutil.rmtree(unexist_dir)
173+
174+
159175
def test_cancel():
160176
engine = Engine()
161177

tests/test_webapp_job.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,18 @@ def test_run_webapp():
2222

2323
async def submit_job():
2424
with pytest.raises(NotImplementedError):
25-
job = WebappJob(run_simple_httpd, ip="2.2.2.2", port=8001, check_delta=0.5)
25+
job = WebappJob(run_simple_httpd, ip="2.2.2.2", port=8088, check_delta=0.5)
2626

2727
with pytest.raises(ValueError):
2828
job = WebappJob("python -m http.server -b ip port")
2929

3030
with pytest.raises(TypeError):
3131
job = WebappJob(1)
3232

33-
job = WebappJob(run_simple_httpd, ip="127.0.0.1", port=8001, check_delta=0.5)
33+
job = WebappJob(run_simple_httpd, ip="127.0.0.1", port=8088, check_delta=0.5)
3434
await engine.submit_async(job)
3535
await asyncio.sleep(5)
36-
assert job.port == 8001
36+
assert job.port == 8088
3737
assert job.status == "running"
3838
await job.cancel()
3939
assert job.status == "cancelled"

0 commit comments

Comments
 (0)