Skip to content

Commit a85a01c

Browse files
committed
another try to fix litellm termination
1 parent 1d81f3b commit a85a01c

File tree

1 file changed

+18
-8
lines changed

1 file changed

+18
-8
lines changed

automation-api/lib/pilot/batchjob/litellm.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -196,16 +196,25 @@ def _process_batch_prompts(
196196
# Process prompts using multiprocessing if enabled
197197
if num_processes > 1:
198198
logger.info(f"Using multiprocessing with {num_processes} processes")
199+
pool = None # Initialize pool to None
199200
try:
200-
with mp.Pool(processes=num_processes, initializer=_init_worker) as pool:
201-
results = pool.starmap(
202-
_process_single_prompt,
203-
[(prompt, provider) for prompt in all_prompts],
204-
)
201+
pool = mp.Pool(processes=num_processes, initializer=_init_worker)
202+
results = pool.starmap(
203+
_process_single_prompt,
204+
[(prompt, provider) for prompt in all_prompts],
205+
)
206+
pool.close() # Close the pool normally
207+
pool.join() # Wait for all worker processes to finish
205208
except KeyboardInterrupt:
206209
logger.info("Keyboard interrupt received. Terminating workers...")
207-
# Let the context manager handle cleanup
208-
raise
210+
if pool:
211+
pool.terminate() # Terminate all worker processes
212+
logger.info("Waiting for workers to terminate...")
213+
pool.join() # Wait for them to exit
214+
# Check if processes are still alive after timeout (optional, for logging)
215+
# This part is tricky as direct access to process objects is not clean with Pool
216+
# For now, we assume terminate + join is the best effort.
217+
raise # Re-raise the KeyboardInterrupt to be caught by the outer handler
209218
else:
210219
logger.info("Processing prompts sequentially")
211220
results = [_process_single_prompt(prompt, provider) for prompt in all_prompts]
@@ -220,7 +229,8 @@ def _process_batch_prompts(
220229

221230
except KeyboardInterrupt:
222231
logger.info("Process was interrupted by user. Partial results may have been cached if you have redis running.")
223-
return None
232+
# Re-raise the interrupt so the caller knows execution was halted.
233+
raise
224234
except Exception as e:
225235
logger.error(f"Error processing batch prompts: {str(e)}")
226236
return None

0 commit comments

Comments
 (0)