@@ -381,9 +381,22 @@ async def _execute_pipeline(
381
381
)
382
382
383
383
try :
384
- # Execute the current command
385
- stdout , stderr = await asyncio .wait_for (
386
- process .communicate (input = prev_stdout ), timeout = timeout
384
+ # Execute the current command using ProcessManager
385
+ process = await self .process_manager .create_process (
386
+ cmd_str ,
387
+ directory ,
388
+ stdout_handle = (
389
+ asyncio .subprocess .PIPE
390
+ if i < len (parsed_commands ) - 1 or not last_stdout
391
+ else last_stdout
392
+ ),
393
+ envs = envs
394
+ )
395
+
396
+ stdout , stderr = await self .process_manager .execute_with_timeout (
397
+ process ,
398
+ stdin = prev_stdout .decode () if prev_stdout else None ,
399
+ timeout = timeout
387
400
)
388
401
389
402
# Collect stderr and check return code
@@ -412,10 +425,10 @@ async def _execute_pipeline(
412
425
processes .append (process )
413
426
414
427
except asyncio .TimeoutError :
415
- process . kill ( )
428
+ await self . process_manager . cleanup_processes ([ process ] )
416
429
raise
417
430
except Exception :
418
- process . kill ( )
431
+ await self . process_manager . cleanup_processes ([ process ] )
419
432
raise
420
433
421
434
return {
@@ -437,9 +450,6 @@ async def _execute_pipeline(
437
450
}
438
451
439
452
finally :
440
- # Ensure all processes are terminated
441
- for process in processes :
442
- if process .returncode is None :
443
- process .kill ()
444
- await process .wait ()
453
+ # Cleanup all processes using ProcessManager
454
+ await self .process_manager .cleanup_processes (processes )
445
455
await self .io_handler .cleanup_handles ({"stdout" : last_stdout })
0 commit comments