11"""tee like run implementation."""
22
3+ from __future__ import annotations
4+
35import asyncio
46import os
57import platform
6- import subprocess
8+ import subprocess # noqa: S404
79import sys
810from asyncio import StreamReader
9- from importlib .metadata import PackageNotFoundError , version # type: ignore
11+ from importlib .metadata import PackageNotFoundError , version
12+ from pathlib import Path
1013from shlex import join
11- from typing import TYPE_CHECKING , Any , Callable , Dict , List , Optional , Union
14+ from typing import TYPE_CHECKING , Any
1215
1316try :
1417 __version__ = version ("subprocess-tee" )
1518except PackageNotFoundError : # pragma: no branch
1619 __version__ = "0.1.dev1"
1720
18- __all__ = ["run " , "CompletedProcess " , "__version__ " ]
21+ __all__ = ["CompletedProcess " , "__version__ " , "run " ]
1922
2023if TYPE_CHECKING :
2124 CompletedProcess = subprocess .CompletedProcess [Any ] # pylint: disable=E1136
25+ from collections .abc import Callable
2226else :
2327 CompletedProcess = subprocess .CompletedProcess
2428
25-
2629STREAM_LIMIT = 2 ** 23 # 8MB instead of default 64kb, override it if you need
2730
2831
@@ -35,18 +38,19 @@ async def _read_stream(stream: StreamReader, callback: Callable[..., Any]) -> No
3538 break
3639
3740
38- async def _stream_subprocess (
39- args : Union [str , List [str ]], ** kwargs : Any
41+ async def _stream_subprocess ( # noqa: C901
42+ args : str | list [str ],
43+ ** kwargs : Any ,
4044) -> CompletedProcess :
41- platform_settings : Dict [str , Any ] = {}
45+ platform_settings : dict [str , Any ] = {}
4246 if platform .system () == "Windows" :
4347 platform_settings ["env" ] = os .environ
4448
4549 # this part keeps behavior backwards compatible with subprocess.run
4650 tee = kwargs .get ("tee" , True )
4751 stdout = kwargs .get ("stdout" , sys .stdout )
4852
49- with open (os .devnull , "w" , encoding = "UTF-8" ) as devnull :
53+ with Path (os .devnull ). open ( "w" , encoding = "UTF-8" ) as devnull :
5054 if stdout == subprocess .DEVNULL or not tee :
5155 stdout = devnull
5256 stderr = kwargs .get ("stderr" , sys .stderr )
@@ -85,31 +89,31 @@ async def _stream_subprocess(
8589 stderr = asyncio .subprocess .PIPE ,
8690 ** platform_settings ,
8791 )
88- out : List [str ] = []
89- err : List [str ] = []
92+ out : list [str ] = []
93+ err : list [str ] = []
9094
91- def tee_func (line : bytes , sink : List [str ], pipe : Optional [ Any ] ) -> None :
95+ def tee_func (line : bytes , sink : list [str ], pipe : Any | None ) -> None :
9296 line_str = line .decode ("utf-8" ).rstrip ()
9397 sink .append (line_str )
9498 if not kwargs .get ("quiet" , False ):
9599 if pipe and hasattr (pipe , "write" ):
96100 print (line_str , file = pipe )
97101 else :
98- print (line_str )
102+ print (line_str ) # noqa: T201
99103
100104 loop = asyncio .get_running_loop ()
101105 tasks = []
102106 if process .stdout :
103107 tasks .append (
104108 loop .create_task (
105- _read_stream (process .stdout , lambda x : tee_func (x , out , stdout ))
106- )
109+ _read_stream (process .stdout , lambda x : tee_func (x , out , stdout )),
110+ ),
107111 )
108112 if process .stderr :
109113 tasks .append (
110114 loop .create_task (
111- _read_stream (process .stderr , lambda x : tee_func (x , err , stderr ))
112- )
115+ _read_stream (process .stderr , lambda x : tee_func (x , err , stderr )),
116+ ),
113117 )
114118
115119 await asyncio .wait (set (tasks ))
@@ -132,32 +136,39 @@ def tee_func(line: bytes, sink: List[str], pipe: Optional[Any]) -> None:
132136 )
133137
134138
135- def run (args : Union [ str , List [str ] ], ** kwargs : Any ) -> CompletedProcess :
139+ def run (args : str | list [str ], ** kwargs : Any ) -> CompletedProcess :
136140 """Drop-in replacement for subprocess.run that behaves like tee.
137141
138142 Extra arguments added by our version:
139143 echo: False - Prints command before executing it.
140144 quiet: False - Avoid printing output
145+
146+ Returns:
147+ CompletedProcess: ...
148+
149+ Raises:
150+ CalledProcessError: ...
151+
141152 """
142- if isinstance (args , str ):
143- cmd = args
144- else :
145- # run was called with a list instead of a single item but asyncio
146- # create_subprocess_shell requires command as a single string, so
147- # we need to convert it to string
148- cmd = join (args )
153+ # run was called with a list instead of a single item but asyncio
154+ # create_subprocess_shell requires command as a single string, so
155+ # we need to convert it to string
156+ cmd = args if isinstance (args , str ) else join (args )
149157
150158 check = kwargs .get ("check" , False )
151159
152160 if kwargs .get ("echo" , False ):
153- print (f"COMMAND: { cmd } " )
161+ print (f"COMMAND: { cmd } " ) # noqa: T201
154162
155163 result = asyncio .run (_stream_subprocess (args , ** kwargs ))
156164 # we restore original args to mimic subproces.run()
157165 result .args = args
158166
159167 if check and result .returncode != 0 :
160168 raise subprocess .CalledProcessError (
161- result .returncode , args , output = result .stdout , stderr = result .stderr
169+ result .returncode ,
170+ args ,
171+ output = result .stdout ,
172+ stderr = result .stderr ,
162173 )
163174 return result
0 commit comments