1
1
import asyncio
2
+ import logging
2
3
import os
3
4
import pwd
4
5
import shlex
@@ -284,8 +285,27 @@ async def execute(
284
285
}
285
286
286
287
# Single command execution
287
- cmd , redirects = self ._parse_command (cleaned_command )
288
- self .validator .validate_command (cmd )
288
+ try :
289
+ cmd , redirects = self ._parse_command (cleaned_command )
290
+ except ValueError as e :
291
+ return {
292
+ "error" : str (e ),
293
+ "status" : 1 ,
294
+ "stdout" : "" ,
295
+ "stderr" : str (e ),
296
+ "execution_time" : time .time () - start_time ,
297
+ }
298
+
299
+ try :
300
+ self .validator .validate_command (cmd )
301
+ except ValueError as e :
302
+ return {
303
+ "error" : str (e ),
304
+ "status" : 1 ,
305
+ "stdout" : "" ,
306
+ "stderr" : str (e ),
307
+ "execution_time" : time .time () - start_time ,
308
+ }
289
309
290
310
# Directory validation
291
311
if directory :
@@ -305,19 +325,25 @@ async def execute(
305
325
"stderr" : f"Not a directory: { directory } " ,
306
326
"execution_time" : time .time () - start_time ,
307
327
}
308
-
309
- # Clean and validate command
310
- cleaned_command = self ._clean_command (command )
311
328
if not cleaned_command :
312
329
raise ValueError ("Empty command" )
313
330
314
- # Process redirections
315
- cmd , redirects = self .io_handler .process_redirections (cleaned_command )
331
+ try :
332
+ # Process redirections
333
+ cmd , redirects = self .io_handler .process_redirections (cleaned_command )
316
334
317
- # Setup handles for redirection
318
- handles = await self .io_handler .setup_redirects (redirects , directory )
335
+ # Setup handles for redirection
336
+ handles = await self .io_handler .setup_redirects (redirects , directory )
337
+ except ValueError as e :
338
+ return {
339
+ "error" : str (e ),
340
+ "status" : 1 ,
341
+ "stdout" : "" ,
342
+ "stderr" : str (e ),
343
+ "execution_time" : time .time () - start_time ,
344
+ }
319
345
320
- # Get stdin from handles if present
346
+ # Get stdin from handles if present
321
347
stdin = handles .get ("stdin_data" , stdin )
322
348
stdout_handle = handles .get ("stdout" , asyncio .subprocess .PIPE )
323
349
@@ -328,6 +354,7 @@ async def execute(
328
354
329
355
process = await asyncio .create_subprocess_shell (
330
356
shell_cmd ,
357
+ stdin = asyncio .subprocess .PIPE ,
331
358
stdout = stdout_handle ,
332
359
stderr = asyncio .subprocess .PIPE ,
333
360
env = {** os .environ , ** (envs or {})}, # Merge environment variables
@@ -357,7 +384,10 @@ async def communicate_with_timeout():
357
384
358
385
# Close file handle if using file redirection
359
386
if isinstance (stdout_handle , IO ):
360
- stdout_handle .close ()
387
+ try :
388
+ stdout_handle .close ()
389
+ except (IOError , OSError ) as e :
390
+ logging .warning (f"Error closing stdout: { e } " )
361
391
362
392
return {
363
393
"error" : None ,
@@ -413,78 +443,109 @@ async def _execute_pipeline(
413
443
) -> Dict [str , Any ]:
414
444
start_time = time .time ()
415
445
processes : List [asyncio .subprocess .Process ] = []
416
-
417
446
try :
447
+ # Validate all commands before execution
448
+ for cmd in commands :
449
+ # Make sure each command is allowed
450
+ self .validator .validate_command (cmd )
451
+
452
+ # Initialize IO variables
418
453
parsed_commands = []
419
454
first_stdin : Optional [bytes ] = None
420
455
last_stdout : Optional [IO [Any ]] = None
456
+ first_redirects = None
457
+ last_redirects = None
458
+
459
+ # Process redirections for all commands
460
+ for i , command in enumerate (commands ):
461
+ cmd , redirects = self .io_handler .process_redirections (command )
462
+ parsed_commands .append (cmd )
463
+
464
+ if i == 0 : # First command
465
+ first_redirects = redirects
466
+ elif i == len (commands ) - 1 : # Last command
467
+ last_redirects = redirects
468
+
469
+ # Setup first and last command redirections
470
+ if first_redirects :
471
+ handles = await self .io_handler .setup_redirects (
472
+ first_redirects , directory
473
+ )
474
+ if handles .get ("stdin_data" ):
475
+ first_stdin = handles ["stdin_data" ].encode ()
421
476
422
- for cmd in commands :
423
- parsed_cmd , redirects = self .io_handler .process_redirections (cmd )
424
- parsed_commands .append (parsed_cmd )
425
-
426
- if commands .index (cmd ) == 0 : # First command
427
- handles = await self .io_handler .setup_redirects (
428
- redirects , directory
429
- )
430
- if handles .get ("stdin_data" ):
431
- first_stdin = handles ["stdin_data" ].encode ()
432
-
433
- if commands .index (cmd ) == len (commands ) - 1 : # Last command
434
- handles = await self .io_handler .setup_redirects (
435
- redirects , directory
436
- )
437
- last_stdout = handles .get ("stdout" )
477
+ if last_redirects :
478
+ handles = await self .io_handler .setup_redirects (
479
+ last_redirects , directory
480
+ )
481
+ last_stdout = handles .get ("stdout" )
438
482
439
483
# Execute pipeline
440
484
prev_stdout : Optional [bytes ] = first_stdin
441
485
final_stderr : bytes = b""
442
486
final_stdout : bytes = b""
443
487
488
+ # Execute each command in the pipeline
444
489
for i , cmd in enumerate (parsed_commands ):
490
+ # Create the shell command
445
491
shell_cmd = self ._create_shell_command (cmd )
446
492
447
- # Get default shell for the first command and set interactive mode
493
+ # Apply interactive mode to first command only
448
494
if i == 0 :
449
495
shell = self ._get_default_shell ()
450
496
shell_cmd = f"{ shell } -i -c { shlex .quote (shell_cmd )} "
451
497
498
+ # Create subprocess with proper IO configuration
452
499
process = await asyncio .create_subprocess_shell (
453
500
shell_cmd ,
454
- stdin = asyncio .subprocess .PIPE if prev_stdout is not None else None ,
455
- stdout = asyncio .subprocess .PIPE ,
501
+ stdin = asyncio .subprocess .PIPE ,
502
+ stdout = (
503
+ asyncio .subprocess .PIPE
504
+ if i < len (parsed_commands ) - 1 or not last_stdout
505
+ else last_stdout
506
+ ),
456
507
stderr = asyncio .subprocess .PIPE ,
457
- env = {** os .environ , ** (envs or {})}, # Merge environment variables
508
+ env = {** os .environ , ** (envs or {})},
458
509
cwd = directory ,
459
510
)
460
511
461
512
try :
513
+ # Execute the current command
462
514
stdout , stderr = await asyncio .wait_for (
463
515
process .communicate (input = prev_stdout ), timeout = timeout
464
516
)
465
517
466
- prev_stdout = stdout if stdout else b""
467
-
518
+ # Collect stderr and check return code
519
+ final_stderr += stderr if stderr else b""
520
+ if process .returncode != 0 :
521
+ error_msg = stderr .decode ("utf-8" , errors = "replace" ).strip ()
522
+ if not error_msg :
523
+ error_msg = (
524
+ f"Command failed with exit code { process .returncode } "
525
+ )
526
+ raise ValueError (error_msg )
527
+
528
+ # Pass output to next command or store it
468
529
if i == len (parsed_commands ) - 1 :
469
- final_stdout = stdout if stdout else b""
530
+ if last_stdout and isinstance (last_stdout , IO ):
531
+ last_stdout .write (stdout .decode ("utf-8" , errors = "replace" ))
532
+ final_output = ""
533
+ else :
534
+ final_stdout = stdout if stdout else b""
535
+ final_output = final_stdout .decode (
536
+ "utf-8" , errors = "replace"
537
+ )
538
+ else :
539
+ prev_stdout = stdout if stdout else b""
470
540
471
- final_stderr += stderr if stderr else b""
472
541
processes .append (process )
473
542
474
- if process .returncode != 0 :
475
- raise ValueError (
476
- f"Command failed with exit code { process .returncode } "
477
- )
478
-
479
543
except asyncio .TimeoutError :
480
544
process .kill ()
481
545
raise
482
-
483
- if last_stdout :
484
- last_stdout .write (final_stdout .decode ("utf-8" , errors = "replace" ))
485
- final_output = ""
486
- else :
487
- final_output = final_stdout .decode ("utf-8" , errors = "replace" )
546
+ except Exception :
547
+ process .kill ()
548
+ raise
488
549
489
550
return {
490
551
"error" : None ,
0 commit comments