Skip to content

Commit 5b23f52

Browse files
JimHsiungliutongxuan
authored andcommitted
feat: support batch response for mix instance.
1 parent db15be9 commit 5b23f52

File tree

4 files changed

+9
-6
lines changed

4 files changed

+9
-6
lines changed

xllm/core/scheduler/async_response_processor.cpp

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ AsyncResponseProcessor::AsyncResponseProcessor(
4141
role_(role.value_or(InstanceRole::DEFAULT)),
4242
enable_schedule_overlap_(enable_schedule_overlap),
4343
enable_decode_response_to_service_(enable_decode_response_to_service) {
44-
if (role_ == InstanceRole::DECODE) {
44+
if (role_ == InstanceRole::DECODE || role_ == InstanceRole::MIX) {
4545
enable_batch_response_ =
4646
util::get_bool_env("ENABLE_PD_DECODE_BATCH_RESPONSE", true);
4747
}
@@ -257,8 +257,9 @@ void AsyncResponseProcessor::process_stream_request(
257257
}
258258

259259
void AsyncResponseProcessor::process_stream_requests(
260-
const std::vector<std::shared_ptr<Request>>& requests) {
261-
if (!enable_batch_response_) {
260+
const std::vector<std::shared_ptr<Request>>& requests,
261+
bool is_prefill) {
262+
if (!enable_batch_response_ || is_prefill) {
262263
for (auto& req : requests) {
263264
process_stream_request(req);
264265
}

xllm/core/scheduler/async_response_processor.h

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -46,7 +46,8 @@ class AsyncResponseProcessor final {
4646

4747
// in disagg pd mode, decode send requests' responses to prefill
4848
void process_stream_requests(
49-
const std::vector<std::shared_ptr<Request>>& requests);
49+
const std::vector<std::shared_ptr<Request>>& requests,
50+
bool is_prefill);
5051

5152
// wait for all responses in queue to be handled
5253
void wait_completion();

xllm/core/scheduler/continuous_scheduler.cpp

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1037,7 +1037,8 @@ void ContinuousScheduler::process_batch_output(bool enable_schedule_overlap) {
10371037
}
10381038
}
10391039
if (!stream_requests.empty()) {
1040-
response_processor_->process_stream_requests(stream_requests);
1040+
response_processor_->process_stream_requests(stream_requests,
1041+
last_step_prefill_);
10411042
}
10421043
}
10431044

xllm/core/scheduler/disagg_pd_scheduler.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ void DisaggPDScheduler::step(const absl::Duration& timeout) {
250250
ContinuousScheduler::step(timeout);
251251
// send first generation token to decode instance
252252
// and remove the request from running_requests_ to remote_requests_map_
253-
if (options_.instance_role() != InstanceRole::DECODE) {
253+
if (options_.instance_role() != InstanceRole::DECODE && last_step_prefill_) {
254254
prefill_send_first_generation();
255255
}
256256
}

0 commit comments

Comments
 (0)