@@ -142,18 +142,6 @@ def find_on_path(start_path: Path, file: Path) -> Optional[Path]:
142142 start_path = next_path
143143 return None
144144
145- def filter_bytes (
146- program : Path , args : list [bytes ], input : bytes
147- ) -> bytes :
148- debug (f"Running { program } { b' ' .join (args )} " )
149- res = subprocess .run (
150- [program .resolve (strict = True )] + args ,
151- capture_output = True ,
152- check = True ,
153- input = input ,
154- )
155- return res .stdout
156-
157145 # Set up macros
158146 macros : dict [bytes , Callable [..., bytes ]] = {}
159147 macros [b"path" ] = lambda _args , _external_args : bytes (base_file )
@@ -164,65 +152,117 @@ def filter_bytes(
164152 else b""
165153 )
166154
167- # Find the given file and read it, optionally filtering it
168- # through a command, and return the output, with the file
169- # name actually read, so as to exclude it from recursive
170- # expansion.
171- def read_filtered_file (
172- command_name : str ,
173- args : list [bytes ],
174- external_args : Optional [list [bytes ]],
155+ # Try to find the given file. If it is found, return its
156+ # contents, with the file name actually read, so as to
157+ # exclude it from recursive expansion; otherwise, return
158+ # `None` and an empty bytes.
159+ def read_file (
160+ basename : Path ,
175161 ) -> tuple [Optional [Path ], bytes ]:
176- debug (f"${ command_name } {{{ b',' .join (args )} }}" )
177- if len (args ) < 1 and (
178- external_args is None or len (external_args ) < 1
179- ):
162+ file = find_on_path (base_file .parent , basename )
163+ if file is None :
180164 raise ValueError (
181- f"${ command_name } expects at least one argument"
182- )
183- file = None
184- output = b""
185- if len (args ) > 0 :
186- basename = os .fsdecode (args [0 ])
187- file = find_on_path (base_file .parent , Path (basename ))
188- if file is None :
189- raise ValueError (
190- f"cannot find '{ basename } ' while expanding '{ base_file .parent } '"
191- )
192- with open (file , "rb" ) as fh :
193- output = fh .read ()
194- if external_args is not None :
195- exe_name = Path (os .fsdecode (external_args [0 ]))
196- exe_path = find_on_path (base_file .parent , exe_name )
197- if exe_path is None :
198- exe_path_str = shutil .which (exe_name )
199- if exe_path_str is None :
200- raise ValueError (f"cannot find program '{ exe_name } '" )
201- exe_path = Path (exe_path_str )
202- output = filter_bytes (
203- exe_path .resolve (strict = True ),
204- external_args [1 :],
205- output ,
165+ f"cannot find '{ basename } ' while expanding '{ base_file .parent } '"
206166 )
167+ with open (file , "rb" ) as fh :
168+ output = fh .read ()
207169 return (file , output )
208170
171+ def filter_bytes (
172+ input : bytes ,
173+ external_args : list [bytes ],
174+ ):
175+ exe_name = Path (os .fsdecode (external_args [0 ]))
176+ exe_path = find_on_path (base_file .parent , exe_name )
177+ if exe_path is None :
178+ exe_path_str = shutil .which (exe_name )
179+ if exe_path_str is None :
180+ raise ValueError (f"cannot find program '{ exe_name } '" )
181+ exe_path = Path (exe_path_str )
182+ exe_args = external_args [1 :]
183+ debug (f"Running { exe_path } { b' ' .join (exe_args )} " )
184+ res = subprocess .run (
185+ [exe_path .resolve (strict = True )] + exe_args ,
186+ capture_output = True ,
187+ check = True ,
188+ input = input ,
189+ )
190+ return res .stdout
191+
192+ def command_to_str (
193+ name : bytes ,
194+ external_args : Optional [list [bytes ]],
195+ args : Optional [list [bytes ]],
196+ ):
197+ external_args_string = (
198+ b"(" + b"," .join (external_args ) + b")"
199+ if external_args is not None
200+ else b""
201+ )
202+ args_string = (
203+ b"{" + b"," .join (args ) + b"}" if args is not None else b""
204+ )
205+ return b"$" + name + external_args_string + args_string
206+
207+ def check_file_command_args (
208+ command_name : bytes ,
209+ args : Optional [list [bytes ]],
210+ external_args : Optional [list [bytes ]],
211+ ):
212+ command_name_str = command_name .decode ('iso-8859-1' )
213+ if args is None and external_args is None :
214+ raise ValueError (
215+ f"${ command_name_str } needs arguments or external arguments"
216+ )
217+ if args is not None and len (args ) != 1 :
218+ raise ValueError (f"${ command_name_str } needs exactly one argument" )
219+ if external_args is not None and len (external_args ) < 1 :
220+ raise ValueError (
221+ f"${ command_name_str } needs at least one external argument"
222+ )
223+ debug (command_to_str (command_name , external_args , args ))
224+
225+ def maybe_file_arg (
226+ args : Optional [list [bytes ]],
227+ ) -> tuple [Optional [Path ], bytes ]:
228+ file = None
229+ contents = b""
230+ if args is not None :
231+ basename = Path (os .fsdecode (args [0 ]))
232+ file , contents = read_file (basename )
233+ return (file , contents )
234+
235+ def maybe_filter_bytes (
236+ input : bytes , external_args : Optional [list [bytes ]]
237+ ) -> bytes :
238+ return (
239+ filter_bytes (input , external_args )
240+ if external_args is not None
241+ else input
242+ )
243+
209244 def include (
210- args : list [bytes ], external_args : Optional [list [bytes ]]
245+ args : Optional [ list [bytes ] ], external_args : Optional [list [bytes ]]
211246 ) -> bytes :
212- file , contents = read_filtered_file ("include" , args , external_args )
213- return strip_final_newline (
214- inner_expand (
215- contents , expand_stack + [file ] if file is not None else []
216- )
247+ check_file_command_args (b"include" , args , external_args )
248+ file , contents = maybe_file_arg (args )
249+ expanded_contents = inner_expand (
250+ contents , expand_stack + [file ] if file is not None else []
251+ )
252+ filtered_contents = maybe_filter_bytes (
253+ expanded_contents , external_args
217254 )
255+ return strip_final_newline (filtered_contents )
218256
219257 macros [b"include" ] = include
220258
221259 def paste (
222- args : list [bytes ], external_args : Optional [list [bytes ]]
260+ args : Optional [ list [bytes ] ], external_args : Optional [list [bytes ]]
223261 ) -> bytes :
224- _file , contents = read_filtered_file ("paste" , args , external_args )
225- return strip_final_newline (contents )
262+ check_file_command_args (b"paste" , args , external_args )
263+ _file , contents = maybe_file_arg (args )
264+ filtered_contents = maybe_filter_bytes (contents , external_args )
265+ return strip_final_newline (filtered_contents )
226266
227267 macros [b"paste" ] = paste
228268
@@ -238,11 +278,11 @@ def expand_args(args: list[bytes]) -> list[bytes]:
238278
239279 def do_macro (
240280 macro : bytes ,
241- args : list [bytes ],
281+ args : Optional [ list [bytes ] ],
242282 external_args : Optional [list [bytes ]],
243283 ) -> bytes :
244- debug (f"do_macro { macro } { args } { external_args } " )
245- expanded_args = expand_args (args )
284+ debug (f"do_macro { command_to_str ( macro , external_args , args ) } " )
285+ expanded_args = expand_args (args ) if args is not None else None
246286 expanded_external_args = (
247287 expand_args (external_args )
248288 if external_args is not None
@@ -264,7 +304,7 @@ def do_macro(
264304 escaped = res [1 ]
265305 name = res [2 ]
266306 startpos = res .end ()
267- args = []
307+ args = None
268308 external_args = None
269309 # Parse external program arguments
270310 if startpos < len (expanded ) and expanded [startpos ] == ord (b"(" ):
@@ -275,18 +315,7 @@ def do_macro(
275315 args , startpos = parse_arguments (expanded , startpos , ord ("}" ))
276316 if escaped != b"" :
277317 # Just remove the leading '\'
278- external_args_string = (
279- b"(" + b"," .join (external_args ) + b")"
280- if external_args is not None
281- else b""
282- )
283- args_string = b"{" + b"," .join (args ) + b"}"
284- output = (
285- b"$"
286- + name
287- + external_args_string
288- + (args_string if len (args ) > 0 else b"" )
289- )
318+ output = command_to_str (name , external_args , args )
290319 else :
291320 output = do_macro (name , args , external_args )
292321 expanded = expanded [: res .start ()] + output + expanded [startpos :]
0 commit comments