diff --git a/docker/flexflow-environment/Dockerfile b/docker/flexflow-environment/Dockerfile index 1b49d3ab6..f82797b90 100644 --- a/docker/flexflow-environment/Dockerfile +++ b/docker/flexflow-environment/Dockerfile @@ -114,7 +114,7 @@ RUN pip3 install transformers>=4.47.1 sentencepiece einops RUN pip3 install tensorflow notebook # PEFT-related RUN pip3 install scipy bitsandbytes datasets accelerate loralib triton peft pytest -RUN pip3 install streamlit +RUN pip3 install uvicorn fastapi streamlit # flash-attn RUN if [ "$FF_GPU_BACKEND" = "cuda" ]; then \ pip3 install flash-attn; \ diff --git a/include/flexflow/flexflow_c.h b/include/flexflow/flexflow_c.h index e26279e26..1a0af058a 100644 --- a/include/flexflow/flexflow_c.h +++ b/include/flexflow/flexflow_c.h @@ -605,7 +605,8 @@ void flexflow_model_generate(flexflow_model_t handle_, int *training_steps, int **output_length_and_tokens, int *num_finetuning_losses, - float *finetuning_losses); + float *finetuning_losses, + char const **log_filepaths); void flexflow_model_set_position_offset(flexflow_model_t handle, int offset); diff --git a/include/flexflow/request_manager.h b/include/flexflow/request_manager.h index 5cfb8e485..d2e04e155 100644 --- a/include/flexflow/request_manager.h +++ b/include/flexflow/request_manager.h @@ -110,6 +110,7 @@ struct Request { // if left as -1, it will be set to the number of entries in the dataset int gradient_accumulation_steps = -1; // std::vector finetuning_tokens_per_batch; + std::string log_filepath; }; RequestType req_type = REQ_INFERENCE; RequestGuid guid = BatchConfig::INVALID_GUID; diff --git a/inference/python/demo_ff_peft.py b/inference/python/demo_ff_peft.py new file mode 100644 index 000000000..59588755f --- /dev/null +++ b/inference/python/demo_ff_peft.py @@ -0,0 +1,195 @@ +# Copyright 2023 CMU, Facebook, LANL, MIT, NVIDIA, and Stanford (alphabetical) +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import flexflow.serve as ff +import argparse, json, os +from types import SimpleNamespace + + +def get_configs(): + parser = argparse.ArgumentParser() + parser.add_argument( + "-config-file", + help="The path to a JSON file with the configs. If omitted, a sample model and configs will be used instead.", + type=str, + default="", + ) + args = parser.parse_args() + + # Load configs from JSON file (if specified) + if len(args.config_file) > 0: + if not os.path.isfile(args.config_file): + raise FileNotFoundError(f"Config file {args.config_file} not found.") + try: + with open(args.config_file) as f: + return json.load(f) + except json.JSONDecodeError as e: + print("JSON format error:") + print(e) + else: + # Define sample configs + ff_init_configs = { + # required parameters + "num_gpus": 4, + "memory_per_gpu": 30000, + "zero_copy_memory_per_node": 40000, + # optional parameters + "num_cpus": 4, + "legion_utility_processors": 8, + "data_parallelism_degree": 1, + "tensor_parallelism_degree": 4, + "pipeline_parallelism_degree": 1, + "offload": False, + "offload_reserve_space_size": 8 * 1024, # 8GB + "use_4bit_quantization": False, + "use_8bit_quantization": False, + "enable_peft": True, + "profiling": False, + "inference_debugging": False, + "fusion": True, + } + model_configs = { + # required parameters + # "base_model": "JackFram/llama-160m", + # "inference_peft_model_id": "goliaro/llama-160m-lora", + # "finetuning_peft_model_id": "goliaro/llama-160m-lora", + # "base_model": "meta-llama/Meta-Llama-3-8B", + "base_model": "meta-llama/Meta-Llama-3.1-8B-Instruct", + "inference_peft_model_id": "goliaro/llama-3-8b-lora", + "finetuning_peft_model_id": "goliaro/llama-3-8b-lora-dolly", + # optional parameters + "cache_path": os.environ.get("FF_CACHE_PATH", ""), + "refresh_cache": False, + "full_precision": False, + # "prompt": "hello, hellohello", + "prompt": os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "./prompt_dataset.json", + ), + "finetuning_dataset": os.path.join( + os.path.dirname(os.path.abspath(__file__)), + "../prompt/peft_dataset.json", + ), + "output_file": "", + "max_requests_per_batch": 16, + "max_seq_length": 600, + "max_tokens_per_batch": 1024, + "max_concurrent_adapters": 1, + } + # Merge dictionaries + ff_init_configs.update(model_configs) + return ff_init_configs + + +def main(): + configs_dict = get_configs() + configs = SimpleNamespace(**configs_dict) + + # Initialize the FlexFlow runtime. ff.init() takes a dictionary or the path to a JSON file with the configs + ff.init(configs_dict) + + # Create the FlexFlow LLM + ff_data_type = ( + ff.DataType.DT_FLOAT if configs.full_precision else ff.DataType.DT_HALF + ) + llm = ff.LLM( + configs.base_model, + data_type=ff_data_type, + cache_path=configs.cache_path, + refresh_cache=configs.refresh_cache, + output_file=configs.output_file, + ) + + # Compile the LLM for inference and load the weights into memory + generation_config = ff.GenerationConfig( + do_sample=False, temperature=0.9, topp=0.8, topk=1 + ) + enable_peft_finetuning = len(configs.finetuning_dataset) > 0 + llm.compile( + generation_config, + max_requests_per_batch=configs_dict.get("max_requests_per_batch", 1) + + enable_peft_finetuning, + max_seq_length=configs_dict.get("max_seq_length", 2048), + max_tokens_per_batch=configs_dict.get("max_tokens_per_batch", 128), + num_kv_cache_slots=configs_dict.get("num_kv_cache_slots", -1), + max_concurrent_adapters=configs_dict.get("max_concurrent_adapters", 1) + + enable_peft_finetuning, + enable_peft_finetuning=enable_peft_finetuning, + ) + + llm.start_server() + + # Add inference and/or finetuning lora + lora_inference_config = None + lora_finetuning_config = None + # if len(configs.prompt) > 0: + # lora_inference_config = ff.LoraLinearConfig( + # llm.cache_path, + # configs.inference_peft_model_id, + # base_model_name_or_path=configs.base_model, + # ) + # llm.register_peft_adapter(lora_inference_config) + + # if len(configs.finetuning_dataset) > 0: + # lora_finetuning_config = ff.LoraLinearConfig( + # llm.cache_path, + # configs.inference_peft_model_id, + # trainable=True, + # init_lora_weights=True, + # target_modules=["down_proj"], + # base_model_name_or_path=configs.base_model, + # optimizer_type=ff.OptimizerType.OPTIMIZER_TYPE_SGD, + # optimizer_kwargs={ + # "learning_rate": 0.001, + # "momentum": 0.0, + # "weight_decay": 0.0, + # "nesterov": False, + # }, + # ) + # llm.register_peft_adapter(lora_finetuning_config) + + requests = [] + # Serving + if len(configs.prompt) > 0: + prompts = [s for s in json.load(open(configs.prompt))] + inference_requests = [ + ff.Request( + ff.RequestType.REQ_INFERENCE, + prompt=prompt, + max_new_tokens=300, + # peft_model_id=llm.get_ff_peft_id(lora_inference_config), + peft_model_id=None, + ) + for prompt in prompts + ] + requests += inference_requests + # # Finetuning + # if len(configs.finetuning_dataset) > 0: + # finetuning_request = ff.Request( + # ff.RequestType.REQ_FINETUNING, + # peft_model_id=llm.get_ff_peft_id(lora_finetuning_config), + # dataset_filepath=configs.finetuning_dataset, + # max_training_epochs=2, + # ) + # requests.append(finetuning_request) + + results = llm.generate(requests) + print("Output: " + results[0].output_text.decode("utf-8")) + + llm.stop_server() + + +if __name__ == "__main__": + print("flexflow PEFT example") + main() diff --git a/inference/python/prompt_dataset.json b/inference/python/prompt_dataset.json new file mode 100644 index 000000000..011c72ccb --- /dev/null +++ b/inference/python/prompt_dataset.json @@ -0,0 +1,3 @@ +[ + "hello" +] \ No newline at end of file diff --git a/inference/python/streamlit/app.py b/inference/python/streamlit/app.py index de41683b0..92f82fa39 100644 --- a/inference/python/streamlit/app.py +++ b/inference/python/streamlit/app.py @@ -2,7 +2,10 @@ import requests import os, json from huggingface_hub import model_info - +import threading +import time +import pandas as pd +from multiprocessing import Process, Pipe # App title st.set_page_config(page_title="🚀💻 FlexLLM Server", layout="wide") @@ -15,6 +18,7 @@ GET_DATASET_SPLITS_URL = "http://localhost:8080/get_dataset_splits/" GET_DATASET_COLUMNS_URL = "http://localhost:8080/get_dataset_columns/" UPLOAD_PEFT_MODEL_URL = "http://localhost:8080/upload_peft_model/" +PROGRESS_URL = "http://localhost:8080/training_progress/" # Initialize session state variables if 'added_adapters' not in st.session_state: @@ -42,6 +46,9 @@ def clear_chat_history(): def generate_llama3_response(prompt_input): system_prompt="You are a helpful, respectful and honest assistant. Always answer as helpfully as possible, while being safe. Please ensure that your responses are positive in nature." + print("---Front: here is inference request data----") + print(prompt) + # Send request to FastAPI server response = requests.post( CHAT_URL, @@ -63,6 +70,9 @@ def generate_llama3_response(prompt_input): return f"{result['detail']}" finetune_result = None +st.session_state.start_finetune = None +st.session_state.request_data = None +st.session_state.peft_model_id = None # Sidebar with st.sidebar: @@ -75,7 +85,7 @@ def generate_llama3_response(prompt_input): st.sidebar.button('Clear Chat History', on_click=clear_chat_history) st.subheader('Generation parameters') - max_length = st.sidebar.slider('Max generation length', min_value=64, max_value=2048, value=1024, step=8) + max_length = st.sidebar.slider('Max generation length', min_value=64, max_value=2048, value=256, step=8) # selected_model = st.sidebar.selectbox('Choose a Llama2 model', ['Llama2-7B', 'Llama2-13B', 'Llama2-70B'], key='selected_model') decoding_method = st.sidebar.selectbox('Decoding method', ['Greedy decoding (default)', 'Sampling'], key='decoding_method') temperature = st.sidebar.slider('temperature', min_value=0.01, max_value=5.0, value=0.1, step=0.01, disabled=decoding_method == 'Greedy decoding (default)') @@ -133,11 +143,12 @@ def generate_llama3_response(prompt_input): st.success('Proceed to finetuning your model!', icon='👉') st.session_state.hf_token = hf_token - # PEFT model name + # Upload PEFT model information peft_model_name = st.text_input( - "Enter the PEFT model name:", + "Enter the PEFT model name to upload to Hugging Face:", help="The name of the PEFT model should start with the username associated with the provided HF token, followed by '/'ß. E.g. 'username/peft-base-uncased'" ) + private = st.checkbox("Upload as a private model") # Dataset selection dataset_option = st.radio("Choose dataset source:", ["Upload JSON", "Hugging Face Dataset"]) @@ -225,21 +236,13 @@ def generate_llama3_response(prompt_input): lora_rank = st.number_input("LoRA rank", min_value=2, max_value=64, value=16, step=2) lora_alpha = st.number_input("LoRA alpha", min_value=2, max_value=64, value=16, step=2) target_modules = st.multiselect("Target modules", ["q_proj", "k_proj", "v_proj", "o_proj", "gate_proj", "up_proj", "down_proj", "lm_head"], default=["down_proj"]) - learning_rate = st.number_input("Learning rate", min_value=1e-6, max_value=1e-3, value=1e-5, step=1e-6) + learning_rate = st.number_input("Learning rate", min_value=0.00001, max_value=1.0, value=0.001, step=0.0001, format="%.4f") optimizer_type = st.selectbox("Optimizer type", ["SGD", "Adam", "AdamW", "Adagrad", "Adadelta", "Adamax", "RMSprop"]) momentum = st.number_input("Momentum", min_value=0.0, max_value=1.0, value=0.0, step=0.01) weight_decay = st.number_input("Weight decay", min_value=0.0, max_value=1.0, value=0.0, step=0.01) nesterov = st.checkbox("Nesterov") max_training_epochs = st.number_input("Max training epochs", min_value=1, max_value=5000, value=10, step=50) - # Upload model information - st.subheader("Upload to Hugging Face") - upload_peft_model_id = st.text_input( - "Enter the HF Model ID to upload:", - help="Example: 'username/my-finetuned-model'" - ) - private = st.checkbox("Upload as a private model") - # Start finetuning button if st.button("Start Finetuning"): if not hf_token: @@ -250,7 +253,7 @@ def generate_llama3_response(prompt_input): st.error("Please enter all Hugging Face dataset information.") else: # Prepare the request data - request_data = { + st.session_state.request_data = { "token": hf_token, "peft_model_id": peft_model_name, "dataset_option": dataset_option, @@ -264,43 +267,27 @@ def generate_llama3_response(prompt_input): "nesterov": nesterov, "max_training_epochs": max_training_epochs, } - + if dataset_option == "Upload JSON": - request_data["dataset"] = dataset - else: - request_data["dataset_name"] = dataset_name - request_data["config_name"] = selected_config - request_data["selected_split"] = selected_split - request_data["selected_column"] = selected_column - - print("---Front: here is request data----") - print(request_data) - # Send finetuning request to FastAPI server - with st.spinner("Finetuning in progress..."): - finetune_response = requests.post(FINETUNE_URL, json=request_data) - - finetune_result = finetune_response.json() - if finetune_response.status_code == 200: - st.success("Finetuning completed successfully!") - - # Start uploading model to hf - upload_request_data = { - "token": hf_token, - "peft_model_id": peft_model_name, - "upload_peft_model_id": upload_peft_model_id, - "private": private - } - - with st.spinner("Uploading fine-tuned model to Hugging Face..."): - upload_response = requests.post(UPLOAD_PEFT_MODEL_URL, json=upload_request_data) - - upload_result = upload_response.json() - if upload_response.status_code == 200: - st.success(f"{upload_peft_model_id} Model uploaded successfully to Hugging Face!") - else: - st.error(f"Upload failed: {upload_result.get('detail', 'Unknown error occurred.')}") + st.session_state.request_data["dataset"] = dataset else: - st.error(f"Finetuning failed: {finetune_result.get('detail', 'Unknown error occurred.')}") + st.session_state.request_data["dataset_name"] = dataset_name + st.session_state.request_data["config_name"] = selected_config + st.session_state.request_data["selected_split"] = selected_split + st.session_state.request_data["selected_column"] = selected_column + + st.session_state.upload_request_data = { + "token": hf_token, + "peft_model_id": peft_model_name, + "upload_peft_model_id": peft_model_name, + "private": private + } + st.session_state.peft_model_id = peft_model_name + + print("---Front: here is finetune request data----") + print(st.session_state.request_data) + + st.session_state.start_finetune = True if page == "Chat": # Display or clear chat messages @@ -328,9 +315,81 @@ def generate_llama3_response(prompt_input): message = {"role": "assistant", "content": full_response} st.session_state.messages.append(message) elif page == "Finetune": + # st.subheader("📈 Training Progress") + from streamlit.runtime.scriptrunner import add_script_run_ctx, get_script_run_ctx + # Send finetuning request to FastAPI server + # Send the finetuning request in a separate thread + if st.session_state.start_finetune: + st.subheader("📈 Training Progress") + def run_finetune_request(): + finetune_response = requests.post(FINETUNE_URL, json=st.session_state.request_data) + st.session_state.finetune_response = finetune_response + + t = threading.Thread(target=run_finetune_request) + add_script_run_ctx(t, get_script_run_ctx()) + t.start() + + time.sleep(3) + if "finetune_response" in st.session_state and st.session_state.finetune_response.status_code != 200: + st.error(f"{st.session_state.finetune_response.json().get('detail', 'Unknown error occurred.')}") + # 409 + + # 2. Meanwhile: poll training progress and update UI + progress_placeholder = st.empty() + chart_placeholder = st.empty() + status_placeholder = st.empty() + + while True: + try: + response = requests.get(PROGRESS_URL) + st.session_state.progress = response.json() + except: + st.warning("Failed to get training progress.") + continue + + current_epoch = st.session_state.progress["current_epoch"] + max_epochs = st.session_state.progress.get("max_epochs", 1) + loss_history = st.session_state.progress.get("loss_history", []) + + # Progress bar + progress_pct = current_epoch / max_epochs if max_epochs else 0 + progress_placeholder.progress(progress_pct, text=f"Epoch {current_epoch}/{max_epochs}") + + # Live loss chart + if loss_history: + loss_df = pd.DataFrame({"Loss": loss_history}) + loss_df.index += 1 # Epochs start from 1 + chart_placeholder.line_chart(loss_df) + status_placeholder.info(f"Latest Loss: {loss_history[-1]:.4f}") + + if st.session_state.progress["status"] == "done": + break + + time.sleep(2) # update every 2 seconds + + while not "finetune_response" in st.session_state: + time.sleep(2) + + finetune_result = st.session_state.finetune_response.json() + if st.session_state.finetune_response.status_code == 200: + st.success("Finetuning completed successfully!") + + # Start uploading model to hf + with st.spinner("Uploading fine-tuned model to Hugging Face..."): + upload_response = requests.post(UPLOAD_PEFT_MODEL_URL, json=st.session_state.upload_request_data) + + upload_result = upload_response.json() + if upload_response.status_code == 200: + st.success(f"{peft_model_id} Model uploaded successfully to Hugging Face!") + else: + st.error(f"Upload failed: {upload_result.get('detail', 'Unkifnown error occurred.')}") + elif st.session_state.finetune_response.status_code != 409: + st.error(f"Finetuning failed: {finetune_result.get('detail', 'Unknown error occurred.')}") + # Print out the number of entries if finetune_result and finetune_result.get("total_entries") and finetune_result.get("remaining_entries"): st.write(f"Dataset loaded: {finetune_result['total_entries']} entries found.") st.write(f"{finetune_result['remaining_entries']} entries remaining after filtering with max sequence length.") + # elif not "progress" in st.session_state: else: st.write("Use the sidebar to configure and start finetuning.") diff --git a/inference/python/streamlit/fastapi_incr.py b/inference/python/streamlit/fastapi_incr.py index 6c7c53d96..79861ac3e 100644 --- a/inference/python/streamlit/fastapi_incr.py +++ b/inference/python/streamlit/fastapi_incr.py @@ -23,7 +23,7 @@ """ -from fastapi import FastAPI, HTTPException, Query +from fastapi import FastAPI, HTTPException, Query, BackgroundTasks from pydantic import BaseModel, Field import flexflow.serve as ff from flexflow.core import * @@ -37,7 +37,10 @@ import time from huggingface_hub import hf_hub_download, HfFolder from datasets import get_dataset_config_names, get_dataset_split_names, load_dataset -import time +import re, threading, io, sys +from fastapi.responses import JSONResponse +from fastapi.concurrency import run_in_threadpool +from transformers import AutoTokenizer # Initialize FastAPI application app = FastAPI() @@ -115,15 +118,18 @@ def get_configs(): ff_init_configs = { # required parameters "num_gpus": 4, - "memory_per_gpu": 34000, + "memory_per_gpu": 36000, + # "memory_per_gpu": 30000, + # "zero_copy_memory_per_node": 180000, "zero_copy_memory_per_node": 40000, - "log_instance_creation": True, + "log_instance_creation": False, # optional parameters - "num_cpus": 4, - "legion_utility_processors": 8, + "num_cpus": 16, + "cpu_memory_per_node": 2048, + "legion_utility_processors": 16, "data_parallelism_degree": 1, - "tensor_parallelism_degree": 1, - "pipeline_parallelism_degree": 4, + "tensor_parallelism_degree": 4, + "pipeline_parallelism_degree": 1, "offload": False, "offload_reserve_space_size": 8 * 1024, # 8GB "use_4bit_quantization": False, @@ -136,7 +142,8 @@ def get_configs(): } llm_configs = { # required parameters - "llm_model": "meta-llama/Meta-Llama-3.1-8B-Instruct", + "llm_model": "deepseek-ai/DeepSeek-R1-Distill-Llama-8B", + # "llm_model": "meta-llama/Meta-Llama-3.1-8B-Instruct", # optional parameters "cache_path": os.environ.get("FF_CACHE_PATH", ""), "refresh_cache": False, @@ -144,7 +151,7 @@ def get_configs(): "prompt": "", "output_file": "", "max_requests_per_batch": 128, - "max_seq_length": 3000, + "max_seq_length": 4096, "max_tokens_per_batch": 128, "max_concurrent_adapters": 4, "num_kv_cache_slots": 100000, @@ -248,6 +255,7 @@ async def chat_completions(request: ChatCompletionRequest): raise HTTPException(status_code=503, detail="LLM model is not initialized.") print("received request:", request) + results = None # Use the PEFT adapter if specified if request.peft_model_id: @@ -261,14 +269,21 @@ async def chat_completions(request: ChatCompletionRequest): max_new_tokens=request.max_new_tokens, peft_model_id=peft_model_cffi, ) - result = llm.generate(request)[0].output_text.decode("utf-8") + results = await run_in_threadpool(llm.generate, request) + # result = llm.generate(request)[0].output_text.decode("utf-8") else: - result = llm.generate( + results = await run_in_threadpool(llm.generate, [message.dict() for message in request.messages], max_new_tokens=request.max_new_tokens, - )[0].output_text.decode("utf-8") + ) + # result = llm.generate( + # [message.dict() for message in request.messages], + # max_new_tokens=request.max_new_tokens, + # )[0].output_text.decode("utf-8") - print("Returning response:", result) + result = results[0].output_text.decode("utf-8") + + print("----Returning response:", result) return {"response": result, "status": "success"} except Exception as e: @@ -314,122 +329,179 @@ async def get_dataset_columns(dataset_name: str, split: str, config_name: Option except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching dataset columns: {str(e)}") +training_progress = { + "current_epoch": 0, + "max_epochs": 0, + "loss_history": [], + "status": "idle", +} +seen_lines = set() + +@app.get("/training_progress/") +async def get_training_progress(): + cache_folder = os.path.expanduser(llm.cache_path) + log_file_path = os.path.join(cache_folder, "logs", "finetune_log.log") + + pattern = r"Completed finetuning epoch (\d+)/(\d+), Loss: ([0-9.]+)" + try: + with open(log_file_path, "r") as f: + log_data = f.readlines() + except FileNotFoundError: + return JSONResponse(content={"error": "Log file not found"}, status_code=404) + + for line in log_data: + if line in seen_lines: + continue + seen_lines.add(line) + + match = re.search(pattern, line) + if match: + epoch, max_epoch, loss = match.groups() + training_progress["current_epoch"] = int(epoch) + training_progress["max_epochs"] = int(max_epoch) + training_progress["loss_history"].append(float(loss)) + + if training_progress["current_epoch"] >= training_progress["max_epochs"]: + training_progress["status"] = "done" + + return JSONResponse(content=training_progress) + + # API endpoint for finetuning request @app.post("/finetuning/") async def finetune(request: FinetuneRequest): """ Endpoint to start LoRA finetuning based on the provided parameters. """ - try: - if llm is None: - raise HTTPException(status_code=503, detail="LLM model is not initialized.") + # try: + if llm is None: + raise HTTPException(status_code=503, detail="LLM model is not initialized.") - print("received request:", request) + print("received request:", request) - llm.download_peft_adapter_if_needed(request.peft_model_id) + if training_progress["status"] == "training": + raise HTTPException( + status_code=409, + detail="A finetuning job is already running. Please wait for it to finish.", + ) - if request.optimizer_type not in OPTIMIZER_TYPE_MAP: - raise ValueError(f"Unsupported optimizer type: {request.optimizer_type}") + if request.optimizer_type not in OPTIMIZER_TYPE_MAP: + raise ValueError(f"Unsupported optimizer type: {request.optimizer_type}") + + optimizer_type = OPTIMIZER_TYPE_MAP[request.optimizer_type] + + # Prepare LoRA configuration for finetuning + lora_finetuning_config = ff.LoraLinearConfig( + llm.cache_path, + request.peft_model_id.lower(), + trainable=True, + init_lora_weights=True, + base_model_name_or_path=llm.model_name, + optimizer_type=optimizer_type, + target_modules=request.target_modules, + optimizer_kwargs={ + "learning_rate": request.learning_rate, + "momentum": request.momentum, + "weight_decay": request.weight_decay, + "nesterov": request.nesterov, + }, + ) - optimizer_type = OPTIMIZER_TYPE_MAP[request.optimizer_type] + llm.register_peft_adapter(lora_finetuning_config) + + cache_folder = os.path.expanduser(llm.cache_path) + # Load the dataset + file_path = None + total_entries = None + remaining_entries = None + if request.dataset_option == "Upload JSON": + dataset_dir = os.path.join(cache_folder, "datasets", "uploaded") + os.makedirs(dataset_dir, exist_ok=True) + + file_path = os.path.join(dataset_dir, "dataset.json") + with open(file_path, "w") as f: + json.dump(request.dataset, f) + elif request.dataset_option == "Hugging Face Dataset": + dataset_dir = os.path.join(cache_folder, "datasets", "huggingface") + os.makedirs(dataset_dir, exist_ok=True) + + json_subdir = os.path.join(dataset_dir, request.dataset_name) + os.makedirs(json_subdir, exist_ok=True) + + + # Load dataset from Hugging Face + dataset_info = f"{request.dataset_name}/{request.config_name}/{request.selected_split}" \ + if request.config_name else f"{request.dataset_name}/{request.selected_split}" + json_filename = f"{dataset_info.replace('/', '_')}.json" + + print(f"Loading dataset: {dataset_info}") + dataset = load_dataset(request.dataset_name, data_dir=request.config_name, split=request.selected_split) + + total_entries = len(dataset) + print(f"Found {total_entries} entries in the dataset.") + + max_length = 10000 # Change if needed + + # Load a pre-trained tokenizer. + tokenizer = AutoTokenizer.from_pretrained(llm.model_name) + + # Function to tokenize text and add a token count. + def tokenize_count(example): + # Tokenize the selected field + tokens = tokenizer.tokenize(example[request.selected_column]) + # Save the number of tokens to a new field + example["token_count"] = len(tokens) + return example + + # Apply the function to each example in the dataset. + tokenized_dataset = dataset.map(tokenize_count) + # Filter entries with token_count less than max_length. + filtered_dataset = tokenized_dataset.filter(lambda example: example["token_count"] < min(max_length, llm.max_seq_length)) + # Extract the original selected field from the filtered examples. + text_list = filtered_dataset[request.selected_column] + + remaining_entries = len(filtered_dataset) + print(f"Filtering out entries longer than {llm.max_seq_length} tokens...") + print(f"{remaining_entries} entries remaining after filtering.") + + # Save the text list to a JSON file. + file_path = os.path.join(json_subdir, json_filename) + with open(file_path, "w") as f: + json.dump(text_list, f, indent=2) + + # Create log file + log_filepath = os.path.join(cache_folder, "logs", "finetune_log.log") + os.makedirs(os.path.dirname(log_filepath), exist_ok=True) + # Clear the file if it already exists + with open(log_filepath, "w") as f: + f.write("") + + # Create finetuning request + finetuning_request = ff.Request( + ff.RequestType.REQ_FINETUNING, + peft_model_id=llm.get_ff_peft_id(lora_finetuning_config), + dataset_filepath=file_path, + max_training_epochs=request.max_training_epochs, + log_filepath=log_filepath, + ) - # Prepare LoRA configuration for finetuning - lora_finetuning_config = ff.LoraLinearConfig( - llm.cache_path, - request.peft_model_id.lower(), - trainable=True, - init_lora_weights=True, - base_model_name_or_path=llm.model_name, - optimizer_type=optimizer_type, - target_modules=request.target_modules, - optimizer_kwargs={ - "learning_rate": request.learning_rate, - "momentum": request.momentum, - "weight_decay": request.weight_decay, - "nesterov": request.nesterov, - }, - ) + # Set training status + training_progress.update({ + "current_epoch": 0, + "max_epochs": request.max_training_epochs, + "loss_history": [], + "status": "training", + }) - llm.register_peft_adapter(lora_finetuning_config) - - cache_folder = os.path.expanduser(llm.cache_path) - # Load the dataset - file_path = None - total_entries = None - remaining_entries = None - if request.dataset_option == "Upload JSON": - dataset_dir = os.path.join(cache_folder, "datasets", "uploaded") - os.makedirs(dataset_dir, exist_ok=True) - - file_path = os.path.join(dataset_dir, "dataset.json") - with open(file_path, "w") as f: - json.dump(request.dataset, f) - elif request.dataset_option == "Hugging Face Dataset": - dataset_dir = os.path.join(cache_folder, "datasets", "huggingface") - os.makedirs(dataset_dir, exist_ok=True) - - json_subdir = os.path.join(dataset_dir, request.dataset_name) - os.makedirs(json_subdir, exist_ok=True) - - from transformers import AutoTokenizer - # Load dataset from Hugging Face - dataset_info = f"{request.dataset_name}/{request.config_name}/{request.selected_split}" \ - if request.config_name else f"{request.dataset_name}/{request.selected_split}" - json_filename = f"{dataset_info.replace('/', '_')}.json" - - print(f"Loading dataset: {dataset_info}") - dataset = load_dataset(request.dataset_name, data_dir=request.config_name, split=request.selected_split) - - total_entries = len(dataset) - print(f"Found {total_entries} entries in the dataset.") - - max_length = 10000 # Change if needed - - # Load a pre-trained tokenizer. - tokenizer = AutoTokenizer.from_pretrained(request.peft_model_id) - - # Function to tokenize text and add a token count. - def tokenize_count(example): - # Tokenize the selected field - tokens = tokenizer.tokenize(example[request.selected_column]) - # Save the number of tokens to a new field - example["token_count"] = len(tokens) - return example - - # Apply the function to each example in the dataset. - tokenized_dataset = dataset.map(tokenize_count) - # Filter entries with token_count less than max_length. - filtered_dataset = tokenized_dataset.filter(lambda example: example["token_count"] < min(max_length, llm.max_seq_length)) - # Extract the original selected field from the filtered examples. - text_list = filtered_dataset[request.selected_column] - - remaining_entries = len(filtered_dataset) - print(f"Filtering out entries longer than {llm.max_seq_length} tokens...") - print(f"{remaining_entries} entries remaining after filtering.") - - # Save the text list to a JSON file. - file_path = os.path.join(json_subdir, json_filename) - with open(file_path, "w") as f: - json.dump(text_list, f, indent=2) - - print(f"Dataset saved to {file_path}") - - # Create finetuning request - finetuning_request = ff.Request( - ff.RequestType.REQ_FINETUNING, - peft_model_id=llm.get_ff_peft_id(lora_finetuning_config), - dataset_filepath=file_path, - max_training_epochs=request.max_training_epochs, - ) + results = await run_in_threadpool(llm.generate, finetuning_request) + # results = llm.generate(finetuning_request) + print(f"Finish fine-tuning") - results = llm.generate(finetuning_request) - print(f"Finish fine-tuning") + return {"results": results, "status": "success", "total_entries": total_entries, "remaining_entries": remaining_entries} - return {"results": results, "status": "success", "total_entries": total_entries, "remaining_entries": remaining_entries} - - except Exception as e: - error_message = f"Error during finetuning: {str(e)}" - raise HTTPException(status_code=500, detail=error_message) + # except Exception as e: + # error_message = f"Error during finetuning: {str(e)}" + # raise HTTPException(status_code=500, detail=error_message) # API endpoint for uploading model request @@ -438,122 +510,122 @@ async def upload_peft_model(request: UploadModelRequest): """ Endpoint to upload the fine-tuned PEFT model to Hugging Face Hub. """ - try: - if llm is None: - raise HTTPException(status_code=503, detail="LLM model is not initialized.") + # try: + # if llm is None: + # raise HTTPException(status_code=503, detail="LLM model is not initialized.") + + from transformers import AutoModelForCausalLM + from peft import get_peft_model + import torch + import numpy as np + print("Upload model request:", request) + + cache_folder = os.path.expanduser(llm.cache_path) + lora_config_filepath = os.path.join( + cache_folder, + "finetuned_models", + request.peft_model_id.lower(), + "config", + "ff_config.json" + ) + + TIMEOUT_SECONDS = 30 + start_time = time.time() + while not os.path.exists(lora_config_filepath): + if time.time() - start_time > TIMEOUT_SECONDS: + raise TimeoutError(f"Timeout: {lora_config_filepath} not found after {TIMEOUT_SECONDS} seconds.") + time.sleep(0.5) # Check every 0.5 seconds + + peft_config = ff.LoraLinearConfig.from_jsonfile(lora_config_filepath) + hf_peft_config = peft_config.to_hf_config() + + # Load model + model = AutoModelForCausalLM.from_pretrained( + peft_config.base_model_name_or_path, + torch_dtype=torch.float32 if peft_config.precision == "fp32" else torch.float16, + device_map=None # Prevent meta tensor issues + ) + model = get_peft_model(model, hf_peft_config, autocast_adapter_dtype=False) + + in_dim = model.config.intermediate_size + out_dim = model.config.hidden_size - from transformers import AutoModelForCausalLM - from peft import get_peft_model - import torch - import numpy as np - - cache_folder = os.path.expanduser(llm.cache_path) - lora_config_filepath = os.path.join( - cache_folder, - "finetuned_models", - request.peft_model_id.lower(), - "config", - "ff_config.json" - ) - - TIMEOUT_SECONDS = 30 - start_time = time.time() - while not os.path.exists(lora_config_filepath): - if time.time() - start_time > TIMEOUT_SECONDS: - raise TimeoutError(f"Timeout: {lora_config_filepath} not found after {TIMEOUT_SECONDS} seconds.") - time.sleep(0.5) # Check every 0.5 seconds - - peft_config = ff.LoraLinearConfig.from_jsonfile(lora_config_filepath) - hf_peft_config = peft_config.to_hf_config() - - # Load model - model = AutoModelForCausalLM.from_pretrained( - peft_config.base_model_name_or_path, - torch_dtype=torch.float32 if peft_config.precision == "fp32" else torch.float16, - device_map=None # Prevent meta tensor issues + weight_folder = os.path.join( + cache_folder, "finetuned_models", request.peft_model_id.lower(), "weights", "shard_0" + ) + num_shards = 1 + while os.path.exists(weight_folder.replace("shard_0", f"shard_{num_shards}")): + num_shards += 1 + if not in_dim % num_shards == 0: + raise ValueError( + f"Number of shards ({num_shards}) must divide the input dimension ({in_dim})" ) - model = get_peft_model(model, hf_peft_config, autocast_adapter_dtype=False) - - in_dim = model.config.intermediate_size - out_dim = model.config.hidden_size - weight_folder = os.path.join( - cache_folder, "finetuned_models", request.peft_model_id.lower(), "weights", "shard_0" - ) - num_shards = 1 - while os.path.exists(weight_folder.replace("shard_0", f"shard_{num_shards}")): - num_shards += 1 - if not in_dim % num_shards == 0: + lora_weight_files = os.listdir(weight_folder) + for lora_file in sorted(lora_weight_files): + lora_filename = ".weight".join(lora_file.split(".weight")[:-1]) + hf_parameter_name = f"base_model.model.model.{lora_filename}.default.weight" + if hf_parameter_name not in model.state_dict().keys(): + raise KeyError(f"Parameter {lora_file} not found in HF model.") + + ff_dtype = np.float32 if peft_config.precision == "fp32" else np.float16 + weight_path = os.path.join(weight_folder, lora_file) + # LoRA_A: [in_dim, rank] + # LoRA_B: [rank, out_dim] + if "lora_A" in lora_file: + weight_data = [] + for shard_id in range(num_shards): + weight_path_shard = weight_path.replace("shard_0", f"shard_{shard_id}") + weight_data_shard = np.fromfile(weight_path_shard, dtype=ff_dtype) + weight_data_shard = weight_data_shard.reshape( + (in_dim // num_shards, peft_config.rank), order="F" + ) + weight_data.append(weight_data_shard) + weight_data = np.concatenate(weight_data, axis=0).T + elif "lora_B" in lora_file: + weight_data = np.fromfile(weight_path, dtype=ff_dtype) + weight_data = weight_data.reshape((peft_config.rank, out_dim), order="F").T + weight_tensor = torch.from_numpy(weight_data) + + param = model.state_dict()[hf_parameter_name] + + actual_numel = weight_tensor.numel() + expected_numel = param.numel() + if actual_numel != expected_numel: raise ValueError( - f"Number of shards ({num_shards}) must divide the input dimension ({in_dim})" + f"Parameter {lora_file} has unexpected parameter count: {actual_numel} (actual) != {expected_numel} (expected)" ) - lora_weight_files = os.listdir(weight_folder) - for lora_file in sorted(lora_weight_files): - lora_filename = ".weight".join(lora_file.split(".weight")[:-1]) - hf_parameter_name = f"base_model.model.model.{lora_filename}.default.weight" - if hf_parameter_name not in model.state_dict().keys(): - raise KeyError(f"Parameter {lora_file} not found in HF model.") - - ff_dtype = np.float32 if peft_config.precision == "fp32" else np.float16 - weight_path = os.path.join(weight_folder, lora_file) - # LoRA_A: [in_dim, rank] - # LoRA_B: [rank, out_dim] - if "lora_A" in lora_file: - weight_data = [] - for shard_id in range(num_shards): - weight_path_shard = weight_path.replace("shard_0", f"shard_{shard_id}") - weight_data_shard = np.fromfile(weight_path_shard, dtype=ff_dtype) - weight_data_shard = weight_data_shard.reshape( - (in_dim // num_shards, peft_config.rank), order="F" - ) - weight_data.append(weight_data_shard) - weight_data = np.concatenate(weight_data, axis=0).T - elif "lora_B" in lora_file: - weight_data = np.fromfile(weight_path, dtype=ff_dtype) - weight_data = weight_data.reshape((peft_config.rank, out_dim), order="F").T - weight_tensor = torch.from_numpy(weight_data) - - param = model.state_dict()[hf_parameter_name] - - actual_numel = weight_tensor.numel() - expected_numel = param.numel() - if actual_numel != expected_numel: - raise ValueError( - f"Parameter {lora_file} has unexpected parameter count: {actual_numel} (actual) != {expected_numel} (expected)" - ) - - if weight_tensor.shape != param.shape: - raise ValueError( - f"Parameter {lora_file} has unexpected shape: {weight_tensor.shape} (actual) != {param.shape} (expected)" - ) - - if weight_tensor.dtype != param.dtype: - raise ValueError( - f"Parameter {lora_file} has unexpected dtype: {weight_tensor.dtype} (actual) != {param.dtype} (expected)" - ) + if weight_tensor.shape != param.shape: + raise ValueError( + f"Parameter {lora_file} has unexpected shape: {weight_tensor.shape} (actual) != {param.shape} (expected)" + ) - with torch.no_grad(): - param.copy_(weight_tensor) + if weight_tensor.dtype != param.dtype: + raise ValueError( + f"Parameter {lora_file} has unexpected dtype: {weight_tensor.dtype} (actual) != {param.dtype} (expected)" + ) - # Ensure all parameters are properly initialized - for name, param in model.named_parameters(): - if param.device.type == "meta": - print(f"Parameter {name} is still on 'meta' device. Moving to CPU.") - param.data = torch.zeros_like(param, device="cpu") # Allocate real memory + with torch.no_grad(): + param.copy_(weight_tensor) - model = model.to("cpu") + # Ensure all parameters are properly initialized + for name, param in model.named_parameters(): + if param.device.type == "meta": + print(f"Parameter {name} is still on 'meta' device. Moving to CPU.") + param.data = torch.zeros_like(param, device="cpu") # Allocate real memory - # Upload model to Hugging Face Hub - model.push_to_hub(request.upload_peft_model_id, token=request.token, private=request.private) - print(f"Upload process for {request.upload_peft_model_id} completed.") - - return {"status": "success"} + model = model.to("cpu") - except Exception as e: - error_message = f"Error during model upload: {str(e)}" - raise HTTPException(status_code=500, detail=error_message) + # Upload model to Hugging Face Hub + model.push_to_hub(request.upload_peft_model_id, token=request.token, private=request.private) + print(f"Upload process for {request.upload_peft_model_id} completed.") + + return {"status": "success"} + # except Exception as e: + # error_message = f"Error during model upload: {str(e)}" + # raise HTTPException(status_code=500, detail=error_message) # Shutdown event to stop the model server @app.on_event("shutdown") diff --git a/python/flexflow/core/flexflow_cffi.py b/python/flexflow/core/flexflow_cffi.py index 443cb9bfe..98ef8efa3 100644 --- a/python/flexflow/core/flexflow_cffi.py +++ b/python/flexflow/core/flexflow_cffi.py @@ -1848,6 +1848,7 @@ def __init__( target_modules: List[str] = [], optimizer_type: OptimizerType = OptimizerType.OPTIMIZER_TYPE_NONE, optimizer_kwargs: dict = {}, + optimizer_config: dict = {}, ): if trainable: if ( @@ -2143,6 +2144,7 @@ class Request: peft_model_id: Optional[PEFTModelID] = None dataset_filepath: Optional[str] = None max_training_epochs: int = 1 + log_filepath: str = None # ----------------------------------------------------------------------- @@ -4503,6 +4505,9 @@ def generate(self, requests_list: List[Request]): # c_finetuning_losses = ffi.new("float**") # TODO: set this value automatically c_finetuning_losses = ffi.new("float[]", 10000) + log_filepaths = [ + get_c_name(request.log_filepath) for request in requests_list + ] ffc().flexflow_model_generate( self.handle, @@ -4519,6 +4524,7 @@ def generate(self, requests_list: List[Request]): c_output_length_and_tokens, num_finetuning_losses, c_finetuning_losses, + log_filepaths, ) finetuning_losses = [] if num_finetuning_losses[0] > 0: diff --git a/python/flexflow/serve/models/llama.py b/python/flexflow/serve/models/llama.py index 85df0deec..f4357f4a5 100644 --- a/python/flexflow/serve/models/llama.py +++ b/python/flexflow/serve/models/llama.py @@ -271,7 +271,8 @@ def build_model(self): if self.ffconfig.enable_peft: # TODO: add attention projections - self.ffmodel.add_lora_layers(["gate_proj", "up_proj", "down_proj", "o_proj", "qkv_proj"]) + # self.ffmodel.add_lora_layers(["gate_proj", "up_proj", "down_proj", "o_proj", "qkv_proj"]) + self.ffmodel.add_lora_layers(["down_proj"]) def convert_hf_weight_name(name): diff --git a/python/flexflow/serve/serve.py b/python/flexflow/serve/serve.py index ca467a8f6..b07372027 100644 --- a/python/flexflow/serve/serve.py +++ b/python/flexflow/serve/serve.py @@ -677,7 +677,8 @@ def __output2chat_response( ) -> List[GenerationResult]: for i in range(len(outputs)): - outputs[i].output_text = outputs[i].output_text[len(requests[i].prompt) + padding:] + # outputs[i].output_text = outputs[i].output_text[len(requests[i].prompt) + padding:] + outputs[i].output_text = outputs[i].output_text return outputs def generate( diff --git a/src/c/flexflow_c.cc b/src/c/flexflow_c.cc index 23439b1fe..94328e1a7 100644 --- a/src/c/flexflow_c.cc +++ b/src/c/flexflow_c.cc @@ -1568,12 +1568,17 @@ void flexflow_model_generate(flexflow_model_t handle_, int *training_steps, int **output_length_and_tokens, int *num_finetuning_losses, - float *finetuning_losses) { + float *finetuning_losses, + char const **log_filepaths) { FFModel *handle = FFCObjectWrapper::unwrap(handle_); std::vector requests; + // RequestManager *rm = RequestManager::get_request_manager(); + // int max_sequence_length = rm->get_max_sequence_length();; + for (int i = 0; i < num_requests; i++) { if (request_types[i] == RequestType::REQ_INFERENCE) { + // max_lengths[i] = max_sequence_length; std::string const text_str(input_texts[i]); Request inference_req; inference_req.prompt = text_str; @@ -1598,6 +1603,7 @@ void flexflow_model_generate(flexflow_model_t handle_, fine_tuning_req.max_length = max_lengths[i]; fine_tuning_req.max_new_tokens = max_new_tokens_[i]; fine_tuning_req.add_special_tokens = add_special_tokens_[i]; + fine_tuning_req.peft_finetuning_info.log_filepath = log_filepaths[i]; PEFTModelID *peft_model_id = FFCObjectWrapper::unwrap(peft_model_ids[i]); if (peft_model_id != nullptr) { fine_tuning_req.peft_model_id = *peft_model_id; diff --git a/src/ops/kernels/linear_kernels.cu b/src/ops/kernels/linear_kernels.cu index f40c31433..1663a3c6d 100644 --- a/src/ops/kernels/linear_kernels.cu +++ b/src/ops/kernels/linear_kernels.cu @@ -35,7 +35,24 @@ LinearMeta::LinearMeta(FFHandler handler, trainable_inputs[0] = li->trainable_inputs[0]; DataType data_type = li->data_type; +<<<<<<< HEAD + this->activation = li->activation; + this->kernel_reg_type = li->kernel_reg_type; + this->kernel_reg_lambda = li->kernel_reg_lambda; + this->use_bias = li->use_bias; + this->add_bias_only_once = li->add_bias_only_once; + this->profiling = li->profiling; + this->inference_debugging = li->inference_debugging; + this->enable_peft_finetuning = li->enable_peft_finetuning; + this->trainable_inputs[0] = li->trainable_inputs[0]; + this->weight_ptr_type = this->input_type[0]; + this->quantization_type = li->quantization_type; + this->offload = li->offload; + std::strcpy(this->op_name, li->name); + this->layer_guid = li->layer_guid; +======= size_t data_size = data_type_size(data_type); +>>>>>>> streamlit // allocate weight and bias in the reserve space for cpu offloading if (li->offload) { weight_ptr = gpu_mem_allocator.allocate_reserved_untyped( diff --git a/src/ops/linear.cc b/src/ops/linear.cc index 7d596ac55..68b9db361 100644 --- a/src/ops/linear.cc +++ b/src/ops/linear.cc @@ -491,7 +491,6 @@ OpMeta *Linear::init_task_with_dim(Task const *task, LinearMeta *m = new LinearMeta( handle, batch_size, linear, gpu_mem_allocator, in_dim * out_dim); - init_kernel(m, batch_size, out_dim); return m; diff --git a/src/runtime/request_manager.cc b/src/runtime/request_manager.cc index e639653bb..c40c216d2 100644 --- a/src/runtime/request_manager.cc +++ b/src/runtime/request_manager.cc @@ -375,13 +375,13 @@ void RequestManager::set_enable_peft_finetuning(bool enable_peft_finetuning_) { void RequestManager::set_inference_finished(bool finished) { inference_finished = finished; - if (finished == false && pending_peft_request_queue.size() > 0) { - std::cout - << "Error: Inference finished but there are pending PEFT requests in " - "the queue. Marking these requests as completed now." - << std::endl; - assert(false); - } + // if (finished == false && pending_peft_request_queue.size() > 0) { + // std::cout + // << "Error: Inference finished but there are pending PEFT requests in " + // "the queue. Marking these requests as completed now." + // << std::endl; + // assert(false); + // } } void RequestManager::register_tokenizer(ModelType type, @@ -1595,10 +1595,22 @@ void RequestManager::process_finetuning_req_bwd_progress( if (request.peft_finetuning_info.completed_training_steps % ((int)request.dataset.size()) == 0) { - log_req_mgr.print("Completed finetuning epoch %i/%i", - request.peft_finetuning_info.completed_training_steps / - ((int)request.dataset.size()), - request.peft_finetuning_info.max_training_epochs); + int curr_epoch = request.peft_finetuning_info.completed_training_steps / ((int)request.dataset.size()); + int total_epochs = request.peft_finetuning_info.max_training_epochs; + float curr_loss = request.peft_finetuning_info.finetuning_losses.back(); + // Format the log line + std::ostringstream log_stream; + log_stream << "Completed finetuning epoch " << curr_epoch + << "/" << total_epochs + << ", Loss: " << std::fixed << std::setprecision(4) << curr_loss; + std::string log_line = log_stream.str(); + log_req_mgr.print("%s", log_line.c_str()); + // Append to file + std::ofstream log_file(request.peft_finetuning_info.log_filepath, std::ios::app); + if (log_file.is_open()) { + log_file << log_line << "\n"; + log_file.close(); + } } if (request.peft_finetuning_info.completed_training_steps == tot_steps || inference_finished) { @@ -3719,9 +3731,9 @@ std::vector for (int i = 0; i < inf_guids.size(); i++) { results.push_back(rm->get_generation_result(inf_guids[i])); } - if (inf_guids.size() > 0) { - rm->set_inference_finished(); - } + // if (inf_guids.size() > 0) { + // rm->set_inference_finished(); + // } for (int i = 0; i < peft_guids.size(); i++) { results.push_back(rm->get_generation_result(peft_guids[i])); }