From f1ec837d6e189dbce26df8b4841b3399320bbdee Mon Sep 17 00:00:00 2001 From: ManilShrestha Date: Mon, 15 Jul 2024 13:38:22 -0400 Subject: [PATCH] Fix server crash on process_prompts --- llm_exl2_dynamic_gen.py | 43 +++++++++++++++------------------ llm_exl2_dynamic_gen_lora.py | 47 +++++++++++++++++------------------- 2 files changed, 42 insertions(+), 48 deletions(-) diff --git a/llm_exl2_dynamic_gen.py b/llm_exl2_dynamic_gen.py index 8f6daad..c7b6283 100644 --- a/llm_exl2_dynamic_gen.py +++ b/llm_exl2_dynamic_gen.py @@ -420,9 +420,8 @@ async def stream_response(prompt_id, timeout=180): def process_prompts(): global partial_responses global prompt_ids2jobs, prompt_length, cancelled_request_ids - try: - - while True: + while True: + try: while not prompts.empty() or len(prompt_length): while len(prompt_length) < max_batch_size and not prompts.empty(): prompt_id, prompt, max_tokens, stream, temperature, outlines_dict = prompts.get() @@ -595,29 +594,27 @@ def process_prompts(): # Re-add the valid items back to the queue for item in temp_storage: prompts.put(item) - - - else: # Sleep for a short duration when there's no work time.sleep(0.1) # Sleep for 100 milliseconds - except Exception as e: - print("Reset server due to ", e) - print(traceback.format_exc()) - for prompt_id in prompt_ids2jobs: - job = prompt_ids2jobs[prompt_id] - if(job.streamer): - ## Generator, yield here.. - partial_response_data = { - "finish_reason": "stop" - } - - responses[prompt_id] = partial_response_data - else: - print("Error handling for full generation current not implemented") - generator.cancel(job) - prompt_ids2jobs = {} - prompt_length = {} + + except Exception as e: + print("Reset server due to ", e) + print(traceback.format_exc()) + for prompt_id in prompt_ids2jobs: + job = prompt_ids2jobs[prompt_id] + if(job.streamer): + ## Generator, yield here.. + partial_response_data = { + "finish_reason": "stop" + } + + responses[prompt_id] = partial_response_data + else: + print("Error handling for full generation current not implemented") + generator.cancel(job) + prompt_ids2jobs = {} + prompt_length = {} # Start worker thread worker = Thread(target=process_prompts) diff --git a/llm_exl2_dynamic_gen_lora.py b/llm_exl2_dynamic_gen_lora.py index ec973eb..2c7354e 100644 --- a/llm_exl2_dynamic_gen_lora.py +++ b/llm_exl2_dynamic_gen_lora.py @@ -451,9 +451,8 @@ async def stream_response(prompt_id, timeout=180): def process_prompts(): global partial_responses global prompt_ids2jobs, prompt_length, prompt_model, cancelled_request_ids - try: - - while True: + while True: + try: while not prompts.empty() or len(prompt_length): while len(prompt_length) < max_batch_size and not prompts.empty(): prompt_id, prompt, max_tokens, stream, temperature, rmodel, outlines_dict = prompts.get() @@ -646,31 +645,29 @@ def process_prompts(): # Re-add the valid items back to the queue for item in temp_storage: prompts.put(item) - - - else: # Sleep for a short duration when there's no work time.sleep(0.1) # Sleep for 100 milliseconds - except Exception as e: - print("Reset server due to ", e) - print(traceback.format_exc()) - for prompt_id in prompt_ids2jobs: - job = prompt_ids2jobs[prompt_id] - if(job.streamer): - ## Generator, yield here.. - partial_response_data = { - "finish_reason": "stop" - } - - responses[prompt_id] = partial_response_data - else: - print("Error handling for full generation current not implemented") - generators[job.model].cancel(job) - #generator.cancel(job) - prompt_ids2jobs = {} - prompt_length = {} - prompt_model = {} + + except Exception as e: + print("Reset server due to ", e) + print(traceback.format_exc()) + for prompt_id in prompt_ids2jobs: + job = prompt_ids2jobs[prompt_id] + if(job.streamer): + ## Generator, yield here.. + partial_response_data = { + "finish_reason": "stop" + } + + responses[prompt_id] = partial_response_data + else: + print("Error handling for full generation current not implemented") + generators[job.model].cancel(job) + #generator.cancel(job) + prompt_ids2jobs = {} + prompt_length = {} + prompt_model = {} # Start worker thread worker = Thread(target=process_prompts)