Skip to content

Commit 713484b

Browse files
weizhehuang0827liutongxuan
authored andcommitted
feat: support multi-priority and on/offline for ChunkedPrefill.
1 parent 3f49a87 commit 713484b

File tree

8 files changed

+417
-159
lines changed

8 files changed

+417
-159
lines changed

xllm/core/framework/request/sequences_group.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -69,7 +69,7 @@ bool SequencesGroup::expand_sequences(bool share_prefix) {
6969
// prefill is not finished, can not expand
7070
// FIXME later share_prefix
7171
if (!share_prefix ||
72-
seq->kv_state().kv_cache_tokens_num() < seq->num_prompt_tokens()) {
72+
seq->kv_state().kv_cache_tokens_num() >= seq->num_prompt_tokens()) {
7373
while (sequences_.size() < best_of) {
7474
add();
7575
}

xllm/core/scheduler/chunked_prefill_scheduler.cpp

Lines changed: 129 additions & 119 deletions
Large diffs are not rendered by default.

xllm/core/scheduler/chunked_prefill_scheduler.h

Lines changed: 4 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -36,37 +36,26 @@ class ChunkedPrefillScheduler final : public ContinuousScheduler {
3636
ChunkedPrefillScheduler(Engine* engine, const Options& options);
3737
virtual ~ChunkedPrefillScheduler();
3838

39-
std::vector<Batch> prepare_batch_test() { return prepare_batch(); }
40-
41-
uint32_t get_waiting_requests_num() const override {
42-
return waiting_priority_queue_.size();
43-
};
44-
4539
private:
4640
// build a batch of requests from the priority queue
4741
virtual std::vector<Batch> prepare_batch() override;
48-
void handle_abnormal_request(
49-
const std::vector<Sequence*>& candidate_sequences,
50-
const std::vector<size_t>& candidate_token_budgets,
51-
const size_t& allocated_tokens,
52-
const size_t& allocated_seqs,
53-
size_t& remaining_token_budget,
54-
size_t& remaining_seq_budget,
55-
bool budget_exhausted,
56-
bool block_exhausted);
5742
void handle_running_queue_requests(
5843
const size_t max_tokens_per_chunk_for_prefill,
5944
size_t& remaining_token_budget,
6045
size_t& remaining_seq_budget,
6146
size_t& num_preempted_requests,
6247
std::vector<Sequence*>& prefill_stage_sequences,
48+
std::unique_ptr<DecodePriorityQueue>& running_queue,
6349
bool& budget_exhausted,
6450
bool& blocks_exhausted);
6551
void handle_prefill_requests(
6652
const size_t max_tokens_per_chunk_for_prefill,
6753
size_t& remaining_token_budget,
6854
size_t& remaining_seq_budget,
55+
size_t& num_preempted_requests,
6956
std::vector<Sequence*>& prefill_stage_sequences,
57+
RequestPriorityQueue& waiting_priority_queue,
58+
bool& budget_exhausted,
7059
bool& blocks_exhausted,
7160
std::vector<std::shared_ptr<Request>>& finished_requests);
7261
void handle_remaining_budget(size_t& remaining_token_budget,

xllm/core/scheduler/chunked_prefill_scheduler_test.cpp

Lines changed: 251 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,13 +82,15 @@ ContinuousScheduler::Options create_scheduler_options(
8282
int32_t max_seqs_per_batch,
8383
int32_t num_speculative_tokens,
8484
int32_t max_tokens_per_chunk_for_prefill,
85-
int32_t dp_size) {
85+
int32_t dp_size,
86+
const std::string& priority_strategy = "FCFS") {
8687
ContinuousScheduler::Options opt;
8788
opt.num_speculative_tokens_ = num_speculative_tokens;
8889
opt.max_tokens_per_chunk_for_prefill_ = max_tokens_per_chunk_for_prefill;
8990
opt.max_tokens_per_batch_ = max_tokens_per_batch;
9091
opt.max_seqs_per_batch_ = max_seqs_per_batch;
9192
opt.dp_size_ = dp_size;
93+
opt.priority_strategy_ = priority_strategy;
9294

9395
return opt;
9496
}
@@ -129,6 +131,51 @@ std::vector<std::shared_ptr<Request>> generate_request(
129131
return requests;
130132
}
131133

134+
std::vector<std::shared_ptr<Request>> generate_priority_request(
135+
const std::vector<int32_t>& prompt_lens,
136+
const std::vector<int32_t>& max_tokens,
137+
const std::vector<bool>& offlines,
138+
const std::vector<int32_t>& priorities,
139+
int32_t max_context_len) {
140+
std::vector<std::shared_ptr<Request>> requests;
141+
EXPECT_TRUE(prompt_lens.size() == max_tokens.size());
142+
for (size_t i = 0; i < prompt_lens.size(); ++i) {
143+
std::vector<int32_t> prompt_token_ids;
144+
prompt_token_ids.resize(prompt_lens[i]);
145+
RequestSamplingParam sampling_param;
146+
StoppingChecker stopping_checker;
147+
stopping_checker.set_max_generated_tokens(max_tokens[i]);
148+
stopping_checker.set_max_context_len(max_context_len);
149+
stopping_checker.set_ignore_eos(true);
150+
RequestState req_state("x",
151+
prompt_token_ids,
152+
sampling_param,
153+
stopping_checker,
154+
prompt_lens[i] + 30000,
155+
1,
156+
1,
157+
false,
158+
false,
159+
false,
160+
false,
161+
false,
162+
nullptr,
163+
nullptr);
164+
auto request =
165+
std::make_shared<Request>("1",
166+
"1",
167+
"1",
168+
std::move(req_state),
169+
"1",
170+
offlines[i],
171+
0,
172+
static_cast<RequestPriority>(priorities[i]));
173+
requests.emplace_back(request);
174+
}
175+
176+
return requests;
177+
}
178+
132179
// dont not consider speculative decoding.
133180
void update_requests(std::vector<std::shared_ptr<Request>> requests) {
134181
for (auto req : requests) {
@@ -364,4 +411,207 @@ TEST(ChunkedPrefillSchedulerTest, PreemptSchedule) {
364411
EXPECT_TRUE(batch[0].size() == 1);
365412
}
366413

414+
// TEST-5:
415+
// test on/offline preempt
416+
TEST(ChunkedPrefillSchedulerTest, OnDecodePreemptOffDecode) {
417+
// set max free blocks: 9, support 9*32=288 tokens
418+
// actually only 8 free blocks , because default 1 block is for padding
419+
int block_num = 9;
420+
int block_size = 32;
421+
int max_tokens_per_chunk_for_prefill = 1024;
422+
// set chunked max_tokens budgets 10000 per step
423+
ContinuousScheduler::Options opt = create_scheduler_options(
424+
10000, 256, 0, max_tokens_per_chunk_for_prefill, 1);
425+
auto engine = std::make_unique<FakeEngine>(block_num, block_size);
426+
auto scheduler = std::make_unique<ChunkedPrefillScheduler>(engine.get(), opt);
427+
BlockManagerPool* block_manager_pool = engine->block_manager_pool();
428+
EXPECT_TRUE(scheduler != nullptr);
429+
430+
std::vector<std::shared_ptr<Request>> running_requests;
431+
432+
// 1. schedule one online and one prefill prefill requests
433+
auto requests = generate_priority_request(
434+
{127, 127}, {10, 10}, {true, false}, {2, 2}, 30000);
435+
running_requests = requests;
436+
for (auto req : requests) {
437+
scheduler->add_request(req);
438+
}
439+
auto batch = scheduler->prepare_batch_test();
440+
EXPECT_TRUE(batch.size() == 1);
441+
EXPECT_TRUE(batch[0].size() == 2);
442+
update_requests(running_requests);
443+
444+
batch = scheduler->prepare_batch_test();
445+
446+
EXPECT_TRUE(batch.size() == 1);
447+
EXPECT_TRUE(batch[0].size() == 2);
448+
update_requests(running_requests);
449+
450+
int free_blocks_before_preempt =
451+
util::max(block_manager_pool->num_free_blocks());
452+
// 2. after 2 step, preemption should happen
453+
batch = scheduler->prepare_batch_test();
454+
EXPECT_TRUE(batch.size() == 1);
455+
EXPECT_TRUE(batch[0].size() == 1);
456+
int free_blocks_after_preempt =
457+
util::max(block_manager_pool->num_free_blocks());
458+
EXPECT_TRUE(free_blocks_after_preempt > free_blocks_before_preempt);
459+
// check the running request is online request
460+
EXPECT_TRUE(scheduler->get_running_requests().size() == 1);
461+
EXPECT_TRUE(scheduler->get_running_requests()[0]->offline() == false);
462+
EXPECT_TRUE(scheduler->get_waiting_requests_num() == 1);
463+
}
464+
465+
// TEST-6:
466+
// test on/offline preempt
467+
TEST(ChunkedPrefillSchedulerTest, OnPrefillPreemptOffDecode) {
468+
// set max free blocks: 9, support 9*32=288 tokens
469+
// actually only 8 free blocks , because default 1 block is for padding
470+
int block_num = 9;
471+
int block_size = 32;
472+
int max_tokens_per_chunk_for_prefill = 1024;
473+
// set chunked max_tokens budgets 10000 per step
474+
ContinuousScheduler::Options opt = create_scheduler_options(
475+
10000, 256, 0, max_tokens_per_chunk_for_prefill, 1);
476+
FLAGS_prefill_scheduling_memory_usage_threshold = 2; // release threshold
477+
478+
{
479+
// 1. two offline decode requests then one online prefill request preempt
480+
// them
481+
auto engine = std::make_unique<FakeEngine>(block_num, block_size);
482+
auto scheduler =
483+
std::make_unique<ChunkedPrefillScheduler>(engine.get(), opt);
484+
BlockManagerPool* block_manager_pool = engine->block_manager_pool();
485+
EXPECT_TRUE(scheduler != nullptr);
486+
487+
std::vector<std::shared_ptr<Request>> running_requests;
488+
489+
auto requests = generate_priority_request(
490+
{100, 100}, {10, 10}, {true, true}, {2, 2}, 30000);
491+
running_requests = requests;
492+
for (auto req : requests) {
493+
scheduler->add_request(req);
494+
}
495+
auto batch = scheduler->prepare_batch_test();
496+
EXPECT_TRUE(batch.size() == 1);
497+
EXPECT_TRUE(batch[0].size() == 2);
498+
EXPECT_TRUE(util::max(block_manager_pool->num_free_blocks()) == 0);
499+
update_requests(running_requests);
500+
501+
batch = scheduler->prepare_batch_test();
502+
EXPECT_TRUE(batch.size() == 1);
503+
EXPECT_TRUE(batch[0].size() == 2);
504+
EXPECT_TRUE(util::max(block_manager_pool->num_free_blocks()) == 0);
505+
update_requests(running_requests);
506+
507+
auto new_requests = generate_priority_request(
508+
{80}, {10}, {false}, {2}, 30000); // use 3 blocks
509+
scheduler->add_request(new_requests[0]);
510+
batch = scheduler->prepare_batch_test();
511+
EXPECT_TRUE(batch.size() == 1);
512+
EXPECT_TRUE(batch[0].size() == 2);
513+
514+
// online prefill request preempt offline decode request
515+
EXPECT_TRUE(scheduler->get_running_requests().size() == 2);
516+
EXPECT_TRUE(scheduler->get_running_requests()[0]->offline() == false);
517+
EXPECT_TRUE(scheduler->get_waiting_requests_num() == 1);
518+
519+
// offline is evicted
520+
EXPECT_TRUE(util::max(block_manager_pool->num_free_blocks()) == 1);
521+
}
522+
523+
// 2. another case: longer online prefill request arrives, but can not evict
524+
// offline because evicting offline is not enough
525+
{
526+
auto engine = std::make_unique<FakeEngine>(block_num, block_size);
527+
auto scheduler =
528+
std::make_unique<ChunkedPrefillScheduler>(engine.get(), opt);
529+
BlockManagerPool* block_manager_pool = engine->block_manager_pool();
530+
EXPECT_TRUE(scheduler != nullptr);
531+
532+
std::vector<std::shared_ptr<Request>> running_requests;
533+
// 1. schedule one online and one offline
534+
auto requests = generate_priority_request(
535+
{100, 100}, {10, 10}, {true, false}, {2, 2}, 30000);
536+
running_requests = requests;
537+
for (auto req : requests) {
538+
scheduler->add_request(req);
539+
}
540+
auto batch = scheduler->prepare_batch_test();
541+
EXPECT_TRUE(batch.size() == 1);
542+
EXPECT_TRUE(batch[0].size() == 2);
543+
EXPECT_TRUE(util::max(block_manager_pool->num_free_blocks()) == 0);
544+
update_requests(running_requests);
545+
546+
auto new_requests =
547+
generate_priority_request({200}, {10}, {false}, {2}, 30000);
548+
scheduler->add_request(new_requests[0]);
549+
batch = scheduler->prepare_batch_test();
550+
551+
// 2. online is still waiting
552+
EXPECT_TRUE(batch.size() == 1);
553+
EXPECT_TRUE(batch[0].size() == 2);
554+
EXPECT_TRUE(scheduler->get_waiting_requests().size() == 1);
555+
EXPECT_TRUE(scheduler->get_waiting_requests()[0].get() ==
556+
new_requests[0].get());
557+
}
558+
}
559+
560+
// TEST-7:
561+
// test priority schedule
562+
TEST(ChunkedPrefillSchedulerTest, PrioritySchedule) {
563+
// set max free blocks: 12
564+
// actually only 11 free blocks , because default 1 block is for padding
565+
int block_num = 12;
566+
int block_size = 32;
567+
int max_tokens_per_chunk_for_prefill = 1024;
568+
// set chunked max_tokens budgets 10000 per step
569+
ContinuousScheduler::Options opt = create_scheduler_options(
570+
10000, 256, 0, max_tokens_per_chunk_for_prefill, 1, "priority");
571+
auto engine = std::make_unique<FakeEngine>(block_num, block_size);
572+
auto scheduler = std::make_unique<ChunkedPrefillScheduler>(engine.get(), opt);
573+
EXPECT_TRUE(scheduler != nullptr);
574+
575+
std::vector<std::shared_ptr<Request>> running_requests;
576+
577+
// 1: HIGH, 2: NORMAL, 3: LOW
578+
auto requests = generate_priority_request(
579+
{127, 127, 127}, {10, 10, 10}, {false, false, false}, {3, 3, 2}, 30000);
580+
for (auto req : requests) {
581+
scheduler->add_request(req);
582+
}
583+
auto batch = scheduler->prepare_batch_test();
584+
EXPECT_TRUE(batch.size() == 1);
585+
EXPECT_TRUE(batch[0].size() == 2);
586+
EXPECT_TRUE(scheduler->get_running_requests().size() == 2);
587+
EXPECT_TRUE(scheduler->get_running_requests()[0]->priority() ==
588+
RequestPriority::NORMAL /*NORMAL*/);
589+
EXPECT_TRUE(scheduler->get_running_requests()[1]->priority() ==
590+
RequestPriority::LOW /*LOW*/);
591+
592+
running_requests = scheduler->get_running_requests();
593+
update_requests(running_requests);
594+
595+
// new HIGH priority request arrives, its prefill starts
596+
auto new_requests = generate_priority_request(
597+
{32}, {10}, {false}, {1}, 30000); // use 1 blocks
598+
scheduler->add_request(new_requests[0]);
599+
batch = scheduler->prepare_batch_test();
600+
// check there are 3 running requests owing to decode-maximal
601+
EXPECT_TRUE(batch.size() == 1);
602+
EXPECT_TRUE(batch[0].size() == 3);
603+
EXPECT_TRUE(scheduler->get_running_requests().size() == 3);
604+
running_requests.push_back(new_requests[0]);
605+
update_requests(running_requests);
606+
// preemption happens, only HIGH and NORMAL decode requests
607+
batch = scheduler->prepare_batch_test();
608+
EXPECT_TRUE(batch.size() == 1);
609+
EXPECT_TRUE(batch[0].size() == 2);
610+
EXPECT_TRUE(scheduler->get_running_requests().size() == 2);
611+
EXPECT_TRUE(scheduler->get_running_requests()[0]->priority() ==
612+
RequestPriority::HIGH /*HIGH*/);
613+
EXPECT_TRUE(scheduler->get_running_requests()[1]->priority() ==
614+
RequestPriority::NORMAL /*NORMAL*/);
615+
}
616+
367617
} // namespace xllm

0 commit comments

Comments
 (0)