77from typing import (Callable , Deque , Dict , Iterable , List , Optional , Set ,
88 Tuple , Union )
99
10- from vllm .config import CacheConfig , LoRAConfig , SchedulerConfig
10+ from vllm .config import CacheConfig , LoRAConfig , SchedulerConfig , DeviceConfig
1111from vllm .core .interfaces import AllocStatus , BlockSpaceManager
1212from vllm .logger import init_logger , print_logger
1313from vllm .lora .request import LoRARequest
@@ -301,6 +301,7 @@ def __init__(
301301 scheduler_config : SchedulerConfig ,
302302 cache_config : CacheConfig ,
303303 lora_config : Optional [LoRAConfig ],
304+ device_config : Optional [DeviceConfig ],
304305 pipeline_parallel_size : int = 1 ,
305306 output_proc_callback : Optional [Callable ] = None ,
306307 ) -> None :
@@ -310,6 +311,10 @@ def __init__(
310311 # simple and NOT fair. It can lead to starvation of some
311312 # LoRAs. This should be improved in the future.
312313 self .lora_config = lora_config
314+ # NOTE(hyunjun): Currently, LPU vLLM backend needs to reduce scheduler dependency
315+ # _can_append_slots, _append_slots
316+ # Temporally, we change resource management flow with device config
317+ self .device_config = device_config
313318
314319 version = "v1"
315320 if self .scheduler_config .use_v2_block_manager :
@@ -576,63 +581,119 @@ def _schedule_running(
576581 assert self .output_proc_callback is not None
577582 self .output_proc_callback ()
578583 self .running = tmp
579-
580- while not True : #TODO #self._can_append_slots(seq_group):
581- budget .subtract_num_batched_tokens (seq_group .request_id ,
582- num_running_tokens )
583- num_running_seqs = seq_group .get_max_num_running_seqs ()
584- budget .subtract_num_seqs (seq_group .request_id ,
585- num_running_seqs )
586-
587- if (curr_loras is not None and seq_group .lora_int_id > 0
588- and seq_group .lora_int_id in curr_loras ):
589- curr_loras .remove (seq_group .lora_int_id )
590-
591- if running_queue :
592- # Preempt the lowest-priority sequence groups.
593- victim_seq_group = running_queue .pop ()
594- preempted_mode = self ._preempt (victim_seq_group ,
595- blocks_to_swap_out )
596- if preempted_mode == PreemptionMode .RECOMPUTE :
597- preempted .append (victim_seq_group )
598- else :
599- swapped_out .append (victim_seq_group )
600- else :
601- # No other sequence groups can be preempted.
602- # Preempt the current sequence group.
603- preempted_mode = self ._preempt (seq_group ,
604- blocks_to_swap_out )
605- if preempted_mode == PreemptionMode .RECOMPUTE :
606- preempted .append (seq_group )
607- else :
608- swapped_out .append (seq_group )
609- break
584+ if self .device_config .device_type == "fpga" :
585+ while not True : #self._can_append_slots(seq_group):
586+ budget .subtract_num_batched_tokens (seq_group .request_id ,
587+ num_running_tokens )
588+ num_running_seqs = seq_group .get_max_num_running_seqs ()
589+ budget .subtract_num_seqs (seq_group .request_id ,
590+ num_running_seqs )
591+
592+ if (curr_loras is not None and seq_group .lora_int_id > 0
593+ and seq_group .lora_int_id in curr_loras ):
594+ curr_loras .remove (seq_group .lora_int_id )
595+
596+ if running_queue :
597+ # Preempt the lowest-priority sequence groups.
598+ victim_seq_group = running_queue .pop ()
599+ preempted_mode = self ._preempt (victim_seq_group ,
600+ blocks_to_swap_out )
601+ if preempted_mode == PreemptionMode .RECOMPUTE :
602+ preempted .append (victim_seq_group )
603+ else :
604+ swapped_out .append (victim_seq_group )
605+ else :
606+ # No other sequence groups can be preempted.
607+ # Preempt the current sequence group.
608+ preempted_mode = self ._preempt (seq_group ,
609+ blocks_to_swap_out )
610+ if preempted_mode == PreemptionMode .RECOMPUTE :
611+ preempted .append (seq_group )
612+ else :
613+ swapped_out .append (seq_group )
614+ break
615+ else :
616+ is_prefill = seq_group .is_prefill ()
617+ scheduled_seq_group : ScheduledSequenceGroup = \
618+ self ._scheduled_seq_group_cache [self .cache_id ].get_object ()
619+ scheduled_seq_group .seq_group = seq_group
620+ if is_prefill :
621+ scheduled_seq_group .token_chunk_size = num_running_tokens
622+ prefill_seq_groups .append (scheduled_seq_group )
623+ ret .prefill_seq_groups_list .append (seq_group )
624+ else :
625+ scheduled_seq_group .token_chunk_size = 1
626+ decode_seq_groups .append (scheduled_seq_group )
627+ ret .decode_seq_groups_list .append (seq_group )
628+
629+ budget .add_num_batched_tokens (seq_group .request_id ,
630+ num_running_tokens )
631+ # OPTIMIZATION: Note that get_max_num_running_seqs is
632+ # expensive. For the default scheduling chase where
633+ # enable_chunking is False, num_seqs are updated before running
634+ # this method, so we don't have to update it again here.
635+ if enable_chunking :
636+ num_running_seqs = seq_group .get_max_num_running_seqs ()
637+ budget .add_num_seqs (seq_group .request_id , num_running_seqs )
638+ if curr_loras is not None and seq_group .lora_int_id > 0 :
639+ curr_loras .add (seq_group .lora_int_id )
610640 else :
611- #self._append_slots(seq_group, blocks_to_copy)
612- is_prefill = seq_group .is_prefill ()
613- scheduled_seq_group : ScheduledSequenceGroup = \
614- self ._scheduled_seq_group_cache [self .cache_id ].get_object ()
615- scheduled_seq_group .seq_group = seq_group
616- if is_prefill :
617- scheduled_seq_group .token_chunk_size = num_running_tokens
618- prefill_seq_groups .append (scheduled_seq_group )
619- ret .prefill_seq_groups_list .append (seq_group )
620- else :
621- scheduled_seq_group .token_chunk_size = 1
622- decode_seq_groups .append (scheduled_seq_group )
623- ret .decode_seq_groups_list .append (seq_group )
624-
625- budget .add_num_batched_tokens (seq_group .request_id ,
626- num_running_tokens )
627- # OPTIMIZATION: Note that get_max_num_running_seqs is
628- # expensive. For the default scheduling chase where
629- # enable_chunking is False, num_seqs are updated before running
630- # this method, so we don't have to update it again here.
631- if enable_chunking :
632- num_running_seqs = seq_group .get_max_num_running_seqs ()
633- budget .add_num_seqs (seq_group .request_id , num_running_seqs )
634- if curr_loras is not None and seq_group .lora_int_id > 0 :
635- curr_loras .add (seq_group .lora_int_id )
641+ while not self ._can_append_slots (seq_group ):
642+ budget .subtract_num_batched_tokens (seq_group .request_id ,
643+ num_running_tokens )
644+ num_running_seqs = seq_group .get_max_num_running_seqs ()
645+ budget .subtract_num_seqs (seq_group .request_id ,
646+ num_running_seqs )
647+
648+ if (curr_loras is not None and seq_group .lora_int_id > 0
649+ and seq_group .lora_int_id in curr_loras ):
650+ curr_loras .remove (seq_group .lora_int_id )
651+
652+ if running_queue :
653+ # Preempt the lowest-priority sequence groups.
654+ victim_seq_group = running_queue .pop ()
655+ preempted_mode = self ._preempt (victim_seq_group ,
656+ blocks_to_swap_out )
657+ if preempted_mode == PreemptionMode .RECOMPUTE :
658+ preempted .append (victim_seq_group )
659+ else :
660+ swapped_out .append (victim_seq_group )
661+ else :
662+ # No other sequence groups can be preempted.
663+ # Preempt the current sequence group.
664+ preempted_mode = self ._preempt (seq_group ,
665+ blocks_to_swap_out )
666+ if preempted_mode == PreemptionMode .RECOMPUTE :
667+ preempted .append (seq_group )
668+ else :
669+ swapped_out .append (seq_group )
670+ break
671+ else :
672+ self ._append_slots (seq_group , blocks_to_copy )
673+ is_prefill = seq_group .is_prefill ()
674+ scheduled_seq_group : ScheduledSequenceGroup = \
675+ self ._scheduled_seq_group_cache [self .cache_id ].get_object ()
676+ scheduled_seq_group .seq_group = seq_group
677+ if is_prefill :
678+ scheduled_seq_group .token_chunk_size = num_running_tokens
679+ prefill_seq_groups .append (scheduled_seq_group )
680+ ret .prefill_seq_groups_list .append (seq_group )
681+ else :
682+ scheduled_seq_group .token_chunk_size = 1
683+ decode_seq_groups .append (scheduled_seq_group )
684+ ret .decode_seq_groups_list .append (seq_group )
685+
686+ budget .add_num_batched_tokens (seq_group .request_id ,
687+ num_running_tokens )
688+ # OPTIMIZATION: Note that get_max_num_running_seqs is
689+ # expensive. For the default scheduling chase where
690+ # enable_chunking is False, num_seqs are updated before running
691+ # this method, so we don't have to update it again here.
692+ if enable_chunking :
693+ num_running_seqs = seq_group .get_max_num_running_seqs ()
694+ budget .add_num_seqs (seq_group .request_id , num_running_seqs )
695+ if curr_loras is not None and seq_group .lora_int_id > 0 :
696+ curr_loras .add (seq_group .lora_int_id )
636697
637698 self ._scheduler_running_outputs_cache [self .next_cache_id ].reset ()
638699 self ._scheduled_seq_group_cache [self .next_cache_id ].reset ()
0 commit comments