[XPU] add build_sampling_params op.#7738
[XPU] add build_sampling_params op.#7738Jiajun-Ji wants to merge 4 commits intoPaddlePaddle:developfrom
Conversation
|
Thanks for your contribution! |
There was a problem hiding this comment.
Pull request overview
该 PR 为 XPU 后端新增 build_sampling_params 自定义算子,用 XPU kernel 替换原先 Python 的 sampling 参数 padding 逻辑,并尝试将 infer_seed 的更新收敛到算子内部,以对齐 GPU 的 seed 步进策略(尤其在 speculative decoding 场景)。
Changes:
- 新增 XPU
build_sampling_paramskernel + plugin wrapper + Paddle static op,并在 XPU speculative verify(TARGET_MATCH)路径中接入。 - XPU ModelRunner 侧引入
increment_value(对齐 GPU:非 speculative 为 4,speculative 为(num_speculative_tokens+1)*4),并调整infer_seed的更新时机。 - 新增
custom_ops/xpu_ops/test/test_build_sampling_params.py单测,对比 Python 参考实现并覆盖多种 batch 形态与 wrap-around。
PR 元信息检查(需补充)
- 标题已包含
[XPU]tag,格式符合要求。 - 描述中 “Modifications / Usage or Command / Accuracy Tests” 等小节未补全;若该算子会影响采样结果或可复现性,建议补充 accuracy 对比与对应运行命令/环境信息;如不加单测或无法跑到 XPU CI,也需注明原因(本 PR 已新增单测文件,但仍建议在描述里给出如何运行)。
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| fastdeploy/worker/xpu_model_runner.py | 计算并下发 increment_value,并调整 speculative 场景下 infer_seed 的更新逻辑 |
| fastdeploy/model_executor/layers/sample/sampler.py | XPU verify(TARGET_MATCH) 路径改用 build_sampling_params,并透传 increment_value |
| custom_ops/xpu_ops/test/test_build_sampling_params.py | 新增 XPU op 单测,与 Python 参考实现对齐校验 |
| custom_ops/xpu_ops/src/plugin/src/wrapper/mtp_wrapper/build_sampling_params.cpp | 新增 plugin wrapper(CPU + XPU3 分发) |
| custom_ops/xpu_ops/src/plugin/src/kernel/kunlun3cpp/mtp_kernel/build_sampling_params.xpu | 新增 Kunlun3 XPU kernel 实现 |
| custom_ops/xpu_ops/src/plugin/include/xpu/plugin.h | 导出 build_sampling_params 声明 |
| custom_ops/xpu_ops/src/ops/mtp/build_sampling_params.cc | 新增 Paddle static op 注册与调用桥接 |
| # 7. Updata 'infer_seed' and step_paddle() | ||
| self.share_inputs["infer_seed"].add_(self.infer_seed_increment) | ||
| self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED | ||
| if not self.speculative_decoding: |
| share_inputs["seq_lens_this_time"], | ||
| share_inputs["seq_lens_encoder"], | ||
| token_num_output_cpu=int(share_inputs["cu_seqlens_q_output"][-1]), | ||
| increment_value=increment_value, |
| api::Context* ctx = xpu_ctx->x_context(); | ||
| if (top_p.is_cpu()) { | ||
| ctx = new api::Context(api::kCPU); |
| // Shared prefix-sum buffer: each cluster computes its own pad_start via | ||
| // a two-pass scan over seq_lens_this_time / seq_lens_encoder. | ||
| // We use a simple approach: core 0 of cluster 0 writes per-batch start | ||
| // offsets into a global scratch area is not available here, so instead we | ||
| // compute pad_start with a sequential scan in core 0 of each cluster. | ||
| // Because clusters run concurrently we cannot share a global accumulator; | ||
| // instead each cluster independently sums the first `bi` entries. | ||
| // This is O(bs) per cluster but bs is typically small (<=512). |
CI报告基于以下代码生成(30分钟更新一次): 1 任务总览当前有 1 个 Required 任务失败(Approval),需处理后方可合并;另有 3 个 Required 任务运行中,等待结果。
2 任务状态汇总2.1 Required任务 : 6/10 通过
2.2 可选任务 — 25/30 通过
3 失败详情(仅 required)Approval — 流程审批(置信度: 高)Approval
根因详情: 关键日志: 修复建议:
修复建议摘要: 请FastDeploy RD和PaddlePaddle RD各在PR上完成Approve 关联变更: PR 标题 "[XPU] add build_sampling_params op",新增 XPU custom op 实现 链接: 查看日志 |
Codecov Report❌ Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## develop #7738 +/- ##
==========================================
Coverage ? 71.59%
==========================================
Files ? 396
Lines ? 55576
Branches ? 8688
==========================================
Hits ? 39792
Misses ? 13045
Partials ? 2739
Flags with carried forward coverage won't be shown. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
| top_k=sampling_metadata.top_k, | ||
| top_k_list=sampling_metadata.top_k_list, | ||
| topp_seed=topp_seed, | ||
| topp_seed=sampling_metadata.topp_seed, |
| self.increment_value = ( | ||
| 4 if not self.speculative_decoding else (self.speculative_config.num_speculative_tokens + 1) * 4 | ||
| ) |
| # 7. Updata 'infer_seed' and step_paddle() | ||
| self.share_inputs["infer_seed"].add_(self.infer_seed_increment) | ||
| self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED | ||
| if not self.speculative_decoding: |
| phi::XPUPlace place(phi::backends::xpu::GetXPUCurrentDeviceId()); | ||
| auto dev_ctx = paddle::experimental::DeviceContextPool::Instance().Get(place); | ||
| auto xpu_ctx = static_cast<const phi::XPUContext*>(dev_ctx); | ||
| api::Context* ctx = xpu_ctx->x_context(); | ||
| if (top_p.is_cpu()) { | ||
| ctx = new api::Context(api::kCPU); | ||
| } |
| // Shared prefix-sum buffer: each cluster computes its own pad_start via | ||
| // a two-pass scan over seq_lens_this_time / seq_lens_encoder. | ||
| // We use a simple approach: core 0 of cluster 0 writes per-batch start | ||
| // offsets into a global scratch area is not available here, so instead we | ||
| // compute pad_start with a sequential scan in core 0 of each cluster. | ||
| // Because clusters run concurrently we cannot share a global accumulator; | ||
| // instead each cluster independently sums the first `bi` entries. | ||
| // This is O(bs) per cluster but bs is typically small (<=512). | ||
|
|
||
| for (int bi = clusterid; bi < bs; bi += nclusters) { | ||
| if (cid == 0) { | ||
| // Read per-batch parameters from global memory. | ||
| float lm_top_p; | ||
| int64_t lm_top_k; | ||
| int64_t lm_seed; | ||
| int lm_slt; // seq_lens_this_time[bi] | ||
| int lm_sle; // seq_lens_encoder[bi] | ||
|
|
||
| GM2LM_ASYNC(top_p + bi, &lm_top_p, sizeof(float)); | ||
| GM2LM_ASYNC(top_k + bi, &lm_top_k, sizeof(int64_t)); | ||
| GM2LM_ASYNC(infer_seed + bi, &lm_seed, sizeof(int64_t)); | ||
| GM2LM_ASYNC(seq_lens_this_time + bi, &lm_slt, sizeof(int)); | ||
| GM2LM(seq_lens_encoder + bi, &lm_sle, sizeof(int)); // sync barrier | ||
|
|
||
| bool is_decoder = (lm_sle == 0); | ||
| int repeat = is_decoder ? lm_slt : 1; | ||
|
|
||
| // Compute pad_start = sum of token counts for batches [0, bi). | ||
| int pad_start = 0; | ||
| for (int k = 0; k < bi; k++) { | ||
| int slt_k, sle_k; | ||
| GM2LM_ASYNC(seq_lens_this_time + k, &slt_k, sizeof(int)); | ||
| GM2LM(seq_lens_encoder + k, &sle_k, sizeof(int)); | ||
| pad_start += (sle_k == 0) ? slt_k : 1; | ||
| } |
| RequestFuncOutput(no=2347, request_id='None', generated_text='', reasoning_content='', success=False, latency=0.0, end_timestamp=0.0, output_tokens=0, ttft=0.0, arrival_time=[], itl=[], tpot=0.0, prompt_len=0, prompt_tokens=0, reasoning_tokens=0, res_ttft=0, error='{"error":{"message":"request[chatcmpl-814e8d96-3da8-46b0-b4da-31925c313041] generator error: Input text is too long, input_ids_len (8191) + min_tokens(1) >= max_model_len(8192), Traceback (most recent call last):\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/openai/serving_chat.py\\", line 168, in create_chat_completion\\n prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict)\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/engine_client.py\\", line 300, in format_and_add_data\\n await self.add_requests(request)\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/engine_client.py\\", line 390, in add_requests\\n raise EngineError(error_msg, error_code=400)\\nfastdeploy.utils.EngineError: Input text is too long, input_ids_len (8191) + min_tokens(1) >= max_model_len(8192)\\n","type":"invalid_request_error","param":null,"code":null}}', metrics={}, tool_calls=[], output_ids=[]) | ||
| RequestFuncOutput(no=2347, request_id='None', generated_text='', reasoning_content='', success=False, latency=0.0, end_timestamp=0.0, output_tokens=0, ttft=0.0, arrival_time=[], itl=[], tpot=0.0, prompt_len=0, prompt_tokens=0, reasoning_tokens=0, res_ttft=0, error='{"error":{"message":"request[chatcmpl-799cdf97-ab7e-4823-80e4-1833bf5f7d90] generator error: Input text is too long, input_ids_len (8191) + min_tokens(1) >= max_model_len(8192), Traceback (most recent call last):\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/openai/serving_chat.py\\", line 168, in create_chat_completion\\n prompt_token_ids = await self.engine_client.format_and_add_data(current_req_dict)\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/engine_client.py\\", line 300, in format_and_add_data\\n await self.add_requests(request)\\n File \\"/home/paddle_test/works/fd/FastDeploy/fastdeploy/entrypoints/engine_client.py\\", line 390, in add_requests\\n raise EngineError(error_msg, error_code=400)\\nfastdeploy.utils.EngineError: Input text is too long, input_ids_len (8191) + min_tokens(1) >= max_model_len(8192)\\n","type":"invalid_request_error","param":null,"code":null}}', metrics={}, tool_calls=[], output_ids=[]) |
651d7cb to
cfc5936
Compare
| _, next_tokens = top_k_top_p_sampling( | ||
| probs, | ||
| top_p=top_p, | ||
| top_k=top_k, | ||
| top_p=sampling_metadata.top_p, | ||
| top_k=sampling_metadata.top_k, | ||
| top_k_list=sampling_metadata.top_k_list, | ||
| topp_seed=topp_seed, | ||
| topp_seed=sampling_metadata.topp_seed, | ||
| ) |
| sampling_metadata.seed, | ||
| paddle.reshape(share_inputs["seq_lens_this_time"], shape=[-1]), | ||
| paddle.reshape(share_inputs["seq_lens_encoder"], shape=[-1]), | ||
| share_inputs["seq_lens_this_time"], | ||
| share_inputs["seq_lens_encoder"], | ||
| token_num_output_cpu=int(share_inputs["cu_seqlens_q_output"][-1]), | ||
| increment_value=increment_value, | ||
| ) |
| self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED | ||
| if not self.speculative_decoding: | ||
| self.share_inputs["infer_seed"].add_(self.infer_seed_increment) | ||
| self.share_inputs["infer_seed"][:] %= self.MAX_INFER_SEED |
| phi::XPUPlace place(phi::backends::xpu::GetXPUCurrentDeviceId()); | ||
| auto dev_ctx = paddle::experimental::DeviceContextPool::Instance().Get(place); | ||
| auto xpu_ctx = static_cast<const phi::XPUContext*>(dev_ctx); | ||
| api::Context* ctx = xpu_ctx->x_context(); | ||
| if (top_p.is_cpu()) { | ||
| ctx = new api::Context(api::kCPU); | ||
| } | ||
|
|
| int64_t pad_idx = 0; | ||
| for (int bi = 0; bi < bs; bi++) { | ||
| bool is_decoder = (seq_lens_encoder[bi] == 0); | ||
| int repeat = is_decoder ? seq_lens_this_time[bi] : 1; | ||
| int64_t bi_seed = infer_seed[bi]; | ||
| for (int local_pos = 0; local_pos < repeat; local_pos++) { | ||
| int64_t offset = is_decoder ? static_cast<int64_t>(local_pos) * 4 : 0LL; | ||
| top_p_padding[pad_idx] = top_p[bi]; | ||
| top_k_padding[pad_idx] = top_k[bi]; | ||
| topp_seed[pad_idx] = (bi_seed + offset) % BUILD_SAMPLING_MAX_INFER_SEED; | ||
| pad_idx++; | ||
| } | ||
| infer_seed[bi] = | ||
| (infer_seed[bi] + increment_value) % BUILD_SAMPLING_MAX_INFER_SEED; | ||
| } |
| // Shared prefix-sum buffer: each cluster computes its own pad_start via | ||
| // a two-pass scan over seq_lens_this_time / seq_lens_encoder. | ||
| // We use a simple approach: core 0 of cluster 0 writes per-batch start | ||
| // offsets into a global scratch area is not available here, so instead we | ||
| // compute pad_start with a sequential scan in core 0 of each cluster. | ||
| // Because clusters run concurrently we cannot share a global accumulator; | ||
| // instead each cluster independently sums the first `bi` entries. | ||
| // This is O(bs) per cluster but bs is typically small (<=512). | ||
|
|
||
| for (int bi = clusterid; bi < bs; bi += nclusters) { | ||
| if (cid == 0) { | ||
| // Read per-batch parameters from global memory. | ||
| float lm_top_p; | ||
| int64_t lm_top_k; | ||
| int64_t lm_seed; | ||
| int lm_slt; // seq_lens_this_time[bi] | ||
| int lm_sle; // seq_lens_encoder[bi] | ||
|
|
||
| GM2LM_ASYNC(top_p + bi, &lm_top_p, sizeof(float)); | ||
| GM2LM_ASYNC(top_k + bi, &lm_top_k, sizeof(int64_t)); | ||
| GM2LM_ASYNC(infer_seed + bi, &lm_seed, sizeof(int64_t)); | ||
| GM2LM_ASYNC(seq_lens_this_time + bi, &lm_slt, sizeof(int)); | ||
| GM2LM(seq_lens_encoder + bi, &lm_sle, sizeof(int)); // sync barrier | ||
|
|
||
| bool is_decoder = (lm_sle == 0); | ||
| int repeat = is_decoder ? lm_slt : 1; | ||
|
|
||
| // Compute pad_start = sum of token counts for batches [0, bi). | ||
| int pad_start = 0; | ||
| for (int k = 0; k < bi; k++) { | ||
| int slt_k, sle_k; | ||
| GM2LM_ASYNC(seq_lens_this_time + k, &slt_k, sizeof(int)); | ||
| GM2LM(seq_lens_encoder + k, &sle_k, sizeof(int)); | ||
| pad_start += (sle_k == 0) ? slt_k : 1; | ||
| } |
PaddlePaddle-bot
left a comment
There was a problem hiding this comment.
🤖 Paddle-CI-Agent | pr_review |
2026-05-08 16:20:00
📋 Review 摘要
PR 概述:新增 XPU build_sampling_params kernel,将 padding_sampling_params 的 Python 实现替换为 XPU kernel 实现,并将 infer_seed 更新逻辑收敛至 kernel 内部。
变更范围:custom_ops/xpu_ops/(kernel/wrapper/op 注册)、fastdeploy/model_executor/layers/sample/sampler.py、fastdeploy/worker/xpu_model_runner.py
影响面 Tag:[XPU] [OP]
📝 PR 规范检查
## Modifications 和 ## Usage or Command 段落内容为空(仅有注释占位符),Checklist 所有项均未勾选,不符合描述模板要求。
标题建议(可直接复制):
[XPU][OP] Add build_sampling_params XPU kernel to replace Python padding_sampling_params
PR 描述建议(可直接复制,必须复刻 checklist §D2 模板的完整结构):
## Motivation
将 XPU 下 `_verify_and_sample_xpu` 和 `_normal_sample_xpu` 中 `padding_sampling_params` 的 Python 实现改为 XPU kernel 实现 `build_sampling_params`,同时将 `infer_seed` 的更新逻辑收敛到 kernel 内部,并将 `infer_seed` 的 `increment_value` 步进对齐 GPU 实现(speculative decoding 场景使用 `(num_speculative_tokens + 1) * 4`)。
## Modifications
- `custom_ops/xpu_ops/src/plugin/src/kernel/kunlun3cpp/mtp_kernel/build_sampling_params.xpu`:新增 XPU3 kernel,每个 cluster 处理一个 batch,core 0 读取 per-batch 参数并通过共享内存广播,各 core 并行填充 token 槽位,core 0 负责原地更新 `infer_seed`。
- `custom_ops/xpu_ops/src/plugin/src/wrapper/mtp_wrapper/build_sampling_params.cpp`:新增 CPU wrapper(用于单测)和 XPU3 wrapper,并通过 `WRAPPER_CHECK_PTR` / `WRAPPER_ASSERT_GT` 完成参数校验。
- `custom_ops/xpu_ops/src/plugin/include/xpu/plugin.h`:新增 `build_sampling_params` 函数声明。
- `custom_ops/xpu_ops/src/ops/mtp/build_sampling_params.cc`:通过 `PD_BUILD_STATIC_OP` 注册 Paddle custom op,输入为 `top_p/top_k/infer_seed/seq_lens_this_time/seq_lens_encoder`,输出为 `top_p_padding/top_k_padding/topp_seed`。
- `fastdeploy/model_executor/layers/sample/sampler.py`:`_verify_and_sample_xpu` 中以 `build_sampling_params` 替换 `padding_sampling_params`;`_normal_sample_xpu` 改为使用预计算的 `sampling_metadata.topp_seed`。
- `fastdeploy/worker/xpu_model_runner.py`:新增 `self.increment_value` 按 speculative decoding 场景动态计算;speculative decoding 路径的 `infer_seed` 更新移至 kernel 内部(去除外部 `add_` 操作)。
- `custom_ops/xpu_ops/test/test_build_sampling_params.py`:新增单元测试,覆盖纯 decoder / 纯 encoder / 混合 / 单 item / seed 环绕 / 每 batch 单 token 等场景。
## Usage or Command
N/A(无新对外 API,为内部实现替换)
## Accuracy Tests
- 测试 XPU kernel 内 INT64 取模行为正常(截图已提供)
- 对比 CPU wrapper 与 Python reference 实现输出一致(单测覆盖 6 个场景)
## Checklist
- [x] Add at least a tag in the PR title.
- Tag list: [`[FDConfig]`,`[APIServer]`,`[Engine]`, `[Scheduler]`, `[PD Disaggregation]`, `[Executor]`, `[Graph Optimization]`, `[Speculative Decoding]`, `[RL]`, `[Models]`, `[Quantization]`, `[Loader]`, `[OP]`, `[KVCache]`, `[DataProcessor]`, `[BugFix]`, `[Docs]`, `[CI]`, `[Optimization]`, `[Feature]`, `[Benchmark]`, `[Others]`, `[XPU]`, `[HPU]`, `[GCU]`, `[DCU]`, `[Iluvatar]`, `[Metax]`]
- You can add new tags based on the PR content, but the semantics must be clear.
- [ ] Format your code, run `pre-commit` before commit.
- [x] Add unit tests. Please write the reason in this PR if no unit tests.
- [x] Provide accuracy results.
- [ ] If the current PR is submitting to the `release` branch, make sure the PR has been submitted to the `develop` branch, then cherry-pick it to the `release` branch with the `[Cherry-Pick]` PR tag.问题
| 级别 | 文件 | 概述 |
|---|---|---|
| 🔴 Bug | fastdeploy/model_executor/layers/sample/sampler.py:1080 |
sampling_metadata.topp_seed 属性不存在,NAIVE 模式下 XPU 采样将崩溃 |
| 🔴 Bug | custom_ops/xpu_ops/src/ops/mtp/build_sampling_params.cc:38 |
new api::Context(api::kCPU) 无对应 delete,存在内存泄漏 |
总体评价
新增 XPU kernel 实现思路清晰,单测覆盖较全面;但存在两处 P0 问题需修复:_normal_sample_xpu 引用了 SamplingMetadata 中不存在的 topp_seed 字段(会引发 AttributeError 导致所有 XPU 非投机解码请求失败),以及 CPU 路径的 api::Context 内存泄漏,建议修复后再合入。
| top_k=sampling_metadata.top_k, | ||
| top_k_list=sampling_metadata.top_k_list, | ||
| topp_seed=topp_seed, | ||
| topp_seed=sampling_metadata.topp_seed, |
There was a problem hiding this comment.
🔴 Bug sampling_metadata.topp_seed 属性不存在于 SamplingMetadata 类中。
SamplingMetadata(meta_data.py)仅定义了 seed 字段,无 topp_seed 字段;xpu_model_runner.py 在构建 sampling_metadata 时也未赋值该字段。此处在 NAIVE 模式下调用 _normal_sample_xpu 时会直接抛出 AttributeError,导致所有 XPU 非投机解码采样请求失败。
建议在 xpu_model_runner.py 中调用 build_sampling_params 预计算后将结果赋值给 sampling_metadata.topp_seed,或在 SamplingMetadata 中增加该字段并在构建时传入。
| auto xpu_ctx = static_cast<const phi::XPUContext*>(dev_ctx); | ||
| api::Context* ctx = xpu_ctx->x_context(); | ||
| if (top_p.is_cpu()) { | ||
| ctx = new api::Context(api::kCPU); |
There was a problem hiding this comment.
🔴 内存泄漏 new api::Context(api::kCPU) 分配的对象在函数返回时没有被 delete 释放。
每次 CPU 路径被调用都会泄漏一个 api::Context 对象。
建议使用 RAII 智能指针管理:
std::unique_ptr<api::Context> cpu_ctx;
if (top_p.is_cpu()) {
cpu_ctx = std::make_unique<api::Context>(api::kCPU);
ctx = cpu_ctx.get();
}
Motivation
将XPU下的padding_sampling_params的py实现改为XPU kernel实现build_sampling_params,此外将infer_seed更新收敛到build_sampling_params内部,并将infer_seed的increment_value步进对齐GPU实现。
Modifications
Usage or Command
Accuracy Tests
测试XPU kernel内INT64取模正常

Checklist
[FDConfig],[APIServer],[Engine],[Scheduler],[PD Disaggregation],[Executor],[Graph Optimization],[Speculative Decoding],[RL],[Models],[Quantization],[Loader],[OP],[KVCache],[DataProcessor],[BugFix],[Docs],[CI],[Optimization],[Feature],[Benchmark],[Others],[XPU],[HPU],[GCU],[DCU],[Iluvatar],[Metax]]pre-commitbefore commit.releasebranch, make sure the PR has been submitted to thedevelopbranch, then cherry-pick it to thereleasebranch with the[Cherry-Pick]PR tag.