Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
73 changes: 35 additions & 38 deletions fastdeploy/demo/openai_demo.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,41 @@
import openai

ip = "0.0.0.0"
service_http_port = "9809" # 服务配置的
service_http_port = "13116" # 服务配置的

client = openai.Client(base_url=f"http://{ip}:{service_http_port}/v1", api_key="EMPTY_API_KEY")

# 非流式返回
response = client.completions.create(
model="default",
prompt="Hello, how are you?",
max_tokens=64,
stream=False,
)
# # 非流式返回
# response = client.completions.create(
# model="default",
# prompt="Hello, how are you?",
# max_tokens=64,
# stream=False,
# )

print(response.choices[0].text)
print("\n")
# print(response.choices[0].text)
# print("\n")

# 流式返回
response = client.completions.create(
model="default",
prompt="Hello, how are you?",
max_tokens=100,
stream=True,
)
# response = client.completions.create(
# model="default",
# prompt="Hello, how are you?",
# max_tokens=100,
# stream=True,
# )

for chunk in response:
print(chunk.choices[0].text, end="")
print("\n")
# for chunk in response:
# print(chunk.choices[0].text, end="")
# print("\n")

# Chat completion
# 非流式返回
response = client.chat.completions.create(
model="default",
messages=[
{"role": "user", "content": "Hello, who are you"},
{"role": "assistant", "content": "I'm a helpful AI assistant."},
{"role": "user", "content": "List 3 countries and their capitals."},
{"role": "user", "content": "北京天安门在哪里?"},
],
temperature=1,
max_tokens=64,
stream=False,
)

Expand All @@ -63,19 +60,19 @@


# # 流式返回
response = client.chat.completions.create(
model="default",
messages=[
{"role": "user", "content": "Hello, who are you"},
{"role": "assistant", "content": "I'm a helpful AI assistant."},
{"role": "user", "content": "List 3 countries and their capitals."},
],
temperature=1,
max_tokens=64,
stream=True,
)
# response = client.chat.completions.create(
# model="default",
# messages=[
# {"role": "user", "content": "Hello, who are you"},
# {"role": "assistant", "content": "I'm a helpful AI assistant."},
# {"role": "user", "content": "List 3 countries and their capitals."},
# ],
# temperature=1,
# max_tokens=64,
# stream=True,
# )

for chunk in response:
if chunk.choices[0].delta is not None:
print(chunk.choices[0].delta.content, end="")
print("\n")
# for chunk in response:
# if chunk.choices[0].delta is not None:
# print(chunk.choices[0].delta.content, end="")
# print("\n")
4 changes: 1 addition & 3 deletions fastdeploy/engine/pooling_params.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@
from fastdeploy.config import ModelConfig


class PoolingParams:
class PoolingParams(msgspec.Struct, omit_defaults=True, array_like=True):
"""API parameters for pooling models.

Attributes:
normalize: Whether to normalize the embeddings outputs.
dimensions: Reduce the dimensions of embeddings
if model support matryoshka representation.
activation: Whether to apply activation function to
the classification outputs.
softmax: Whether to apply softmax to the reward outputs.
step_tag_id: Step tag ID for process reward models to identify
specific steps in multi-step reasoning tasks.
Expand Down
6 changes: 4 additions & 2 deletions fastdeploy/model_executor/layers/pool/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,10 @@ def build_pooling_cursor(num_scheduled_tokens: list[int], prompt_lens: paddle.Te

n_seq = len(num_scheduled_tokens)
index = list(range(n_seq))
num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens, device="cpu")
cumsum = paddle.zeros([n_seq + 1], dtype="int64", place=paddle.CPUPlace())
num_scheduled_tokens = paddle.to_tensor(num_scheduled_tokens, place=paddle.CPUPlace())
cumsum = paddle.zeros([n_seq + 1], dtype="int64")
if cumsum.place.is_gpu_place():
cumsum = cumsum.cpu()
paddle.cumsum(num_scheduled_tokens, axis=0, out=cumsum[1:])
if device == "gpu":
cumsum_device = cumsum.cuda()
Expand Down
24 changes: 24 additions & 0 deletions fastdeploy/model_executor/layers/pooler.py
Original file line number Diff line number Diff line change
Expand Up @@ -332,6 +332,29 @@ def from_pooling_type(pooling_type: PoolingType) -> "PoolingMethod":
return MeanPool()
raise NotImplementedError(f"Unsupported method: {pooling_type}")

@abstractmethod
def get_supported_tasks(self) -> Set[PoolingTask]:
raise NotImplementedError

def get_pooling_updates(self, task: PoolingTask) -> PoolingParamsUpdate:
return PoolingParamsUpdate()

@abstractmethod
def forward_all(
self,
hidden_states: paddle.Tensor,
pooling_cursor: PoolingCursor,
) -> Union[list[paddle.Tensor], paddle.Tensor]:
raise NotImplementedError

def forward(
self,
hidden_states: paddle.Tensor,
pooling_metadata: PoolingMetadata,
) -> Union[list[paddle.Tensor], paddle.Tensor]:
pooling_cursor = pooling_metadata.pooling_cursor
return self.forward_all(hidden_states, pooling_cursor)


class LastPool(PoolingMethod):

Expand Down Expand Up @@ -495,6 +518,7 @@ def forward(
hidden_states: Union[paddle.Tensor, list[paddle.Tensor]],
pooling_metadata: PoolingMetadata,
) -> PoolerOutput:
print("self.pooling", self.pooling)
pooled_data = self.pooling(hidden_states, pooling_metadata)
pooled_data = self.head(pooled_data, pooling_metadata)
return build_output(pooled_data)
Expand Down
3 changes: 2 additions & 1 deletion fastdeploy/model_executor/models/adapters.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

from fastdeploy.config import ModelConfig
from fastdeploy.model_executor.layers.activation import get_act_fn
from fastdeploy.model_executor.models.interfaces_base import is_pooling_model
from fastdeploy.transformer_utils.config import get_hf_file_to_dict

_T = TypeVar("_T", bound=type[nn.Layer])
Expand Down Expand Up @@ -191,6 +190,8 @@ def as_embedding_model(cls: _T) -> _T:
please implement your own model if this is not the case.
"""
# Avoid modifying existing embedding models
from fastdeploy.model_executor.models.interfaces_base import is_pooling_model

if is_pooling_model(cls):
return cls

Expand Down
64 changes: 56 additions & 8 deletions fastdeploy/model_executor/models/interfaces_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,18 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from typing import Type
from typing import ClassVar, Literal, Protocol, Type

import paddle
from paddle import nn
from typing_extensions import TypeVar, runtime_checkable

from fastdeploy.config import FDConfig
from fastdeploy.model_executor.forward_meta import ForwardMeta
from fastdeploy.model_executor.layers.pooler import Pooler

T = TypeVar("T", default=paddle.Tensor)
T_co = TypeVar("T_co", default=paddle.Tensor, covariant=True)


def is_text_generation_model(model_cls: Type[nn.Layer]) -> bool:
Expand All @@ -24,13 +33,7 @@ def is_text_generation_model(model_cls: Type[nn.Layer]) -> bool:


def is_pooling_model(model_cls: Type[nn.Layer]) -> bool:
class_name = model_cls.__name__
pooling_indicators = ["Embedding", "ForSequenceClassification"]
return (
any(indicator in class_name for indicator in pooling_indicators)
or hasattr(model_cls, "is_embedding_model")
and model_cls.is_embedding_model
)
return getattr(model_cls, "is_pooling_model", False)


def is_multimodal_model(class_name: str) -> bool:
Expand All @@ -52,3 +55,48 @@ def get_default_pooling_type(model_cls: Type[nn.Layer] = None) -> str:
if model_cls is not None:
return getattr(model_cls, "default_pooling_type", "LAST")
return "LAST"


@runtime_checkable
class FdModel(Protocol[T_co]):
"""The interface required for all models in FastDeploy."""

def __init__(
self,
fd_config: FDConfig,
prefix: str = "",
) -> None:
pass

def forward(
self,
ids_remove_padding: paddle.Tensor,
forward_metadata: ForwardMeta,
) -> T_co:
pass


class FdModelForPooling(FdModel[T_co], Protocol[T_co]):
"""The interface required for all pooling models in FastDeploy."""

is_pooling_model: ClassVar[Literal[True]] = True
"""
A flag that indicates this model supports pooling.

Note:
There is no need to redefine this flag if this class is in the
MRO of your model class.
"""

default_pooling_type: ClassVar[str] = "LAST"
"""
Indicates the
[fastdeploy.config.PoolerConfig.pooling_type][]
to use by default.

You can use the
[fastdeploy.model_executor.models.interfaces_base.default_pooling_type][]
decorator to conveniently set this field.
"""
pooler: Pooler
"""The pooler is only called on TP rank 0."""
2 changes: 2 additions & 0 deletions fastdeploy/model_executor/models/qwen3.py
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,9 @@ def load_weights(self, weights_iterator) -> None:
if model_param_name not in params_dict:
continue
param = params_dict[model_param_name]

weight_loader = getattr(param, "weight_loader", default_weight_loader(self.fd_config))

weight_loader(param, loaded_weight, shard_id)

break
Expand Down
23 changes: 21 additions & 2 deletions fastdeploy/model_executor/pre_and_post_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

from fastdeploy import envs
from fastdeploy.config import SpeculativeConfig
from fastdeploy.output.pooler import PoolerOutput
from fastdeploy.platforms import current_platform

if current_platform.is_iluvatar():
Expand Down Expand Up @@ -183,6 +184,7 @@ def _zmq_send_text_outputs(zmq_client: ZmqIpcClient, output_tokens: np.ndarray,


def post_process_normal(
pooler_output: PoolerOutput,
sampler_output: SamplerOutput,
model_output: ModelOutputData,
share_inputs: Dict[str, paddle.Tensor],
Expand All @@ -192,7 +194,9 @@ def post_process_normal(
zmq_client: ZmqIpcClient = None,
) -> ModelRunnerOutput:
"""Post-processing steps after completing a single token generation."""
# handle vl:

# handle vl
print("model_output.enable_thinking", model_output.enable_thinking)
if model_output.enable_thinking:
exists_think_end = sampler_output.sampled_token_ids == model_output.think_end_id
paddle.assign(
Expand Down Expand Up @@ -384,6 +388,7 @@ def post_process_specualate(


def post_process(
pooler_output: PoolerOutput,
sampler_output: SamplerOutput,
model_output: ModelOutputData,
share_inputs: Dict[str, paddle.Tensor],
Expand All @@ -398,7 +403,14 @@ def post_process(
post_process_specualate(model_output, save_each_rank, skip_save_output)
else:
post_process_normal(
sampler_output, model_output, share_inputs, block_size, save_each_rank, skip_save_output, zmq_client
pooler_output,
sampler_output,
model_output,
share_inputs,
block_size,
save_each_rank,
skip_save_output,
zmq_client,
)


Expand All @@ -413,6 +425,7 @@ def step_cuda(
TODO(gongshaotian): normalization name
"""

print("speculative_config.method", speculative_config.method)
if speculative_config.method is not None:
if DISABLE_RECOVER:
speculate_step_reschedule(
Expand Down Expand Up @@ -505,6 +518,7 @@ def step_cuda(
)
else:
if DISABLE_RECOVER:
print("step_reschedule前面", share_inputs["seq_lens_this_time"])
step_reschedule(
share_inputs["stop_flags"],
share_inputs["seq_lens_this_time"],
Expand All @@ -531,8 +545,10 @@ def step_cuda(
block_size,
enc_dec_block_num,
)
print("step_reschedule后面", share_inputs["seq_lens_this_time"])
else:
if enable_prefix_caching:
print("step_system_cache前面", int((share_inputs["seq_lens_this_time"] > 0).sum()))
step_system_cache(
share_inputs["stop_flags"],
share_inputs["seq_lens_this_time"],
Expand Down Expand Up @@ -560,7 +576,9 @@ def step_cuda(
block_size,
enc_dec_block_num,
)
print("step_system_cache后面", int((share_inputs["seq_lens_this_time"] > 0).sum()))
else:
print("step_paddle前面", share_inputs["seq_lens_this_time"])
step_paddle(
share_inputs["stop_flags"],
share_inputs["seq_lens_this_time"],
Expand All @@ -587,6 +605,7 @@ def step_cuda(
block_size,
enc_dec_block_num,
)
print("step_paddle后面", share_inputs["seq_lens_this_time"])


def rebuild_padding(
Expand Down
Loading