diff --git a/fastdeploy/demo/openai_demo.py b/fastdeploy/demo/openai_demo.py index 308fa440ff..c93e9a518a 100644 --- a/fastdeploy/demo/openai_demo.py +++ b/fastdeploy/demo/openai_demo.py @@ -17,44 +17,41 @@ import openai ip = "0.0.0.0" -service_http_port = "9809" # 服务配置的 +service_http_port = "13116" # 服务配置的 client = openai.Client(base_url=f"http://{ip}:{service_http_port}/v1", api_key="EMPTY_API_KEY") -# 非流式返回 -response = client.completions.create( - model="default", - prompt="Hello, how are you?", - max_tokens=64, - stream=False, -) +# # 非流式返回 +# response = client.completions.create( +# model="default", +# prompt="Hello, how are you?", +# max_tokens=64, +# stream=False, +# ) -print(response.choices[0].text) -print("\n") +# print(response.choices[0].text) +# print("\n") # 流式返回 -response = client.completions.create( - model="default", - prompt="Hello, how are you?", - max_tokens=100, - stream=True, -) +# response = client.completions.create( +# model="default", +# prompt="Hello, how are you?", +# max_tokens=100, +# stream=True, +# ) -for chunk in response: - print(chunk.choices[0].text, end="") -print("\n") +# for chunk in response: +# print(chunk.choices[0].text, end="") +# print("\n") # Chat completion # 非流式返回 response = client.chat.completions.create( model="default", messages=[ - {"role": "user", "content": "Hello, who are you"}, - {"role": "assistant", "content": "I'm a helpful AI assistant."}, - {"role": "user", "content": "List 3 countries and their capitals."}, + {"role": "user", "content": "北京天安门在哪里?"}, ], temperature=1, - max_tokens=64, stream=False, ) @@ -63,19 +60,19 @@ # # 流式返回 -response = client.chat.completions.create( - model="default", - messages=[ - {"role": "user", "content": "Hello, who are you"}, - {"role": "assistant", "content": "I'm a helpful AI assistant."}, - {"role": "user", "content": "List 3 countries and their capitals."}, - ], - temperature=1, - max_tokens=64, - stream=True, -) +# response = client.chat.completions.create( +# model="default", +# messages=[ +# {"role": "user", "content": "Hello, who are you"}, +# {"role": "assistant", "content": "I'm a helpful AI assistant."}, +# {"role": "user", "content": "List 3 countries and their capitals."}, +# ], +# temperature=1, +# max_tokens=64, +# stream=True, +# ) -for chunk in response: - if chunk.choices[0].delta is not None: - print(chunk.choices[0].delta.content, end="") -print("\n") +# for chunk in response: +# if chunk.choices[0].delta is not None: +# print(chunk.choices[0].delta.content, end="") +# print("\n") diff --git a/fastdeploy/engine/pooling_params.py b/fastdeploy/engine/pooling_params.py index 13d3f01e48..ddb544cdb9 100644 --- a/fastdeploy/engine/pooling_params.py +++ b/fastdeploy/engine/pooling_params.py @@ -26,15 +26,13 @@ from fastdeploy.config import ModelConfig -class PoolingParams: +class PoolingParams(msgspec.Struct, omit_defaults=True, array_like=True): """API parameters for pooling models. Attributes: normalize: Whether to normalize the embeddings outputs. dimensions: Reduce the dimensions of embeddings if model support matryoshka representation. - activation: Whether to apply activation function to - the classification outputs. softmax: Whether to apply softmax to the reward outputs. step_tag_id: Step tag ID for process reward models to identify specific steps in multi-step reasoning tasks. diff --git a/fastdeploy/model_executor/layers/pool/metadata.py b/fastdeploy/model_executor/layers/pool/metadata.py index 2dd4d13fe4..be28723ae8 100644 --- a/fastdeploy/model_executor/layers/pool/metadata.py +++ b/fastdeploy/model_executor/layers/pool/metadata.py @@ -69,8 +69,10 @@ def build_pooling_cursor(num_scheduled_tokens: list[int], prompt_lens: paddle.Te n_seq = len(num_scheduled_tokens) index = list(range(n_seq)) - num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens, device="cpu") - cumsum = paddle.zeros([n_seq + 1], dtype="int64", place=paddle.CPUPlace()) + num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens, place=paddle.CPUPlace()) + cumsum = paddle.zeros([n_seq + 1], dtype="int64") + if cumsum.place.is_gpu_place(): + cumsum = cumsum.cpu() paddle.cumsum(num_scheduled_tokens, axis=0, out=cumsum[1:]) if device == "gpu": cumsum_device = cumsum.cuda() diff --git a/fastdeploy/model_executor/layers/pooler.py b/fastdeploy/model_executor/layers/pooler.py index 06b18b4672..86695a580a 100644 --- a/fastdeploy/model_executor/layers/pooler.py +++ b/fastdeploy/model_executor/layers/pooler.py @@ -332,6 +332,29 @@ def from_pooling_type(pooling_type: PoolingType) -> "PoolingMethod": return MeanPool() raise NotImplementedError(f"Unsupported method: {pooling_type}") + @abstractmethod + def get_supported_tasks(self) -> Set[PoolingTask]: + raise NotImplementedError + + def get_pooling_updates(self, task: PoolingTask) -> PoolingParamsUpdate: + return PoolingParamsUpdate() + + @abstractmethod + def forward_all( + self, + hidden_states: paddle.Tensor, + pooling_cursor: PoolingCursor, + ) -> Union[list[paddle.Tensor], paddle.Tensor]: + raise NotImplementedError + + def forward( + self, + hidden_states: paddle.Tensor, + pooling_metadata: PoolingMetadata, + ) -> Union[list[paddle.Tensor], paddle.Tensor]: + pooling_cursor = pooling_metadata.pooling_cursor + return self.forward_all(hidden_states, pooling_cursor) + class LastPool(PoolingMethod): @@ -495,6 +518,7 @@ def forward( hidden_states: Union[paddle.Tensor, list[paddle.Tensor]], pooling_metadata: PoolingMetadata, ) -> PoolerOutput: + print("self.pooling", self.pooling) pooled_data = self.pooling(hidden_states, pooling_metadata) pooled_data = self.head(pooled_data, pooling_metadata) return build_output(pooled_data) diff --git a/fastdeploy/model_executor/models/adapters.py b/fastdeploy/model_executor/models/adapters.py index d56c1dcb1f..1f2590acdd 100644 --- a/fastdeploy/model_executor/models/adapters.py +++ b/fastdeploy/model_executor/models/adapters.py @@ -22,7 +22,6 @@ from fastdeploy.config import ModelConfig from fastdeploy.model_executor.layers.activation import get_act_fn -from fastdeploy.model_executor.models.interfaces_base import is_pooling_model from fastdeploy.transformer_utils.config import get_hf_file_to_dict _T = TypeVar("_T", bound=type[nn.Layer]) @@ -191,6 +190,8 @@ def as_embedding_model(cls: _T) -> _T: please implement your own model if this is not the case. """ # Avoid modifying existing embedding models + from fastdeploy.model_executor.models.interfaces_base import is_pooling_model + if is_pooling_model(cls): return cls diff --git a/fastdeploy/model_executor/models/interfaces_base.py b/fastdeploy/model_executor/models/interfaces_base.py index b7ece5fe69..77533209d9 100644 --- a/fastdeploy/model_executor/models/interfaces_base.py +++ b/fastdeploy/model_executor/models/interfaces_base.py @@ -12,9 +12,18 @@ # See the License for the specific language governing permissions and # limitations under the License. -from typing import Type +from typing import ClassVar, Literal, Protocol, Type +import paddle from paddle import nn +from typing_extensions import TypeVar, runtime_checkable + +from fastdeploy.config import FDConfig +from fastdeploy.model_executor.forward_meta import ForwardMeta +from fastdeploy.model_executor.layers.pooler import Pooler + +T = TypeVar("T", default=paddle.Tensor) +T_co = TypeVar("T_co", default=paddle.Tensor, covariant=True) def is_text_generation_model(model_cls: Type[nn.Layer]) -> bool: @@ -24,13 +33,7 @@ def is_text_generation_model(model_cls: Type[nn.Layer]) -> bool: def is_pooling_model(model_cls: Type[nn.Layer]) -> bool: - class_name = model_cls.__name__ - pooling_indicators = ["Embedding", "ForSequenceClassification"] - return ( - any(indicator in class_name for indicator in pooling_indicators) - or hasattr(model_cls, "is_embedding_model") - and model_cls.is_embedding_model - ) + return getattr(model_cls, "is_pooling_model", False) def is_multimodal_model(class_name: str) -> bool: @@ -52,3 +55,48 @@ def get_default_pooling_type(model_cls: Type[nn.Layer] = None) -> str: if model_cls is not None: return getattr(model_cls, "default_pooling_type", "LAST") return "LAST" + + +@runtime_checkable +class FdModel(Protocol[T_co]): + """The interface required for all models in FastDeploy.""" + + def __init__( + self, + fd_config: FDConfig, + prefix: str = "", + ) -> None: + pass + + def forward( + self, + ids_remove_padding: paddle.Tensor, + forward_metadata: ForwardMeta, + ) -> T_co: + pass + + +class FdModelForPooling(FdModel[T_co], Protocol[T_co]): + """The interface required for all pooling models in FastDeploy.""" + + is_pooling_model: ClassVar[Literal[True]] = True + """ + A flag that indicates this model supports pooling. + + Note: + There is no need to redefine this flag if this class is in the + MRO of your model class. + """ + + default_pooling_type: ClassVar[str] = "LAST" + """ + Indicates the + [fastdeploy.config.PoolerConfig.pooling_type][] + to use by default. + + You can use the + [fastdeploy.model_executor.models.interfaces_base.default_pooling_type][] + decorator to conveniently set this field. + """ + pooler: Pooler + """The pooler is only called on TP rank 0.""" diff --git a/fastdeploy/model_executor/models/qwen3.py b/fastdeploy/model_executor/models/qwen3.py index 47ed104bab..17d2adec2a 100644 --- a/fastdeploy/model_executor/models/qwen3.py +++ b/fastdeploy/model_executor/models/qwen3.py @@ -303,7 +303,9 @@ def load_weights(self, weights_iterator) -> None: if model_param_name not in params_dict: continue param = params_dict[model_param_name] + weight_loader = getattr(param, "weight_loader", default_weight_loader(self.fd_config)) + weight_loader(param, loaded_weight, shard_id) break diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index 01cc699cb6..70e57cc869 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -22,6 +22,7 @@ from fastdeploy import envs from fastdeploy.config import SpeculativeConfig +from fastdeploy.output.pooler import PoolerOutput from fastdeploy.platforms import current_platform if current_platform.is_iluvatar(): @@ -183,6 +184,7 @@ def _zmq_send_text_outputs(zmq_client: ZmqIpcClient, output_tokens: np.ndarray, def post_process_normal( + pooler_output: PoolerOutput, sampler_output: SamplerOutput, model_output: ModelOutputData, share_inputs: Dict[str, paddle.Tensor], @@ -192,7 +194,9 @@ def post_process_normal( zmq_client: ZmqIpcClient = None, ) -> ModelRunnerOutput: """Post-processing steps after completing a single token generation.""" - # handle vl: + + # handle vl + print("model_output.enable_thinking", model_output.enable_thinking) if model_output.enable_thinking: exists_think_end = sampler_output.sampled_token_ids == model_output.think_end_id paddle.assign( @@ -384,6 +388,7 @@ def post_process_specualate( def post_process( + pooler_output: PoolerOutput, sampler_output: SamplerOutput, model_output: ModelOutputData, share_inputs: Dict[str, paddle.Tensor], @@ -398,7 +403,14 @@ def post_process( post_process_specualate(model_output, save_each_rank, skip_save_output) else: post_process_normal( - sampler_output, model_output, share_inputs, block_size, save_each_rank, skip_save_output, zmq_client + pooler_output, + sampler_output, + model_output, + share_inputs, + block_size, + save_each_rank, + skip_save_output, + zmq_client, ) @@ -413,6 +425,7 @@ def step_cuda( TODO(gongshaotian): normalization name """ + print("speculative_config.method", speculative_config.method) if speculative_config.method is not None: if DISABLE_RECOVER: speculate_step_reschedule( @@ -505,6 +518,7 @@ def step_cuda( ) else: if DISABLE_RECOVER: + print("step_reschedule前面", share_inputs["seq_lens_this_time"]) step_reschedule( share_inputs["stop_flags"], share_inputs["seq_lens_this_time"], @@ -531,8 +545,10 @@ def step_cuda( block_size, enc_dec_block_num, ) + print("step_reschedule后面", share_inputs["seq_lens_this_time"]) else: if enable_prefix_caching: + print("step_system_cache前面", int((share_inputs["seq_lens_this_time"] > 0).sum())) step_system_cache( share_inputs["stop_flags"], share_inputs["seq_lens_this_time"], @@ -560,7 +576,9 @@ def step_cuda( block_size, enc_dec_block_num, ) + print("step_system_cache后面", int((share_inputs["seq_lens_this_time"] > 0).sum())) else: + print("step_paddle前面", share_inputs["seq_lens_this_time"]) step_paddle( share_inputs["stop_flags"], share_inputs["seq_lens_this_time"], @@ -587,6 +605,7 @@ def step_cuda( block_size, enc_dec_block_num, ) + print("step_paddle后面", share_inputs["seq_lens_this_time"]) def rebuild_padding( diff --git a/fastdeploy/worker/gpu_model_runner.py b/fastdeploy/worker/gpu_model_runner.py index 3817b49326..1e7a15409b 100644 --- a/fastdeploy/worker/gpu_model_runner.py +++ b/fastdeploy/worker/gpu_model_runner.py @@ -16,7 +16,7 @@ import os import time -from typing import List, Optional +from typing import List, Optional, cast import numpy as np import paddle @@ -74,10 +74,18 @@ import zmq from fastdeploy import envs +from fastdeploy.engine.pooling_params import PoolingParams +from fastdeploy.engine.tasks import PoolingTask from fastdeploy.input.ernie4_5_vl_processor import DataProcessor from fastdeploy.inter_communicator import ZmqIpcClient from fastdeploy.model_executor.forward_meta import ForwardMeta +from fastdeploy.model_executor.layers.pool.metadata import PoolingMetadata from fastdeploy.model_executor.models.ernie4_5_vl.modeling_resampler import ScatterOp +from fastdeploy.model_executor.models.interfaces_base import ( + FdModelForPooling, + is_pooling_model, +) +from fastdeploy.output.pooler import PoolerOutput from fastdeploy.worker.model_runner_base import ModelRunnerBase from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput @@ -100,6 +108,7 @@ def __init__( self.speculative_decoding = self.speculative_method is not None self.enable_logprob = fd_config.model_config.enable_logprob self.enable_early_stop = self.fd_config.early_stop_config.enable_early_stop + self.is_pooling_model = self.fd_config.model_config.runner_type == "pooling" # VL model config: if self.enable_mm: @@ -709,6 +718,26 @@ def get_input_length_list( return input_length_list, max_dec_len_list, block_num + def get_supported_pooling_tasks(self) -> list[PoolingTask]: + model = self.get_model() + if not is_pooling_model(model): + return [] + + supported_tasks = list(model.pooler.get_supported_tasks()) + + if self.cache_config.enable_prefix_caching and "encode" in supported_tasks: + supported_tasks.remove("encode") + + logger.warning( + "Chunked prefill is not supported with " + "encode task which using ALL pooling. " + "Please turn off chunked prefill by " + "`--no-enable-chunked-prefill` before using it." + ) + + # score not support + return supported_tasks + def _dummy_prefill_inputs(self, input_length_list: List[int], max_dec_len_list: List[int], block_num: int): """Set dummy prefill inputs to share_inputs""" batch_size = len(input_length_list) @@ -1251,6 +1280,171 @@ def initialize_attn_backend(self) -> None: self.attn_backends.append(attn_backend) + def _dummy_pooler_run_task( + self, + hidden_states: paddle.Tensor, + task: PoolingTask, + ) -> PoolerOutput: + num_tokens = hidden_states.shape[0] + max_num_seqs = self.scheduler_config.max_num_seqs + num_reqs = min(num_tokens, max_num_seqs) + min_tokens_per_req = num_tokens // num_reqs + num_scheduled_tokens_list = [min_tokens_per_req] * num_reqs + num_scheduled_tokens_list[-1] += num_tokens % num_reqs + assert sum(num_scheduled_tokens_list) == num_tokens + assert len(num_scheduled_tokens_list) == num_reqs + + req_num_tokens = num_tokens // num_reqs + + dummy_prompt_lens = paddle.to_tensor(num_scheduled_tokens_list, dtype="int32") + dummy_token_ids = paddle.zeros( + [num_reqs, req_num_tokens], + dtype="int32", + ) + model = cast(FdModelForPooling, self.get_model()) + dummy_pooling_params = PoolingParams(task=task) + to_update = model.pooler.get_pooling_updates(task) + to_update.apply(dummy_pooling_params) + + dummy_metadata = PoolingMetadata( + prompt_lens=dummy_prompt_lens, + prompt_token_ids=dummy_token_ids, + pooling_params=[dummy_pooling_params] * num_reqs, + ) + dummy_metadata.build_pooling_cursor(num_scheduled_tokens_list, device=hidden_states.place) + + try: + return model.pooler(hidden_states=hidden_states, pooling_metadata=dummy_metadata) + except RuntimeError as e: + if "out of memory" in str(e): + raise RuntimeError( + "CUDA out of memory occurred when warming up pooler " + f"({task=}) with {num_reqs} dummy requests. Please try " + "lowering `max_num_seqs` or `gpu_memory_utilization` when " + "initializing the engine." + ) from e + else: + raise e + + def _dummy_pooler_run( + self, + hidden_states: paddle.Tensor, + ) -> PoolerOutput: + output_size = dict[PoolingTask, float]() + for task in self.get_supported_pooling_tasks(): + output = self._dummy_pooler_run_task(hidden_states, task) + output_size[task] = output.get_data_nbytes() + del output + + max_task = max(output_size.items(), key=lambda x: x[1])[0] + final_output = self._dummy_pooler_run_task(hidden_states, max_task) + + return final_output + + def _dummy_sampler_run( + self, + hidden_states: paddle.Tensor, + model_output: paddle.Tensor, + ) -> paddle.Tensor: + logits = self.model.compute_logits(hidden_states) + + if not self.speculative_decoding: + set_value_by_flags_and_idx( + self.share_inputs["pre_ids"], + self.share_inputs["input_ids"], + self.share_inputs["seq_lens_this_time"], + self.share_inputs["seq_lens_encoder"], + self.share_inputs["seq_lens_decoder"], + self.share_inputs["step_idx"], + self.share_inputs["stop_flags"], + ) + sampler_output = self.sampler(logits, self.sampling_metadata) + if self.parallel_config.tensor_parallel_size > 1: + paddle.distributed.broadcast( + sampler_output.sampled_token_ids, + self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, + group=self.parallel_config.tp_group, + ) + else: + self.sampler( + logits, + self.sampling_metadata, + self.parallel_config.max_model_len, + self.share_inputs, + ) + sampler_output = None + if self.parallel_config.tensor_parallel_size > 1: + paddle.distributed.broadcast( + self.share_inputs["accept_tokens"], + self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, + group=self.parallel_config.tp_group, + ) + paddle.distributed.broadcast( + self.share_inputs["accept_num"], + self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, + group=self.parallel_config.tp_group, + ) + paddle.distributed.broadcast( + self.share_inputs["step_idx"], + self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, + group=self.parallel_config.tp_group, + ) + paddle.distributed.broadcast( + self.share_inputs["stop_flags"], + self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, + group=self.parallel_config.tp_group, + ) + # 5. post process + model_output_data = ModelOutputData( + next_tokens=self.share_inputs["next_tokens"], + stop_flags=self.share_inputs["stop_flags"], + step_idx=self.share_inputs["step_idx"], + max_dec_len=self.share_inputs["max_dec_len"], + pre_ids=self.share_inputs["pre_ids"], + seq_lens_this_time=self.share_inputs["seq_lens_this_time"], + eos_token_id=self.share_inputs["eos_token_id"], + not_need_stop=self.share_inputs["not_need_stop"], + input_ids=self.share_inputs["input_ids"], + stop_nums=self.share_inputs["stop_nums"], + seq_lens_encoder=self.share_inputs["seq_lens_encoder"], + seq_lens_decoder=self.share_inputs["seq_lens_decoder"], + is_block_step=self.share_inputs["is_block_step"], + full_hidden_states=model_output, + msg_queue_id=self.parallel_config.msg_queue_id, + mp_rank=self.parallel_config.tensor_parallel_rank, + use_ep=self.parallel_config.use_ep, + draft_tokens=(self.share_inputs["draft_tokens"] if self.speculative_decoding else None), + actual_draft_token_num=( + self.share_inputs["actual_draft_token_num"] if self.speculative_decoding else None + ), + accept_tokens=(self.share_inputs["accept_tokens"] if self.speculative_decoding else None), + accept_num=(self.share_inputs["accept_num"] if self.speculative_decoding else None), + enable_thinking=(self.share_inputs["enable_thinking"] if self.enable_mm else None), + think_end_id=(getattr(self.model_config, "think_end_id", -1) if self.enable_mm else -1), + need_think_end=(self.share_inputs["need_think_end"] if self.enable_mm else None), + reasoning_index=(self.share_inputs["reasoning_index"] if self.enable_mm else None), + stop_token_ids=self.share_inputs["stop_seqs"], + stop_seqs_len=self.share_inputs["stop_seqs_len"], + ) + + post_process( + sampler_output=sampler_output, + model_output=model_output_data, + share_inputs=self.share_inputs, + block_size=self.cache_config.block_size, + speculative_decoding=self.speculative_decoding, + skip_save_output=True, + zmq_client=self.zmq_client, + ) + + if self.speculative_decoding: + if self.speculative_method == "mtp": + self.proposer.run(full_hidden_states=model_output) + else: + self.proposer.run(share_inputs=self.share_inputs) + + return sampler_output + def _dummy_run( self, num_tokens: paddle.Tensor, @@ -1285,6 +1479,7 @@ def _dummy_run( batch_size=batch_size, expected_decode_len=expected_decode_len, ) + i = 1 while True: # 1. Initialize forward meta and attention meta data @@ -1319,109 +1514,12 @@ def _dummy_run( self.parallel_config.max_model_len, ) - logits = None - if hasattr(self.model, "is_pooling_model") and self.model.is_pooling_model: - pass - else: - # 4. Execute spec decode - logits = self.model.compute_logits(hidden_states) - - if not self.speculative_decoding: - set_value_by_flags_and_idx( - self.share_inputs["pre_ids"], - self.share_inputs["input_ids"], - self.share_inputs["seq_lens_this_time"], - self.share_inputs["seq_lens_encoder"], - self.share_inputs["seq_lens_decoder"], - self.share_inputs["step_idx"], - self.share_inputs["stop_flags"], - ) - sampler_output = self.sampler(logits, self.sampling_metadata) - if self.parallel_config.tensor_parallel_size > 1: - paddle.distributed.broadcast( - sampler_output.sampled_token_ids, - self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, - group=self.parallel_config.tp_group, - ) + if self.is_pooling_model: + self._dummy_pooler_run(hidden_states) else: - self.sampler( - logits, - self.sampling_metadata, - self.parallel_config.max_model_len, - self.share_inputs, - ) - sampler_output = None - if self.parallel_config.tensor_parallel_size > 1: - paddle.distributed.broadcast( - self.share_inputs["accept_tokens"], - self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, - group=self.parallel_config.tp_group, - ) - paddle.distributed.broadcast( - self.share_inputs["accept_num"], - self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, - group=self.parallel_config.tp_group, - ) - paddle.distributed.broadcast( - self.share_inputs["step_idx"], - self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, - group=self.parallel_config.tp_group, - ) - paddle.distributed.broadcast( - self.share_inputs["stop_flags"], - self.parallel_config.data_parallel_rank * self.parallel_config.tensor_parallel_size, - group=self.parallel_config.tp_group, - ) - - # 5. post process - model_output_data = ModelOutputData( - next_tokens=self.share_inputs["next_tokens"], - stop_flags=self.share_inputs["stop_flags"], - step_idx=self.share_inputs["step_idx"], - max_dec_len=self.share_inputs["max_dec_len"], - pre_ids=self.share_inputs["pre_ids"], - seq_lens_this_time=self.share_inputs["seq_lens_this_time"], - eos_token_id=self.share_inputs["eos_token_id"], - not_need_stop=self.share_inputs["not_need_stop"], - input_ids=self.share_inputs["input_ids"], - stop_nums=self.share_inputs["stop_nums"], - seq_lens_encoder=self.share_inputs["seq_lens_encoder"], - seq_lens_decoder=self.share_inputs["seq_lens_decoder"], - is_block_step=self.share_inputs["is_block_step"], - full_hidden_states=model_output, - msg_queue_id=self.parallel_config.msg_queue_id, - mp_rank=self.parallel_config.tensor_parallel_rank, - use_ep=self.parallel_config.use_ep, - draft_tokens=(self.share_inputs["draft_tokens"] if self.speculative_decoding else None), - actual_draft_token_num=( - self.share_inputs["actual_draft_token_num"] if self.speculative_decoding else None - ), - accept_tokens=(self.share_inputs["accept_tokens"] if self.speculative_decoding else None), - accept_num=(self.share_inputs["accept_num"] if self.speculative_decoding else None), - enable_thinking=(self.share_inputs["enable_thinking"] if self.enable_mm else None), - think_end_id=(getattr(self.model_config, "think_end_id", -1) if self.enable_mm else -1), - need_think_end=(self.share_inputs["need_think_end"] if self.enable_mm else None), - reasoning_index=(self.share_inputs["reasoning_index"] if self.enable_mm else None), - stop_token_ids=self.share_inputs["stop_seqs"], - stop_seqs_len=self.share_inputs["stop_seqs_len"], - ) - - post_process( - sampler_output=sampler_output, - model_output=model_output_data, - share_inputs=self.share_inputs, - block_size=self.cache_config.block_size, - speculative_decoding=self.speculative_decoding, - skip_save_output=True, - zmq_client=self.zmq_client, - ) - - if self.speculative_decoding: - if self.speculative_method == "mtp": - self.proposer.run(full_hidden_states=model_output) - else: - self.proposer.run(share_inputs=self.share_inputs) + self._dummy_sampler_run(hidden_states, model_output) + i += 1 # 7. Updata 'infer_seed' and step_cuda() self.share_inputs["infer_seed"].add_(self.infer_seed_increment) self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED @@ -1432,7 +1530,8 @@ def _dummy_run( self.speculative_config, self.cache_config.enable_prefix_caching, ) - + print("i", i) + print("self.share_inputs[seq]", int((self.share_inputs["seq_lens_this_time"] > 0).sum())) if int((self.share_inputs["seq_lens_this_time"] > 0).sum()) == 0: break diff --git a/fastdeploy/worker/output.py b/fastdeploy/worker/output.py index 6d820a873a..955eabf563 100644 --- a/fastdeploy/worker/output.py +++ b/fastdeploy/worker/output.py @@ -276,3 +276,10 @@ class ModelRunnerOutput: [num_reqs, num_spec_tokens] """ spec_token_ids: Optional[list[list[int]]] + + """ + [num_reqs, hidden_size] + """ + pooler_output: list[Optional[paddle.Tensor]] + + is_pooling_model: bool = False diff --git a/tests/pooling/test_embedding.py b/tests/pooling/test_embedding.py index a548494dca..3e96d6687c 100644 --- a/tests/pooling/test_embedding.py +++ b/tests/pooling/test_embedding.py @@ -16,6 +16,11 @@ import os import sys +current_dir = os.path.dirname(os.path.abspath(__file__)) +project_root = os.path.abspath(os.path.join(current_dir, "..")) +if project_root not in sys.path: + sys.path.insert(0, project_root) + import paddle import pytest