From 9d5d11a4ff99a6c5e6b915715a87fb290d5dd98f Mon Sep 17 00:00:00 2001 From: guozr Date: Mon, 25 Aug 2025 16:39:16 +0800 Subject: [PATCH 1/4] npu adapter --- build.sh | 6 +- .../model_executor/layers/activation.py | 2 +- .../layers/attention/__init__.py | 2 + .../layers/attention/npu_fapa_attn_backend.py | 212 ++++ .../layers/backends/npu/__init__.py | 3 + .../backends/npu/quantization/weight_only.py | 69 ++ fastdeploy/model_executor/layers/linear.py | 4 + .../model_executor/layers/normalization.py | 3 + .../layers/quantization/weight_only.py | 3 + .../model_executor/layers/rotary_embedding.py | 15 + .../sample/ops/apply_penalty_multi_scores.py | 14 + .../model_executor/layers/sample/sampler.py | 35 + fastdeploy/model_executor/models/utils.py | 2 +- fastdeploy/model_executor/ops/npu/__init__.py | 26 +- .../model_executor/ops/npu/fapa_attention.py | 44 + .../model_executor/ops/npu/fused_rms_norm.py | 26 + .../model_executor/ops/npu/get_output.py | 12 + .../ops/npu/get_padding_offset.py | 12 + .../ops/npu/get_token_penalty_multi_scores.py | 28 + .../model_executor/ops/npu/rebuild_padding.py | 33 + .../model_executor/ops/npu/save_output.py | 6 + .../ops/npu/set_stop_value_multi_ends.py | 12 + .../model_executor/ops/npu/sparse_moe.py | 76 ++ .../model_executor/ops/npu/step_paddle.py | 99 ++ .../model_executor/ops/npu/top_p_sampling.py | 44 + .../model_executor/ops/npu/update_inputs.py | 41 + .../ops/npu/weight_only_linear.py | 15 + .../model_executor/ops/npu/weight_quantize.py | 19 + .../model_executor/pre_and_post_process.py | 28 + fastdeploy/output/token_processor.py | 5 + fastdeploy/platforms/npu.py | 31 +- fastdeploy/worker/npu_model_runner.py | 905 ++++++++++++++++++ fastdeploy/worker/npu_worker.py | 113 +++ fastdeploy/worker/worker_process.py | 4 + requirements_npu.txt | 40 + setup.py | 2 + 36 files changed, 1980 insertions(+), 11 deletions(-) create mode 100644 fastdeploy/model_executor/layers/attention/npu_fapa_attn_backend.py create mode 100644 fastdeploy/model_executor/layers/backends/npu/quantization/weight_only.py create mode 100644 fastdeploy/model_executor/ops/npu/fapa_attention.py create mode 100644 fastdeploy/model_executor/ops/npu/fused_rms_norm.py create mode 100644 fastdeploy/model_executor/ops/npu/get_output.py create mode 100644 fastdeploy/model_executor/ops/npu/get_padding_offset.py create mode 100644 fastdeploy/model_executor/ops/npu/get_token_penalty_multi_scores.py create mode 100644 fastdeploy/model_executor/ops/npu/rebuild_padding.py create mode 100644 fastdeploy/model_executor/ops/npu/save_output.py create mode 100644 fastdeploy/model_executor/ops/npu/set_stop_value_multi_ends.py create mode 100644 fastdeploy/model_executor/ops/npu/sparse_moe.py create mode 100644 fastdeploy/model_executor/ops/npu/step_paddle.py create mode 100644 fastdeploy/model_executor/ops/npu/top_p_sampling.py create mode 100644 fastdeploy/model_executor/ops/npu/update_inputs.py create mode 100644 fastdeploy/model_executor/ops/npu/weight_only_linear.py create mode 100644 fastdeploy/model_executor/ops/npu/weight_quantize.py create mode 100644 fastdeploy/worker/npu_model_runner.py create mode 100644 fastdeploy/worker/npu_worker.py create mode 100644 requirements_npu.txt diff --git a/build.sh b/build.sh index 86ec3cedbd..cd4fb5cf69 100644 --- a/build.sh +++ b/build.sh @@ -104,8 +104,7 @@ function copy_ops(){ is_npu=`$python -c "import paddle; print(paddle.is_compiled_with_custom_device('npu'))"` if [ "$is_npu" = "True" ]; then DEVICE_TYPE="npu" - cp -r ${OPS_TMP_DIR}/${WHEEL_NAME}/* ../fastdeploy/model_executor/ops/npu - echo -e "npu ops have been copy to fastdeploy" + echo -e "npu ops are already present in fastdeploy" return fi @@ -153,6 +152,7 @@ function build_and_install_ops() { echo -e "${BLUE}[build]${NONE} build and install fastdeploy_ops..." TMP_DIR_REAL_PATH=`readlink -f ${OPS_TMP_DIR}` is_xpu=`$python -c "import paddle; print(paddle.is_compiled_with_xpu())"` + is_npu=`$python -c "import paddle; print(paddle.is_compiled_with_custom_device('npu'))"` if [ "$is_xpu" = "True" ]; then cd xpu_ops/src bash build.sh ${TMP_DIR_REAL_PATH} @@ -164,6 +164,8 @@ function build_and_install_ops() { FD_BUILDING_ARCS=${FD_BUILDING_ARCS} FD_CPU_USE_BF16=True ${python} setup_ops.py install --install-lib ${OPS_TMP_DIR} fi find ${OPS_TMP_DIR} -type f -name "*.o" -exec rm -f {} \; + elif [ "$is_npu" = "True" ]; then + echo -e "${BLUE}[build]${NONE} skipping NPU ops build (already present)" elif [ "$FD_CPU_USE_BF16" == "false" ]; then if [ "$FD_BUILDING_ARCS" == "" ]; then ${python} setup_ops.py install --install-lib ${OPS_TMP_DIR} diff --git a/fastdeploy/model_executor/layers/activation.py b/fastdeploy/model_executor/layers/activation.py index 5e426e7efd..d0e86cf671 100644 --- a/fastdeploy/model_executor/layers/activation.py +++ b/fastdeploy/model_executor/layers/activation.py @@ -71,7 +71,7 @@ def __init__( or current_platform.is_maca() ): self.forward = self.forward_cuda - elif current_platform.is_gcu(): + elif current_platform.is_gcu() or current_platform.is_npu(): self.forward = self.forward_gcu else: raise NotImplementedError diff --git a/fastdeploy/model_executor/layers/attention/__init__.py b/fastdeploy/model_executor/layers/attention/__init__.py index c4c1801d43..94db704f05 100644 --- a/fastdeploy/model_executor/layers/attention/__init__.py +++ b/fastdeploy/model_executor/layers/attention/__init__.py @@ -22,6 +22,7 @@ from .mla_attention_backend import MLAAttentionBackend from .native_paddle_backend import PaddleNativeAttnBackend from .xpu_attn_backend import XPUAttentionBackend +from .npu_fapa_attn_backend import NpuFaPaAttentionBackend __all__ = [ "AttentionBackend", @@ -34,4 +35,5 @@ "IluvatarAttnBackend", "BlockAttentionBackend", "Attention", + "NpuFaPaAttentionBackend" ] diff --git a/fastdeploy/model_executor/layers/attention/npu_fapa_attn_backend.py b/fastdeploy/model_executor/layers/attention/npu_fapa_attn_backend.py new file mode 100644 index 0000000000..41ae849a63 --- /dev/null +++ b/fastdeploy/model_executor/layers/attention/npu_fapa_attn_backend.py @@ -0,0 +1,212 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +from __future__ import annotations + +import os +from dataclasses import dataclass, field +from typing import TYPE_CHECKING, List, Optional +from paddle import core +from fastdeploy.config import FDConfig +import paddle +from fastdeploy.model_executor.layers.attention.ops import ( + get_block_shape_and_split_kv_block, init_signal_layerwise, + open_shm_and_get_meta_signal) +from fastdeploy.model_executor.ops.npu import fused_fapa_attention_npu + +if TYPE_CHECKING: + from paddle._typing.dtype_like import _DTypeLiteral + +# from fastdeploy.config import LLMConfig +from fastdeploy.model_executor.layers.attention import Attention +from fastdeploy.model_executor.layers.attention.base_attention_backend import ( + AttentionBackend, AttentionMetadata) + + +@dataclass +class NpuFaPaAttentionMetadata(AttentionMetadata): + """ + NpuFaPaAttentionMetadata + """ + + max_len_kv: paddle.Tensor = None + set_max_lengths: int = -1 + encoder_batch_ids: paddle.Tensor = None + encoder_tile_ids_per_batch: paddle.Tensor = None + encoder_num_blocks: paddle.Tensor = None + kv_batch_ids: paddle.Tensor = None + kv_tile_ids_per_batch: paddle.Tensor = None + kv_num_blocks: paddle.Tensor = None + decoder_batch_ids: paddle.Tensor = None + decoder_tile_ids_per_batch: paddle.Tensor = None + decoder_num_blocks: paddle.Tensor = None + + _dtype: _DTypeLiteral = paddle.bfloat16 + encoder_max_partition_size: int = 32768 + max_partition_size: int = 32768 + block_tables: Optional[paddle.Tensor] = None + rotary_embs: Optional[paddle.Tensor] = None + attn_mask: Optional[paddle.Tensor] = None + encoder_block_shape_q: Optional[paddle.Tensor] = None + decoder_block_shape_q: Optional[paddle.Tensor] = None + _fuse_kernel_compute_dtype: str = "bf16" + + # pd_disaggregation + kv_signal_metadata: Optional[paddle.Tensor] = None + kv_signal_data_list: List[paddle.Tensor] = field(default_factory=list) + + +class NpuFaPaAttentionBackend(AttentionBackend): + """ + NpuFaPaAttentionBackend backend implementation. + """ + + def __init__(self, fd_config: FDConfig, kv_num_heads: int, num_heads: int, head_dim: int): + """ + NpuFaPaAttentionBackend __init__ + """ + super().__init__() + self.attention_metadata: NpuFaPaAttentionMetadata = None + # TODO(gongshaotian): Use fd_config parameters in the correct location + self.block_size = fd_config.parallel_config.block_size + self.max_seq_len = fd_config.parallel_config.max_model_len + self.rope_theta = ( + 10000.0 + if fd_config.model_config.rope_theta is None + else fd_config.model_config.rope_theta + ) + self.rope_3d = getattr(fd_config.model_config, "rope_3d", False) + self.causal = getattr(fd_config.model_config, "causal", True) + self.speculative_method: str = fd_config.speculative_config.method + self.use_speculate: bool = self.speculative_method is not None + self.speculate_max_draft_token_num: int = fd_config.speculative_config.num_speculative_tokens + self.keep_pd_step_flag: bool = fd_config.speculative_config.model_type == "mtp" + self.rank = fd_config.parallel_config.tensor_parallel_rank + + self.kv_num_heads = kv_num_heads + self.num_heads = num_heads + self.head_dim = head_dim + self.num_layers: int = fd_config.model_config.num_hidden_layers + + # pd_disaggregation + self.use_pd_disaggregation = int(os.getenv("FLAGS_use_pd_disaggregation", 0)) + self.start_layer_index = fd_config.model_config.start_layer_index + + def init_attention_metadata(self, forward_meta): + """Initialize attntion metadata hence all layers in the forward pass can reuse it.""" + metadata = NpuFaPaAttentionMetadata() + metadata.encoder_block_shape_q = 64 + metadata.decoder_block_shape_q = 16 + metadata.max_partition_size = 32768 + metadata.encoder_max_partition_size = 32768 + metadata._dtype = paddle.get_default_dtype() + if metadata._dtype == "bfloat16": + metadata._fuse_kernel_compute_dtype = "bf16" + elif metadata._dtype == "float16": + metadata._fuse_kernel_compute_dtype = "fp16" + elif metadata._dtype == "float32": + metadata._fuse_kernel_compute_dtype = "fp32" + metadata.block_tables = forward_meta.block_tables + metadata.rotary_embs = forward_meta.rotary_embs + metadata.attn_mask = forward_meta.attn_mask + metadata.pre_caches_length = forward_meta.pre_caches_length + + # # FIXME: + # ( + # metadata.encoder_batch_ids, + # metadata.encoder_tile_ids_per_batch, + # metadata.encoder_num_blocks, + # metadata.kv_batch_ids, + # metadata.kv_tile_ids_per_batch, + # metadata.kv_num_blocks, + # metadata.decoder_batch_ids, + # metadata.decoder_tile_ids_per_batch, + # metadata.decoder_num_blocks, + # metadata.max_len_kv, + # metadata.set_max_lengths, + # ) = get_block_shape_and_split_kv_block( + # forward_meta.seq_lens_encoder, + # forward_meta.seq_lens_decoder, + # forward_meta.seq_lens_this_time, + # forward_meta.cum_offsets, + # metadata.encoder_block_shape_q, + # metadata.decoder_block_shape_q, + # self.num_heads // self.kv_num_heads, + # self.block_size, + # self.speculate_max_draft_token_num + 1, + # ) + + # pd_disaggregation + metadata.kv_signal_data_list = [None] * self.num_layers + if self.use_pd_disaggregation: + metadata.kv_signal_metadata = open_shm_and_get_meta_signal( + self.rank, self.keep_pd_step_flag + ) + self.attention_metadata = metadata + + def get_attntion_meta(self): + """get_attntion_meta""" + return self.attention_metadata + + def get_kv_cache_shape( + self, + max_num_blocks: int, + kv_cache_quant_type: str = None, + + ): + """ + Caculate kv cache shape + """ + return (max_num_blocks, self.kv_num_heads, self.block_size, self.head_dim) + + def forward_mixed( + self, + q, + k, + v, + qkv, + compressed_kv, + k_pe, + layer: Attention, + forward_meta, + ): + """ + forward_mixed + """ + metadata = self.attention_metadata + + if self.use_pd_disaggregation: + metadata.kv_signal_data_list[layer.layer_id] = init_signal_layerwise( + metadata.kv_signal_metadata, layer.layer_id + self.start_layer_index + ) + # FIXME: guozr 这里改成bfloat16 + + + res = fused_fapa_attention_npu( + qkv, + metadata.rotary_embs, + forward_meta.caches[2 * layer.layer_id], + forward_meta.caches[2 * layer.layer_id + 1], + forward_meta.seq_lens_encoder, + forward_meta.seq_lens_decoder, + metadata.block_tables, + self.num_heads, + self.kv_num_heads, + self.head_dim, + self.max_seq_len, + self.block_size, + ) + return res[0] diff --git a/fastdeploy/model_executor/layers/backends/npu/__init__.py b/fastdeploy/model_executor/layers/backends/npu/__init__.py index 5f7a59bc8c..c722d58c28 100644 --- a/fastdeploy/model_executor/layers/backends/npu/__init__.py +++ b/fastdeploy/model_executor/layers/backends/npu/__init__.py @@ -15,3 +15,6 @@ """ npu backend methods """ +from .quantization.weight_only import NPUWeightOnlyLinearMethod + +__all__ = ['NPUWeightOnlyLinearMethod'] \ No newline at end of file diff --git a/fastdeploy/model_executor/layers/backends/npu/quantization/weight_only.py b/fastdeploy/model_executor/layers/backends/npu/quantization/weight_only.py new file mode 100644 index 0000000000..a30097f354 --- /dev/null +++ b/fastdeploy/model_executor/layers/backends/npu/quantization/weight_only.py @@ -0,0 +1,69 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +import paddle +from fastdeploy.model_executor.layers.quantization.weight_only import ( + WeightOnlyConfig, WeightOnlyLinearMethod) +from fastdeploy.model_executor.ops.npu import fused_linear_op as weight_only_linear +from fastdeploy.model_executor.ops.npu import npu_quant_weight +# import inspect + +class NPUWeightOnlyLinearMethod(WeightOnlyLinearMethod): + """ + Weight only quantization method for linear layer on NPU + """ + + def __init__( + self, + quant_config: WeightOnlyConfig, + ) -> None: + super().__init__(quant_config) + + def create_weights(self, layer): + """ + Create weights for linear layer on NPU + """ + + linear_weight_scale_shape = [layer.embed_dim] + if hasattr(layer, "linear_weight_shape"): + if isinstance(layer.linear_weight_shape, list): + layer_weight_shape = layer.linear_weight_shape + linear_weight_scale_shape = layer_weight_shape[:1] + + layer.linear_weight_scale = layer.create_parameter( + shape=linear_weight_scale_shape, + dtype="bfloat16", + is_bias=False, + ) + + def process_loaded_weights(self, layer, weight) -> None: + """ + loaded_weights using npu special quantization + """ + + quanted_weight_tensor, weight_scale_tensor = npu_quant_weight(weight) + layer.linear_weight.set_value(quanted_weight_tensor.T) + layer.linear_weight_scale.set_value( + weight_scale_tensor.astype(paddle.get_default_dtype()) + ) + + def apply(self, layer, x): + linear_out = weight_only_linear( + x, + weight=layer.linear_weight.T, + weight_scale=layer.linear_weight_scale, + ) + return linear_out diff --git a/fastdeploy/model_executor/layers/linear.py b/fastdeploy/model_executor/layers/linear.py index eb908bc0ae..cbfa61dc10 100644 --- a/fastdeploy/model_executor/layers/linear.py +++ b/fastdeploy/model_executor/layers/linear.py @@ -108,6 +108,7 @@ def __init__( or current_platform.is_gcu() or current_platform.is_dcu() or current_platform.is_maca() + or current_platform.is_npu() ): self.forward = self.forward_cuda else: @@ -555,6 +556,9 @@ def load_weight(self, state_dict: dict): if self.fd_config.quant_config: self.quant_method.process_loaded_weights(self, weight_tensor) else: + # Handle dtype conversion for NPU compatibility + if self.weight.dtype != weight_tensor.dtype: #FIXME: guozr 这里可能问题所在 + weight_tensor = weight_tensor.cast(self.weight.dtype) self.weight.set_value(weight_tensor) def load_state_dict(self, state_dict: dict): diff --git a/fastdeploy/model_executor/layers/normalization.py b/fastdeploy/model_executor/layers/normalization.py index dff17321ba..21a84ecddd 100644 --- a/fastdeploy/model_executor/layers/normalization.py +++ b/fastdeploy/model_executor/layers/normalization.py @@ -24,6 +24,9 @@ if current_platform.is_gcu(): from fastdeploy.model_executor.ops.gcu import fused_add_rms_norm, rms_norm +elif current_platform.is_npu(): + from fastdeploy.model_executor.ops.npu import rms_norm_npu as fused_rms_norm + from paddle.incubate.nn.functional import fused_layer_norm else: from paddle.incubate.nn.functional import fused_layer_norm, fused_rms_norm diff --git a/fastdeploy/model_executor/layers/quantization/weight_only.py b/fastdeploy/model_executor/layers/quantization/weight_only.py index 4825faaf77..49a508e575 100644 --- a/fastdeploy/model_executor/layers/quantization/weight_only.py +++ b/fastdeploy/model_executor/layers/quantization/weight_only.py @@ -104,6 +104,9 @@ def get_quant_method(self, layer) -> Optional[QuantMethodBase]: else: return GPUWeightOnlyLinearMethod(self) + elif current_platform.is_npu(): + from fastdeploy.model_executor.layers.backends import NPUWeightOnlyLinearMethod + return NPUWeightOnlyLinearMethod(self) else: if isinstance(layer, FusedMoE): if layer.use_method == "cutlass": diff --git a/fastdeploy/model_executor/layers/rotary_embedding.py b/fastdeploy/model_executor/layers/rotary_embedding.py index c0e2b5a14d..068f33cab5 100644 --- a/fastdeploy/model_executor/layers/rotary_embedding.py +++ b/fastdeploy/model_executor/layers/rotary_embedding.py @@ -268,6 +268,19 @@ def get_rope_xpu( return rotary_emb.to("xpu") +def get_rope_npu( + rotary_dim: int, + base: 10000.0, + position_ids, + model_config: ModelConfig, + partial_rotary_factor=1, +): + with CpuGuard(): + position_ids = position_ids.cpu() + rotary_emb = get_rope_impl(rotary_dim, base, position_ids, + model_config, partial_rotary_factor) + return rotary_emb.to('npu') + def get_rope( rotary_dim: int, base: 10000.0, @@ -295,6 +308,8 @@ def get_rope( """ if current_platform.is_xpu(): return get_rope_xpu(rotary_dim, base, position_ids, model_config, partial_rotary_factor) + elif current_platform.is_npu(): + return get_rope_npu(rotary_dim, base, position_ids, model_config, partial_rotary_factor) else: return get_rope_impl(rotary_dim, base, position_ids, model_config, partial_rotary_factor) diff --git a/fastdeploy/model_executor/layers/sample/ops/apply_penalty_multi_scores.py b/fastdeploy/model_executor/layers/sample/ops/apply_penalty_multi_scores.py index e66db93ba3..e8a34a0195 100644 --- a/fastdeploy/model_executor/layers/sample/ops/apply_penalty_multi_scores.py +++ b/fastdeploy/model_executor/layers/sample/ops/apply_penalty_multi_scores.py @@ -136,6 +136,20 @@ def apply_penalty_multi_scores( min_dec_lens, eos_token_ids, ) + elif current_platform.is_npu(): + from fastdeploy.model_executor.ops.npu import get_token_penalty_multi_scores_npu + logits = get_token_penalty_multi_scores_npu( + pre_token_ids, + logits, + repetition_penalties, + frequency_penalties, + presence_penalties, + temperature, + bad_words_token_ids, + step_idx, + min_dec_lens, + eos_token_ids, + ) else: raise NotImplementedError diff --git a/fastdeploy/model_executor/layers/sample/sampler.py b/fastdeploy/model_executor/layers/sample/sampler.py index cece8f8700..62eb7c4d2e 100644 --- a/fastdeploy/model_executor/layers/sample/sampler.py +++ b/fastdeploy/model_executor/layers/sample/sampler.py @@ -180,6 +180,8 @@ def __init__(self, fd_config: FDConfig = None): or current_platform.is_maca() ): self.forward = self.forward_cuda + elif current_platform.is_npu(): + self.forward = self.forward_npu else: raise NotImplementedError @@ -306,6 +308,39 @@ def forward_cuda( ) return sampler_output + + def forward_npu( + self, + logits: paddle.Tensor, + sampling_metadata: SamplingMetadata, + ) -> paddle.Tensor: + """ + """ + from fastdeploy.model_executor.ops.npu.top_p_sampling import top_p_sampling_npu + logits = apply_penalty_multi_scores( + sampling_metadata.pre_token_ids, + sampling_metadata.prompt_ids, + sampling_metadata.prompt_lens, + logits, + sampling_metadata.repetition_penalties, + sampling_metadata.frequency_penalties, + sampling_metadata.presence_penalties, + sampling_metadata.temperature, + sampling_metadata.bad_words_token_ids, + sampling_metadata.step_idx, + sampling_metadata.min_dec_lens, + sampling_metadata.eos_token_ids, + ) + probs = F.softmax(logits) + + _, next_tokens = top_p_sampling_npu(probs, sampling_metadata.top_p) + + sampler_output = SamplerOutput( + sampled_token_ids=next_tokens, + logprobs_tensors=None, + ) + + return sampler_output class SpeculativeSampler(nn.Layer): diff --git a/fastdeploy/model_executor/models/utils.py b/fastdeploy/model_executor/models/utils.py index 1d2f21a824..9d22de1d59 100644 --- a/fastdeploy/model_executor/models/utils.py +++ b/fastdeploy/model_executor/models/utils.py @@ -237,7 +237,7 @@ def convert_ndarray_dtype(np_array: np.ndarray, target_dtype: str) -> np.ndarray if ( source_dtype == "uint16" and target_dtype == "bfloat16" - and paddle.is_compiled_with_custom_device("iluvatar_gpu") + and (paddle.is_compiled_with_custom_device("iluvatar_gpu") or paddle.is_compiled_with_custom_device("npu")) ): return np_array.view(dtype=target_dtype) if source_dtype == "uint16" or target_dtype == "bfloat16": diff --git a/fastdeploy/model_executor/ops/npu/__init__.py b/fastdeploy/model_executor/ops/npu/__init__.py index 2977f2a9c5..ec3e99b7e6 100644 --- a/fastdeploy/model_executor/ops/npu/__init__.py +++ b/fastdeploy/model_executor/ops/npu/__init__.py @@ -15,16 +15,30 @@ from fastdeploy.import_ops import import_custom_ops, rename_imported_op +from .fapa_attention import fused_fapa_attention_npu +from .fused_rms_norm import rms_norm_npu +from .get_padding_offset import get_padding_offset +from .rebuild_padding import rebuild_padding +from .save_output import save_output +from .get_output import get_output +from .set_stop_value_multi_ends import set_stop_value_multi_ends +from .step_paddle import step_paddle_npu +from .update_inputs import update_inputs_npu +from .weight_only_linear import fused_linear_op +from .get_token_penalty_multi_scores import get_token_penalty_multi_scores_npu +from .top_p_sampling import top_p_sampling_npu +from .weight_quantize import npu_quant_weight + PACKAGE = "fastdeploy.model_executor.ops.npu" -import_custom_ops(PACKAGE, ".fastdeploy_ops", globals()) +# import_custom_ops(PACKAGE, ".fastdeploy_ops", globals()) rename_imported_op( old_name="set_value_by_flags_and_idx_v2", new_name="set_value_by_flags_and_idx", global_ns=globals(), ) -rename_imported_op( - old_name="set_stop_value_multi_ends_v2", - new_name="set_stop_value_multi_ends", - global_ns=globals(), -) +# rename_imported_op( +# old_name="set_stop_value_multi_ends_v2", +# new_name="set_stop_value_multi_ends", +# global_ns=globals(), +# ) diff --git a/fastdeploy/model_executor/ops/npu/fapa_attention.py b/fastdeploy/model_executor/ops/npu/fapa_attention.py new file mode 100644 index 0000000000..24647f036b --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/fapa_attention.py @@ -0,0 +1,44 @@ +import inspect + +import paddle +from paddle.base import core + + +def fused_fapa_attention_npu( + qkv_weight, + rope_emb, + cache_k, + cache_v, + seq_lens_encoder, + seq_lens_decoder, + block_tables, + q_num_head, + kv_num_head, + head_dim, + max_seq_len, + block_size, +): + + rope_emb=paddle.cast(rope_emb,paddle.bfloat16) + cos,sin=paddle.chunk(rope_emb, chunks=2, axis=-1) + + out = core.eager._run_custom_op( + "fused_fapa_attention_op", + qkv_weight, + cos, + sin, + cache_k, + cache_v, + seq_lens_encoder, + seq_lens_decoder, + block_tables, + q_num_head, + kv_num_head, + head_dim, + 1, + max_seq_len, + block_size, + False, # use_neox_rotary_style + False, + ) + return out diff --git a/fastdeploy/model_executor/ops/npu/fused_rms_norm.py b/fastdeploy/model_executor/ops/npu/fused_rms_norm.py new file mode 100644 index 0000000000..4b793794b3 --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/fused_rms_norm.py @@ -0,0 +1,26 @@ +from paddle.base import core +import paddlenlp_ops + +def rms_norm_npu( + x, + norm_weight, + norm_bias, + epsilon, + begin_norm_axis, + bias, + residual, + quant_scale, + quant_round_type, + quant_max_bound, + quant_min_bound, +): + + out, residual_out = core.eager._run_custom_op( + "atb_rms_norm", + x, + norm_weight, + residual, + epsilon + ) + + return out, residual_out diff --git a/fastdeploy/model_executor/ops/npu/get_output.py b/fastdeploy/model_executor/ops/npu/get_output.py new file mode 100644 index 0000000000..5eae3fc452 --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/get_output.py @@ -0,0 +1,12 @@ +from paddle.base import core +#import paddlenlp_ops + +def get_output( + x, + rank_id, + wait_flag +): + # AttributeError: module 'paddle.base.libpaddle' has no attribute 'eager_run_custom_op' + + out = core.eager._run_custom_op("get_output", x, rank_id, wait_flag) + return out \ No newline at end of file diff --git a/fastdeploy/model_executor/ops/npu/get_padding_offset.py b/fastdeploy/model_executor/ops/npu/get_padding_offset.py new file mode 100644 index 0000000000..6c6e407d2c --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/get_padding_offset.py @@ -0,0 +1,12 @@ +from paddle.base import core +#import paddlenlp_ops + +def get_padding_offset( + input_ids, + cum_offsets, + token_num, + seq_len +): + # AttributeError: module 'paddle.base.libpaddle' has no attribute 'eager_run_custom_op' + out = core.eager._run_custom_op("get_padding_offset_v2", input_ids, cum_offsets, token_num, seq_len) + return out \ No newline at end of file diff --git a/fastdeploy/model_executor/ops/npu/get_token_penalty_multi_scores.py b/fastdeploy/model_executor/ops/npu/get_token_penalty_multi_scores.py new file mode 100644 index 0000000000..2ec50a5f32 --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/get_token_penalty_multi_scores.py @@ -0,0 +1,28 @@ +from paddle.base import core + +def get_token_penalty_multi_scores_npu( + pre_ids, + logits, + penalty_scores, + frequency_scores, + presence_scores, + temperatures, + bad_tokens, + cur_len, + min_len, + eos_token_id +): + logits_out = core.eager._run_custom_op( + "get_token_penalty_multi_scores_v2", + pre_ids, + logits, + penalty_scores, + frequency_scores, + presence_scores, + temperatures, + bad_tokens, + cur_len, + min_len, + eos_token_id) + return logits_out[0] + diff --git a/fastdeploy/model_executor/ops/npu/rebuild_padding.py b/fastdeploy/model_executor/ops/npu/rebuild_padding.py new file mode 100644 index 0000000000..f7b31bbdac --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/rebuild_padding.py @@ -0,0 +1,33 @@ +import paddle +from paddle.base import core +import inspect +#import paddlenlp_ops + +def rebuild_padding( + model_output, + cum_offsets, + seq_lens_this_time, + seq_lens_decoder, + seq_lens_encoder, + padding_offset, + max_model_len +): + # Cast to float16 for NPU kernel as required, then cast back to original dtype + # FIXME: guozr need furter check if type cast needed + original_dtype = model_output.dtype + model_output = paddle.cast(model_output, paddle.float16) + + out = core.eager._run_custom_op( + "rebuild_padding_v2", + model_output, + cum_offsets, + seq_lens_decoder, + seq_lens_encoder, + max_model_len + )[0] + + # Cast back to original dtype to maintain consistency + out = paddle.cast(out, original_dtype) + + + return out diff --git a/fastdeploy/model_executor/ops/npu/save_output.py b/fastdeploy/model_executor/ops/npu/save_output.py new file mode 100644 index 0000000000..a0a37182d8 --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/save_output.py @@ -0,0 +1,6 @@ +from paddle.base import core + + +def save_output(sampled_token_ids, not_need_stop, mp_rank, use_ep): + out = core.eager._run_custom_op("save_output", sampled_token_ids, not_need_stop, mp_rank) + return out \ No newline at end of file diff --git a/fastdeploy/model_executor/ops/npu/set_stop_value_multi_ends.py b/fastdeploy/model_executor/ops/npu/set_stop_value_multi_ends.py new file mode 100644 index 0000000000..95470c46f6 --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/set_stop_value_multi_ends.py @@ -0,0 +1,12 @@ +from paddle.base import core + + +def set_stop_value_multi_ends(topk_ids, stop_flags, seq_lens, end_ids, next_tokens): + + topk_ids_out, stop_flags_out, next_tokens_out = ( + core.eager._run_custom_op("set_stop_value_multi_ends_v2", + topk_ids, stop_flags, seq_lens, end_ids, next_tokens + ) + ) + + return topk_ids_out, stop_flags_out, next_tokens_out diff --git a/fastdeploy/model_executor/ops/npu/sparse_moe.py b/fastdeploy/model_executor/ops/npu/sparse_moe.py new file mode 100644 index 0000000000..f68b5e956b --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/sparse_moe.py @@ -0,0 +1,76 @@ +import inspect + +import paddle +import paddlenlp_ops +from paddle.base import core + + +# npu interface refer to gpu interface +def fused_sparse_moe( + input, + gate_weight, + ffn1_weight, + ffn2_weight, + ffn1_bias, + ffn1_scale, + ffn2_bias, + ffn2_scale, + quant_method, + moe_topk, + tp_size:int +): + """ + call npu func to implement this function + """ + ffn1_weight = paddle.cast(ffn1_weight, paddle.bfloat16) + ffn2_weight = paddle.cast(ffn2_weight, paddle.bfloat16) + + + gate_weight = gate_weight.transpose([1, 0]).astype(input.dtype) + + temp = paddle.zeros([1]).astype(input.dtype) + + expert_array = paddle.arange(moe_topk * input.shape[0]).astype("int32") + expert_group = paddle.ones([1]).astype("int32") + one_hot = paddle.ones([1]).astype("int32") + zero_hot = paddle.zeros([1]).astype("int32") + + # define quant mapping: may modify + if quant_method == "weight_int4_only": + quanttype = 11 + elif quant_method == "weight_int8_only": + quanttype = 6 + else: + quanttype = 1 + y = paddlenlp_ops.sparse_moe( + input, + gate_weight, + temp, + temp, + temp, + temp, + temp, + ffn1_weight, + ffn1_bias if ffn1_bias else temp, + temp, + temp, + ffn1_scale, + temp, + ffn2_weight, + ffn2_bias if ffn2_bias else temp, + temp, + temp, + ffn2_scale, + temp, + expert_array, + expert_group, + one_hot, + zero_hot, + moe_topk, + input.dtype == paddle.bfloat16, + tp_size, + quanttype, + ) + return y + + diff --git a/fastdeploy/model_executor/ops/npu/step_paddle.py b/fastdeploy/model_executor/ops/npu/step_paddle.py new file mode 100644 index 0000000000..c2f57f6696 --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/step_paddle.py @@ -0,0 +1,99 @@ +from paddle.base import core +import paddle +import inspect + + +def step_paddle_npu( + stop_flags, + seq_lens_this_time, + ori_seq_lens_encoder, + seq_lens_encoder, + seq_lens_decoder, + block_tables, + encoder_block_lens, + is_block_step, + step_block_list, + step_lens, + recover_block_list, + recover_lens, + need_block_list, + need_block_len, + used_list_len, + free_list, + free_list_len, + input_ids, + pre_ids, + step_idx, + next_tokens, + first_tokens_ids, + block_size, + encoder_decoder_block_num, +): + + ( + stop_flags_out, + seq_lens_this_time_out, + seq_lens_encoder_out, + seq_lens_decoder_out, + block_tables_out, + encoder_block_lens_out, + is_block_step_out, + step_block_list_out, + step_lens_out, + recover_block_list_out, + recover_lens_out, + need_block_list_out, + need_block_len_out, + used_list_len_out, + free_list_out, + free_list_len_out, + input_ids_out, + first_token_ids_out, + ) = core.eager._run_custom_op( + "step_paddle_op", + stop_flags, + seq_lens_this_time, + ori_seq_lens_encoder.cpu(), + seq_lens_encoder, + seq_lens_decoder, + block_tables, + encoder_block_lens.cpu(), + is_block_step, + step_block_list.cpu(), + step_lens.cpu(), + recover_block_list.cpu(), + recover_lens.cpu(), + need_block_list.cpu(), + need_block_len.cpu(), + used_list_len.cpu(), + free_list.cpu(), + free_list_len.cpu(), + input_ids, + pre_ids, + step_idx, + next_tokens, + first_tokens_ids.cpu(), + block_size, + encoder_decoder_block_num, + ) + + return ( + stop_flags_out, + seq_lens_this_time_out, + seq_lens_encoder_out, + seq_lens_decoder_out, + block_tables_out, + encoder_block_lens_out, + is_block_step_out, + step_block_list_out, + step_lens_out, + recover_block_list_out, + recover_lens_out, + need_block_list_out, + need_block_len_out, + used_list_len_out, + free_list_out, + free_list_len_out, + input_ids_out, + first_token_ids_out, + ) diff --git a/fastdeploy/model_executor/ops/npu/top_p_sampling.py b/fastdeploy/model_executor/ops/npu/top_p_sampling.py new file mode 100644 index 0000000000..a48fa13632 --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/top_p_sampling.py @@ -0,0 +1,44 @@ +import paddle + +def top_p_sampling_npu(probs, top_p): + sorted_probs = paddle.sort(probs, descending=True) + sorted_indices = paddle.argsort(probs, descending=True) + cumulative_probs = paddle.cumsum(sorted_probs, axis=-1) + + # Remove tokens with cumulative probs above the top_p, But keep at + # least min_tokens_to_keep tokens + sorted_indices_to_remove = cumulative_probs > top_p + + # Keep the first token + sorted_indices_to_remove = paddle.cast( + sorted_indices_to_remove, dtype='int64' + ) + + sorted_indices_to_remove = paddle.static.setitem( + sorted_indices_to_remove, + (slice(None), slice(1, None)), + sorted_indices_to_remove[:, :-1].clone(), + ) + sorted_indices_to_remove = paddle.static.setitem( + sorted_indices_to_remove, (slice(None), 0), 0 + ) + + # Scatter sorted tensors to original indexing + batch_size = probs.shape[0] + vocab_size = probs.shape[-1] + + # Create flat indices for scatter operation + batch_offsets = paddle.arange(batch_size).unsqueeze(-1) * vocab_size + flat_sorted_indices = (sorted_indices + batch_offsets).flatten() + + # Perform scatter operation + condition = paddle.scatter( + paddle.zeros(batch_size * vocab_size, dtype=sorted_indices_to_remove.dtype), + flat_sorted_indices, + sorted_indices_to_remove.flatten(), + ) + condition = paddle.cast(condition, 'bool').reshape(probs.shape) + probs = paddle.where(condition, paddle.full_like(probs, 0.0), probs) + next_tokens = paddle.multinomial(probs) + next_scores = paddle.index_sample(probs, next_tokens) + return next_scores, next_tokens diff --git a/fastdeploy/model_executor/ops/npu/update_inputs.py b/fastdeploy/model_executor/ops/npu/update_inputs.py new file mode 100644 index 0000000000..346d5006ba --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/update_inputs.py @@ -0,0 +1,41 @@ +from paddle.base import core + + +def update_inputs_npu( + stop_flags, + not_need_stop, + seq_lens_this_time, + seq_lens_encoder, + seq_lens_decoder, + input_ids, + stop_nums, + next_tokens, + is_block_step, +): + + + ( + not_need_stop_out, + seq_lens_this_time_out, + seq_lens_encoder_out, + seq_lens_decoder_out, + input_ids_out, + ) = core.eager._run_custom_op( + "update_inputs", + stop_flags, + not_need_stop, + seq_lens_this_time, + seq_lens_encoder, + seq_lens_decoder, + input_ids, + stop_nums, + next_tokens, + is_block_step, + ) + return ( + not_need_stop_out, + seq_lens_this_time_out, + seq_lens_encoder_out, + seq_lens_decoder_out, + input_ids_out, + ) diff --git a/fastdeploy/model_executor/ops/npu/weight_only_linear.py b/fastdeploy/model_executor/ops/npu/weight_only_linear.py new file mode 100644 index 0000000000..82034b868c --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/weight_only_linear.py @@ -0,0 +1,15 @@ +from paddle.base import core +import paddle + +def fused_linear_op( + x, + weight, + weight_scale, +): + + if weight_scale.dtype != x.dtype: + weight_scale = paddle.cast(weight_scale, x.dtype) + + out = core.eager._run_custom_op("weight_only_linear_npu", x, weight, weight_scale) + + return out[0] \ No newline at end of file diff --git a/fastdeploy/model_executor/ops/npu/weight_quantize.py b/fastdeploy/model_executor/ops/npu/weight_quantize.py new file mode 100644 index 0000000000..8aea3d3443 --- /dev/null +++ b/fastdeploy/model_executor/ops/npu/weight_quantize.py @@ -0,0 +1,19 @@ +import paddle +import numpy as np + +def npu_quant_weight(weight_np): + if isinstance(weight_np, paddle.Tensor): + if weight_np.dtype == paddle.bfloat16: + weight_np = paddle.cast(weight_np, paddle.float16) + weight_np = weight_np.numpy() + weight = weight_np + max_value = np.max(np.abs(weight), axis=0).reshape(1, -1) + quanted_weight = clip_and_round(weight / max_value * 127.0) + quanted_weight = paddle.to_tensor(quanted_weight) + weight_scales = (max_value / 127.0).astype(weight_np.dtype).reshape(-1) + weight_scales = paddle.to_tensor(weight_scales) + weight_scales = paddle.cast(weight_scales, paddle.get_default_dtype()) + return quanted_weight, weight_scales + +def clip_and_round(x): + return np.clip(np.around(x), -127, 127).astype("int8") \ No newline at end of file diff --git a/fastdeploy/model_executor/pre_and_post_process.py b/fastdeploy/model_executor/pre_and_post_process.py index b26746e74a..a3c419412b 100644 --- a/fastdeploy/model_executor/pre_and_post_process.py +++ b/fastdeploy/model_executor/pre_and_post_process.py @@ -53,6 +53,14 @@ step_paddle, update_inputs, ) +elif current_platform.is_npu(): + from fastdeploy.model_executor.ops.npu import ( + get_padding_offset, + save_output, + set_stop_value_multi_ends, + step_paddle_npu as step_paddle, + update_inputs_npu as update_inputs, + ) else: from fastdeploy.model_executor.ops.gpu import ( get_padding_offset, @@ -246,6 +254,14 @@ def post_process_normal( model_output.stop_seqs_len, False, ) # multi ends + elif current_platform.is_npu(): + set_stop_value_multi_ends( + sampler_output.sampled_token_ids, + model_output.stop_flags, + model_output.seq_lens_this_time, + model_output.eos_token_id, + model_output.next_tokens, + ) # multi ends else: set_stop_value_multi_ends( sampler_output.sampled_token_ids, @@ -597,6 +613,18 @@ def rebuild_padding( elif current_platform.is_maca(): from fastdeploy.model_executor.ops.gpu import rebuild_padding + hidden_states = rebuild_padding( + tmp_out, + cum_offsets, + seq_len_this_time, + seq_lens_decoder, + seq_lens_encoder, + output_padding_offset, + max_input_length, + ) + elif current_platform.is_npu(): + from fastdeploy.model_executor.ops.npu import rebuild_padding + hidden_states = rebuild_padding( tmp_out, cum_offsets, diff --git a/fastdeploy/output/token_processor.py b/fastdeploy/output/token_processor.py index ebb64cebc7..f89e58548d 100644 --- a/fastdeploy/output/token_processor.py +++ b/fastdeploy/output/token_processor.py @@ -143,6 +143,8 @@ def process_sampling_results(self): from fastdeploy.model_executor.ops.iluvatar import get_output elif current_platform.is_gcu(): from fastdeploy.model_executor.ops.gcu import get_output + elif current_platform.is_npu(): + from fastdeploy.model_executor.ops.npu import get_output else: from fastdeploy.model_executor.ops.gpu import ( get_output, @@ -528,6 +530,9 @@ def process_sampling_results(self): from fastdeploy.model_executor.ops.xpu import get_output elif current_platform.is_iluvatar(): from fastdeploy.model_executor.ops.iluvatar import get_output + elif current_platform.is_npu(): + from fastdeploy.model_executor.ops.npu import get_output + else: from fastdeploy.model_executor.ops.gpu import ( get_output, diff --git a/fastdeploy/platforms/npu.py b/fastdeploy/platforms/npu.py index 6e021c7443..16607f9f7f 100644 --- a/fastdeploy/platforms/npu.py +++ b/fastdeploy/platforms/npu.py @@ -11,8 +11,37 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. -from .base import Platform +from .base import Platform, _Backend +import paddle class NPUPlatform(Platform): device_name = "npu" + + @classmethod + def available(self): + """ + Check whether CUDA is available. + """ + """ + Check whether XPU is available. + """ + try: + assert paddle.is_compiled_with_xpu() + assert len(paddle.static.xpu_places()) > 0 + return True + except Exception as e: + return False + @classmethod + def get_attention_backend_cls( + cls, + selected_backend + ): + """ + get_attention_backend_cls + """ + if selected_backend == _Backend.NATIVE_ATTN: + return ("fastdeploy.model_executor.layers.attention.NpuFaPaAttentionBackend") + elif selected_backend == _Backend.APPEND_ATTN: + return ("fastdeploy.model_executor.layers.attention.NpuFaPaAttentionBackend") + \ No newline at end of file diff --git a/fastdeploy/worker/npu_model_runner.py b/fastdeploy/worker/npu_model_runner.py new file mode 100644 index 0000000000..85329eeb69 --- /dev/null +++ b/fastdeploy/worker/npu_model_runner.py @@ -0,0 +1,905 @@ +""" +# Copyright (c) 2025 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License" +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +import os +import random +import time +from typing import List, Optional + +import numpy as np +import paddle +import paddle.nn as nn +from fastdeploy.config import FDConfig +from fastdeploy.engine.request import Request +from fastdeploy.model_executor.layers.attention import get_attention_backend +from fastdeploy.model_executor.layers.attention.base_attention_backend import \ + AttentionBackend +from fastdeploy.model_executor.layers.rotary_embedding import get_rope +from fastdeploy.model_executor.layers.sample.meta_data import SamplingMetadata +from fastdeploy.model_executor.layers.sample.sampler import Sampler +from fastdeploy.model_executor.model_loader import get_model_loader +from fastdeploy.model_executor.ops.npu import ( + rebuild_padding ) +from fastdeploy.model_executor.pre_and_post_process import ( + post_process, pre_process) +from fastdeploy.utils import get_logger +from fastdeploy.worker.model_runner_base import ModelRunnerBase +from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput +from fastdeploy.model_executor.forward_meta import ForwardMeta +from typing import Dict + +logger = get_logger("npu_model_runner", "npu_model_runner.log") + + +def step_paddle( + share_inputs: Dict[str, paddle.Tensor], + block_size: int, + enc_dec_block_num: int, +) -> None: + """ + NPU wrapper for step_paddle function + """ + from fastdeploy.model_executor.ops.npu.step_paddle import step_paddle_npu + step_paddle_npu( + share_inputs["stop_flags"], + share_inputs["seq_lens_this_time"], + share_inputs["ori_seq_lens_encoder"], + share_inputs["seq_lens_encoder"], + share_inputs["seq_lens_decoder"], + share_inputs["block_tables"], + share_inputs["encoder_block_lens"], + share_inputs["is_block_step"], + share_inputs["step_block_list"], + share_inputs["step_lens"], + share_inputs["recover_block_list"], + share_inputs["recover_lens"], + share_inputs["need_block_list"], + share_inputs["need_block_len"], + share_inputs["used_list_len"], + share_inputs["free_list"], + share_inputs["free_list_len"], + share_inputs["input_ids"], + share_inputs["pre_ids"], + share_inputs["step_idx"], + share_inputs["next_tokens"], + share_inputs["first_token_ids"], + block_size, + enc_dec_block_num, + ) + + +class NPUModelRunner(ModelRunnerBase): + """ """ + + def __init__( + self, + fd_config: FDConfig, + device: str, # logic device + device_id: int, # physical device id + rank: int, + local_rank: int, + ): + super().__init__(fd_config=fd_config, device=device) + self.rank = rank + self.local_rank = local_rank + self.device_id = device_id # FIXME + self.speculative_method = self.fd_config.speculative_config.method + self.speculative_decoding = self.speculative_method is not None + + # Sampler + self.sampler = Sampler() + + # Lazy initialize kv cache after model loading + # self.kv_caches: list[paddle.Tensor] = [] + + # Cuda Graph + # self.use_cuda_grpah = False # remove + self.input_ids = paddle.zeros(self.parallel_config.max_num_seqs, dtype="int32") + + # Initialize share inputs + self._init_share_inputs(self.fd_config.parallel_config.max_num_seqs) + self.infer_seed_increment = paddle.full( + shape=[self.parallel_config.max_num_seqs, 1], fill_value=4, dtype="int64" + ) + + # Initialize attention Backend + # Note(gonshaotian): Currently, all attention layers share one attention backend instance. + # In the future, we will expand it as a list. + self.attn_backends: list[AttentionBackend] = [] + # self.attn_metadatas: list[AttentionMetadata] = [] + self.initialize_attn_backend() + + # Forward meta store the global meta information of the forward + self.forward_meta = None + if not self.speculative_decoding: + self.sampler = Sampler(fd_config) + else: + self.sampler = SpeculativeSampler(fd_config) + + def prefill_finished(self): + """ + check whether prefill stage finished + """ + prefill_statue = (self.share_inputs["seq_lens_this_time"] != 0) & ( + self.share_inputs["seq_lens_this_time"] != 1 + ) + return not paddle.any(prefill_statue).numpy() + + def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None): + """ + Process inputs for prefill tasks and insert it to share_inputs buffer + TODO(gongshaotian): Refactor this func + """ + # NOTE(luotingdan): Lazy initialize kv cache + if "caches" not in self.share_inputs: + self.initialize_kv_cache() + + # NOTE(luotingdan): Set environment variable of prefill node + if ( + req_dicts[-1].disaggregate_info is not None + and req_dicts[-1].disaggregate_info["role"] == "prefill" + ): + os.environ["PREFILL_NODE_ONE_STEP_STOP"] = "1" + + req_len = len(req_dicts) + for i in range(req_len): + request = req_dicts[i] + idx = request.idx + length = len(request.prompt_token_ids) + assert length > 0, "The prompt requested must not be empty." + + # Is Decode Node + if ( + req_dicts[i].disaggregate_info is not None + and req_dicts[i].disaggregate_info["role"] == "decode" + ): + self.share_inputs["pre_ids"][idx : idx + 1] = request.prompt_token_ids[ + -1 + ] + self.share_inputs["input_ids"][idx : idx + 1, 0] = ( + request.prompt_token_ids[0] + ) + self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 + self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length + self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1 + self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = 0 + self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = length + self.share_inputs["step_idx"][idx : idx + 1] = 1 + else: + self.share_inputs["pre_ids"][idx : idx + 1] = -1 + self.share_inputs["step_idx"][idx : idx + 1] = 0 + self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array( + request.prompt_token_ids + ) + + # Use chunked prefill + if self.cache_config.enable_chunked_prefill: + request.set("chunk_idx", 1) + token_chunk_size = request.prefill_chunk_info[0] + self.share_inputs["seq_lens_this_time"][ + idx : idx + 1 + ] = token_chunk_size + self.share_inputs["input_ids"][idx, :token_chunk_size] = np.array( + request.prompt_token_ids[:token_chunk_size] + ) + self.share_inputs["step_seq_lens_encoder"][ + idx : idx + 1 + ] = token_chunk_size + self.share_inputs["seq_lens_encoder"][ + idx : idx + 1 + ] = token_chunk_size + self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get( + "seq_lens_decoder", 0 + ) + self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = ( + request.get("seq_lens_decoder", 0) + ) + else: + self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get( + "seq_lens_decoder", 0 + ) + self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = ( + request.get("seq_lens_decoder", 0) + ) + self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length + self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length + self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length + + if len(request.eos_token_ids) < self.model_config.eos_tokens_lens: + request.eos_token_ids.append(request.eos_token_ids[0]) + self.share_inputs["eos_token_id"][:] = np.array( + request.eos_token_ids, dtype="int64" + ).reshape(-1, 1) + + self.share_inputs["top_p"][idx : idx + 1] = request.get("top_p", 0.7) + self.share_inputs["temperature"][idx : idx + 1] = request.get( + "temperature", 0.95 + ) + self.share_inputs["penalty_score"][idx : idx + 1] = request.get( + "repetition_penalty", 1.0 + ) + self.share_inputs["frequency_score"][idx : idx + 1] = request.get( + "frequency_penalty", 0.0 + ) + self.share_inputs["presence_score"][idx : idx + 1] = request.get( + "presence_penalty", 0.0 + ) + + self.share_inputs["min_dec_len"][idx : idx + 1] = request.get( + "min_tokens", 1 + ) + self.share_inputs["max_dec_len"][idx : idx + 1] = request.get( + "max_tokens", self.model_config.max_model_len + ) + self.share_inputs["stop_flags"][idx : idx + 1] = False + + self.share_inputs["first_token_ids"][idx : idx + 1] = self.share_inputs[ + "input_ids" + ][idx : idx + 1, :1] + self.share_inputs["ori_seq_lens_encoder"][idx : idx + 1] = length + + if request.get("seed") is not None: + self.share_inputs["infer_seed"][idx : idx + 1] = request.get("seed") + encoder_block_num = len(request.get("block_tables")) + self.share_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num + self.share_inputs["block_tables"][idx : idx + 1, :] = -1 + self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = ( + np.array(request.block_tables, dtype="int32") + ) + + if ( + request.get("stop_token_ids") is not None + and request.get("stop_seqs_len") is not None + ): + stop_seqs_num = len(request.get("stop_seqs_len")) + for i in range(stop_seqs_num, self.model_config.max_stop_seqs_num): + request.stop_seqs_len.append(0) + self.share_inputs["stop_seqs_len"][:] = np.array( + request.stop_seqs_len, dtype="int32" + ) + self.share_inputs["stop_seqs"][ + :stop_seqs_num, : len(request.get("stop_token_ids")[0]) + ] = np.array(request.get("stop_token_ids"), dtype="int64") + + self.share_inputs["not_need_stop"][0] = True + + + def _init_share_inputs(self, max_num_seqs: int): + """Initialize all share buffers for model inputs. + Note: In the future, we may abandon share buffers. + """ + self.MAX_INFER_SEED = 9223372036854775806 + self.share_inputs = {} + + self.share_inputs["pre_ids"] = paddle.full( + [max_num_seqs, self.parallel_config.max_model_len], + -1, + dtype="int64", + ) + self.share_inputs["input_ids"] = paddle.full( + [max_num_seqs, self.parallel_config.max_model_len], + self.model_config.pad_token_id, + dtype="int64", + ) + self.share_inputs["prompt_ids"] = paddle.full( + [max_num_seqs, self.parallel_config.max_model_len], + self.model_config.pad_token_id, + dtype="int64", + ) + self.share_inputs["eos_token_id"] = paddle.full( + [self.model_config.eos_tokens_lens, 1], 0, dtype="int64" + ) + self.share_inputs["top_p"] = paddle.full( + [max_num_seqs, 1], self.model_config.top_p, dtype="float32" + ) + self.share_inputs["top_k"] = paddle.full( + [max_num_seqs, 1], 0, dtype="int64" + ) + self.share_inputs["min_p"] = paddle.full( + [max_num_seqs, 1], 0.0, dtype="float32" + ) + self.share_inputs["temperature"] = paddle.full( + [max_num_seqs, 1], self.model_config.temperature, dtype="float32" + ) + self.share_inputs["penalty_score"] = paddle.full( + [max_num_seqs, 1], self.model_config.penalty_score, dtype="float32" + ) + self.share_inputs["frequency_score"] = paddle.full( + [max_num_seqs, 1], self.model_config.frequency_score, dtype="float32" + ) + self.share_inputs["presence_score"] = paddle.full( + [max_num_seqs, 1], self.model_config.presence_score, dtype="float32" + ) + + self.share_inputs["min_dec_len"] = paddle.full( + [max_num_seqs, 1], self.model_config.min_length, dtype="int64" + ) + self.share_inputs["max_dec_len"] = paddle.full( + [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" + ) + self.share_inputs["min_length"] = paddle.full( + [max_num_seqs, 1], self.model_config.min_length, dtype="int64" + ) + self.share_inputs["max_length"] = paddle.full( + [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" + ) + self.share_inputs["seq_lens_this_time"] = paddle.full( + max_num_seqs, 0, dtype="int32" + ) + self.share_inputs["seq_lens_encoder"] = paddle.full( + [max_num_seqs, 1], 0, dtype="int32" + ) + self.share_inputs["seq_lens_decoder"] = paddle.full( + [max_num_seqs, 1], 0, dtype="int32" + ) + self.share_inputs["step_seq_lens_encoder"] = paddle.full( + [max_num_seqs, 1], 0, dtype="int32" + ) + self.share_inputs["step_seq_lens_decoder"] = paddle.full( + [max_num_seqs, 1], 0, dtype="int32" + ) + self.share_inputs["prompt_lens"] = paddle.full( + [max_num_seqs, 1], 0, dtype="int64" + ) + self.share_inputs["step_idx"] = paddle.full([max_num_seqs, 1], 0, dtype="int64") + self.share_inputs["not_need_stop"] = paddle.full( + [1], False, dtype="bool" + ).cpu() # TODO(gongshaotian): move to pinnd memory + self.share_inputs["stop_flags"] = paddle.full( + [max_num_seqs, 1], True, dtype="bool" + ) + self.share_inputs["stop_nums"] = paddle.full([1], max_num_seqs, dtype="int64") + + self.share_inputs["bad_tokens"] = paddle.full( + [max_num_seqs, self.model_config.vocab_size], -1, dtype="int64" + ) + self.share_inputs["bad_tokens_len"] = paddle.full( + [max_num_seqs], 1, dtype="int64" + ) + self.share_inputs["next_tokens"] = paddle.full( + [max_num_seqs, 1], -1, dtype="int64" + ) + self.share_inputs["is_block_step"] = paddle.full( + [max_num_seqs], False, dtype="bool" + ) + self.share_inputs["encoder_block_lens"] = paddle.full( + [max_num_seqs], 0, dtype="int32" + ) + self.share_inputs["step_block_list"] = paddle.full( + [max_num_seqs], -1, dtype="int32" + ) + self.share_inputs["step_lens"] = paddle.full([1], 0, dtype="int32") + self.share_inputs["recover_block_list"] = paddle.full( + [max_num_seqs], -1, dtype="int32" + ) + self.share_inputs["recover_lens"] = paddle.full([1], 0, dtype="int32") + self.share_inputs["need_block_list"] = paddle.full( + [max_num_seqs], -1, dtype="int32" + ) + self.share_inputs["need_block_len"] = paddle.full([1], 0, dtype="int32") + self.share_inputs["used_list_len"] = paddle.full( + [max_num_seqs], 0, dtype="int32" + ) + self.share_inputs["infer_seed"] = paddle.full( + [max_num_seqs, 1], 0, dtype="int64" + ) + self.share_inputs["first_token_ids"] = paddle.full( + [max_num_seqs, 1], -1, dtype="int64" + ) + self.share_inputs["ori_seq_lens_encoder"] = paddle.full( + [max_num_seqs, 1], 0, dtype="int32" + ) + self.share_inputs["system_lens"] = paddle.full( + [max_num_seqs, 1], 0, dtype="int32" + ) + self.share_inputs["system_ids"] = paddle.full( + [max_num_seqs, 1], -1, dtype="int32" + ) + + # Initialize rotary position embedding + tmp_position_ids = paddle.arange(self.parallel_config.max_model_len).reshape( + (1, -1) + ) + # TODO(gongshaotian): move to models + self.share_inputs["rope_emb"] = get_rope( + rotary_dim=self.model_config.head_dim, + position_ids=tmp_position_ids, + base=self.model_config.rope_theta, + model_config=self.model_config, + ) + + # Set block tables + pre_max_block_num = ( + (self.parallel_config.max_model_len + self.parallel_config.block_size - 1) + // self.parallel_config.block_size + + self.parallel_config.enc_dec_block_num + ) + self.share_inputs["block_tables"] = paddle.full( + [max_num_seqs, pre_max_block_num], -1, dtype="int32" + ) + + # Initialize free list + free_list = list( + range( + self.parallel_config.total_block_num - 1, + int(self.parallel_config.total_block_num * self.cache_config.kv_cache_ratio) - 1, + -1, + ) + ) + self.free_list_len = len(free_list) + self.share_inputs["free_list"] = paddle.to_tensor(free_list, dtype="int32") + self.share_inputs["free_list_len"] = paddle.full( + [1], self.free_list_len, dtype="int32" + ) + + # Initialize stop seqs + self.share_inputs["stop_seqs_len"] = paddle.full( + [self.model_config.max_stop_seqs_num], 0, dtype="int32" + ) + self.share_inputs["stop_seqs"] = paddle.full( + [self.model_config.max_stop_seqs_num, self.model_config.stop_seqs_max_len], + -1, + dtype="int32", + ) + + # Initialize fields needed for _prepare_inputs + self.share_inputs["ids_remove_padding"] = paddle.full( + [self.parallel_config.max_num_seqs * self.parallel_config.max_model_len], + self.model_config.pad_token_id, + dtype="int64" + ) + self.share_inputs["cum_offsets"] = paddle.full( + [self.parallel_config.max_num_seqs], + 0, + dtype="int32" + ) + self.share_inputs["batch_id_per_token"] = paddle.full( + [self.parallel_config.max_num_seqs * self.parallel_config.max_model_len], + 0, + dtype="int32" + ) + self.share_inputs["cu_seqlens_q"] = paddle.full( + [self.parallel_config.max_num_seqs + 1], + 0, + dtype="int32" + ) + self.share_inputs["cu_seqlens_k"] = paddle.full( + [self.parallel_config.max_num_seqs + 1], + 0, + dtype="int32" + ) + self.share_inputs["decoder_batch_ids"] = paddle.full( + [self.parallel_config.max_num_seqs], + 0, + dtype="int32" + ) + self.share_inputs["decoder_tile_ids_per_batch"] = paddle.full( + [self.parallel_config.max_num_seqs], + 0, + dtype="int32" + ) + self.share_inputs["decoder_num_blocks_cpu"] = paddle.full( + [self.parallel_config.max_num_seqs], + 0, + dtype="int32" + ) + self.share_inputs["max_len_tensor_cpu"] = paddle.full( + [1], + self.parallel_config.max_model_len, + dtype="int32" + ) + self.share_inputs["draft_tokens"] = paddle.full( + [self.parallel_config.max_num_seqs, self.parallel_config.max_model_len], + -1, + dtype="int64" + ) + self.share_inputs["output_cum_offsets"] = paddle.full( + [self.parallel_config.max_num_seqs], + 0, + dtype="int32" + ) + self.share_inputs["output_padding_offset"] = paddle.full( + [self.parallel_config.max_num_seqs], + 0, + dtype="int32" + ) + + def _prepare_inputs(self) -> None: + """prepare the model inputs""" + # Remove padding + ( + ids_remove_padding, + cum_offsets, + batch_id_per_token, + cu_seqlens_q, + cu_seqlens_k, + output_cum_offsets, + output_padding_offset, + ) = pre_process( + self.share_inputs["input_ids"], + self.share_inputs["seq_lens_this_time"], + self.speculative_decoding, + (self.share_inputs["draft_tokens"] if self.speculative_decoding else None), + self.share_inputs["seq_lens_encoder"], + self.share_inputs["seq_lens_decoder"], + ) + + # Initialize forward meta data + self.share_inputs["ids_remove_padding"] = ids_remove_padding + self.share_inputs["cum_offsets"] = cum_offsets + self.share_inputs["batch_id_per_token"].copy_(batch_id_per_token, False) + self.share_inputs["cu_seqlens_q"] = cu_seqlens_q + self.share_inputs["cu_seqlens_k"] = cu_seqlens_k + self.initialize_forward_meta() # FIXME + if self.speculative_decoding: + self.share_inputs["output_cum_offsets"].copy_(output_cum_offsets, False) + self.share_inputs["output_padding_offset"].copy_(output_padding_offset, False) + + # Get sampling metadata + max_bad_tokens_len = paddle.max(self.share_inputs["bad_tokens_len"]) + self.sampling_metadata = SamplingMetadata( + temperature=self.share_inputs["temperature"], + top_p=self.share_inputs["top_p"], + top_k=self.share_inputs["top_k"], + min_p=self.share_inputs["min_p"], + seed=self.share_inputs["infer_seed"], + step_idx=self.share_inputs["step_idx"], + pre_token_ids=self.share_inputs["pre_ids"], + frequency_penalties=self.share_inputs["frequency_score"], + presence_penalties=self.share_inputs["presence_score"], + repetition_penalties=self.share_inputs["penalty_score"], + min_dec_lens=self.share_inputs["min_dec_len"], + bad_words_token_ids=self.share_inputs["bad_tokens"][:, :max_bad_tokens_len], + eos_token_ids=self.share_inputs["eos_token_id"], + ) + + def load_model(self) -> None: + """load or download model""" + logger.info(f"Starting to load model {self.model_config.architectures[0]}") + time_before_load = time.perf_counter() + # 1. Load original model + model_loader = get_model_loader(load_config=self.fd_config.load_config) + self.model = model_loader.load_model(fd_config=self.fd_config) + + # 2. Load lora model + + # 3. Load drafter model(for speculative decoding) + + # self._init_speculative_proposer() + + def get_model(self) -> nn.Layer: + """get current model""" + return self.model + + def initialize_forward_meta(self): # FIXME + """ + Initialize forward meta and attention meta data + """ + # Initialize forward meta + self.forward_meta = ForwardMeta( + input_ids=self.share_inputs["input_ids"], + ids_remove_padding=self.share_inputs["ids_remove_padding"], + rotary_embs=self.share_inputs["rope_emb"], + attn_backend=self.attn_backends[0], + decoder_batch_ids=self.share_inputs["decoder_batch_ids"], + decoder_tile_ids_per_batch=self.share_inputs["decoder_tile_ids_per_batch"], + decoder_num_blocks_cpu=self.share_inputs["decoder_num_blocks_cpu"], + max_len_tensor_cpu=self.share_inputs["max_len_tensor_cpu"], + seq_lens_encoder=self.share_inputs["seq_lens_encoder"], + seq_lens_decoder=self.share_inputs["seq_lens_decoder"], + seq_lens_this_time=self.share_inputs["seq_lens_this_time"], + batch_id_per_token=self.share_inputs["batch_id_per_token"], + cu_seqlens_q=self.share_inputs["cu_seqlens_q"], + cu_seqlens_k=self.share_inputs["cu_seqlens_k"], + block_tables=self.share_inputs["block_tables"], + caches=self.share_inputs["caches"], + ) + + # Update Batch type for cuda graph + only_decode_batch = True + prefill_exists = None + # mix ep in single node + if self.fd_config.parallel_config.use_ep and self.fd_config.parallel_config.splitwise_role == "mixed": + only_decode_batch_list = [] + prefill_exists = self.exist_prefill() + paddle.distributed.all_gather_object(only_decode_batch_list, not prefill_exists) + only_decode_batch = all(only_decode_batch_list) + self.fd_config.parallel_config.moe_phase.phase = "decode" if only_decode_batch else "prefill" + + # Initialzie attention meta data + for attn_backend in self.attn_backends: + attn_backend.init_attention_metadata(self.forward_meta) + + def initialize_kv_cache(self, + kv_cache_config = None) -> None: # FIXME + """ + Initialize kv cache + Args: + kv_cache_config: + """ + cache_kvs = {} + max_block_num = self.num_gpu_blocks + + # Get kv cache dtype + cache_type = self.parallel_config.dtype + + kv_cache_quant_type = None + if ( + self.quant_config + and hasattr(self.quant_config, "kv_cache_quant_type") + and self.quant_config.kv_cache_quant_type is not None + ): + cache_type = "uint8" + kv_cache_quant_type = self.quant_config.kv_cache_quant_type + + # Get kv cache shape + kv_cache_shape = self.attn_backends[0].get_kv_cache_shape( + max_num_blocks=max_block_num, kv_cache_quant_type=kv_cache_quant_type + ) + + for i in range(self.model_config.num_hidden_layers): + + cache_kvs["key_caches_{}".format(i)] = paddle.full( + shape=kv_cache_shape, + fill_value=0, + dtype=cache_type, + ) + cache_kvs["value_caches_{}".format(i)] = paddle.full( + shape=kv_cache_shape, + fill_value=0, + dtype=cache_type, + ) + self.share_inputs["caches"] = list(cache_kvs.values()) + for value in cache_kvs.values(): + del value + + # paddle.device.cuda.empty_cache() + + def initialize_attn_backend( + self, kv_cache_config = None + ) -> None: + """ + Initialize attention backends and forward metadata + Args: + kv_cache_config: + """ + assert len(self.attn_backends) == 0 + + num_heads = self.model_config.num_attention_heads // self.parallel_config.tensor_parallel_size + self.model_config.kv_num_heads = max( + 1, + int(self.model_config.num_key_value_heads) // self.parallel_config.tensor_parallel_size, + ) + head_dim = self.model_config.head_dim + + # Get the attention backend + attn_cls = get_attention_backend() + attn_backend = attn_cls( + self.fd_config, + kv_num_heads=self.model_config.kv_num_heads, + num_heads=num_heads, + head_dim=head_dim, + ) + if attn_backend is None: + raise NotImplementedError( + f"{ self.parallel_config.attention_backend} attention backend is not support by NPUModelRunner" + ) + self.attn_backends.append(attn_backend) + + def capture_model(self) -> None: + """ + Trigger CUDA Graph capture for all shapes in 'CudaGraphConfig.cudagraph_capture_sizes' + """ + logger.warn("NPU not support cuda graph currently") + pass + + def execute_model( + self, + model_forward_batch: Optional[List[Request]] = None, + num_running_requests: int = None, + ) -> Optional[ModelRunnerOutput]: + """ + The Entrance of model execute. + Args: + model_forward_batch: 'Request' contains information related to prompt and is an abstract + class at the server level, which is too granular for ModelRunner. + We plan to replace it with 'ModelForwardBatch'. + intermediate_tensors: + """ + + # # Note(@wufeisheng): If `not_need_stop`` is False, it means the current worker is in an idle state. + # # This logic is not used in TP (Tensor Parallelism) mode. However, in EP (Expert Parallelism) mode, + # # when there is data on other runner, the current runner is required to execute part of the model. + # if not self.not_need_stop(): # remove + # self._execute_empty_input() + # return None + + # 1. Prepare inputs of model and decoder. + self._prepare_inputs() + + # 2. Padding inputs for cuda grph + + # 3. Execute model + model_output = self.model( + self.share_inputs["ids_remove_padding"], self.forward_meta + ) + + + hidden_states = rebuild_padding( + model_output, + self.share_inputs["cum_offsets"], + self.share_inputs["seq_lens_this_time"], + self.share_inputs["seq_lens_decoder"], + self.share_inputs["seq_lens_encoder"], + None, # self.share_inputs["padding_offset"], + self.parallel_config.max_model_len, + ) + # hidden_states = paddle.randn([8, 8192], paddle.bfloat16) + + # 4. Compute logits, Sample + logits = self.model.compute_logits(hidden_states) + + sampler_output = self.sampler(logits, self.sampling_metadata) + + + + # 6. Post Process + model_output_data = ModelOutputData( + next_tokens=self.share_inputs["next_tokens"], + stop_flags=self.share_inputs["stop_flags"], + step_idx=self.share_inputs["step_idx"], + max_dec_len=self.share_inputs["max_dec_len"], + pre_ids=self.share_inputs["pre_ids"], + seq_lens_this_time=self.share_inputs["seq_lens_this_time"], + eos_token_id=self.share_inputs["eos_token_id"], + not_need_stop=self.share_inputs["not_need_stop"], + input_ids=self.share_inputs["input_ids"], + stop_nums=self.share_inputs["stop_nums"], + seq_lens_encoder=self.share_inputs["seq_lens_encoder"], + seq_lens_decoder=self.share_inputs["seq_lens_decoder"], + is_block_step=self.share_inputs["is_block_step"], + msg_queue_id=self.parallel_config.msg_queue_id, + mp_rank=self.local_rank, + use_ep=self.parallel_config.use_ep, + full_hidden_states=paddle.empty([0]), # Empty tensor for NPU + draft_tokens=paddle.empty([0]), # Empty tensor for NPU + actual_draft_token_num=paddle.empty([0]), # Empty tensor for NPU + accept_tokens=paddle.empty([0]), # Empty tensor for NPU + accept_num=paddle.empty([0]), # Empty tensor for NPU + enable_thinking=None, # NPU doesn't support multimodal + think_end_id=-1, + need_think_end=None, + reasoning_index=None, + ) + post_process( + sampler_output=sampler_output, + model_output=model_output_data, + share_inputs=self.share_inputs, + block_size=self.cache_config.block_size, + speculative_decoding=self.speculative_decoding, + skip_save_output=False, + ) + # 7. Updata 'infer_seed' and step_cuda() + self.share_inputs["infer_seed"].add_(self.infer_seed_increment) + self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED + + step_paddle( + self.share_inputs, + self.parallel_config.block_size, + self.parallel_config.enc_dec_block_num, + ) + + # self._update_chunked_prefill(model_forward_batch) # FIXME + return None + + + def profile_run(self) -> None: + """Execute a forward pass with dummy inputs to profile the memory usage of the model.""" + + logger.warn("NPU not support profile currently") + # # Initialize kv cache for profile run. After profile run kv cache will be reset. + # # TODO(gongshaotian): Optimize the management logic of kvcache + # self.num_gpu_blocks = self.parallel_config.max_block_num + # self.initialize_kv_cache() + # + # # 1. Profile with multimodal encoder & encoder cache + # + # # 2. Dummy run + # self._dummy_run(num_tokens=self.parallel_config.max_num_batched_tokens, + # batch_size=self.parallel_config.max_num_seqs) + # + # # 3. gc + # del self.share_inputs["caches"] + # if self.forward_meta is not None: + # del self.forward_meta.caches + # del self.share_inputs["block_tables"] + # # # paddle.device.cuda.synchronize() + # # paddle.device.cuda.empty_cache() + # # gc.collect() + + def update_share_input_block_num(self, num_gpu_blocks: int) -> None: + """ + Set a globally unified block number and update the model's shared input. + Args: + num_gpu_blocks: + """ + self.num_gpu_blocks = num_gpu_blocks + + # # Reset block table and kv cache with global block num + # if not (self.parallel_config.enable_prefix_caching \ # remove + # or self.parallel_config.splitwise_role != "mixed"): + # self.initialize_kv_cache() + self.initialize_kv_cache() # FIXME + + self.share_inputs["block_tables"] = paddle.full( + [self.parallel_config.max_num_seqs, self.num_gpu_blocks], -1, dtype="int32" + ) + + # Reset free list + free_list = list( + range( + self.num_gpu_blocks - 1, + int(self.num_gpu_blocks * self.cache_config.kv_cache_ratio) - 1, + -1, + ) + ) + self.free_list_len = len(free_list) + self.share_inputs.update( + { + "free_list": paddle.to_tensor(free_list, dtype="int32"), + "free_list_len": paddle.full([1], self.free_list_len, dtype="int32"), + } + ) + + # self.parallel_config.do_profile = False # remove + + def cal_theortical_kvcache(self): + """ + Calculate the total block memory required at the model level + TODO(gongshaotian): Move to Attention Backend + """ + """ + Byte of dtype: + - default(bf16): 2 + - cache_int8: 1 + - cache_int4: + """ + cache_quant_dtype = None + if ( + self.quant_config + and hasattr(self.quant_config, "kv_cache_quant_type") + and self.quant_config.kv_cache_quant_type is not None + ): + cache_quant_dtype = self.quant_config.kv_cache_quant_type + + if cache_quant_dtype is not None: # int8, int8_zp, fp8, fp8_zp + byte_of_dtype = 1 + else: # default + byte_of_dtype = 2 + + hidden_dim = self.model_config.head_dim * self.model_config.kv_num_heads + # NOTE(liuzichang): Implement multi-layer MTP architecture in the future + num_layers = ( + self.model_config.num_hidden_layers + self.speculative_config.num_gpu_block_expand_ratio + if self.speculative_method in ["mtp"] + else self.model_config.num_hidden_layers + ) + required_memory = byte_of_dtype * 2 * (self.cache_config.block_size * hidden_dim) * num_layers # k + v + return required_memory + + def not_need_stop(self) -> bool: + """ """ + return self.share_inputs["not_need_stop"][0] diff --git a/fastdeploy/worker/npu_worker.py b/fastdeploy/worker/npu_worker.py new file mode 100644 index 0000000000..62ec0b9dd6 --- /dev/null +++ b/fastdeploy/worker/npu_worker.py @@ -0,0 +1,113 @@ +import gc +from typing import List, Optional + +import paddle +import paddle.nn as nn +from fastdeploy.config import FDConfig +from fastdeploy.engine.request import Request +from fastdeploy.utils import get_logger +from fastdeploy.worker.npu_model_runner import NPUModelRunner +from fastdeploy.worker.output import ModelRunnerOutput +from fastdeploy.worker.worker_base import WorkerBase +from fastdeploy import envs + +logger = get_logger("npu_worker", "npu_worker.log") + + +class NpuWorker(WorkerBase): + def __init__( + self, + fd_config: FDConfig, + local_rank: int, + rank: int, + ): + super().__init__( + fd_config=fd_config, + local_rank=local_rank, + rank=rank, + ) + + def init_device(self): + """Initialize device and Construct model runner""" + if paddle.is_compiled_with_custom_device("npu"): + # Set evironment variable + self.device = f"npu:{self.local_rank}" + paddle.device.set_device(self.device) + paddle.set_default_dtype(self.parallel_config.dtype) + self.device_ids = self.parallel_config.device_ids.split(",") + + gc.collect() + else: + raise RuntimeError( + f"Not support device type: {self.device_config.device_type}" + ) + + # Construct model runner + self.model_runner: NPUModelRunner = NPUModelRunner( + fd_config=self.fd_config, + device=self.device, + device_id=self.device_ids[self.local_rank], + rank=self.rank, + local_rank=self.local_rank, + ) + + def prefill_finished(self): + """ + check whether prefill stage finished + """ + return self.model_runner.prefill_finished() + + def determine_available_memory(self) -> int: + # TODO: guozr 这里因为缺失 api 导致无法计算真实的显存 + # return 60 * 0.1 * 1024**3 + return 1024**3 + + def load_model(self) -> None: + """ """ + self.model_runner.load_model() + + def get_model(self) -> nn.Layer: + """ """ + return self.model_runner.get_model() + + def initialize_cache(self, num_gpu_blocks: int) -> None: + """Initizlize the KV Cache with accurate num_gpu_blocks""" + # accurate cache size + self.model_runner.update_share_input_block_num(num_gpu_blocks=num_gpu_blocks) + + def execute_model( + self, + model_forward_batch: Optional[List[Request]] = None, + num_running_requests: int = None, + ) -> Optional[ModelRunnerOutput]: + """ """ + output = self.model_runner.execute_model(model_forward_batch, num_running_requests) + return output + + def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: int) -> None: + """Process new requests and then start the decode loop + TODO(gongshaotian):The scheduler should schedule the handling of prefill, + and workers and modelrunners should not perceive it. + """ + if envs.ENABLE_V1_KVCACHE_SCHEDULER: + self.model_runner.insert_tasks_v1(req_dicts=req_dicts, num_running_requests=num_running_requests) + else: + self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests) + + def graph_optimize_and_warm_up_model(self) -> None: + """ """ + pass + + def check_health(self) -> bool: + """ """ + return True + + def cal_theortical_kvcache(self) -> int: + """ """ + return self.model_runner.cal_theortical_kvcache() + + def reinitialize_kv_cache(self, num_gpu_blocks: int) -> None: + """ """ + self.model_runner.update_share_input_block_num(num_gpu_blocks=num_gpu_blocks) + # TODO: + # pass diff --git a/fastdeploy/worker/worker_process.py b/fastdeploy/worker/worker_process.py index 956c89a666..bed8dc5bec 100644 --- a/fastdeploy/worker/worker_process.py +++ b/fastdeploy/worker/worker_process.py @@ -79,6 +79,10 @@ def get_worker(fd_config: FDConfig, local_rank: int, rank: int) -> WorkerBase: from fastdeploy.worker.metax_worker import MetaxWorker return MetaxWorker(fd_config=fd_config, local_rank=local_rank, rank=rank) + if current_platform.is_npu(): + from fastdeploy.worker.npu_worker import NpuWorker + + return NpuWorker(fd_config=fd_config, local_rank=local_rank, rank=rank) def init_distributed_environment(seed: int = 20) -> Tuple[int, int]: diff --git a/requirements_npu.txt b/requirements_npu.txt new file mode 100644 index 0000000000..6b5369dcd5 --- /dev/null +++ b/requirements_npu.txt @@ -0,0 +1,40 @@ +setuptools>=62.3.0,<80.0 +pre-commit +yapf +flake8 +ruamel.yaml +zmq +aiozmq +openai>=1.93.0 +tqdm +pynvml +uvicorn==0.29.0 +fastapi +paddleformers +redis +etcd3 +httpx +tool_helpers +cupy-cuda12x +pybind11[global] +tabulate +gradio +xlwt +visualdl +setuptools-scm>=8 +prometheus-client +moviepy +use-triton-in-paddle +crcmod +fastsafetensors==0.1.14 +msgpack +modelscope +opentelemetry-api>=1.24.0 +opentelemetry-sdk>=1.24.0 +opentelemetry-instrumentation-redis +opentelemetry-instrumentation-mysql +opentelemetry-distro  +opentelemetry-exporter-otlp +opentelemetry-instrumentation-fastapi +partial_json_parser +numpy<2 diff --git a/setup.py b/setup.py index 53e5fec07d..1ce8d307d4 100644 --- a/setup.py +++ b/setup.py @@ -153,6 +153,8 @@ def load_requirements(): requirements_file_name = "requirements_dcu.txt" elif paddle.device.is_compiled_with_custom_device("metax_gpu"): requirements_file_name = "requirements_metaxgpu.txt" + elif paddle.device.is_compiled_with_custom_device("npu"): + requirements_file_name = "requirements_npu.txt" requirements_path = os.path.join(os.path.dirname(__file__), requirements_file_name) with open(requirements_path, "r") as f: return [line.strip() for line in f if line.strip() and not line.startswith("#")] From 9573de846121320631467386300422bb00699941 Mon Sep 17 00:00:00 2001 From: guozr Date: Tue, 26 Aug 2025 17:15:41 +0800 Subject: [PATCH 2/4] Complete the missing NPU parts --- fastdeploy/worker/npu_model_runner.py | 634 ++++++++++++++------------ fastdeploy/worker/npu_worker.py | 41 +- 2 files changed, 368 insertions(+), 307 deletions(-) diff --git a/fastdeploy/worker/npu_model_runner.py b/fastdeploy/worker/npu_model_runner.py index 85329eeb69..8783f0a6b2 100644 --- a/fastdeploy/worker/npu_model_runner.py +++ b/fastdeploy/worker/npu_model_runner.py @@ -17,29 +17,31 @@ import os import random import time -from typing import List, Optional +from typing import Dict, List, Optional import numpy as np import paddle import paddle.nn as nn + from fastdeploy.config import FDConfig from fastdeploy.engine.request import Request +from fastdeploy.model_executor.forward_meta import ForwardMeta +from fastdeploy.model_executor.graph_optimization.utils import ( + sot_warmup_guard, +) from fastdeploy.model_executor.layers.attention import get_attention_backend -from fastdeploy.model_executor.layers.attention.base_attention_backend import \ - AttentionBackend +from fastdeploy.model_executor.layers.attention.base_attention_backend import ( + AttentionBackend, +) from fastdeploy.model_executor.layers.rotary_embedding import get_rope from fastdeploy.model_executor.layers.sample.meta_data import SamplingMetadata -from fastdeploy.model_executor.layers.sample.sampler import Sampler +from fastdeploy.model_executor.layers.sample.sampler import Sampler, SpeculativeSampler from fastdeploy.model_executor.model_loader import get_model_loader -from fastdeploy.model_executor.ops.npu import ( - rebuild_padding ) -from fastdeploy.model_executor.pre_and_post_process import ( - post_process, pre_process) +from fastdeploy.model_executor.ops.npu import rebuild_padding +from fastdeploy.model_executor.pre_and_post_process import pre_process from fastdeploy.utils import get_logger from fastdeploy.worker.model_runner_base import ModelRunnerBase from fastdeploy.worker.output import ModelOutputData, ModelRunnerOutput -from fastdeploy.model_executor.forward_meta import ForwardMeta -from typing import Dict logger = get_logger("npu_model_runner", "npu_model_runner.log") @@ -53,6 +55,7 @@ def step_paddle( NPU wrapper for step_paddle function """ from fastdeploy.model_executor.ops.npu.step_paddle import step_paddle_npu + step_paddle_npu( share_inputs["stop_flags"], share_inputs["seq_lens_this_time"], @@ -81,6 +84,66 @@ def step_paddle( ) +def npu_post_process( + sampled_token_ids: paddle.Tensor, + model_output: ModelOutputData, + share_inputs: Dict[str, paddle.Tensor], + block_size: int = 64, + skip_save_output: bool = False, +) -> None: + """NPU-specific post processing""" + from fastdeploy.model_executor.ops.npu import ( + save_output, + set_stop_value_multi_ends, + update_inputs_npu, + ) + + # 1. Set stop value + paddle.assign( + paddle.where( + model_output.stop_flags, + model_output.step_idx, + model_output.step_idx + 1, + ), + model_output.step_idx, + ) + length_cond = paddle.greater_equal(model_output.step_idx, model_output.max_dec_len) + paddle.assign( + paddle.logical_or(model_output.stop_flags, length_cond), + model_output.stop_flags, + ) + set_stop_value_multi_ends( + sampled_token_ids, + model_output.stop_flags, + model_output.seq_lens_this_time, + model_output.eos_token_id, + model_output.next_tokens, + ) # multi ends + + # 2. Update the input buffer of the model + with paddle.framework._no_check_dy2st_diff(): + update_inputs_npu( + model_output.stop_flags, + model_output.not_need_stop, + model_output.seq_lens_this_time, + model_output.seq_lens_encoder, + model_output.seq_lens_decoder, + model_output.input_ids, + model_output.stop_nums, + sampled_token_ids, + model_output.is_block_step, + ) + + # 3. Transmit the model's output and stop generation signal via message queue. + if not skip_save_output: + save_output( + sampled_token_ids, + model_output.not_need_stop, + model_output.mp_rank, + False, # use_ep + ) + + class NPUModelRunner(ModelRunnerBase): """ """ @@ -88,27 +151,27 @@ def __init__( self, fd_config: FDConfig, device: str, # logic device - device_id: int, # physical device id rank: int, local_rank: int, ): super().__init__(fd_config=fd_config, device=device) self.rank = rank self.local_rank = local_rank - self.device_id = device_id # FIXME self.speculative_method = self.fd_config.speculative_config.method self.speculative_decoding = self.speculative_method is not None + # Graph optimization level + self.graph_opt_level = 0 + # Sampler - self.sampler = Sampler() + if not self.speculative_decoding: + self.sampler = Sampler(fd_config) + else: + self.sampler = SpeculativeSampler(fd_config) # Lazy initialize kv cache after model loading # self.kv_caches: list[paddle.Tensor] = [] - # Cuda Graph - # self.use_cuda_grpah = False # remove - self.input_ids = paddle.zeros(self.parallel_config.max_num_seqs, dtype="int32") - # Initialize share inputs self._init_share_inputs(self.fd_config.parallel_config.max_num_seqs) self.infer_seed_increment = paddle.full( @@ -119,15 +182,19 @@ def __init__( # Note(gonshaotian): Currently, all attention layers share one attention backend instance. # In the future, we will expand it as a list. self.attn_backends: list[AttentionBackend] = [] - # self.attn_metadatas: list[AttentionMetadata] = [] self.initialize_attn_backend() # Forward meta store the global meta information of the forward self.forward_meta = None - if not self.speculative_decoding: - self.sampler = Sampler(fd_config) + + def exist_prefill(self): + """ + check whether prefill stage exist + """ + if int(paddle.max(self.share_inputs["seq_lens_encoder"])) != 0: + return 1 else: - self.sampler = SpeculativeSampler(fd_config) + return 0 def prefill_finished(self): """ @@ -138,6 +205,93 @@ def prefill_finished(self): ) return not paddle.any(prefill_statue).numpy() + def insert_tasks_v1(self, req_dicts: List[Request], num_running_requests: int = None): + """ + Process scheduler output tasks, used when ENABLE_V1_KVCACHE_SCHEDULER=1 + """ + # NOTE(luotingdan): Lazy initialize kv cache + if "caches" not in self.share_inputs: + self.initialize_kv_cache() + + req_len = len(req_dicts) + has_prefill_task = False + for i in range(req_len): + request = req_dicts[i] + idx = request.idx + if request.task_type.value == 1: # prefill task (RequestType.PREFILL.value) + prefill_start_index = request.prefill_start_index + prefill_end_index = request.prefill_end_index + length = prefill_end_index - prefill_start_index + input_ids = request.prompt_token_ids + request.output_token_ids + self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array( + input_ids[prefill_start_index:prefill_end_index] + ) + encoder_block_num = len(request.block_tables) + self.share_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num + self.share_inputs["block_tables"][idx : idx + 1, :] = -1 + self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( + request.block_tables, dtype="int32" + ) + self.share_inputs["stop_flags"][idx : idx + 1] = False + self.share_inputs["seq_lens_decoder"][idx : idx + 1] = prefill_start_index + self.seq_lens_this_time_buffer[idx : idx + 1] = length + self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length + self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = 0 + self.share_inputs["prompt_lens"][idx : idx + 1] = len(input_ids) + self.share_inputs["is_block_step"][idx : idx + 1] = False + self.share_inputs["step_idx"][idx : idx + 1] = ( + len(request.output_token_ids) if prefill_end_index >= len(input_ids) else 0 + ) + has_prefill_task = True + elif request.task_type.value == 2: # decode task (RequestType.DECODE.value) + encoder_block_num = len(request.block_tables) + self.share_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num + self.share_inputs["block_tables"][idx : idx + 1, :] = -1 + self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( + request.block_tables, dtype="int32" + ) + continue + else: # preempted task + self.share_inputs["block_tables"][idx : idx + 1, :] = -1 + self.share_inputs["stop_flags"][idx : idx + 1] = True + self.seq_lens_this_time_buffer[idx : idx + 1] = 0 + self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 + self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 + self.share_inputs["is_block_step"][idx : idx + 1] = False + continue + + assert len(request.eos_token_ids) == self.model_config.eos_tokens_lens + self.share_inputs["eos_token_id"][:] = np.array(request.eos_token_ids, dtype="int64").reshape(-1, 1) + + self.share_inputs["top_p"][idx : idx + 1] = request.get("top_p", 0.7) + self.share_inputs["temperature"][idx : idx + 1] = request.get("temperature", 0.95) + self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0) + self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0) + self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0) + + self.share_inputs["min_dec_len"][idx : idx + 1] = request.get("min_tokens", 1) + self.share_inputs["max_dec_len"][idx : idx + 1] = request.get( + "max_tokens", self.model_config.max_model_len + ) + + self.share_inputs["first_token_ids"][idx : idx + 1] = self.share_inputs["input_ids"][idx : idx + 1, :1] + self.share_inputs["ori_seq_lens_encoder"][idx : idx + 1] = length + + if request.get("seed") is not None: + self.share_inputs["infer_seed"][idx : idx + 1] = request.get("seed") + + if request.get("stop_token_ids") is not None and request.get("stop_seqs_len") is not None: + stop_seqs_num = len(request.get("stop_seqs_len")) + for i in range(stop_seqs_num, self.model_config.max_stop_seqs_num): + request.stop_seqs_len.append(0) + self.share_inputs["stop_seqs_len"][:] = np.array(request.stop_seqs_len, dtype="int32") + self.share_inputs["stop_seqs"][:stop_seqs_num, : len(request.get("stop_token_ids")[0])] = np.array( + request.get("stop_token_ids"), dtype="int64" + ) + if has_prefill_task: + self.share_inputs["not_need_stop"][0] = True + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer[:num_running_requests] + def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: int = None): """ Process inputs for prefill tasks and insert it to share_inputs buffer @@ -148,10 +302,7 @@ def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: self.initialize_kv_cache() # NOTE(luotingdan): Set environment variable of prefill node - if ( - req_dicts[-1].disaggregate_info is not None - and req_dicts[-1].disaggregate_info["role"] == "prefill" - ): + if req_dicts[-1].disaggregate_info is not None and req_dicts[-1].disaggregate_info["role"] == "prefill": os.environ["PREFILL_NODE_ONE_STEP_STOP"] = "1" req_len = len(req_dicts) @@ -162,16 +313,9 @@ def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: assert length > 0, "The prompt requested must not be empty." # Is Decode Node - if ( - req_dicts[i].disaggregate_info is not None - and req_dicts[i].disaggregate_info["role"] == "decode" - ): - self.share_inputs["pre_ids"][idx : idx + 1] = request.prompt_token_ids[ - -1 - ] - self.share_inputs["input_ids"][idx : idx + 1, 0] = ( - request.prompt_token_ids[0] - ) + if req_dicts[i].disaggregate_info is not None and req_dicts[i].disaggregate_info["role"] == "decode": + self.share_inputs["pre_ids"][idx : idx + 1] = request.prompt_token_ids[-1] + self.share_inputs["input_ids"][idx : idx + 1, 0] = request.prompt_token_ids[0] self.share_inputs["seq_lens_encoder"][idx : idx + 1] = 0 self.share_inputs["seq_lens_decoder"][idx : idx + 1] = length self.share_inputs["seq_lens_this_time"][idx : idx + 1] = 1 @@ -181,74 +325,44 @@ def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: else: self.share_inputs["pre_ids"][idx : idx + 1] = -1 self.share_inputs["step_idx"][idx : idx + 1] = 0 - self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array( - request.prompt_token_ids - ) + self.share_inputs["input_ids"][idx : idx + 1, :length] = np.array(request.prompt_token_ids) # Use chunked prefill if self.cache_config.enable_chunked_prefill: request.set("chunk_idx", 1) token_chunk_size = request.prefill_chunk_info[0] - self.share_inputs["seq_lens_this_time"][ - idx : idx + 1 - ] = token_chunk_size + self.share_inputs["seq_lens_this_time"][idx : idx + 1] = token_chunk_size self.share_inputs["input_ids"][idx, :token_chunk_size] = np.array( request.prompt_token_ids[:token_chunk_size] ) - self.share_inputs["step_seq_lens_encoder"][ - idx : idx + 1 - ] = token_chunk_size - self.share_inputs["seq_lens_encoder"][ - idx : idx + 1 - ] = token_chunk_size - self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get( - "seq_lens_decoder", 0 - ) - self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = ( - request.get("seq_lens_decoder", 0) - ) + self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = token_chunk_size + self.share_inputs["seq_lens_encoder"][idx : idx + 1] = token_chunk_size + self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) + self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) else: - self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get( - "seq_lens_decoder", 0 - ) - self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = ( - request.get("seq_lens_decoder", 0) - ) + self.share_inputs["seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) + self.share_inputs["step_seq_lens_decoder"][idx : idx + 1] = request.get("seq_lens_decoder", 0) self.share_inputs["seq_lens_this_time"][idx : idx + 1] = length self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = length self.share_inputs["seq_lens_encoder"][idx : idx + 1] = length if len(request.eos_token_ids) < self.model_config.eos_tokens_lens: request.eos_token_ids.append(request.eos_token_ids[0]) - self.share_inputs["eos_token_id"][:] = np.array( - request.eos_token_ids, dtype="int64" - ).reshape(-1, 1) + self.share_inputs["eos_token_id"][:] = np.array(request.eos_token_ids, dtype="int64").reshape(-1, 1) self.share_inputs["top_p"][idx : idx + 1] = request.get("top_p", 0.7) - self.share_inputs["temperature"][idx : idx + 1] = request.get( - "temperature", 0.95 - ) - self.share_inputs["penalty_score"][idx : idx + 1] = request.get( - "repetition_penalty", 1.0 - ) - self.share_inputs["frequency_score"][idx : idx + 1] = request.get( - "frequency_penalty", 0.0 - ) - self.share_inputs["presence_score"][idx : idx + 1] = request.get( - "presence_penalty", 0.0 - ) + self.share_inputs["temperature"][idx : idx + 1] = request.get("temperature", 0.95) + self.share_inputs["penalty_score"][idx : idx + 1] = request.get("repetition_penalty", 1.0) + self.share_inputs["frequency_score"][idx : idx + 1] = request.get("frequency_penalty", 0.0) + self.share_inputs["presence_score"][idx : idx + 1] = request.get("presence_penalty", 0.0) - self.share_inputs["min_dec_len"][idx : idx + 1] = request.get( - "min_tokens", 1 - ) + self.share_inputs["min_dec_len"][idx : idx + 1] = request.get("min_tokens", 1) self.share_inputs["max_dec_len"][idx : idx + 1] = request.get( "max_tokens", self.model_config.max_model_len ) self.share_inputs["stop_flags"][idx : idx + 1] = False - self.share_inputs["first_token_ids"][idx : idx + 1] = self.share_inputs[ - "input_ids" - ][idx : idx + 1, :1] + self.share_inputs["first_token_ids"][idx : idx + 1] = self.share_inputs["input_ids"][idx : idx + 1, :1] self.share_inputs["ori_seq_lens_encoder"][idx : idx + 1] = length if request.get("seed") is not None: @@ -256,27 +370,21 @@ def insert_prefill_inputs(self, req_dicts: List[Request], num_running_requests: encoder_block_num = len(request.get("block_tables")) self.share_inputs["encoder_block_lens"][idx : idx + 1] = encoder_block_num self.share_inputs["block_tables"][idx : idx + 1, :] = -1 - self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = ( - np.array(request.block_tables, dtype="int32") + self.share_inputs["block_tables"][idx : idx + 1, :encoder_block_num] = np.array( + request.block_tables, dtype="int32" ) - if ( - request.get("stop_token_ids") is not None - and request.get("stop_seqs_len") is not None - ): + if request.get("stop_token_ids") is not None and request.get("stop_seqs_len") is not None: stop_seqs_num = len(request.get("stop_seqs_len")) for i in range(stop_seqs_num, self.model_config.max_stop_seqs_num): request.stop_seqs_len.append(0) - self.share_inputs["stop_seqs_len"][:] = np.array( - request.stop_seqs_len, dtype="int32" + self.share_inputs["stop_seqs_len"][:] = np.array(request.stop_seqs_len, dtype="int32") + self.share_inputs["stop_seqs"][:stop_seqs_num, : len(request.get("stop_token_ids")[0])] = np.array( + request.get("stop_token_ids"), dtype="int64" ) - self.share_inputs["stop_seqs"][ - :stop_seqs_num, : len(request.get("stop_token_ids")[0]) - ] = np.array(request.get("stop_token_ids"), dtype="int64") self.share_inputs["not_need_stop"][0] = True - def _init_share_inputs(self, max_num_seqs: int): """Initialize all share buffers for model inputs. Note: In the future, we may abandon share buffers. @@ -299,18 +407,10 @@ def _init_share_inputs(self, max_num_seqs: int): self.model_config.pad_token_id, dtype="int64", ) - self.share_inputs["eos_token_id"] = paddle.full( - [self.model_config.eos_tokens_lens, 1], 0, dtype="int64" - ) - self.share_inputs["top_p"] = paddle.full( - [max_num_seqs, 1], self.model_config.top_p, dtype="float32" - ) - self.share_inputs["top_k"] = paddle.full( - [max_num_seqs, 1], 0, dtype="int64" - ) - self.share_inputs["min_p"] = paddle.full( - [max_num_seqs, 1], 0.0, dtype="float32" - ) + self.share_inputs["eos_token_id"] = paddle.full([self.model_config.eos_tokens_lens, 1], 0, dtype="int64") + self.share_inputs["top_p"] = paddle.full([max_num_seqs, 1], self.model_config.top_p, dtype="float32") + self.share_inputs["top_k"] = paddle.full([max_num_seqs, 1], 0, dtype="int64") + self.share_inputs["min_p"] = paddle.full([max_num_seqs, 1], 0.0, dtype="float32") self.share_inputs["temperature"] = paddle.full( [max_num_seqs, 1], self.model_config.temperature, dtype="float32" ) @@ -324,95 +424,47 @@ def _init_share_inputs(self, max_num_seqs: int): [max_num_seqs, 1], self.model_config.presence_score, dtype="float32" ) - self.share_inputs["min_dec_len"] = paddle.full( - [max_num_seqs, 1], self.model_config.min_length, dtype="int64" - ) + self.share_inputs["min_dec_len"] = paddle.full([max_num_seqs, 1], self.model_config.min_length, dtype="int64") self.share_inputs["max_dec_len"] = paddle.full( [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" ) - self.share_inputs["min_length"] = paddle.full( - [max_num_seqs, 1], self.model_config.min_length, dtype="int64" - ) + self.share_inputs["min_length"] = paddle.full([max_num_seqs, 1], self.model_config.min_length, dtype="int64") self.share_inputs["max_length"] = paddle.full( [max_num_seqs, 1], self.model_config.max_model_len, dtype="int64" ) - self.share_inputs["seq_lens_this_time"] = paddle.full( - max_num_seqs, 0, dtype="int32" - ) - self.share_inputs["seq_lens_encoder"] = paddle.full( - [max_num_seqs, 1], 0, dtype="int32" - ) - self.share_inputs["seq_lens_decoder"] = paddle.full( - [max_num_seqs, 1], 0, dtype="int32" - ) - self.share_inputs["step_seq_lens_encoder"] = paddle.full( - [max_num_seqs, 1], 0, dtype="int32" - ) - self.share_inputs["step_seq_lens_decoder"] = paddle.full( - [max_num_seqs, 1], 0, dtype="int32" - ) - self.share_inputs["prompt_lens"] = paddle.full( - [max_num_seqs, 1], 0, dtype="int64" - ) + self.share_inputs["seq_lens_this_time"] = paddle.full(max_num_seqs, 0, dtype="int32") + self.share_inputs["seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") + self.share_inputs["seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") + self.share_inputs["step_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") + self.share_inputs["step_seq_lens_decoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") + self.share_inputs["prompt_lens"] = paddle.full([max_num_seqs, 1], 0, dtype="int64") self.share_inputs["step_idx"] = paddle.full([max_num_seqs, 1], 0, dtype="int64") self.share_inputs["not_need_stop"] = paddle.full( [1], False, dtype="bool" ).cpu() # TODO(gongshaotian): move to pinnd memory - self.share_inputs["stop_flags"] = paddle.full( - [max_num_seqs, 1], True, dtype="bool" - ) + self.share_inputs["stop_flags"] = paddle.full([max_num_seqs, 1], True, dtype="bool") self.share_inputs["stop_nums"] = paddle.full([1], max_num_seqs, dtype="int64") - self.share_inputs["bad_tokens"] = paddle.full( - [max_num_seqs, self.model_config.vocab_size], -1, dtype="int64" - ) - self.share_inputs["bad_tokens_len"] = paddle.full( - [max_num_seqs], 1, dtype="int64" - ) - self.share_inputs["next_tokens"] = paddle.full( - [max_num_seqs, 1], -1, dtype="int64" - ) - self.share_inputs["is_block_step"] = paddle.full( - [max_num_seqs], False, dtype="bool" - ) - self.share_inputs["encoder_block_lens"] = paddle.full( - [max_num_seqs], 0, dtype="int32" - ) - self.share_inputs["step_block_list"] = paddle.full( - [max_num_seqs], -1, dtype="int32" - ) + self.share_inputs["bad_tokens"] = paddle.full([max_num_seqs, self.model_config.vocab_size], -1, dtype="int64") + self.share_inputs["bad_tokens_len"] = paddle.full([max_num_seqs], 1, dtype="int64") + self.share_inputs["next_tokens"] = paddle.full([max_num_seqs, 1], -1, dtype="int64") + self.share_inputs["is_block_step"] = paddle.full([max_num_seqs], False, dtype="bool") + self.share_inputs["encoder_block_lens"] = paddle.full([max_num_seqs], 0, dtype="int32") + self.share_inputs["step_block_list"] = paddle.full([max_num_seqs], -1, dtype="int32") self.share_inputs["step_lens"] = paddle.full([1], 0, dtype="int32") - self.share_inputs["recover_block_list"] = paddle.full( - [max_num_seqs], -1, dtype="int32" - ) + self.share_inputs["recover_block_list"] = paddle.full([max_num_seqs], -1, dtype="int32") self.share_inputs["recover_lens"] = paddle.full([1], 0, dtype="int32") - self.share_inputs["need_block_list"] = paddle.full( - [max_num_seqs], -1, dtype="int32" - ) + self.share_inputs["need_block_list"] = paddle.full([max_num_seqs], -1, dtype="int32") self.share_inputs["need_block_len"] = paddle.full([1], 0, dtype="int32") - self.share_inputs["used_list_len"] = paddle.full( - [max_num_seqs], 0, dtype="int32" - ) - self.share_inputs["infer_seed"] = paddle.full( - [max_num_seqs, 1], 0, dtype="int64" - ) - self.share_inputs["first_token_ids"] = paddle.full( - [max_num_seqs, 1], -1, dtype="int64" - ) - self.share_inputs["ori_seq_lens_encoder"] = paddle.full( - [max_num_seqs, 1], 0, dtype="int32" - ) - self.share_inputs["system_lens"] = paddle.full( - [max_num_seqs, 1], 0, dtype="int32" - ) - self.share_inputs["system_ids"] = paddle.full( - [max_num_seqs, 1], -1, dtype="int32" - ) + self.share_inputs["used_list_len"] = paddle.full([max_num_seqs], 0, dtype="int32") + self.share_inputs["infer_seed"] = paddle.full([max_num_seqs, 1], 0, dtype="int64") + self.share_inputs["first_token_ids"] = paddle.full([max_num_seqs, 1], -1, dtype="int64") + self.share_inputs["ori_seq_lens_encoder"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") + self.share_inputs["system_lens"] = paddle.full([max_num_seqs, 1], 0, dtype="int32") + self.share_inputs["system_ids"] = paddle.full([max_num_seqs, 1], -1, dtype="int32") # Initialize rotary position embedding - tmp_position_ids = paddle.arange(self.parallel_config.max_model_len).reshape( - (1, -1) - ) + tmp_position_ids = paddle.arange(self.parallel_config.max_model_len).reshape((1, -1)) # TODO(gongshaotian): move to models self.share_inputs["rope_emb"] = get_rope( rotary_dim=self.model_config.head_dim, @@ -423,13 +475,9 @@ def _init_share_inputs(self, max_num_seqs: int): # Set block tables pre_max_block_num = ( - (self.parallel_config.max_model_len + self.parallel_config.block_size - 1) - // self.parallel_config.block_size - + self.parallel_config.enc_dec_block_num - ) - self.share_inputs["block_tables"] = paddle.full( - [max_num_seqs, pre_max_block_num], -1, dtype="int32" - ) + self.parallel_config.max_model_len + self.parallel_config.block_size - 1 + ) // self.parallel_config.block_size + self.parallel_config.enc_dec_block_num + self.share_inputs["block_tables"] = paddle.full([max_num_seqs, pre_max_block_num], -1, dtype="int32") # Initialize free list free_list = list( @@ -441,14 +489,10 @@ def _init_share_inputs(self, max_num_seqs: int): ) self.free_list_len = len(free_list) self.share_inputs["free_list"] = paddle.to_tensor(free_list, dtype="int32") - self.share_inputs["free_list_len"] = paddle.full( - [1], self.free_list_len, dtype="int32" - ) + self.share_inputs["free_list_len"] = paddle.full([1], self.free_list_len, dtype="int32") # Initialize stop seqs - self.share_inputs["stop_seqs_len"] = paddle.full( - [self.model_config.max_stop_seqs_num], 0, dtype="int32" - ) + self.share_inputs["stop_seqs_len"] = paddle.full([self.model_config.max_stop_seqs_num], 0, dtype="int32") self.share_inputs["stop_seqs"] = paddle.full( [self.model_config.max_stop_seqs_num, self.model_config.stop_seqs_max_len], -1, @@ -457,65 +501,29 @@ def _init_share_inputs(self, max_num_seqs: int): # Initialize fields needed for _prepare_inputs self.share_inputs["ids_remove_padding"] = paddle.full( - [self.parallel_config.max_num_seqs * self.parallel_config.max_model_len], - self.model_config.pad_token_id, - dtype="int64" - ) - self.share_inputs["cum_offsets"] = paddle.full( - [self.parallel_config.max_num_seqs], - 0, - dtype="int32" + [self.parallel_config.max_num_seqs * self.parallel_config.max_model_len], + self.model_config.pad_token_id, + dtype="int64", ) + self.share_inputs["cum_offsets"] = paddle.full([self.parallel_config.max_num_seqs], 0, dtype="int32") self.share_inputs["batch_id_per_token"] = paddle.full( - [self.parallel_config.max_num_seqs * self.parallel_config.max_model_len], - 0, - dtype="int32" - ) - self.share_inputs["cu_seqlens_q"] = paddle.full( - [self.parallel_config.max_num_seqs + 1], - 0, - dtype="int32" - ) - self.share_inputs["cu_seqlens_k"] = paddle.full( - [self.parallel_config.max_num_seqs + 1], - 0, - dtype="int32" - ) - self.share_inputs["decoder_batch_ids"] = paddle.full( - [self.parallel_config.max_num_seqs], - 0, - dtype="int32" + [self.parallel_config.max_num_seqs * self.parallel_config.max_model_len], 0, dtype="int32" ) + self.share_inputs["cu_seqlens_q"] = paddle.full([self.parallel_config.max_num_seqs + 1], 0, dtype="int32") + self.share_inputs["cu_seqlens_k"] = paddle.full([self.parallel_config.max_num_seqs + 1], 0, dtype="int32") + self.share_inputs["decoder_batch_ids"] = paddle.full([self.parallel_config.max_num_seqs], 0, dtype="int32") self.share_inputs["decoder_tile_ids_per_batch"] = paddle.full( - [self.parallel_config.max_num_seqs], - 0, - dtype="int32" + [self.parallel_config.max_num_seqs], 0, dtype="int32" ) self.share_inputs["decoder_num_blocks_cpu"] = paddle.full( - [self.parallel_config.max_num_seqs], - 0, - dtype="int32" - ) - self.share_inputs["max_len_tensor_cpu"] = paddle.full( - [1], - self.parallel_config.max_model_len, - dtype="int32" + [self.parallel_config.max_num_seqs], 0, dtype="int32" ) + self.share_inputs["max_len_tensor_cpu"] = paddle.full([1], self.parallel_config.max_model_len, dtype="int32") self.share_inputs["draft_tokens"] = paddle.full( - [self.parallel_config.max_num_seqs, self.parallel_config.max_model_len], - -1, - dtype="int64" - ) - self.share_inputs["output_cum_offsets"] = paddle.full( - [self.parallel_config.max_num_seqs], - 0, - dtype="int32" - ) - self.share_inputs["output_padding_offset"] = paddle.full( - [self.parallel_config.max_num_seqs], - 0, - dtype="int32" + [self.parallel_config.max_num_seqs, self.parallel_config.max_model_len], -1, dtype="int64" ) + self.share_inputs["output_cum_offsets"] = paddle.full([self.parallel_config.max_num_seqs], 0, dtype="int32") + self.share_inputs["output_padding_offset"] = paddle.full([self.parallel_config.max_num_seqs], 0, dtype="int32") def _prepare_inputs(self) -> None: """prepare the model inputs""" @@ -569,7 +577,7 @@ def _prepare_inputs(self) -> None: def load_model(self) -> None: """load or download model""" logger.info(f"Starting to load model {self.model_config.architectures[0]}") - time_before_load = time.perf_counter() + time.perf_counter() # 1. Load original model model_loader = get_model_loader(load_config=self.fd_config.load_config) self.model = model_loader.load_model(fd_config=self.fd_config) @@ -623,8 +631,7 @@ def initialize_forward_meta(self): # FIXME for attn_backend in self.attn_backends: attn_backend.init_attention_metadata(self.forward_meta) - def initialize_kv_cache(self, - kv_cache_config = None) -> None: # FIXME + def initialize_kv_cache(self, kv_cache_config=None) -> None: # FIXME """ Initialize kv cache Args: @@ -651,7 +658,6 @@ def initialize_kv_cache(self, ) for i in range(self.model_config.num_hidden_layers): - cache_kvs["key_caches_{}".format(i)] = paddle.full( shape=kv_cache_shape, fill_value=0, @@ -665,12 +671,10 @@ def initialize_kv_cache(self, self.share_inputs["caches"] = list(cache_kvs.values()) for value in cache_kvs.values(): del value - + # paddle.device.cuda.empty_cache() - def initialize_attn_backend( - self, kv_cache_config = None - ) -> None: + def initialize_attn_backend(self, kv_cache_config=None) -> None: """ Initialize attention backends and forward metadata Args: @@ -695,7 +699,7 @@ def initialize_attn_backend( ) if attn_backend is None: raise NotImplementedError( - f"{ self.parallel_config.attention_backend} attention backend is not support by NPUModelRunner" + f"{self.parallel_config.attention_backend} attention backend is not support by NPUModelRunner" ) self.attn_backends.append(attn_backend) @@ -704,7 +708,17 @@ def capture_model(self) -> None: Trigger CUDA Graph capture for all shapes in 'CudaGraphConfig.cudagraph_capture_sizes' """ logger.warn("NPU not support cuda graph currently") - pass + + @sot_warmup_guard(True) + def sot_warmup(self) -> None: + start_time = time.perf_counter() + for batch_size in self.sot_warmup_sizes: + self._dummy_run( + num_tokens=self.parallel_config.max_num_batched_tokens, + batch_size=batch_size, + ) + logger.info(f"SOT warmup the model with the batch size:{batch_size}") + logger.info(f"SOT warmup took {time.perf_counter() - start_time} seconds") def execute_model( self, @@ -733,10 +747,7 @@ class at the server level, which is too granular for ModelRunner. # 2. Padding inputs for cuda grph # 3. Execute model - model_output = self.model( - self.share_inputs["ids_remove_padding"], self.forward_meta - ) - + model_output = self.model(self.share_inputs["ids_remove_padding"], self.forward_meta) hidden_states = rebuild_padding( model_output, @@ -754,8 +765,6 @@ class at the server level, which is too granular for ModelRunner. sampler_output = self.sampler(logits, self.sampling_metadata) - - # 6. Post Process model_output_data = ModelOutputData( next_tokens=self.share_inputs["next_tokens"], @@ -784,18 +793,17 @@ class at the server level, which is too granular for ModelRunner. need_think_end=None, reasoning_index=None, ) - post_process( - sampler_output=sampler_output, - model_output=model_output_data, - share_inputs=self.share_inputs, - block_size=self.cache_config.block_size, - speculative_decoding=self.speculative_decoding, - skip_save_output=False, + npu_post_process( + sampled_token_ids=sampler_output.sampled_token_ids, + model_output=model_output_data, + share_inputs=self.share_inputs, + block_size=self.cache_config.block_size, + skip_save_output=False, ) # 7. Updata 'infer_seed' and step_cuda() self.share_inputs["infer_seed"].add_(self.infer_seed_increment) self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED - + step_paddle( self.share_inputs, self.parallel_config.block_size, @@ -805,30 +813,77 @@ class at the server level, which is too granular for ModelRunner. # self._update_chunked_prefill(model_forward_batch) # FIXME return None + def prepare_profile(self) -> None: + """Prepare the profile run by setting the block number and initializing the KV cache.""" + self.num_gpu_blocks = self.parallel_config.total_block_num + self.initialize_kv_cache() def profile_run(self) -> None: """Execute a forward pass with dummy inputs to profile the memory usage of the model.""" + # Initialize kv cache for profile run. After profile run kv cache will be reset. + # TODO(gongshaotian): Optimize the management logic of kvcache + self.num_gpu_blocks = self.parallel_config.total_block_num + self.initialize_kv_cache() + + # Dummy run + self._dummy_run( + num_tokens=int(self.parallel_config.max_num_batched_tokens), + batch_size=min(self.parallel_config.max_num_seqs, 1), + ) + + # Clean up + self.clear_cache() + + def _dummy_prefill_inputs(self, num_tokens: int, batch_size: int): + """Set dummy prefill inputs to share_inputs""" + full_length = min(num_tokens // batch_size, self.parallel_config.max_model_len - 10) + input_length = int(full_length - 512) + block_num = ( + input_length + self.cache_config.block_size - 1 + ) // self.cache_config.block_size + self.cache_config.enc_dec_block_num + + for i in range(batch_size): + idx = i + self.share_inputs["input_ids"][idx : idx + 1, :input_length] = np.array([5] * input_length) + self.share_inputs["eos_token_id"][:] = np.array( + [2] * self.model_config.eos_tokens_lens, dtype="int64" + ).reshape(-1, 1) + self.seq_lens_this_time_buffer[idx : idx + 1] = input_length + self.share_inputs["step_seq_lens_encoder"][idx : idx + 1] = input_length + self.share_inputs["seq_lens_encoder"][idx : idx + 1] = input_length + self.share_inputs["seq_lens_decoder"][idx : idx + 1] = 0 + self.share_inputs["step_idx"][idx : idx + 1] = 0 + self.share_inputs["max_dec_len"][idx : idx + 1] = 10 + self.share_inputs["stop_flags"][idx : idx + 1] = False + + self.share_inputs["first_token_ids"][idx : idx + 1] = self.share_inputs["input_ids"][idx : idx + 1, :1] + self.share_inputs["ori_seq_lens_encoder"][idx : idx + 1] = input_length + + self.share_inputs["infer_seed"][idx : idx + 1] = random.randint(0, 922337203685477580) + self.share_inputs["encoder_block_lens"][idx : idx + 1] = block_num + self.share_inputs["block_tables"][idx : idx + 1, :block_num] = np.arange( + idx * block_num, (idx + 1) * block_num, 1 + ) + self.share_inputs["seq_lens_this_time"] = self.seq_lens_this_time_buffer + + def _dummy_run( + self, + num_tokens: int, + batch_size: int, + in_capturing: bool = False, + ) -> None: + """ + Use dummy inputs to run before formal execution. + Args: + num_tokens: Expected number of tokens generated + """ + self._dummy_prefill_inputs(num_tokens, batch_size) + + while True: + self.execute_model(None, True, batch_size) - logger.warn("NPU not support profile currently") - # # Initialize kv cache for profile run. After profile run kv cache will be reset. - # # TODO(gongshaotian): Optimize the management logic of kvcache - # self.num_gpu_blocks = self.parallel_config.max_block_num - # self.initialize_kv_cache() - # - # # 1. Profile with multimodal encoder & encoder cache - # - # # 2. Dummy run - # self._dummy_run(num_tokens=self.parallel_config.max_num_batched_tokens, - # batch_size=self.parallel_config.max_num_seqs) - # - # # 3. gc - # del self.share_inputs["caches"] - # if self.forward_meta is not None: - # del self.forward_meta.caches - # del self.share_inputs["block_tables"] - # # # paddle.device.cuda.synchronize() - # # paddle.device.cuda.empty_cache() - # # gc.collect() + if int((self.share_inputs["seq_lens_this_time"] > 0).sum()) == 0: + break def update_share_input_block_num(self, num_gpu_blocks: int) -> None: """ @@ -838,11 +893,8 @@ def update_share_input_block_num(self, num_gpu_blocks: int) -> None: """ self.num_gpu_blocks = num_gpu_blocks - # # Reset block table and kv cache with global block num - # if not (self.parallel_config.enable_prefix_caching \ # remove - # or self.parallel_config.splitwise_role != "mixed"): - # self.initialize_kv_cache() - self.initialize_kv_cache() # FIXME + # Reset block table and kv cache with global block num + self.initialize_kv_cache() self.share_inputs["block_tables"] = paddle.full( [self.parallel_config.max_num_seqs, self.num_gpu_blocks], -1, dtype="int32" @@ -864,7 +916,11 @@ def update_share_input_block_num(self, num_gpu_blocks: int) -> None: } ) - # self.parallel_config.do_profile = False # remove + def clear_cache(self): + """Clear cached data from shared inputs and forward metadata""" + self.share_inputs.pop("caches", None) + if self.forward_meta is not None: + self.forward_meta.clear_caches() def cal_theortical_kvcache(self): """ diff --git a/fastdeploy/worker/npu_worker.py b/fastdeploy/worker/npu_worker.py index 62ec0b9dd6..16c6324e38 100644 --- a/fastdeploy/worker/npu_worker.py +++ b/fastdeploy/worker/npu_worker.py @@ -3,13 +3,14 @@ import paddle import paddle.nn as nn + +from fastdeploy import envs from fastdeploy.config import FDConfig from fastdeploy.engine.request import Request -from fastdeploy.utils import get_logger +from fastdeploy.utils import get_logger, set_random_seed from fastdeploy.worker.npu_model_runner import NPUModelRunner from fastdeploy.worker.output import ModelRunnerOutput from fastdeploy.worker.worker_base import WorkerBase -from fastdeploy import envs logger = get_logger("npu_worker", "npu_worker.log") @@ -38,19 +39,23 @@ def init_device(self): gc.collect() else: - raise RuntimeError( - f"Not support device type: {self.device_config.device_type}" - ) + raise RuntimeError(f"Not support device type: {self.device_config.device_type}") + set_random_seed(self.fd_config.model_config.seed) # Construct model runner self.model_runner: NPUModelRunner = NPUModelRunner( fd_config=self.fd_config, device=self.device, - device_id=self.device_ids[self.local_rank], rank=self.rank, local_rank=self.local_rank, ) + def exist_prefill(self): + """ + check whether prefill stage exist + """ + return self.model_runner.exist_prefill() + def prefill_finished(self): """ check whether prefill stage finished @@ -59,15 +64,14 @@ def prefill_finished(self): def determine_available_memory(self) -> int: # TODO: guozr 这里因为缺失 api 导致无法计算真实的显存 - # return 60 * 0.1 * 1024**3 - return 1024**3 + return 60 * 0.1 * 1024**3 def load_model(self) -> None: - """ """ + """Load model""" self.model_runner.load_model() def get_model(self) -> nn.Layer: - """ """ + """Get current model""" return self.model_runner.get_model() def initialize_cache(self, num_gpu_blocks: int) -> None: @@ -80,7 +84,7 @@ def execute_model( model_forward_batch: Optional[List[Request]] = None, num_running_requests: int = None, ) -> Optional[ModelRunnerOutput]: - """ """ + """Execute model inference""" output = self.model_runner.execute_model(model_forward_batch, num_running_requests) return output @@ -95,19 +99,20 @@ def preprocess_new_task(self, req_dicts: List[Request], num_running_requests: in self.model_runner.insert_prefill_inputs(req_dicts=req_dicts, num_running_requests=num_running_requests) def graph_optimize_and_warm_up_model(self) -> None: - """ """ - pass + """ + Perform the warm-up and the graph optimization + """ + if self.model_runner.graph_opt_level >= 1: + self.model_runner.sot_warmup() def check_health(self) -> bool: - """ """ + """Basic health check""" return True def cal_theortical_kvcache(self) -> int: - """ """ + """Calculate the block memory required""" return self.model_runner.cal_theortical_kvcache() def reinitialize_kv_cache(self, num_gpu_blocks: int) -> None: - """ """ + """Reinitialize KV cache with new block number""" self.model_runner.update_share_input_block_num(num_gpu_blocks=num_gpu_blocks) - # TODO: - # pass From f3c2b68b2c3dc3dfdaf092ee62e5969cf6d30212 Mon Sep 17 00:00:00 2001 From: guozr Date: Mon, 1 Sep 2025 12:59:31 +0800 Subject: [PATCH 3/4] moe support --- .../layers/moe/fused_moe_npu_backend.py | 202 ++++++++++++++++++ fastdeploy/model_executor/layers/moe/moe.py | 4 + .../layers/quantization/weight_only.py | 7 +- fastdeploy/model_executor/ops/npu/__init__.py | 1 + .../model_executor/ops/npu/sparse_moe.py | 33 +-- 5 files changed, 232 insertions(+), 15 deletions(-) create mode 100644 fastdeploy/model_executor/layers/moe/fused_moe_npu_backend.py diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_npu_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_npu_backend.py new file mode 100644 index 0000000000..598302a9fc --- /dev/null +++ b/fastdeploy/model_executor/layers/moe/fused_moe_npu_backend.py @@ -0,0 +1,202 @@ +""" +# Copyright (c) 2024 PaddlePaddle Authors. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +""" + +from typing import Dict + +import paddle +from paddle import nn + +from fastdeploy.model_executor.layers.moe.fused_moe_backend_base import ( + UnquantizedFusedMoEMethod, +) +from fastdeploy.model_executor.layers.quantization.quant_base import QuantMethodBase +from fastdeploy.model_executor.layers.quantization.weight_only import WeightOnlyConfig +from fastdeploy.model_executor.ops.npu import npu_quant_weight + + +class NPUMoEMethod(UnquantizedFusedMoEMethod): + """ + NPU MOE + """ + + def process_loaded_weights(self, layer: nn.Layer, state_dict): + + up_gate_proj_weights, down_proj_weights = layer.extract_moe_ffn_weights(state_dict) + for weights in [up_gate_proj_weights, down_proj_weights]: + for idx, weight in enumerate(weights): + weights[idx] = weight.transpose([1, 0]) + stacked_up_gate_proj_weights = paddle.stack(up_gate_proj_weights, axis=0) + stacked_down_proj_weights = paddle.stack(down_proj_weights, axis=0) + + layer.up_gate_proj_weight.set_value(stacked_up_gate_proj_weights) + layer.down_proj_weight.set_value(stacked_down_proj_weights) + + def apply_tp( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + Paddle Cutlass compute Fused MoE. + """ + from fastdeploy.model_executor.ops.npu import fused_sparse_moe + fused_moe_out = fused_sparse_moe( + x, + gate.weight.transpose([1, 0]), + layer.up_gate_proj_weight, + layer.down_proj_weight, + None, # ffn1_bias + None, # ffn1_scale + None, # ffn2_bias + None, # ffn2_scale + self.moe_quant_type, + layer.top_k, + layer.tp_size + ) + if layer.tp_size > 1: + from fastdeploy.distributed.communication import ( + tensor_model_parallel_all_reduce, + ) + + tensor_model_parallel_all_reduce(fused_moe_out) + + return fused_moe_out + + def apply_ep_prefill( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + Apply the EP prefill method. + """ + raise NotImplementedError + + def apply_ep_decode( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + Apply the EP decoder method. + """ + raise NotImplementedError + + +class NPUWeightOnlyMoEMethod(QuantMethodBase): + """ + NPU Fused MoE Method. + """ + + def __init__( + self, + quant_config: WeightOnlyConfig, + ) -> None: + super().__init__() + self.quant_config = quant_config + self.moe_quant_type = self.quant_config.algo + + def create_weights(self, layer: nn.Layer, state_dict: Dict[str, paddle.Tensor]): + """ + Paddle cutlass create weight process. + """ + up_gate_proj_weights, down_proj_weights = layer.extract_moe_ffn_weights(state_dict) + assert len(up_gate_proj_weights) == layer.num_local_experts + assert len(down_proj_weights) == layer.num_local_experts + assert up_gate_proj_weights[0].shape == [ + layer.hidden_size, + layer.moe_intermediate_size * 2, + ] + assert down_proj_weights[0].shape == [ + layer.moe_intermediate_size, + layer.hidden_size, + ] + + added_weight_attrs = ["up_gate_proj_weight", "down_proj_weight"] + added_scale_attrs = [ + "up_gate_proj_weight_scale", + "down_proj_weight_scale", + ] + + for idx, weight_tensor in enumerate([up_gate_proj_weights, down_proj_weights]): + weight_name = added_weight_attrs[idx] + scale_name = added_scale_attrs[idx] + + weight_list = [] + weight_scale_list = [] + for i in range(layer.num_local_experts): + quant_weight, scale = npu_quant_weight( + weight_tensor[i], self.moe_quant_type, -1, -1 + ) # weight is [k,n] + weight_list.append(quant_weight.transpose([1, 0])) # transpose weight to [n,k] + weight_scale_list.append(scale) + quanted_weight = paddle.stack(weight_list, axis=0) + setattr( + layer, + weight_name, + layer.create_parameter( + shape=quanted_weight.shape, + dtype=quanted_weight.dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + getattr(layer, weight_name).set_value(quanted_weight) + + quanted_weight_scale = paddle.stack(weight_scale_list, axis=0) + setattr( + layer, + scale_name, + layer.create_parameter( + shape=quanted_weight_scale.shape, + dtype=quanted_weight_scale.dtype, + ), + ) + getattr(layer, scale_name).set_value(quanted_weight_scale) + + def apply( + self, + layer: nn.Layer, + x: paddle.Tensor, + gate: nn.Layer, + ) -> paddle.Tensor: + """ + NPU compute Fused MoE. + """ + from fastdeploy.model_executor.ops.npu import fused_sparse_moe + fused_moe_out = fused_sparse_moe( + x, + gate.weight.transpose([1, 0]), + layer.up_gate_proj_weight, + layer.down_proj_weight, + None, # ffn1_bias + (layer.up_gate_proj_weight_scale if hasattr(layer, "up_gate_proj_weight_scale") else None), + None, # ffn2_bias + (layer.down_proj_weight_scale if hasattr(layer, "down_proj_weight_scale") else None), + self.moe_quant_type, + layer.top_k, + layer.tp_size + ) + if layer.tp_size > 1: + from fastdeploy.distributed.communication import ( + tensor_model_parallel_all_reduce, + ) + + tensor_model_parallel_all_reduce(fused_moe_out) + + return fused_moe_out \ No newline at end of file diff --git a/fastdeploy/model_executor/layers/moe/moe.py b/fastdeploy/model_executor/layers/moe/moe.py index 069ee3d04e..f848395102 100644 --- a/fastdeploy/model_executor/layers/moe/moe.py +++ b/fastdeploy/model_executor/layers/moe/moe.py @@ -55,6 +55,10 @@ def get_moe_method(): ) return MetaxTritonWeightOnlyMoEMethod(None) + elif current_platform.is_npu(): + from .fused_moe_npu_backend import NPUMoEMethod + + return NPUMoEMethod(None) raise NotImplementedError diff --git a/fastdeploy/model_executor/layers/quantization/weight_only.py b/fastdeploy/model_executor/layers/quantization/weight_only.py index 49a508e575..223703bef4 100644 --- a/fastdeploy/model_executor/layers/quantization/weight_only.py +++ b/fastdeploy/model_executor/layers/quantization/weight_only.py @@ -105,8 +105,11 @@ def get_quant_method(self, layer) -> Optional[QuantMethodBase]: return GPUWeightOnlyLinearMethod(self) elif current_platform.is_npu(): - from fastdeploy.model_executor.layers.backends import NPUWeightOnlyLinearMethod - return NPUWeightOnlyLinearMethod(self) + from fastdeploy.model_executor.layers.backends import (NPUWeightOnlyLinearMethod, NPUWeightOnlyMoEMethod) + if isinstance(layer, FusedMoe): + return NPUWeightOnlyMoEMethod(self) + else: + return NPUWeightOnlyLinearMethod(self) else: if isinstance(layer, FusedMoE): if layer.use_method == "cutlass": diff --git a/fastdeploy/model_executor/ops/npu/__init__.py b/fastdeploy/model_executor/ops/npu/__init__.py index ec3e99b7e6..6869a7fb21 100644 --- a/fastdeploy/model_executor/ops/npu/__init__.py +++ b/fastdeploy/model_executor/ops/npu/__init__.py @@ -28,6 +28,7 @@ from .get_token_penalty_multi_scores import get_token_penalty_multi_scores_npu from .top_p_sampling import top_p_sampling_npu from .weight_quantize import npu_quant_weight +from .sparse_moe import fused_sparse_moe PACKAGE = "fastdeploy.model_executor.ops.npu" diff --git a/fastdeploy/model_executor/ops/npu/sparse_moe.py b/fastdeploy/model_executor/ops/npu/sparse_moe.py index f68b5e956b..2110ebda44 100644 --- a/fastdeploy/model_executor/ops/npu/sparse_moe.py +++ b/fastdeploy/model_executor/ops/npu/sparse_moe.py @@ -1,16 +1,19 @@ import inspect import paddle -import paddlenlp_ops from paddle.base import core +import inspect +from paddlenlp_ops import sparse_moe + + # npu interface refer to gpu interface def fused_sparse_moe( input, gate_weight, - ffn1_weight, - ffn2_weight, + ffn1_weight, + ffn2_weight, ffn1_bias, ffn1_scale, ffn2_bias, @@ -22,27 +25,31 @@ def fused_sparse_moe( """ call npu func to implement this function """ - ffn1_weight = paddle.cast(ffn1_weight, paddle.bfloat16) - ffn2_weight = paddle.cast(ffn2_weight, paddle.bfloat16) + gate_weight = paddle.cast(gate_weight, paddle.bfloat16) + + # ffn1_weight = paddle.cast(ffn1_weight, paddle.bfloat16) + ffn1_weight = paddle.transpose(ffn1_weight, [0, 2, 1]) + # ffn2_weight = paddle.cast(ffn2_weight, paddle.bfloat16) + ffn2_weight = paddle.transpose(ffn2_weight, [0, 2, 1]) - gate_weight = gate_weight.transpose([1, 0]).astype(input.dtype) temp = paddle.zeros([1]).astype(input.dtype) + expert_array = paddle.arange(moe_topk * input.shape[0]).astype("int32") expert_group = paddle.ones([1]).astype("int32") one_hot = paddle.ones([1]).astype("int32") zero_hot = paddle.zeros([1]).astype("int32") - # define quant mapping: may modify if quant_method == "weight_int4_only": quanttype = 11 elif quant_method == "weight_int8_only": quanttype = 6 else: quanttype = 1 - y = paddlenlp_ops.sparse_moe( + + y = sparse_moe( input, gate_weight, temp, @@ -51,16 +58,16 @@ def fused_sparse_moe( temp, temp, ffn1_weight, - ffn1_bias if ffn1_bias else temp, + ffn1_bias if ffn1_bias is not None else temp, temp, temp, - ffn1_scale, + ffn1_scale if ffn1_scale is not None else temp, temp, ffn2_weight, - ffn2_bias if ffn2_bias else temp, + ffn2_bias if ffn2_bias is not None else temp, temp, temp, - ffn2_scale, + ffn2_scale if ffn2_scale is not None else temp, temp, expert_array, expert_group, @@ -68,7 +75,7 @@ def fused_sparse_moe( zero_hot, moe_topk, input.dtype == paddle.bfloat16, - tp_size, + tp_size, quanttype, ) return y From 7c02a324416d294a5ac74d1ddc32ee51ce985bbd Mon Sep 17 00:00:00 2001 From: guozr Date: Mon, 1 Sep 2025 17:36:39 +0800 Subject: [PATCH 4/4] quant moe --- .../layers/moe/fused_moe_npu_backend.py | 74 +++++++++++++++++-- .../layers/quantization/weight_only.py | 2 +- .../model_executor/ops/npu/sparse_moe.py | 14 ++-- .../model_executor/ops/npu/weight_quantize.py | 7 +- fastdeploy/platforms/npu.py | 4 +- 5 files changed, 79 insertions(+), 22 deletions(-) diff --git a/fastdeploy/model_executor/layers/moe/fused_moe_npu_backend.py b/fastdeploy/model_executor/layers/moe/fused_moe_npu_backend.py index 598302a9fc..010fe1e4cf 100644 --- a/fastdeploy/model_executor/layers/moe/fused_moe_npu_backend.py +++ b/fastdeploy/model_executor/layers/moe/fused_moe_npu_backend.py @@ -31,18 +31,76 @@ class NPUMoEMethod(UnquantizedFusedMoEMethod): """ NPU MOE """ + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self.moe_quant_type = "wint8" + self.added_weight_attrs = ["up_gate_proj_weight", "down_proj_weight"] + self.added_scale_attrs = [ + "up_gate_proj_weight_scale", + "down_proj_weight_scale", + ] def process_loaded_weights(self, layer: nn.Layer, state_dict): up_gate_proj_weights, down_proj_weights = layer.extract_moe_ffn_weights(state_dict) - for weights in [up_gate_proj_weights, down_proj_weights]: - for idx, weight in enumerate(weights): - weights[idx] = weight.transpose([1, 0]) - stacked_up_gate_proj_weights = paddle.stack(up_gate_proj_weights, axis=0) - stacked_down_proj_weights = paddle.stack(down_proj_weights, axis=0) - - layer.up_gate_proj_weight.set_value(stacked_up_gate_proj_weights) - layer.down_proj_weight.set_value(stacked_down_proj_weights) + assert len(up_gate_proj_weights) == layer.num_local_experts + assert len(down_proj_weights) == layer.num_local_experts + assert up_gate_proj_weights[0].shape == [ + layer.hidden_size, + layer.moe_intermediate_size * 2, + ] + assert down_proj_weights[0].shape == [ + layer.moe_intermediate_size, + layer.hidden_size, + ] + + up_gate_proj_tensor = paddle.stack(up_gate_proj_weights, axis=0) + down_proj_tensor = paddle.stack(down_proj_weights, axis=0) + + if self.moe_quant_type == "wint8": + max_bound = 127 + elif self.moe_quant_type == "wint4": + max_bound = 7 + + for idx, weight_tensor in enumerate([up_gate_proj_tensor, down_proj_tensor]): + print("<><><><><>run quant moe") + weight_name = self.added_weight_attrs[idx] + scale_name = self.added_scale_attrs[idx] + + quanted_weight_scale = weight_tensor.abs().max(axis=1) + quanted_weight = weight_tensor / quanted_weight_scale[:, + None, :] * max_bound + """ + RuntimeError: (NotFound) The kernel with key (npu, Undefined(AnyLayout), bfloat16) of kernel `round` is not registered and fail to fallback to CPU one. Selected wrong DataType `bfloat16`. Paddle support following DataTypes: float32, float64, float16. + """ + quanted_weight = paddle.cast(quanted_weight, paddle.float16) + quanted_weight = paddle.round(quanted_weight).astype("int8") + quanted_weight_scale = quanted_weight_scale / max_bound + + # quanted_weight, quanted_weight_scale = npu_quant_weight(weight_tensor) # FIXME: 这个地方应该看看这个函数和的功能是不是重复的 + + + setattr( + layer, + weight_name, + layer.create_parameter( + shape=quanted_weight.shape, + dtype=quanted_weight.dtype, + default_initializer=paddle.nn.initializer.Constant(0), + ), + ) + getattr(layer, weight_name).set_value(quanted_weight) + + setattr( + layer, + scale_name, + layer.create_parameter( + shape=quanted_weight_scale.shape, + dtype="bfloat16", + ), + ) + + getattr(layer, scale_name).set_value(quanted_weight_scale) def apply_tp( self, diff --git a/fastdeploy/model_executor/layers/quantization/weight_only.py b/fastdeploy/model_executor/layers/quantization/weight_only.py index 223703bef4..b7e57cacac 100644 --- a/fastdeploy/model_executor/layers/quantization/weight_only.py +++ b/fastdeploy/model_executor/layers/quantization/weight_only.py @@ -106,7 +106,7 @@ def get_quant_method(self, layer) -> Optional[QuantMethodBase]: return GPUWeightOnlyLinearMethod(self) elif current_platform.is_npu(): from fastdeploy.model_executor.layers.backends import (NPUWeightOnlyLinearMethod, NPUWeightOnlyMoEMethod) - if isinstance(layer, FusedMoe): + if isinstance(layer, FusedMoE): return NPUWeightOnlyMoEMethod(self) else: return NPUWeightOnlyLinearMethod(self) diff --git a/fastdeploy/model_executor/ops/npu/sparse_moe.py b/fastdeploy/model_executor/ops/npu/sparse_moe.py index 2110ebda44..3d8ef8258d 100644 --- a/fastdeploy/model_executor/ops/npu/sparse_moe.py +++ b/fastdeploy/model_executor/ops/npu/sparse_moe.py @@ -28,10 +28,10 @@ def fused_sparse_moe( gate_weight = paddle.cast(gate_weight, paddle.bfloat16) - # ffn1_weight = paddle.cast(ffn1_weight, paddle.bfloat16) - ffn1_weight = paddle.transpose(ffn1_weight, [0, 2, 1]) - # ffn2_weight = paddle.cast(ffn2_weight, paddle.bfloat16) - ffn2_weight = paddle.transpose(ffn2_weight, [0, 2, 1]) + ffn1_weight = paddle.cast(ffn1_weight, paddle.bfloat16) + # ffn1_weight = paddle.transpose(ffn1_weight, [0, 2, 1]) + ffn2_weight = paddle.cast(ffn2_weight, paddle.bfloat16) + # ffn2_weight = paddle.transpose(ffn2_weight, [0, 2, 1]) temp = paddle.zeros([1]).astype(input.dtype) @@ -42,12 +42,10 @@ def fused_sparse_moe( one_hot = paddle.ones([1]).astype("int32") zero_hot = paddle.zeros([1]).astype("int32") - if quant_method == "weight_int4_only": + if quant_method == "wint4": quanttype = 11 - elif quant_method == "weight_int8_only": + elif quant_method == "wint8": quanttype = 6 - else: - quanttype = 1 y = sparse_moe( input, diff --git a/fastdeploy/model_executor/ops/npu/weight_quantize.py b/fastdeploy/model_executor/ops/npu/weight_quantize.py index 8aea3d3443..0beb024f85 100644 --- a/fastdeploy/model_executor/ops/npu/weight_quantize.py +++ b/fastdeploy/model_executor/ops/npu/weight_quantize.py @@ -1,7 +1,11 @@ import paddle import numpy as np +def clip_and_round(x): + return np.clip(np.around(x), -127, 127).astype("int8") + def npu_quant_weight(weight_np): + print("<><><><><>calling npu quant weight") if isinstance(weight_np, paddle.Tensor): if weight_np.dtype == paddle.bfloat16: weight_np = paddle.cast(weight_np, paddle.float16) @@ -14,6 +18,3 @@ def npu_quant_weight(weight_np): weight_scales = paddle.to_tensor(weight_scales) weight_scales = paddle.cast(weight_scales, paddle.get_default_dtype()) return quanted_weight, weight_scales - -def clip_and_round(x): - return np.clip(np.around(x), -127, 127).astype("int8") \ No newline at end of file diff --git a/fastdeploy/platforms/npu.py b/fastdeploy/platforms/npu.py index 16607f9f7f..211af6faa7 100644 --- a/fastdeploy/platforms/npu.py +++ b/fastdeploy/platforms/npu.py @@ -27,8 +27,8 @@ def available(self): Check whether XPU is available. """ try: - assert paddle.is_compiled_with_xpu() - assert len(paddle.static.xpu_places()) > 0 + assert paddle.is_compiled_with_custom_device("npu") + assert len(paddle.static.custom_device_places("npu")) > 0 return True except Exception as e: return False