diff --git a/docs/features/data_parallel_service.md b/docs/features/data_parallel_service.md index f8b20dde94..4f496d3f31 100644 --- a/docs/features/data_parallel_service.md +++ b/docs/features/data_parallel_service.md @@ -15,9 +15,9 @@ The scheduling flow is shown below - users randomly request IP and port, obtain ```python prompts = [ "Hello, my name is", - "你好,请问今天是星期", - "请写6个以数字开头的成语", - "写一个300字的小说大纲,内容是李白穿越到现代,最后成为公司文职人员的故事", + "你好,请问今天是星期", + "请写6个以数字开头的成语", + "写一个300字的小说大纲,内容是李白穿越到现代,最后成为公司文职人员的故事", "我要采访一位科幻作家,创建一个包含5个问题的列表" ] @@ -83,9 +83,9 @@ python -m fastdeploy.entrypoints.openai.multi_api_server \ ``` ### Parameter Description -- num-servers: Number of API servers to launch -- ports: Ports for API servers -- args: Arguments for API servers +- num-servers: Number of API servers to launch +- ports: Ports for API servers +- args: Arguments for API servers ### Data Parallelism + Disaggregated Deployment Refer to [Disaggregated Deployment](disaggregated.md#multi-machine-disaggregated-deployment) @@ -94,9 +94,8 @@ Refer to [Disaggregated Deployment](disaggregated.md#multi-machine-disaggregated For multi-machine deployment, ensure network cards support RDMA and all cluster nodes are interconnected. **Note**: -* `KVCACHE_RDMA_NICS` specifies RDMA network cards for the current machine, multiple cards should be separated by commas. -* The repository provides an automatic RDMA network card detection script `bash scripts/get_rdma_nics.sh `, where can be `cpu` or `gpu`. - +- `KVCACHE_RDMA_NICS` specifies RDMA network cards for the current machine, multiple cards should be separated by commas. +- The repository provides an automatic RDMA network card detection script `bash scripts/get_rdma_nics.sh `, where can be `cpu` or `gpu`. **Prefill Instance** ```bash @@ -148,4 +147,4 @@ python -m fastdeploy.entrypoints.openai.api_server \ --scheduler-ttl 9000 --scheduler-topic "test" \ --splitwise-role "decode" -``` \ No newline at end of file +``` diff --git a/docs/features/disaggregated.md b/docs/features/disaggregated.md index 5cc30d8e6a..44f29b5d94 100644 --- a/docs/features/disaggregated.md +++ b/docs/features/disaggregated.md @@ -73,10 +73,10 @@ Refer to the example code `offline_disaggregated_demo.py` in the `fastdeploy/dem #### Prerequisite: Redis -> **⚠️ NOTE** -> **Redis requirement: version 6.2.0 or higher** +> **⚠️ NOTE** +> **Redis requirement: version 6.2.0 or higher** > Versions below this may not support the required commands. -> +> * Installation via `conda` ```bash diff --git a/docs/features/multi-node_deployment.md b/docs/features/multi-node_deployment.md index 49a66419ac..04bda9fc29 100644 --- a/docs/features/multi-node_deployment.md +++ b/docs/features/multi-node_deployment.md @@ -1,71 +1,71 @@ # Multi-Node Deployment -## Overview +## Overview Multi-node deployment addresses scenarios where a single machine's GPU memory is insufficient to support deployment of large models by enabling tensor parallelism across multiple machines. -## Environment Preparation -#### Network Requirements -1. All nodes must be within the same local network -2. Ensure bidirectional connectivity between all nodes (test using `ping` and `nc -zv`) +## Environment Preparation +### Network Requirements +1. All nodes must be within the same local network +2. Ensure bidirectional connectivity between all nodes (test using `ping` and `nc -zv`) -#### Software Requirements -1. Install the same version of FastDeploy on all nodes -2. [Recommended] Install and configure MPI (OpenMPI or MPICH) +#### Software Requirements +1. Install the same version of FastDeploy on all nodes +2. [Recommended] Install and configure MPI (OpenMPI or MPICH) -## Tensor Parallel Deployment +## Tensor Parallel Deployment -### Recommended Launch Method -We recommend using mpirun for one-command startup without manually starting each node. +### Recommended Launch Method +We recommend using mpirun for one-command startup without manually starting each node. -### Usage Instructions -1. Execute the same command on all machines -2. The IP order in the `ips` parameter determines the node startup sequence -3. The first IP will be designated as the master node -4. Ensure all nodes can resolve each other's hostnames +### Usage Instructions +1. Execute the same command on all machines +2. The IP order in the `ips` parameter determines the node startup sequence +3. The first IP will be designated as the master node +4. Ensure all nodes can resolve each other's hostnames -* Online inference startup example: - ```shell - python -m fastdeploy.entrypoints.openai.api_server \ - --model baidu/ERNIE-4.5-300B-A47B-Paddle \ - --port 8180 \ - --metrics-port 8181 \ - --engine-worker-queue-port 8182 \ - --max-model-len 32768 \ - --max-num-seqs 32 \ - --tensor-parallel-size 16 \ - --ips 192.168.1.101,192.168.1.102 - ``` +* Online inference startup example: + ```shell + python -m fastdeploy.entrypoints.openai.api_server \ + --model baidu/ERNIE-4.5-300B-A47B-Paddle \ + --port 8180 \ + --metrics-port 8181 \ + --engine-worker-queue-port 8182 \ + --max-model-len 32768 \ + --max-num-seqs 32 \ + --tensor-parallel-size 16 \ + --ips 192.168.1.101,192.168.1.102 + ``` -* Offline startup example: - ```python - from fastdeploy.engine.sampling_params import SamplingParams - from fastdeploy.entrypoints.llm import LLM - - model_name_or_path = "baidu/ERNIE-4.5-300B-A47B-Paddle" - - sampling_params = SamplingParams(temperature=0.1, max_tokens=30) - llm = LLM(model=model_name_or_path, tensor_parallel_size=16, ips="192.168.1.101,192.168.1.102") - if llm._check_master(): - output = llm.generate(prompts="Who are you?", use_tqdm=True, sampling_params=sampling_params) - print(output) - ``` +* Offline startup example: + ```python + from fastdeploy.engine.sampling_params import SamplingParams + from fastdeploy.entrypoints.llm import LLM -* Notes: -- Only the master node can receive completion requests -- Always send requests to the master node (the first IP in the ips list) -- The master node will distribute workloads across all nodes + model_name_or_path = "baidu/ERNIE-4.5-300B-A47B-Paddle" -### Parameter Description + sampling_params = SamplingParams(temperature=0.1, max_tokens=30) + llm = LLM(model=model_name_or_path, tensor_parallel_size=16, ips="192.168.1.101,192.168.1.102") + if llm._check_master(): + output = llm.generate(prompts="Who are you?", use_tqdm=True, sampling_params=sampling_params) + print(output) + ``` -#### `ips` Parameter -- **Type**: `string` -- **Format**: Comma-separated IPv4 addresses -- **Description**: Specifies the IP addresses of all nodes in the deployment group -- **Required**: Only for multi-node deployments -- **Example**: `"192.168.1.101,192.168.1.102,192.168.1.103"` +* Notes: +* Only the master node can receive completion requests +* Always send requests to the master node (the first IP in the ips list) +* The master node will distribute workloads across all nodes -#### `tensor_parallel_size` Parameter -- **Type**: `integer` -- **Description**: Total number of GPUs across all nodes -- **Required**: Yes -- **Example**: For 2 nodes with 8 GPUs each, set to 16 +### Parameter Description + +#### `ips` Parameter +* **Type**: `string` +* **Format**: Comma-separated IPv4 addresses +* **Description**: Specifies the IP addresses of all nodes in the deployment group +* **Required**: Only for multi-node deployments +* **Example**: `"192.168.1.101,192.168.1.102,192.168.1.103"` + +#### `tensor_parallel_size` Parameter +* **Type**: `integer` +* **Description**: Total number of GPUs across all nodes +* **Required**: Yes +* **Example**: For 2 nodes with 8 GPUs each, set to 16 diff --git a/docs/zh/features/data_parallel_service.md b/docs/zh/features/data_parallel_service.md index 31bcffc4b0..8951a76791 100644 --- a/docs/zh/features/data_parallel_service.md +++ b/docs/zh/features/data_parallel_service.md @@ -12,15 +12,14 @@ FastDeploy 提供了splitwise scheduler,可以感知各个DP的负载状态, 具体调度流程如下图,用户随机请求ip 与端口,通过redis获取负载状态,将数据分发到负载较低的DP进行推理。 ![数据调度架构图](./images/scheduler_img.png) - #### 离线推理 ```python prompts = [ "Hello, my name is", - "你好,请问今天是星期", - "请写6个以数字开头的成语", - "写一个300字的小说大纲,内容是李白穿越到现代,最后成为公司文职人员的故事", + "你好,请问今天是星期", + "请写6个以数字开头的成语", + "写一个300字的小说大纲,内容是李白穿越到现代,最后成为公司文职人员的故事", "我要采访一位科幻作家,创建一个包含5个问题的列表" ] @@ -65,11 +64,9 @@ python -m fastdeploy.entrypoints.openai.api_server \ --scheduler-ttl 9000 ``` - ### 用户自行调度 FastDeploy 提供了multi_api_server,用户可以拉起多个api server,用户自行选择dp 进行请求,在该种情况下用户可以自行添加负载均衡模型进行调度。(目前该种方式只支持在线推理) - #### 在线推理 ![数据调度架构图](./images/no_scheduler_img.png) @@ -95,8 +92,6 @@ python -m fastdeploy.entrypoints.openai.multi_api_server \ - ports: 指定拉起的api server 的端口 - args: 指定拉起的api server 的参数 - - ### 数据并行 + 分离式部署 具体可以参考[分离式部署](disaggregated.md#多机分离式部署) @@ -106,8 +101,8 @@ python -m fastdeploy.entrypoints.openai.multi_api_server \ 多机部署时需要确认当前网卡是否支持RDMA,并且需要集群中所有节点网络互通。 **注意**: -* `KVCACHE_RDMA_NICS` 指定当前机器的RDMA网卡,多个网卡用逗号隔开。 -* 仓库中提供了自动检测RDMA网卡的脚本 `bash scripts/get_rdma_nics.sh `, 其中 可以是 `cpu` 或 `gpu`。 +- `KVCACHE_RDMA_NICS` 指定当前机器的RDMA网卡,多个网卡用逗号隔开。 +- 仓库中提供了自动检测RDMA网卡的脚本 `bash scripts/get_rdma_nics.sh `, 其中 可以是 `cpu` 或 `gpu`。 **prefill 实例** @@ -163,4 +158,3 @@ python -m fastdeploy.entrypoints.openai.api_server \ --scheduler-topic "test" \ --splitwise-role "decode" ``` - diff --git a/docs/zh/features/disaggregated.md b/docs/zh/features/disaggregated.md index 84a3dfb5c6..e67c36d85d 100644 --- a/docs/zh/features/disaggregated.md +++ b/docs/zh/features/disaggregated.md @@ -75,8 +75,8 @@ python -m fastdeploy.entrypoints.openai.api_server \ #### 前置依赖 Redis * 使用`conda`安装 -> **⚠️ 注意** -> **Redis 版本要求:6.2.0 及以上** +> **⚠️ 注意** +> **Redis 版本要求:6.2.0 及以上** > 低于此版本可能不支持所需的命令。 ```bash diff --git a/docs/zh/features/multi-node_deployment.md b/docs/zh/features/multi-node_deployment.md index 7899fdc59f..909a7a70f4 100644 --- a/docs/zh/features/multi-node_deployment.md +++ b/docs/zh/features/multi-node_deployment.md @@ -4,11 +4,10 @@ 多节点部署旨在解决单个机器GPU显存不足时,支持跨多台机器的张量并行执行。 ## 环境准备 -#### 网络要求 +### 网络要求 1. 所有节点必须在同一本地网络中 2. 确保所有节点之间双向连通(可使用`ping`和`nc -zv`测试) - #### 软件要求 1. 所有节点安装相同版本的FastDeploy 2. [建议安装]安装并配置MPI(OpenMPI或MPICH) @@ -52,22 +51,21 @@ ``` * 注意: -- 只有主节点可以接收完成请求 -- 请始终将请求发送到主节点(ips列表中的第一个IP) -- 主节点将在所有节点间分配工作负载 +* 只有主节点可以接收完成请求 +* 请始终将请求发送到主节点(ips列表中的第一个IP) +* 主节点将在所有节点间分配工作负载 ### 参数说明 #### `ips`参数 -- **类型**: `字符串` -- **格式**: 逗号分隔的IPv4地址 -- **描述**: 指定部署组中所有节点的IP地址 -- **必填**: 仅多节点部署时需要 -- **示例**: `"192.168.1.101,192.168.1.102,192.168.1.103"` +* **类型**: `字符串` +* **格式**: 逗号分隔的IPv4地址 +* **描述**: 指定部署组中所有节点的IP地址 +* **必填**: 仅多节点部署时需要 +* **示例**: `"192.168.1.101,192.168.1.102,192.168.1.103"` #### `tensor_parallel_size`参数 -- **类型**: `整数` -- **描述**: 所有节点上的GPU总数 -- **必填**: 是 -- **示例**: 对于2个节点各8个GPU,设置为16 - +* **类型**: `整数` +* **描述**: 所有节点上的GPU总数 +* **必填**: 是 +* **示例**: 对于2个节点各8个GPU,设置为16 diff --git a/fastdeploy/entrypoints/engine_client.py b/fastdeploy/entrypoints/engine_client.py index 4a51691dac..663fb2f0ad 100644 --- a/fastdeploy/entrypoints/engine_client.py +++ b/fastdeploy/entrypoints/engine_client.py @@ -19,6 +19,7 @@ import time import traceback import uuid +from copy import copy import numpy as np @@ -210,15 +211,29 @@ async def add_requests(self, task): self.valid_parameters(task) api_server_logger.debug(f"Receive task: {task}") + n = task.get("n", 1) try: - if not self.enable_mm: - self.zmq_client.send_json(task) + request_id_idx = task.get("request_id") + parts = request_id_idx.rsplit("_", 1) + if len(parts) == 1: + self._send_task(task) else: - self.zmq_client.send_pyobj(task) + request_id = parts[0] + index = int(parts[1]) + for i in range(index * n, (index + 1) * n): + child_task = copy(task) + child_task["request_id"] = f"{request_id}_{i}" + self._send_task(child_task) except Exception as e: api_server_logger.error(f"zmq_client send task error: {e}, {str(traceback.format_exc())}") raise EngineError(str(e), error_code=400) + def _send_task(self, task): + if not self.enable_mm: + self.zmq_client.send_json(task) + else: + self.zmq_client.send_pyobj(task) + def valid_parameters(self, data): """ Validate stream options @@ -226,10 +241,6 @@ def valid_parameters(self, data): 前置到了ChatCompletionRequest/CompletionRequest中 """ - if data.get("n") is not None: - if data["n"] != 1: - raise ParameterError("n", "n only support 1.") - if data.get("max_tokens") is not None: if data["max_tokens"] < 1 or data["max_tokens"] >= self.max_model_len: raise ParameterError("max_tokens", f"max_tokens can be defined [1, {self.max_model_len}).") diff --git a/fastdeploy/entrypoints/openai/serving_chat.py b/fastdeploy/entrypoints/openai/serving_chat.py index 52cd556916..3aed2b3a84 100644 --- a/fastdeploy/entrypoints/openai/serving_chat.py +++ b/fastdeploy/entrypoints/openai/serving_chat.py @@ -112,10 +112,11 @@ async def create_chat_completion(self, request: ChatCompletionRequest): api_server_logger.info(f"create chat completion request: {request_id}") text_after_process = None try: - current_req_dict = request.to_dict_for_infer(request_id) + current_req_dict = request.to_dict_for_infer(f"{request_id}_0") if "chat_template" not in current_req_dict: current_req_dict["chat_template"] = self.chat_template current_req_dict["arrival_time"] = time.time() + # preprocess the req_dict prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict) text_after_process = current_req_dict.get("text_after_process") if isinstance(prompt_token_ids, np.ndarray): @@ -174,11 +175,11 @@ async def chat_completion_stream_generator( """ created_time = int(time.time()) chunk_object_type: str = "chat.completion.chunk" - first_iteration = True - previous_num_tokens = 0 + num_choices = 1 if request.n is None else request.n + first_iteration = [True] * num_choices + previous_num_tokens = [0] * num_choices num_prompt_tokens = 0 - num_choices = 1 - tool_called = False + tool_called = [False] * num_choices max_streaming_response_tokens = ( request.max_streaming_response_tokens if request.max_streaming_response_tokens is not None @@ -210,8 +211,12 @@ async def chat_completion_stream_generator( api_server_logger.info(f"create chat completion request: {request_id}") try: - dealer, response_queue = await self.engine_client.connection_manager.get_connection(request_id) - dealer.write([b"", request_id.encode("utf-8")]) + dealer, response_queue = await self.engine_client.connection_manager.get_connection( + request_id, num_choices + ) + request_ids = [f"{request_id}_{i}" for i in range(num_choices)] + for rid in request_ids: + dealer.write([b"", rid.encode("utf-8")]) choices = [] current_waiting_time = 0 response_processor = ChatResponseProcessor( @@ -247,6 +252,7 @@ async def chat_completion_stream_generator( ) async for res in generator: + idx = int(res["request_id"].split("_")[-1]) if res.get("error_code", 200) != 200: raise ValueError("{}".format(res["error_msg"])) @@ -255,7 +261,7 @@ async def chat_completion_stream_generator( inference_start_time = res["metrics"]["inference_start_time"] else: arrival_time = res["metrics"]["arrival_time"] - inference_start_time - if first_iteration: + if first_iteration[idx]: num_prompt_tokens = len(prompt_token_ids) num_cached_tokens = res.get("num_cached_tokens", 0) for i in range(num_choices): @@ -299,11 +305,11 @@ async def chat_completion_stream_generator( ) yield f"data: {chunk.model_dump_json(exclude_unset=True)} \n\n" api_server_logger.info(f"Chat Streaming response send_idx 0: {chunk.model_dump_json()}") - first_iteration = False + first_iteration[idx] = False output = res["outputs"] output_top_logprobs = output["top_logprobs"] - previous_num_tokens += len(output["token_ids"]) + previous_num_tokens[idx] += len(output["token_ids"]) logprobs_res: Optional[LogProbs] = None if request.logprobs and output_top_logprobs is not None: logprobs_res = self._create_chat_logprobs( @@ -321,7 +327,6 @@ async def chat_completion_stream_generator( delta_message.multimodal_content = output["multipart"] else: delta_message.content = output["text"] - if not res["finished"] and "delta_message" in output: delta_message_output = output["delta_message"] if delta_message_output is None: @@ -330,10 +335,10 @@ async def chat_completion_stream_generator( delta_message.reasoning_content = delta_message_output.reasoning_content or "" if delta_message_output.tool_calls: delta_message.tool_calls = delta_message_output.tool_calls - tool_called = True + tool_called[idx] = True choice = ChatCompletionResponseStreamChoice( - index=0, + index=idx, delta=delta_message, logprobs=logprobs_res, arrival_time=arrival_time, @@ -345,9 +350,9 @@ async def chat_completion_stream_generator( ) has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None max_tokens = request.max_completion_tokens or request.max_tokens - if has_no_token_limit or previous_num_tokens != max_tokens: + if has_no_token_limit or previous_num_tokens[idx] != max_tokens: choice.finish_reason = "stop" - if tool_called: + if tool_called[idx]: choice.finish_reason = "tool_calls" else: choice.finish_reason = "length" @@ -365,8 +370,8 @@ async def chat_completion_stream_generator( if include_continuous_usage: chunk.usage = UsageInfo( prompt_tokens=num_prompt_tokens, - completion_tokens=previous_num_tokens, - total_tokens=num_prompt_tokens + previous_num_tokens, + completion_tokens=previous_num_tokens[idx], + total_tokens=num_prompt_tokens + previous_num_tokens[idx], ) choices.append(choice) @@ -378,7 +383,7 @@ async def chat_completion_stream_generator( choices = [] if include_usage: - completion_tokens = previous_num_tokens + completion_tokens = sum(previous_num_tokens) usage = UsageInfo( prompt_tokens=num_prompt_tokens, completion_tokens=completion_tokens, @@ -417,26 +422,32 @@ async def chat_completion_full_generator( Full chat completion generator. """ created_time = int(time.time()) - final_res = None + num_choices = 1 if request.n is None else request.n enable_thinking = request.chat_template_kwargs.get("enable_thinking") if request.chat_template_kwargs else None if enable_thinking is None: enable_thinking = request.metadata.get("enable_thinking") if request.metadata else None include_stop_str_in_output = request.include_stop_str_in_output try: - dealer, response_queue = await self.engine_client.connection_manager.get_connection(request_id) - dealer.write([b"", request_id.encode("utf-8")]) - final_res = None - previous_num_tokens = 0 + dealer, response_queue = await self.engine_client.connection_manager.get_connection( + request_id, num_choices + ) + # dealer.write([b"", request_id.encode("utf-8")]) + request_ids = [f"{request_id}_{i}" for i in range(num_choices)] + for rid in request_ids: + dealer.write([b"", rid.encode("utf-8")]) + previous_num_tokens = [0] * num_choices current_waiting_time = 0 - logprob_contents = [] - completion_token_ids = [] + logprob_contents = [[] for _ in range(num_choices)] + completion_token_ids = [[] for _ in range(num_choices)] + num_cached_tokens = [0] * num_choices response_processor = ChatResponseProcessor( data_processor=self.engine_client.data_processor, enable_mm_output=self.enable_mm_output, decoder_base_url=self.tokenizer_base_url, ) - while True: + choices = [] + while num_choices > 0: if self.engine_client.check_model_weight_status(): return ErrorResponse( error=ErrorInfo( @@ -459,8 +470,6 @@ async def chat_completion_full_generator( await asyncio.sleep(0.1) continue - task_is_finished = False - generator = response_processor.process_response_chat( response, stream=False, @@ -470,9 +479,10 @@ async def chat_completion_full_generator( async for data in generator: if data.get("error_code", 200) != 200: raise ValueError("{}".format(data["error_msg"])) + idx = int(data["request_id"].split("_")[-1]) # api_server_logger.debug(f"Client {request_id} received: {data}") - previous_num_tokens += len(data["outputs"]["token_ids"]) - completion_token_ids.extend(data["outputs"]["token_ids"]) + previous_num_tokens[idx] += len(data["outputs"]["token_ids"]) + completion_token_ids[idx].extend(data["outputs"]["token_ids"]) # The logprob for handling the response output = data["outputs"] output_top_logprobs = output["top_logprobs"] @@ -481,70 +491,73 @@ async def chat_completion_full_generator( output_top_logprobs, request.logprobs, request.top_logprobs ) if logprobs_res and logprobs_res.content is not None: - logprob_contents.extend(logprobs_res.content) + logprob_contents[idx].extend(logprobs_res.content) if data["finished"]: - final_res = data - task_is_finished = True - break - if task_is_finished: - break - finally: - await self.engine_client.connection_manager.cleanup_request(request_id) - self.engine_client.semaphore.release() - api_server_logger.info(f"release {self.engine_client.semaphore.status()}") - - choices = [] - output = final_res["outputs"] - message = ChatMessage( - role="assistant", - reasoning_content=output.get("reasoning_content"), - tool_calls=output.get("tool_call"), - prompt_token_ids=prompt_token_ids if request.return_token_ids else None, - completion_token_ids=completion_token_ids if request.return_token_ids else None, - text_after_process=text_after_process if request.return_token_ids else None, - prompt_tokens=text_after_process if request.return_token_ids else None, - raw_prediction=output.get("raw_prediction") if request.return_token_ids else None, - completion_tokens=output.get("raw_prediction") if request.return_token_ids else None, - ) - - if response_processor.enable_multimodal_content(): - message.multimodal_content = output.get("multipart") - else: - message.content = output["text"] + num_choices -= 1 + if ( + output is not None + and output.get("metrics") is not None + and output.get("metrics").get("request_start_time") is not None + ): + work_process_metrics.e2e_request_latency.observe( + time.time() - output.get("metrics").get("request_start_time") + ) + message = ChatMessage( + role="assistant", + reasoning_content=output.get("reasoning_content"), + tool_calls=output.get("tool_call"), + prompt_token_ids=prompt_token_ids if request.return_token_ids else None, + # TODO 确认这里是用idx还是不是idx + completion_token_ids=completion_token_ids[idx] if request.return_token_ids else None, + text_after_process=text_after_process if request.return_token_ids else None, + prompt_tokens=text_after_process if request.return_token_ids else None, + raw_prediction=output.get("raw_prediction") if request.return_token_ids else None, + completion_tokens=output.get("raw_prediction") if request.return_token_ids else None, + ) - logprobs_full_res = None - if logprob_contents: - logprobs_full_res = LogProbs(content=logprob_contents) + if response_processor.enable_multimodal_content(): + message.multimodal_content = output.get("multipart") + else: + message.content = output["text"] - choice = ChatCompletionResponseChoice( - index=0, - message=message, - logprobs=logprobs_full_res, - finish_reason=None, - ) - has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None - max_tokens = request.max_completion_tokens or request.max_tokens - if has_no_token_limit or previous_num_tokens != max_tokens: - choice.finish_reason = "stop" - if output.get("tool_call"): - choice.finish_reason = "tool_calls" - else: - choice.finish_reason = "length" + logprobs_full_res = None + if logprob_contents[idx]: + logprobs_full_res = LogProbs(content=logprob_contents[idx]) - if final_res.get("error_msg") is not None and "Recover" in final_res["error_msg"]: - choice.finish_reason = "recover_stop" + choice = ChatCompletionResponseChoice( + index=idx, + message=message, + logprobs=logprobs_full_res, + finish_reason=None, + ) + has_no_token_limit = request.max_tokens is None and request.max_completion_tokens is None + max_tokens = request.max_completion_tokens or request.max_tokens + # output 可能没有num_cached_tokens字段 + num_cached_tokens[idx] = output.get("num_cached_tokens", 0) + if has_no_token_limit or previous_num_tokens[idx] != max_tokens: + choice.finish_reason = "stop" + if output.get("tool_call"): + choice.finish_reason = "tool_calls" + else: + choice.finish_reason = "length" - choices.append(choice) + if output.get("error_msg") is not None and "Recover" in output["error_msg"]: + choice.finish_reason = "recover_stop" + choices.append(choice) + finally: + await self.engine_client.connection_manager.cleanup_request(request_id) + self.engine_client.semaphore.release() + api_server_logger.info(f"release {self.engine_client.semaphore.status()}") num_prompt_tokens = len(prompt_token_ids) - num_generated_tokens = previous_num_tokens + num_generated_tokens = sum(previous_num_tokens) usage = UsageInfo( prompt_tokens=num_prompt_tokens, completion_tokens=num_generated_tokens, total_tokens=num_prompt_tokens + num_generated_tokens, - prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=final_res.get("num_cached_tokens", 0)), + prompt_tokens_details=PromptTokenUsageInfo(cached_tokens=sum(num_cached_tokens)), ) - work_process_metrics.e2e_request_latency.observe(time.time() - final_res["metrics"]["request_start_time"]) + choices = sorted(choices, key=lambda x: x.index) res = ChatCompletionResponse( id=request_id, created=created_time, diff --git a/fastdeploy/entrypoints/openai/serving_completion.py b/fastdeploy/entrypoints/openai/serving_completion.py index 646b282abf..1568a2a8f2 100644 --- a/fastdeploy/entrypoints/openai/serving_completion.py +++ b/fastdeploy/entrypoints/openai/serving_completion.py @@ -120,7 +120,7 @@ async def create_completion(self, request: CompletionRequest): if request_prompt_ids is not None: request_prompts = request_prompt_ids - num_choices = len(request_prompts) + num_choices = len(request_prompts) * (1 if request.n is None else request.n) api_server_logger.info(f"Start preprocessing request: req_id={request_id}), num_choices={num_choices}") prompt_batched_token_ids = [] text_after_process_list = [] @@ -142,14 +142,16 @@ async def create_completion(self, request: CompletionRequest): try: try: for idx, prompt in enumerate(request_prompts): - request_id_idx = f"{request_id}-{idx}" + request_id_idx = f"{request_id}_{idx}" current_req_dict = request.to_dict_for_infer(request_id_idx, prompt) + n_param = current_req_dict.get("n", 1) current_req_dict["arrival_time"] = time.time() prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict) # tokenize if isinstance(prompt_token_ids, np.ndarray): prompt_token_ids = prompt_token_ids.tolist() - text_after_process_list.append(current_req_dict.get("text_after_process")) - prompt_batched_token_ids.append(prompt_token_ids) + for i in range(n_param): + text_after_process_list.append(current_req_dict.get("text_after_process")) + prompt_batched_token_ids.append(prompt_token_ids) del current_req_dict except ParameterError as e: api_server_logger.error(f"OpenAIServingCompletion format error: {e}, {e.message}") @@ -211,7 +213,7 @@ async def completion_full_generator( """ dealer = None try: - request_ids = [f"{request_id}-{i}" for i in range(num_choices)] + request_ids = [f"{request_id}_{i}" for i in range(num_choices)] # create dealer dealer, response_queue = await self.engine_client.connection_manager.get_connection( request_id, num_choices @@ -250,7 +252,7 @@ async def completion_full_generator( continue for data in response: - rid = int(data["request_id"].split("-")[-1]) + rid = int(data["request_id"].split("_")[-1]) if data.get("error_code", 200) != 200: raise ValueError("{}".format(data["error_msg"])) @@ -346,7 +348,7 @@ async def completion_stream_generator( ) for i in range(num_choices): - req_id = f"{request_id}-{i}" + req_id = f"{request_id}_{i}" dealer.write([b"", req_id.encode("utf-8")]) # 发送多路请求 output_tokens = [0] * num_choices inference_start_time = [0] * num_choices @@ -384,7 +386,7 @@ async def completion_stream_generator( continue for res in response: - idx = int(res["request_id"].split("-")[-1]) + idx = int(res["request_id"].split("_")[-1]) if res.get("error_code", 200) != 200: raise ValueError("{}".format(res["error_msg"])) @@ -562,6 +564,7 @@ def request_output_to_completion_response( num_prompt_tokens += len(prompt_token_ids) + num_prompt_tokens = num_prompt_tokens // request.n usage = UsageInfo( prompt_tokens=num_prompt_tokens, completion_tokens=num_generated_tokens, diff --git a/fastdeploy/entrypoints/openai/utils.py b/fastdeploy/entrypoints/openai/utils.py index 99212e0ee9..9ca90a393b 100644 --- a/fastdeploy/entrypoints/openai/utils.py +++ b/fastdeploy/entrypoints/openai/utils.py @@ -124,7 +124,9 @@ async def _listen_connection(self, dealer, conn_index): response = msgpack.unpackb(raw_data[-1]) request_id = response[-1]["request_id"] if "cmpl" == request_id[:4]: - request_id = request_id.rsplit("-", 1)[0] + request_id = request_id.rsplit("_", 1)[0] + elif "chatcmpl" == request_id[:8]: + request_id = request_id.rsplit("_", 1)[0] async with self.lock: if request_id in self.request_map: await self.request_map[request_id].put(response) diff --git a/fastdeploy/multimodal/utils.py b/fastdeploy/multimodal/utils.py index 6e2360fced..ea45bd710f 100644 --- a/fastdeploy/multimodal/utils.py +++ b/fastdeploy/multimodal/utils.py @@ -19,7 +19,6 @@ import ipaddress import mimetypes import os -import random import socket import subprocess import tempfile @@ -103,6 +102,7 @@ def http_to_pil_image(url): return pil_image + def base64_to_pil_image(base64_string): """base64_to_pil_image""" image_bytes = base64.b64decode(base64_string) diff --git a/tests/ce/server/test_completions.py b/tests/ce/server/test_completions.py index 2dae312a29..188586d7d4 100644 --- a/tests/ce/server/test_completions.py +++ b/tests/ce/server/test_completions.py @@ -13,6 +13,7 @@ COMPLETIONS_URL = URL.replace("/v1/chat/completions", "/v1/completions") + def test_completion_total_tokens(): data = { "prompt": "你是谁", @@ -48,10 +49,9 @@ def test_completion_echo_stream_one_prompt_rti(): "max_tokens": 2, "return_token_ids": True, } - + payload = build_request_payload(TEMPLATE, data) resp = send_request(COMPLETIONS_URL, payload, stream=True) - last_data = None # 初始化计数器 counter = 0 second_data = None @@ -60,7 +60,7 @@ def test_completion_echo_stream_one_prompt_rti(): break if line.strip() == "" or not line.startswith("data: "): continue - line = line[len("data: "):] + line = line[len("data: ") :] stream_data = json.loads(line) counter += 1 if counter == 2: # 当计数器为2时,保存第二包数据 @@ -81,12 +81,11 @@ def test_completion_echo_stream_one_prompt(): "stream": True, "stream_options": {"include_usage": True, "continuous_usage_stats": True}, "echo": True, - "max_tokens": 2 + "max_tokens": 2, } - + payload = build_request_payload(TEMPLATE, data) resp = send_request(COMPLETIONS_URL, payload, stream=True) - last_data = None # 初始化计数器 counter = 0 second_data = None @@ -95,7 +94,7 @@ def test_completion_echo_stream_one_prompt(): break if line.strip() == "" or not line.startswith("data: "): continue - line = line[len("data: "):] + line = line[len("data: ") :] stream_data = json.loads(line) counter += 1 if counter == 1: # 当计数器为1时,保存第一包数据 @@ -112,20 +111,16 @@ def test_completion_echo_stream_more_prompt(): 测试echo参数在流式回复中,且设置为回复多个prompt """ data = { - "prompt": ["水果的营养价值是如何的?","水的化学式是什么?"], + "prompt": ["水果的营养价值是如何的?", "水的化学式是什么?"], "stream": True, "stream_options": {"include_usage": True, "continuous_usage_stats": True}, "echo": True, "max_tokens": 2, - "return_token_ids": True + "return_token_ids": True, } - + payload = build_request_payload(TEMPLATE, data) resp = send_request(COMPLETIONS_URL, payload, stream=True) - last_data = None - # 初始化计数器 - counter = 0 - second_data = None # 初始化字典来存储每个index的第二包数据 second_data_by_index = {0: None, 1: None} # 初始化字典来记录每个index的包计数 @@ -136,9 +131,9 @@ def test_completion_echo_stream_more_prompt(): break if line.strip() == "" or not line.startswith("data: "): continue - line = line[len("data: "):] + line = line[len("data: ") :] stream_data = json.loads(line) - + for choice in stream_data.get("choices", []): index = choice.get("index") if index in packet_count_by_index: @@ -183,13 +178,13 @@ def test_completion_echo_more_prompt(): """ data = { "stream": False, - "prompt": ["水果的营养价值是如何的?","水的化学式是什么?"], + "prompt": ["水果的营养价值是如何的?", "水的化学式是什么?"], "echo": True, - "max_tokens": 100 + "max_tokens": 100, } payload = build_request_payload(TEMPLATE, data) response = send_request(COMPLETIONS_URL, payload).json() - + text_0 = response["choices"][0]["text"] text_1 = response["choices"][1]["text"] assert data["prompt"][0] in text_0, "echo回显不正确" @@ -204,12 +199,8 @@ def test_completion_finish_length(): """ 非流式回复中,因达到max_token截断检查finish_reasoning参数 """ - data = { - "stream": False, - "prompt": "水果的营养价值是如何的?", - "max_tokens": 10 - } - + data = {"stream": False, "prompt": "水果的营养价值是如何的?", "max_tokens": 10} + payload = build_request_payload(TEMPLATE, data) response = send_request(COMPLETIONS_URL, payload).json() @@ -221,15 +212,10 @@ def test_completion_finish_stop(): """ 非流式回复中,模型自然回复完成,检查finish_reasoning参数 """ - data = { - "stream": False, - "prompt": "简短的回答我:苹果是水果吗?" - } - + data = {"stream": False, "prompt": "简短的回答我:苹果是水果吗?"} + payload = build_request_payload(TEMPLATE, data) response = send_request(COMPLETIONS_URL, payload).json() finish_reason = response["choices"][0]["finish_reason"] assert finish_reason == "stop", "无任何中介,finish_reason不为stop" - - \ No newline at end of file diff --git a/tests/entrypoints/openai/test_max_streaming_tokens.py b/tests/entrypoints/openai/test_max_streaming_tokens.py index 61d5f88d45..17b7c138ae 100644 --- a/tests/entrypoints/openai/test_max_streaming_tokens.py +++ b/tests/entrypoints/openai/test_max_streaming_tokens.py @@ -92,36 +92,43 @@ def test_edge_case_zero_value(self): async def test_integration_with_chat_stream_generator(self, mock_processor_class, mock_logger): response_data = [ { + "request_id": "test_request_id_0", "outputs": {"token_ids": [1], "text": "a", "top_logprobs": None}, "metrics": {"first_token_time": 0.1, "inference_start_time": 0.1}, "finished": False, }, { + "request_id": "test_request_id_0", "outputs": {"token_ids": [2], "text": "b", "top_logprobs": None}, "metrics": {"arrival_time": 0.2, "first_token_time": None}, "finished": False, }, { + "request_id": "test_request_id_0", "outputs": {"token_ids": [3], "text": "c", "top_logprobs": None}, "metrics": {"arrival_time": 0.3, "first_token_time": None}, "finished": False, }, { + "request_id": "test_request_id_0", "outputs": {"token_ids": [4], "text": "d", "top_logprobs": None}, "metrics": {"arrival_time": 0.4, "first_token_time": None}, "finished": False, }, { + "request_id": "test_request_id_0", "outputs": {"token_ids": [5], "text": "e", "top_logprobs": None}, "metrics": {"arrival_time": 0.5, "first_token_time": None}, "finished": False, }, { + "request_id": "test_request_id_0", "outputs": {"token_ids": [6], "text": "f", "top_logprobs": None}, "metrics": {"arrival_time": 0.6, "first_token_time": None}, "finished": False, }, { + "request_id": "test_request_id_0", "outputs": {"token_ids": [7], "text": "g", "top_logprobs": None}, "metrics": {"arrival_time": 0.7, "first_token_time": None, "request_start_time": 0.1}, "finished": True, @@ -200,13 +207,13 @@ async def test_integration_with_completion_stream_generator(self, mock_logger): response_data = [ [ { - "request_id": "test-request-id-0", + "request_id": "test-request-id_0", "outputs": {"token_ids": [1], "text": "a", "top_logprobs": None}, "metrics": {"first_token_time": 0.1, "inference_start_time": 0.1}, "finished": False, }, { - "request_id": "test-request-id-0", + "request_id": "test-request-id_0", "outputs": {"token_ids": [2], "text": "b", "top_logprobs": None}, "metrics": {"arrival_time": 0.2, "first_token_time": None}, "finished": False, @@ -214,7 +221,7 @@ async def test_integration_with_completion_stream_generator(self, mock_logger): ], [ { - "request_id": "test-request-id-0", + "request_id": "test-request-id_0", "outputs": {"token_ids": [7], "text": "g", "top_logprobs": None}, "metrics": {"arrival_time": 0.7, "first_token_time": None, "request_start_time": 0.1}, "finished": True, diff --git a/tests/entrypoints/openai/test_serving_completion.py b/tests/entrypoints/openai/test_serving_completion.py index f3941c80b7..76bd9bf127 100644 --- a/tests/entrypoints/openai/test_serving_completion.py +++ b/tests/entrypoints/openai/test_serving_completion.py @@ -97,6 +97,7 @@ def test_request_output_to_completion_response(self): request: CompletionRequest = Mock() request.prompt = "Hello, world!" request.echo = True + request.n = 2 request_id = "test_request_id" created_time = 1655136000 model_name = "test_model" diff --git a/tests/entrypoints/test_engine_client.py b/tests/entrypoints/test_engine_client.py index e11fa54939..3f2ea7e2e3 100644 --- a/tests/entrypoints/test_engine_client.py +++ b/tests/entrypoints/test_engine_client.py @@ -18,6 +18,7 @@ async def asyncSetUp(self): async def test_add_request(self): request = { + "request_id": "test-request-id", "chat_template_kwargs": {"enable_thinking": True}, "prompt_token_ids": [1], "chat_template": "Hello", diff --git a/tests/utils/test_custom_chat_template.py b/tests/utils/test_custom_chat_template.py index 1311289ff6..2223515bcd 100644 --- a/tests/utils/test_custom_chat_template.py +++ b/tests/utils/test_custom_chat_template.py @@ -67,6 +67,7 @@ async def mock_chat_completion_full_generator( return prompt_token_ids async def mock_format_and_add_data(current_req_dict): + current_req_dict["text_after_process"] = "你好" return current_req_dict self.chat_completion_handler.chat_completion_full_generator = mock_chat_completion_full_generator @@ -94,6 +95,7 @@ async def mock_chat_completion_full_generator( return prompt_token_ids async def mock_format_and_add_data(current_req_dict): + current_req_dict["text_after_process"] = "你好" return current_req_dict self.chat_completion_handler.chat_completion_full_generator = mock_chat_completion_full_generator