diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index 92d7ac0072..0a5741ea27 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -96,19 +96,19 @@ def create_zmq_client(self, model, mode): self.zmq_client = ZmqClient(model, mode) self.zmq_client.connect() - def format_and_add_data(self, prompts: dict): + def format_and_add_data(self, req_dict: dict): """ Format the request data and send the request to the server. """ - if "request_id" not in prompts: + if "request_id" not in req_dict: request_id = str(uuid.uuid4()) - prompts["request_id"] = request_id + req_dict["request_id"] = request_id - if "max_tokens" not in prompts: - prompts["max_tokens"] = self.max_model_len - 1 + if "max_tokens" not in req_dict: + req_dict["max_tokens"] = self.max_model_len - 1 - self.add_requests(prompts) - return prompts["prompt_token_ids"] + self.add_requests(req_dict) + return req_dict["prompt_token_ids"] def add_requests(self, task): """ diff --git a/fastdeploy/entrypoints/openai/protocol.py b/fastdeploy/entrypoints/openai/protocol.py index 6aa6b8bd01..9903538389 100644 --- a/fastdeploy/entrypoints/openai/protocol.py +++ b/fastdeploy/entrypoints/openai/protocol.py @@ -375,7 +375,7 @@ class CompletionRequest(BaseModel): max_streaming_response_tokens: Optional[int] = None return_token_ids: Optional[bool] = None - prompt_token_ids: Optional[List[int]] = None + prompt_token_ids: Optional[Union[List[int], List[List[int]]]] = None # doc: end-completion-extra-params def to_dict_for_infer(self, request_id=None, prompt=None): @@ -400,11 +400,11 @@ def to_dict_for_infer(self, request_id=None, prompt=None): if prompt is not None: req_dict["prompt"] = prompt - if "prompt_token_ids" in req_dict: - if "prompt" in req_dict: - del req_dict["prompt"] - else: - assert len(prompt) > 0 + # if "prompt_token_ids" in req_dict: + # if "prompt" in req_dict: + # del req_dict["prompt"] + # else: + # assert len(prompt) > 0 guided_json_object = None if self.response_format is not None: @@ -503,7 +503,7 @@ class ChatCompletionRequest(BaseModel): stop_token_ids: Optional[List[int]] = Field(default_factory=list) # doc: end-chat-completion-sampling-params - # doc: start-completion-extra-params + # doc: start-chat-completion-extra-params chat_template_kwargs: Optional[dict] = None reasoning_max_tokens: Optional[int] = None structural_tag: Optional[str] = None diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index e2a1158d3f..aa38d6cfa1 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -25,6 +25,7 @@ from aiozmq import zmq from fastdeploy.engine.request import RequestOutput +from fastdeploy.entrypoints.engine_client import EngineClient from fastdeploy.entrypoints.openai.protocol import ( CompletionLogprobs, CompletionRequest, @@ -41,7 +42,7 @@ class OpenAIServingCompletion: def __init__(self, engine_client, pid, ips, max_waiting_time): - self.engine_client = engine_client + self.engine_client: EngineClient = engine_client self.pid = pid self.master_ip = ips self.host_ip = get_host_ip() @@ -72,41 +73,57 @@ async def create_completion(self, request: CompletionRequest): request_id = f"cmpl-{request.user}-{uuid.uuid4()}" else: request_id = f"cmpl-{uuid.uuid4()}" - api_server_logger.info(f"initialize request {request_id}") + api_server_logger.info(f"Initialize request {request_id}: {request}") request_prompt_ids = None request_prompts = None + + # Handle prompt and prompt_token_ids try: - if isinstance(request.prompt, str): - request_prompts = [request.prompt] - elif isinstance(request.prompt, list) and all(isinstance(item, int) for item in request.prompt): - request_prompt_ids = [request.prompt] - elif isinstance(request.prompt, list) and all(isinstance(item, str) for item in request.prompt): - request_prompts = request.prompt - elif isinstance(request.prompt, list): - for item in request.prompt: - if isinstance(item, list) and all(isinstance(x, int) for x in item): - continue - else: - raise ValueError("Prompt must be a string, a list of strings or a list of integers.") - request_prompt_ids = request.prompt + if request.prompt_token_ids is not None: # let `prompt_token_ids` support batch inference + assert len(request.prompt_token_ids) > 0, "prompt_token_ids should not be an empty list" + if isinstance(request.prompt_token_ids[0], list): + request_prompt_ids = request.prompt_token_ids + elif isinstance(request.prompt_token_ids[0], int): + request_prompt_ids = [request.prompt_token_ids] + else: + raise ValueError( + "If prompt_token_ids is provided, its type should be one of: list[int], list[list[int]]" + ) + # reset `prompt_token_ids` to avoid data processor directly using it; let data processor fill it + request.prompt_token_ids = None else: - raise ValueError("Prompt must be a string, a list of strings or a list of integers.") + if isinstance(request.prompt, str): + request_prompts = [request.prompt] + elif isinstance(request.prompt, list) and all(isinstance(item, int) for item in request.prompt): + request_prompt_ids = [request.prompt] + elif isinstance(request.prompt, list) and all(isinstance(item, str) for item in request.prompt): + request_prompts = request.prompt + elif isinstance(request.prompt, list): + for item in request.prompt: + if isinstance(item, list) and all(isinstance(x, int) for x in item): + continue + else: + raise ValueError("If prompt is a list, each item type must be one of: str, list[int]") + request_prompt_ids = request.prompt + else: + raise ValueError("Prompt type must be one of: str, list[str], list[int], list[list[int]]") except Exception as e: return ErrorResponse(message=str(e), code=400) if request_prompt_ids is not None: request_prompts = request_prompt_ids - num_choices = len(request_prompts) - api_server_logger.info(f"start inference for request {num_choices}") + num_choices = len(request_prompts) + api_server_logger.info(f"Start preprocessing request: req_id={request_id}), num_choices={num_choices}") prompt_batched_token_ids = [] try: - for idx, prompt in enumerate(request_prompts): + for idx, prompt in enumerate(request_prompts): # process each prompt for this batch completion request request_id_idx = f"{request_id}-{idx}" current_req_dict = request.to_dict_for_infer(request_id_idx, prompt) + api_server_logger.debug(f"current_req_dict: {current_req_dict}") try: current_req_dict["arrival_time"] = time.time() - prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) + prompt_token_ids = self.engine_client.format_and_add_data(current_req_dict) # tokenize if isinstance(prompt_token_ids, np.ndarray): prompt_token_ids = prompt_token_ids.tolist() prompt_batched_token_ids.append(prompt_token_ids) @@ -115,6 +132,10 @@ async def create_completion(self, request: CompletionRequest): del current_req_dict + api_server_logger.info( + f"Finish preprocessing request: req_id={request_id}, lengths={[len(t) for t in prompt_batched_token_ids]}" + ) + try: if self.max_waiting_time < 0: await self.engine_client.semaphore.acquire() diff --git a/fastdeploy/input/ernie_processor.py b/fastdeploy/input/ernie_processor.py index 28d91bdbf8..74e0c96bb2 100644 --- a/fastdeploy/input/ernie_processor.py +++ b/fastdeploy/input/ernie_processor.py @@ -88,40 +88,52 @@ def process_request(self, request, max_model_len=None, **kwargs): request = self._apply_default_parameters(request) if request.get("eos_token_ids") is None or len(request.eos_token_ids) == 0: request.eos_token_ids = self.eos_token_ids + + # processing stop_sequences stop_sequences = request.get("stop", []) if stop_sequences is not None and len(stop_sequences) != 0: stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences) request.set("stop_token_ids", stop_seqs) request.set("stop_seqs_len", stop_seqs_len) + # processing prompt_token_ids if request.prompt_token_ids is None or len(request.prompt_token_ids) == 0: - if request.prompt is None and request.messages is None: - raise ValueError(f"The request should have `prompt_token_ids`, `prompt` or `messages`: {request}.") if request.prompt is not None: - prompt = request.prompt if request.prompt is not None else request.messages[0] - prompt = prompt[0] if isinstance(prompt, list) else prompt - tokens = self.tokenizer.tokenize(prompt) - token_ids = self.tokenizer.convert_tokens_to_ids(tokens) - request.prompt_token_ids = token_ids - data_processor_logger.info(f"req_id:{request.request_id}, tokens:{tokens}, token_ids: {token_ids}") - else: + # prompt = request.prompt if request.prompt is not None else request.messages[0] + prompt = request.prompt + assert isinstance(prompt, str) or ( + isinstance(prompt, list) and all([isinstance(t, int) for t in prompt]) + ), f"prompt must be a string or a list of integers, but got {type(prompt)}" + + if isinstance(prompt, list): # if prompt is a token id list + request.prompt_token_ids = prompt + else: + tokens = self.tokenizer.tokenize(prompt) + token_ids = self.tokenizer.convert_tokens_to_ids(tokens) + request.prompt_token_ids = token_ids + data_processor_logger.debug( + f"request_ids: {request.request_id}, prompt: {prompt}, tokens: {tokens}, token_ids: {token_ids}" + ) + elif request.messages is not None: request.prompt_token_ids = self.messages2ids(request.to_dict()) + else: + raise ValueError(f"The request should have `prompt_token_ids`, `prompt` or `messages`: {request}.") if len(request.prompt_token_ids) == 0: raise ValueError("Invalid input: prompt_token_ids must be a non-empty sequence of token IDs") + + # truncate prompts that exceed the length limit if max_model_len is not None and len(request.prompt_token_ids) > max_model_len: request.prompt_token_ids = request.prompt_token_ids[: max_model_len - 1] if request.get("max_tokens") is None: - request.set( - "max_tokens", - max(1, max_model_len - len(request.prompt_token_ids)), - ) + request.set("max_tokens", max(1, max_model_len - len(request.prompt_token_ids))) if request.get("temperature") < _SAMPLING_EPS: # zero temperature is equivalent to greedy sampling request.set("temperature", 1) if request.get("top_p") < _SAMPLING_EPS: request.set("top_p", _SAMPLING_EPS) - data_processor_logger.info(f"Processed request {request}") + + data_processor_logger.info(f"Processed request: {request}") return request def process_request_dict(self, request, max_model_len=None): @@ -148,19 +160,25 @@ def process_request_dict(self, request, max_model_len=None): # processing prompt_token_ids if not request.get("prompt_token_ids"): - if request.get("prompt") is None and request.get("messages") is None: - raise ValueError(f"Request must contain 'prompt_token_ids', 'prompt', or 'messages': {request}") if request.get("prompt"): prompt = request.get("prompt") - prompt = prompt[0] if isinstance(prompt, list) else prompt - - tokens = self.tokenizer.tokenize(prompt) - token_ids = self.tokenizer.convert_tokens_to_ids(tokens) - request["prompt_token_ids"] = token_ids - req_id = request.get("request_id", None) - data_processor_logger.info(f"req_id:{req_id}, tokens:{tokens}, token_ids: {token_ids}") - else: + assert isinstance(prompt, str) or ( + isinstance(prompt, list) and all([isinstance(t, int) for t in prompt]) + ), f"prompt must be a string or a list of integers, but got {type(prompt)}" + if isinstance(prompt, list): # if prompt is a token id list + request["prompt_token_ids"] = prompt + else: + tokens = self.tokenizer.tokenize(prompt) + token_ids = self.tokenizer.convert_tokens_to_ids(tokens) + request["prompt_token_ids"] = token_ids + data_processor_logger.debug( + f"request_ids: {request.get('request_id')}, prompt: {prompt}, tokens: {tokens}, token_ids: {token_ids}" + ) + elif request.get("messages"): request["prompt_token_ids"] = self.messages2ids(request) + else: + raise ValueError(f"Request must contain 'prompt_token_ids', 'prompt', or 'messages': {request}") + if len(request["prompt_token_ids"]) == 0: raise ValueError("Invalid input: prompt_token_ids must be a non-empty sequence of token IDs") @@ -174,8 +192,8 @@ def process_request_dict(self, request, max_model_len=None): request["temperature"] = 1 if request.get("top_p") < _SAMPLING_EPS: request["top_p"] = _SAMPLING_EPS - data_processor_logger.info(f"Processed request {request}") + data_processor_logger.info(f"Processed request dict: {request}") return request def process_response(self, response_dict, **kwargs): diff --git a/fastdeploy/input/text_processor.py b/fastdeploy/input/text_processor.py index cbaca990c5..2af6bc2343 100644 --- a/fastdeploy/input/text_processor.py +++ b/fastdeploy/input/text_processor.py @@ -206,15 +206,24 @@ def process_request(self, request, max_model_len=None, **kwargs): if request.get("eos_token_ids") is None or len(request.eos_token_ids) == 0: request.eos_token_ids = self.eos_token_ids + # processing stop_sequences stop_sequences = request.get("stop", []) if stop_sequences is not None and len(stop_sequences) != 0: stop_seqs, stop_seqs_len = self.update_stop_seq(stop_sequences) request.set("stop_token_ids", stop_seqs) request.set("stop_seqs_len", stop_seqs_len) + # processing prompt_token_ids if request.prompt_token_ids is None or len(request.prompt_token_ids) == 0: if request.prompt is not None: - request.prompt_token_ids = self.text2ids(request.prompt, max_model_len) + prompt = request.prompt + assert isinstance(prompt, str) or ( + isinstance(prompt, list) and all([isinstance(t, int) for t in prompt]) + ), f"prompt must be a string or a list of integers, but got {type(prompt)}" + if isinstance(prompt, list): # if prompt is a token id list + request.prompt_token_ids = prompt + else: + request.prompt_token_ids = self.text2ids(request.prompt, max_model_len) elif request.messages is not None: if self.tokenizer.chat_template is None: raise ValueError("This model does not support chat_template.") @@ -223,19 +232,22 @@ def process_request(self, request, max_model_len=None, **kwargs): request.prompt_token_ids = self.messages2ids(task) else: raise ValueError(f"The request should have `input_ids`, `text` or `messages`: {request}.") + if len(request.prompt_token_ids) == 0: raise ValueError("Invalid input: prompt_token_ids must be a non-empty sequence of token IDs") + + # truncate prompts that exceed the length limit + if max_model_len is not None and len(request.prompt_token_ids) > max_model_len: + request.prompt_token_ids = request.prompt_token_ids[: max_model_len - 1] if request.get("max_tokens") is None: - request.set( - "max_tokens", - max(1, max_model_len - len(request.prompt_token_ids)), - ) + request.set("max_tokens", max(1, max_model_len - len(request.prompt_token_ids))) if request.get("temperature") < _SAMPLING_EPS: # zero temperature is equivalent to greedy sampling request.set("temperature", 1) if request.get("top_p") < _SAMPLING_EPS: request.set("top_p", _SAMPLING_EPS) - data_processor_logger.info(f"Processed request {request}") + + data_processor_logger.info(f"Processed request: {request}") return request def process_request_dict(self, request, max_model_len=None, **kwargs): @@ -260,19 +272,30 @@ def process_request_dict(self, request, max_model_len=None, **kwargs): request["stop_token_ids"] = stop_seqs request["stop_seqs_len"] = stop_seqs_len - data_processor_logger.info(f"Processing request {request}") # processing prompt_token_ids if not request.get("prompt_token_ids"): - if "prompt" in request: - request["prompt_token_ids"] = self.text2ids(request["prompt"], max_model_len).tolist() - elif "messages" in request: + if request.get("prompt"): + prompt = request.get("prompt") + assert isinstance(prompt, str) or ( + isinstance(prompt, list) and all([isinstance(t, int) for t in prompt]) + ), f"prompt must be a string or a list of integers, but got {type(prompt)}" + if isinstance(prompt, list): # if prompt is a token id list + request["prompt_token_ids"] = prompt + else: + request["prompt_token_ids"] = self.text2ids(request["prompt"], max_model_len).tolist() + elif request.get("messages"): if self.tokenizer.chat_template is None: raise ValueError("This model does not support chat_template.") request["prompt_token_ids"] = self.messages2ids(request) else: raise ValueError(f"Request must contain 'prompt_token_ids', 'prompt', or 'messages': {request}") + if len(request["prompt_token_ids"]) == 0: raise ValueError("Invalid input: prompt_token_ids must be a non-empty sequence of token IDs") + + # truncate prompts that exceed the length limit + if max_model_len is not None and len(request["prompt_token_ids"]) > max_model_len: + request["prompt_token_ids"] = request["prompt_token_ids"][: max_model_len - 1] if request.get("max_tokens") is None: request["max_tokens"] = max(1, max_model_len - len(request["prompt_token_ids"])) if request.get("temperature") < _SAMPLING_EPS: @@ -280,7 +303,8 @@ def process_request_dict(self, request, max_model_len=None, **kwargs): request["temperature"] = 1 if request.get("top_p") < _SAMPLING_EPS: request["top_p"] = _SAMPLING_EPS - data_processor_logger.info(f"Processed request {request}") + + data_processor_logger.info(f"Processed request dict: {request}") return request def process_logprob_response(self, token_ids, **kwargs): diff --git a/test/ci_use/EB_Lite/test_EB_Lite_serving.py b/test/ci_use/EB_Lite/test_EB_Lite_serving.py index 85cddcba1c..491be04ecf 100644 --- a/test/ci_use/EB_Lite/test_EB_Lite_serving.py +++ b/test/ci_use/EB_Lite/test_EB_Lite_serving.py @@ -668,6 +668,7 @@ def test_non_streaming_completion_with_prompt_token_ids(openai_client, capsys): """ Test prompt_token_ids option in streaming completion functionality with the local service """ + # Test case for passing a token id list in `prompt_token_ids` response = openai_client.completions.create( model="default", prompt="", @@ -676,17 +677,49 @@ def test_non_streaming_completion_with_prompt_token_ids(openai_client, capsys): extra_body={"prompt_token_ids": [5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937]}, stream=False, ) - assert hasattr(response, "choices") - assert len(response.choices) > 0 - assert hasattr(response, "usage") - assert hasattr(response.usage, "prompt_tokens") + assert len(response.choices) == 1 + assert response.usage.prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]]}, + stream=False, + ) + assert len(response.choices) == 2 + assert response.usage.prompt_tokens == 9 + 3 + + # Test case for passing a token id list in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], + temperature=1, + max_tokens=5, + stream=False, + ) + assert len(response.choices) == 1 assert response.usage.prompt_tokens == 9 + # Test case for passing a batch of token id lists in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]], + temperature=1, + max_tokens=5, + stream=False, + ) + assert len(response.choices) == 2 + assert response.usage.prompt_tokens == 9 + 3 + def test_streaming_completion_with_prompt_token_ids(openai_client, capsys): """ Test prompt_token_ids option in non-streaming completion functionality with the local service """ + # Test case for passing a token id list in `prompt_token_ids` response = openai_client.completions.create( model="default", prompt="", @@ -696,14 +729,65 @@ def test_streaming_completion_with_prompt_token_ids(openai_client, capsys): stream=True, stream_options={"include_usage": True}, ) + sum_prompt_tokens = 0 for chunk in response: - assert hasattr(chunk, "choices") - assert hasattr(chunk, "usage") if len(chunk.choices) > 0: assert chunk.usage is None else: - assert hasattr(chunk.usage, "prompt_tokens") - assert chunk.usage.prompt_tokens == 9 + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]]}, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + 3 + + # Test case for passing a token id list in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], + temperature=1, + max_tokens=5, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]], + temperature=1, + max_tokens=5, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + 3 def test_non_streaming_chat_completion_disable_chat_template(openai_client, capsys): diff --git a/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py b/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py index 5898d332f2..e569eadf26 100644 --- a/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py +++ b/test/ci_use/Qwen2-7B-Instruct_serving/test_Qwen2-7B-Instruct_serving.py @@ -599,3 +599,132 @@ def test_streaming(openai_client, capsys): for chunk in response: output.append(chunk.choices[0].text) assert len(output) > 0 + + +# ========================== +# OpenAI Client additional chat/completions test +# ========================== +def test_non_streaming_completion_with_prompt_token_ids(openai_client, capsys): + """ + Test prompt_token_ids option in streaming completion functionality with the local service + """ + # Test case for passing a token id list in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937]}, + stream=False, + ) + assert len(response.choices) == 1 + assert response.usage.prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]]}, + stream=False, + ) + assert len(response.choices) == 2 + assert response.usage.prompt_tokens == 9 + 3 + + # Test case for passing a token id list in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], + temperature=1, + max_tokens=5, + stream=False, + ) + assert len(response.choices) == 1 + assert response.usage.prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]], + temperature=1, + max_tokens=5, + stream=False, + ) + assert len(response.choices) == 2 + assert response.usage.prompt_tokens == 9 + 3 + + +def test_streaming_completion_with_prompt_token_ids(openai_client, capsys): + """ + Test prompt_token_ids option in non-streaming completion functionality with the local service + """ + # Test case for passing a token id list in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937]}, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt_token_ids` + response = openai_client.completions.create( + model="default", + prompt="", + temperature=1, + max_tokens=5, + extra_body={"prompt_token_ids": [[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]]}, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + 3 + + # Test case for passing a token id list in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], + temperature=1, + max_tokens=5, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + + # Test case for passing a batch of token id lists in `prompt` + response = openai_client.completions.create( + model="default", + prompt=[[5209, 626, 274, 45954, 1071, 3265, 3934, 1869, 93937], [1, 2, 3]], + temperature=1, + max_tokens=5, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + assert chunk.usage is None + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + assert sum_prompt_tokens == 9 + 3 diff --git a/test/entrypoints/openai/test_serving_api.py b/test/entrypoints/openai/test_serving_api.py new file mode 100644 index 0000000000..3f688210fc --- /dev/null +++ b/test/entrypoints/openai/test_serving_api.py @@ -0,0 +1,541 @@ +import os +import signal +import socket +import subprocess +import sys +import time +import unittest + +import openai + +from fastdeploy.utils import get_random_port + +FD_API_PORT = int(os.getenv("FD_API_PORT", get_random_port())) +FD_ENGINE_QUEUE_PORT = int(os.getenv("FD_ENGINE_QUEUE_PORT", get_random_port())) +FD_METRICS_PORT = int(os.getenv("FD_METRICS_PORT", get_random_port())) +PORTS_TO_CLEAN = [FD_API_PORT, FD_ENGINE_QUEUE_PORT, FD_METRICS_PORT] + + +def is_port_open(host, port, timeout=1.0): + try: + with socket.create_connection((host, port), timeout): + return True + except Exception: + return False + + +def kill_process_on_port(port): + try: + output = subprocess.check_output(f"lsof -i:{port} -t", shell=True).decode().strip() + for pid in output.splitlines(): + os.kill(int(pid), signal.SIGKILL) + except subprocess.CalledProcessError: + pass + + +class TestServingAPI(unittest.TestCase): + + @classmethod + def setUpClass(cls): + for port in PORTS_TO_CLEAN: + kill_process_on_port(port) + + base_path = os.getenv("MODEL_PATH") + if base_path: + model_path = os.path.join(base_path, "ERNIE-4.5-0.3B-Paddle") + else: + model_path = "PaddlePaddle/ERNIE-4.5-0.3B-Paddle" + + cmd = [ + sys.executable, + "-m", + "fastdeploy.entrypoints.openai.api_server", + "--model", + model_path, + "--port", + str(FD_API_PORT), + "--tensor-parallel-size", + "1", + "--engine-worker-queue-port", + str(FD_ENGINE_QUEUE_PORT), + "--metrics-port", + str(FD_METRICS_PORT), + "--max-model-len", + "32768", + "--max-num-seqs", + "128", + "--use-cudagraph", + "--graph-optimization-config", + '{"cudagraph_capture_sizes": [1]}', + ] + + cls.server_proc = subprocess.Popen(cmd, stdout=subprocess.PIPE, stderr=None, start_new_session=True) + + for _ in range(600): + if is_port_open("127.0.0.1", FD_API_PORT): + break + time.sleep(1) + else: + raise RuntimeError("API server did not start in time") + + cls.client = openai.Client( + base_url=f"http://0.0.0.0:{FD_API_PORT}/v1", + api_key="EMPTY_API_KEY", + ) + + @classmethod + def tearDownClass(cls): + try: + os.killpg(cls.server_proc.pid, signal.SIGTERM) + except Exception: + pass + + def create_chat(self, **kwargs): + default_kwargs = dict( + model="default", + messages=[ + {"role": "system", "content": "You are a helpful AI assistant."}, + {"role": "user", "content": "List 3 countries and their capitals."}, + ], + max_tokens=10, + temperature=1, + top_p=0, + ) + default_kwargs.update(**kwargs) + return self.client.chat.completions.create(**default_kwargs) + + def create_completion(self, **kwargs): + default_kwargs = dict( + model="default", + prompt="Once upon a time, in a small village by the sea, there", + max_tokens=10, + temperature=1, + top_p=0, + ) + default_kwargs.update(**kwargs) + return self.client.completions.create(**default_kwargs) + + def test_basic_non_streaming_chat(self): + """ + Test case for basic non-streaming chat + """ + response = self.create_chat() + self.assertTrue(response.choices[0].message.content) + + def test_basic_streaming_chat(self): + """ + Test case for basic streaming chat + """ + response = self.create_chat(stream=True) + output = "" + for chunk in response: + output += chunk.choices[0].delta.content + self.assertTrue(output) + + def test_basic_non_streaming_completion(self): + """ + Test case for basic non-streaming completion + """ + response = self.create_completion() + self.assertTrue(response.choices[0].text) + + def test_basic_streaming_completion(self): + """ + Test case for basic streaming completion + """ + response = self.create_completion(stream=True) + output = "" + for chunk in response: + output += chunk.choices[0].text + self.assertTrue(output) + + def test_stop_str_non_streaming_chat(self): + """ + Test case for setting `include_stop_str_in_output` in non-streaming chat + """ + response = self.create_chat(extra_body={"include_stop_str_in_output": True}) + self.assertTrue(response.choices[0].message.content.endswith("")) + + response = self.create_chat(extra_body={"include_stop_str_in_output": False}) + self.assertFalse(response.choices[0].message.content.endswith("")) + + def test_stop_str_non_streaming_completion(self): + """ + Test case for setting `include_stop_str_in_output` in non-streaming completion + """ + response = self.create_completion(max_tokens=1024) + self.assertFalse(response.choices[0].text.endswith("")) + + response = self.create_completion(max_tokens=1024, extra_body={"include_stop_str_in_output": True}) + self.assertTrue(response.choices[0].text.endswith("")) + + def test_stop_str_streaming_chat(self): + """ + Test case for setting `include_stop_str_in_output` in streaming chat + """ + response = self.create_chat( + extra_body={"include_stop_str_in_output": True}, + stream=True, + ) + last_token = "" + for chunk in response: + last_token = chunk.choices[0].delta.content + self.assertEqual(last_token, "") + + response = self.create_chat( + extra_body={"include_stop_str_in_output": False}, + stream=True, + ) + last_token = "" + for chunk in response: + last_token = chunk.choices[0].delta.content + self.assertNotEqual(last_token, "") + + def test_stop_str_streaming_completion(self): + """ + Test case for setting `include_stop_str_in_output` in streaming completion + """ + response = self.create_completion(stream=True) + last_token = "" + for chunk in response: + last_token = chunk.choices[0].text + self.assertFalse(last_token.endswith("")) + + response = self.create_completion( + extra_body={"include_stop_str_in_output": True}, + stream=True, + ) + last_token = "" + for chunk in response: + last_token = chunk.choices[0].text + self.assertTrue(last_token.endswith("")) + + def test_return_token_ids_non_streaming_chat(self): + """ + Test case for setting `return_token_ids` in non-streaming chat + """ + # enable return_token_ids + response = self.create_chat( + messages=[{"role": "user", "content": "Hello, how are you?"}], + extra_body={"return_token_ids": True}, + ) + self.assertIsInstance(response.choices[0].message.prompt_token_ids, list) + self.assertGreater(len(response.choices[0].message.prompt_token_ids), 0) + self.assertIsInstance(response.choices[0].message.completion_token_ids, list) + self.assertGreater(len(response.choices[0].message.completion_token_ids), 0) + + # disable return_token_ids + response = self.create_chat( + model="default", + messages=[{"role": "user", "content": "Hello, how are you?"}], + max_tokens=5, + extra_body={"return_token_ids": False}, + stream=False, + ) + self.assertIsNone(response.choices[0].message.prompt_token_ids) + self.assertIsNone(response.choices[0].message.completion_token_ids) + + def test_return_token_ids_streaming_chat(self): + """ + Tese case for setting `return_token_ids` in streaming chat + """ + # enable return_token_ids + response = self.create_chat( + extra_body={"return_token_ids": True}, + stream=True, + ) + is_first_chunk = True + for chunk in response: + delta = chunk.choices[0].delta + if is_first_chunk: + is_first_chunk = False + self.assertIsInstance(delta.prompt_token_ids, list) + self.assertGreater(len(delta.prompt_token_ids), 0) + self.assertIsNone(delta.completion_token_ids) + else: + self.assertIsNone(delta.prompt_token_ids) + self.assertIsInstance(delta.completion_token_ids, list) + self.assertGreater(len(delta.completion_token_ids), 0) + + # disable return_token_ids + response = self.create_chat( + extra_body={"return_token_ids": False}, + stream=True, + ) + for chunk in response: + delta = chunk.choices[0].delta + self.assertIsNone(delta.prompt_token_ids) + self.assertIsNone(delta.completion_token_ids) + + def test_return_token_ids_non_streaming_completion(self): + """ + Test case for setting `return_token_ids` in non-streaming completion + """ + # enable return_token_ids + response = self.create_completion( + extra_body={"return_token_ids": True}, + ) + self.assertIsInstance(response.choices[0].prompt_token_ids, list) + self.assertGreater(len(response.choices[0].prompt_token_ids), 0) + self.assertIsInstance(response.choices[0].completion_token_ids, list) + self.assertGreater(len(response.choices[0].completion_token_ids), 0) + + # disable return_token_ids + response = self.create_completion() + self.assertIsNone(response.choices[0].prompt_token_ids) + self.assertIsNone(response.choices[0].completion_token_ids) + + def test_return_token_ids_streaming_completion(self): + """ + Test case for setting `return_token_ids` in streaming completion + """ + # enable return_token_ids + response = self.create_completion( + extra_body={"return_token_ids": True}, + stream=True, + ) + is_first_chunk = True + for chunk in response: + choice = chunk.choices[0] + if is_first_chunk: + is_first_chunk = False + self.assertIsInstance(choice.prompt_token_ids, list) + self.assertGreater(len(choice.prompt_token_ids), 0) + self.assertIsNone(choice.completion_token_ids) + else: + self.assertIsNone(choice.prompt_token_ids) + self.assertIsInstance(choice.completion_token_ids, list) + self.assertGreater(len(choice.completion_token_ids), 0) + + # disable return_token_ids + response = self.create_completion(stream=True) + for chunk in response: + choice = chunk.choices[0] + self.assertIsNone(choice.prompt_token_ids) + self.assertIsNone(choice.completion_token_ids) + + def test_prompt_token_ids_non_streaming_completion(self): + """ + Test case for passing token ids via `prompt_token_ids` or `prompt` in non-streaming completion + """ + # passing a token id list in `prompt_token_ids` + response = self.create_completion( + extra_body={"prompt_token_ids": [1001, 1002, 1003, 1004, 1005]}, + ) + self.assertEqual(len(response.choices), 1) + self.assertEqual(response.usage.prompt_tokens, 5) + + # passing a batch of token id lists in `prompt_token_ids` + response = self.create_completion( + extra_body={"prompt_token_ids": [[1001, 1002, 1003, 1004, 1005], [1006, 1007, 1008]]}, + ) + self.assertEqual(len(response.choices), 2) + self.assertEqual(response.usage.prompt_tokens, 8) + + # passing a token id list in `prompt` + response = self.create_completion( + prompt=[1001, 1002, 1003, 1004, 1005], + ) + self.assertEqual(len(response.choices), 1) + self.assertEqual(response.usage.prompt_tokens, 5) + + # passing a batch of token id lists in `prompt` + response = self.create_completion( + prompt=[[1001, 1002, 1003, 1004, 1005], [1006, 1007, 1008]], + ) + self.assertEqual(len(response.choices), 2) + self.assertEqual(response.usage.prompt_tokens, 8) + + def test_prompt_token_ids_streaming_completion(self): + """ + Test case for passing token ids via `prompt_token_ids` or `prompt` in streaming completion + """ + # passing a token id list in `prompt_token_ids` + response = self.create_completion( + extra_body={"prompt_token_ids": [1001, 1002, 1003, 1004, 1005]}, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + self.assertIsNone(chunk.usage) + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + self.assertEqual(sum_prompt_tokens, 5) + + # passing a batch of token id lists in `prompt_token_ids` + response = self.create_completion( + extra_body={"prompt_token_ids": [[1001, 1002, 1003, 1004, 1005], [1006, 1007, 1008]]}, + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + self.assertIsNone(chunk.usage) + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + self.assertEqual(sum_prompt_tokens, 8) + + # passing a token id list in `prompt` + response = self.create_completion( + prompt=[1001, 1002, 1003, 1004, 1005], + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + self.assertIsNone(chunk.usage) + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + self.assertEqual(sum_prompt_tokens, 5) + + # passing a batch of token id lists in `prompt` + response = self.create_completion( + prompt=[[1001, 1002, 1003, 1004, 1005], [1006, 1007, 1008]], + stream=True, + stream_options={"include_usage": True}, + ) + sum_prompt_tokens = 0 + for chunk in response: + if len(chunk.choices) > 0: + self.assertIsNone(chunk.usage) + else: + sum_prompt_tokens += chunk.usage.prompt_tokens + self.assertEqual(sum_prompt_tokens, 8) + + def test_disable_chat_template_non_streaming_chat(self): + """ + Test case for setting `disable_chat_template` in non-streaming chat + """ + enabled_response = self.create_chat( + messages=[{"role": "user", "content": "Hello, how are you?"}], + extra_body={"disable_chat_template": False}, + ) + self.assertGreater(len(enabled_response.choices), 0) + + # from fastdeploy.input.ernie_tokenizer import ErnieBotTokenizer + # tokenizer = ErnieBotTokenizer.from_pretrained("PaddlePaddle/ERNIE-4.5-0.3B-Paddle", trust_remote_code=True) + # prompt = tokenizer.apply_chat_template([{"role": "user", "content": "Hello, how are you?"}], tokenize=False) + prompt = "<|begin_of_sentence|>User: Hello, how are you?\nAssistant: " + disabled_response = self.create_chat( + messages=[{"role": "user", "content": prompt}], + extra_body={"disable_chat_template": True}, + ) + self.assertGreater(len(disabled_response.choices), 0) + self.assertEqual(enabled_response.choices[0].message.content, disabled_response.choices[0].message.content) + + def test_min_tokens_non_streaming_chat(self): + """ + Test case for setting `min_tokens` in non-streaming chat + """ + min_tokens = 1000 + response = self.create_chat( + max_tokens=1010, + extra_body={"min_tokens": min_tokens}, + ) + self.assertGreaterEqual(response.usage.completion_tokens, min_tokens) + + def test_min_max_token_equals_one_non_streaming_chat(self): + """ + Test case for chat/completion when min_tokens equals max_tokens equals 1. + Verify it returns exactly one token. + """ + # Test non-streaming chat + response = self.create_chat(max_tokens=1) + self.assertIsNotNone(response.choices[0].message.content) + # Verify usage shows exactly 1 completion token + self.assertEqual(response.usage.completion_tokens, 1) + + def test_bad_words_non_streaming_chat(self): + """ + Test case for setting `bad_words` in non-streaming chat + """ + resp0 = self.create_chat(max_tokens=10) + words0 = resp0.choices[0].message.content.split(" ") + + # add bad words + resp1 = self.create_chat( + max_tokens=10, + extra_body={"bad_words": words0[-5:]}, + ) + words1 = resp1.choices[0].message.content.split(" ") + for w in words0[-5:]: + self.assertNotIn(w, words1) + + def test_bad_words_streaming_chat(self): + """ + Test case for setting `bad_words` in streaming chat + """ + resp0 = self.create_chat( + max_tokens=10, + stream=True, + ) + str0 = "" + for chunk in resp0: + str0 += chunk.choices[0].delta.content + words0 = str0.split(" ") + self.assertGreater(len(words0), 0) + + resp1 = self.create_chat( + max_tokens=10, + stream=True, + extra_body={"bad_words": words0[-5:]}, + ) + str1 = "'" + for chunk in resp1: + str1 += chunk.choices[0].delta.content + words1 = str1.split(" ") + self.assertGreater(len(words1), 0) + for w in words0[-5:]: + self.assertNotIn(w, words1) + + def test_bad_words_non_streaming_completion(self): + """ + Test case for setting `bad_words` in non-streaming completion + """ + resp0 = self.create_completion(max_tokens=10) + words0 = resp0.choices[0].text.split(" ") + + # add bad words + resp1 = self.create_completion( + max_tokens=10, + extra_body={"bad_words": words0[-5:]}, + ) + words1 = resp1.choices[0].text.split(" ") + for w in words0[-5:]: + self.assertNotIn(w, words1) + + def test_bad_words_streaming_completion(self): + """ + Test case for setting `bad_words` in streaming completion + """ + resp0 = self.create_completion( + max_tokens=10, + stream=True, + ) + str0 = "" + for chunk in resp0: + str0 += chunk.choices[0].text + words0 = str0.split(" ") + self.assertGreater(len(words0), 0) + + resp1 = self.create_completion( + max_tokens=10, + stream=True, + extra_body={"bad_words": words0[-5:]}, + ) + str1 = "" + for chunk in resp1: + str1 += chunk.choices[0].text + words1 = str1.split(" ") + self.assertGreater(len(words1), 0) + + for w in words0[-5:]: + self.assertNotIn(w, words1) + + +if __name__ == "__main__": + unittest.main()