Skip to content

Commit 4006aef

Browse files
undo unrelated changes
Signed-off-by: Lucas Wilkinson <[email protected]>
1 parent 759e737 commit 4006aef

File tree

7 files changed

+29
-207
lines changed

7 files changed

+29
-207
lines changed

docs/design/fused_moe_modular_kernel.md

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@ The `FusedMoEModularKernel` acts as a bridge between the `FusedMoEPermuteExperts
5757
The `FusedMoEPrepareAndFinalize` abstract class exposes `prepare`, `prepare_no_receive` and `finalize` functions.
5858
The `prepare` function is responsible for input activation Quantization and All2All Dispatch. If implemented, The `prepare_no_receive` is like `prepare` except it does not wait to receive results from other workers. Instead it returns a "receiver" callback that must be invoked to wait for the final results of worker. It is not required that this method is supported by all `FusedMoEPrepareAndFinalize` classes, but if it is available, it can be used to interleave work with the initial all to all communication, e.g. interleaving shared experts with fused experts. The `finalize` function is responsible for invoking the All2All Combine. Additionally the `finalize` function may or may not do the TopK weight application and reduction (Please refer to the TopKWeightAndReduce section)
5959

60-
6160
![](../assets/design/fused_moe_modular_kernel/prepare_and_finalize_blocks.png "FusedMoEPrepareAndFinalize Blocks")
6261

6362
### FusedMoEPermuteExpertsUnpermute

examples/offline_inference/data_parallel.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,12 +92,6 @@ def parse_args():
9292
action="store_true",
9393
help=("Enable microbatched execution")
9494
)
95-
parser.add_argument(
96-
"--compilation-config",
97-
type=int,
98-
default=0,
99-
help=("Compilation optimization (O) level 0-3."),
100-
)
10195
parser.add_argument(
10296
"--compilation-config",
10397
type=int,

vllm/v1/executor/multiproc_executor.py

Lines changed: 3 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@
33
import multiprocessing
44
import os
55
import pickle
6-
import queue
76
import signal
87
import threading
98
import time
@@ -19,7 +18,6 @@
1918
from typing import Any, Callable, Optional, Union, cast
2019

2120
import cloudpickle
22-
import torch
2321

2422
import vllm.envs as envs
2523
from vllm.config import VllmConfig
@@ -35,8 +33,7 @@
3533
get_loopback_ip, get_mp_context, get_open_port,
3634
set_process_title)
3735
from vllm.v1.executor.abstract import Executor, FailureCallback
38-
from vllm.v1.outputs import (AsyncModelRunnerOutput, DraftTokenIds,
39-
ModelRunnerOutput)
36+
from vllm.v1.outputs import DraftTokenIds, ModelRunnerOutput
4037
from vllm.worker.worker_base import WorkerWrapperBase
4138

4239
logger = init_logger(__name__)
@@ -415,14 +412,6 @@ def __init__(
415412
# Initializes a message queue for sending the model output
416413
self.worker_response_mq = MessageQueue(1, 1)
417414

418-
self.async_output_queue: queue.Queue = queue.Queue()
419-
self.async_output_copy_stream = torch.cuda.Stream()
420-
self.async_output_copy_thread = Thread(
421-
target=self.async_output_busy_loop,
422-
daemon=True,
423-
name="WorkerAsyncOutputCopy")
424-
self.async_output_copy_thread.start()
425-
426415
# Initialize device and loads weights
427416
self.worker.init_device()
428417
self.worker.load_model()
@@ -604,18 +593,6 @@ class ResponseStatus(Enum):
604593
SUCCESS = auto()
605594
FAILURE = auto()
606595

607-
def enqueue_worker_output(self, output: Any) -> None:
608-
if isinstance(output, AsyncModelRunnerOutput):
609-
output = output.serialize(self.async_output_copy_stream)
610-
self.worker_response_mq.enqueue(
611-
(WorkerProc.ResponseStatus.SUCCESS, output))
612-
613-
def async_output_busy_loop(self):
614-
"""Entrypoint for the thread which handles outputs asynchronously."""
615-
while True:
616-
output = self.async_output_queue.get()
617-
self.enqueue_worker_output(output)
618-
619596
def worker_busy_loop(self):
620597
"""Main busy loop for Multiprocessing Workers"""
621598
while True:
@@ -640,4 +617,5 @@ def worker_busy_loop(self):
640617
continue
641618

642619
if output_rank is None or self.rank == output_rank:
643-
self.async_output_queue.put(output)
620+
self.worker_response_mq.enqueue(
621+
(WorkerProc.ResponseStatus.SUCCESS, output))

vllm/v1/outputs.py

Lines changed: 0 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -114,34 +114,6 @@ class ModelRunnerOutput:
114114
num_nans_in_logits: Optional[dict[str, int]] = None
115115

116116

117-
# ModelRunnerOutput wrapper for async scheduling.
118-
# Contains GPU tensors which must be serialized before sending
119-
# to the scheduler process.
120-
@dataclass
121-
class AsyncModelRunnerOutput:
122-
model_runner_output: ModelRunnerOutput
123-
124-
# [num_reqs, max_num_generated_tokens]
125-
sampled_token_ids: torch.Tensor
126-
127-
invalid_req_indices: list[int]
128-
129-
def serialize(self, copy_stream: torch.cuda.Stream) -> ModelRunnerOutput:
130-
default_stream = torch.cuda.current_stream()
131-
with torch.cuda.stream(copy_stream):
132-
copy_stream.wait_stream(default_stream)
133-
sampled_token_ids_cpu = self.sampled_token_ids.to(
134-
'cpu', non_blocking=True)
135-
copy_stream.synchronize()
136-
valid_sampled_token_ids = sampled_token_ids_cpu.tolist()
137-
for i in self.invalid_req_indices:
138-
valid_sampled_token_ids[i].clear()
139-
140-
output = self.model_runner_output
141-
output.sampled_token_ids = valid_sampled_token_ids
142-
return output
143-
144-
145117
@dataclass
146118
class DraftTokenIds:
147119

vllm/v1/worker/gpu_input_batch.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,11 +250,6 @@ def __init__(
250250

251251
self.pooling_params: dict[str, PoolingParams] = {}
252252

253-
# Cached reference to the GPU tensor of previously sampled tokens
254-
self.prev_sampled_token_ids: Optional[torch.Tensor] = None
255-
self.prev_sampled_token_ids_invalid_indices: Optional[set[int]] = None
256-
self.prev_req_id_to_index: Optional[dict[str, int]] = None
257-
258253
@property
259254
def req_ids(self) -> list[str]:
260255
# None elements should only be present transiently

vllm/v1/worker/gpu_model_runner.py

Lines changed: 21 additions & 137 deletions
Original file line numberDiff line numberDiff line change
@@ -69,8 +69,8 @@
6969
FullAttentionSpec, KVCacheConfig,
7070
KVCacheGroupSpec, KVCacheSpec,
7171
MambaSpec, SlidingWindowSpec)
72-
from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, AsyncModelRunnerOutput,
73-
DraftTokenIds, LogprobsTensors, ModelRunnerOutput)
72+
from vllm.v1.outputs import (EMPTY_MODEL_RUNNER_OUTPUT, DraftTokenIds,
73+
LogprobsTensors, ModelRunnerOutput)
7474
from vllm.v1.pool.metadata import PoolingMetadata
7575
from vllm.v1.sample.logits_processor import LogitsProcessors, build_logitsprocs
7676
from vllm.v1.sample.metadata import SamplingMetadata
@@ -238,8 +238,6 @@ def __init__(
238238
is_pooling_model=self.is_pooling_model,
239239
)
240240

241-
self.use_async_scheduling = self.scheduler_config.async_scheduling
242-
243241
# TODO(woosuk): Provide an option to tune the max cudagraph batch size.
244242
# The convention is different.
245243
# self.cudagraph_batch_sizes sorts in ascending order.
@@ -711,73 +709,6 @@ def _get_cumsum_and_arange(
711709

712710
return cu_num_tokens, arange
713711

714-
def _prepare_input_ids(self, total_num_scheduled_tokens: int,
715-
cu_num_tokens: np.ndarray) -> None:
716-
"""Prepare the input IDs for the current batch.
717-
718-
Carefully handles the `prev_sampled_token_ids` which can be cached
719-
from the previous engine iteration, in which case those tokens on the
720-
GPU need to be copied into the corresponding slots into input_ids."""
721-
722-
if self.input_batch.prev_sampled_token_ids is not None:
723-
# Async scheduling case, we need to copy the sampled token ids
724-
# from the previous iteration.
725-
prev_req_id_to_index = self.input_batch.prev_req_id_to_index
726-
current_req_id_to_index = self.input_batch.req_id_to_index
727-
assert prev_req_id_to_index is not None
728-
common_req_ids = set(prev_req_id_to_index.keys()).intersection(
729-
set(current_req_id_to_index.keys()))
730-
if common_req_ids:
731-
current_common_req_indices = [
732-
current_req_id_to_index[req_id]
733-
for req_id in common_req_ids
734-
]
735-
prev_common_req_indices = [
736-
prev_req_id_to_index[req_id] for req_id in common_req_ids
737-
]
738-
# We need to compute the flattened input_ids index of the
739-
# last token in each common request.
740-
flattened_indices = [
741-
int(cu_num_tokens[idx]) - 1
742-
for idx in current_common_req_indices
743-
]
744-
if len(flattened_indices) < total_num_scheduled_tokens:
745-
# If not all requests are decodes from the last iteration,
746-
# We need to copy the input_ids_cpu to the GPU first.
747-
self.input_ids.copy_to_gpu(total_num_scheduled_tokens)
748-
if flattened_indices == prev_common_req_indices and \
749-
set(flattened_indices) == \
750-
set(range(len(flattened_indices))):
751-
# Common-case optimization: the batch is unchanged
752-
# and no reordering happened.
753-
# The indices are both the same permutation of 0..N-1
754-
self.input_ids.gpu[:len(flattened_indices)].copy_(
755-
self.input_batch.prev_sampled_token_ids[:len(
756-
flattened_indices)].squeeze(1),
757-
non_blocking=True)
758-
else:
759-
# Upload the index tensors asynchronously
760-
# so the scatter can be non-blocking
761-
input_ids_index_tensor = torch.tensor(
762-
flattened_indices,
763-
dtype=torch.int64,
764-
pin_memory=self.pin_memory).to(self.device,
765-
non_blocking=True)
766-
prev_common_req_indices_tensor = torch.tensor(
767-
prev_common_req_indices,
768-
dtype=torch.int64,
769-
pin_memory=self.pin_memory).to(self.device,
770-
non_blocking=True)
771-
self.input_ids.gpu.scatter_(
772-
dim=0,
773-
index=input_ids_index_tensor,
774-
src=self.input_batch.prev_sampled_token_ids[
775-
prev_common_req_indices_tensor].squeeze(1))
776-
else:
777-
self.input_ids.copy_to_gpu(total_num_scheduled_tokens)
778-
else:
779-
self.input_ids.copy_to_gpu(total_num_scheduled_tokens)
780-
781712
def _prepare_inputs(
782713
self, scheduler_output: "SchedulerOutput"
783714
) -> tuple[PerLayerAttnMetadata, torch.Tensor,
@@ -869,8 +800,7 @@ def _prepare_inputs(
869800
max_seq_len = self.seq_lens.np[:num_reqs].max().item()
870801

871802
# Copy the tensors to the GPU.
872-
self._prepare_input_ids(total_num_scheduled_tokens, cu_num_tokens)
873-
803+
self.input_ids.copy_to_gpu(total_num_scheduled_tokens)
874804
if self.uses_mrope:
875805
# Only relevant for models using M-RoPE (e.g, Qwen2-VL)
876806
self.mrope_positions.gpu[:, :total_num_scheduled_tokens].copy_(
@@ -986,10 +916,6 @@ def _prepare_inputs(
986916
builder,
987917
)
988918

989-
<<<<<<< HEAD
990-
991-
=======
992-
>>>>>>> nm/sage/dbo-full-cudagraphs
993919
if ubatch_slices is not None:
994920
common_attn_metadata_list = split_attn_metadata(
995921
ubatch_slices, common_attn_metadata)
@@ -1637,7 +1563,6 @@ def get_dp_padding_ubatch(
16371563
should_ubatch = False
16381564

16391565
# Note that we compute the number of padded tokens per ubatch
1640-
16411566
(should_ubatch,
16421567
num_tokens_across_dp) = self.should_ubatch_with_num_tokens(
16431568
should_ubatch, num_tokens_unpadded // 2, num_tokens_per_ubatch)
@@ -1724,7 +1649,7 @@ def execute_model(
17241649
self,
17251650
scheduler_output: "SchedulerOutput",
17261651
intermediate_tensors: Optional[IntermediateTensors] = None,
1727-
) -> Union[ModelRunnerOutput, AsyncModelRunnerOutput, IntermediateTensors]:
1652+
) -> Union[ModelRunnerOutput, IntermediateTensors]:
17281653
self._update_states(scheduler_output)
17291654
if not scheduler_output.total_num_scheduled_tokens:
17301655
if not has_kv_transfer_group():
@@ -1927,12 +1852,6 @@ def execute_model(
19271852
# so that we could clear the sampled tokens before returning.
19281853
discard_sampled_tokens_req_indices.append(i)
19291854

1930-
# Copy some objects so they don't get modified after returning.
1931-
# This is important when using async scheduling.
1932-
req_ids_output_copy = self.input_batch.req_ids.copy()
1933-
req_id_to_index_output_copy = \
1934-
self.input_batch.req_id_to_index.copy()
1935-
19361855
# NOTE: GPU -> CPU Sync happens here.
19371856
# Move as many CPU operations as possible before this sync point.
19381857
logprobs_tensors = sampler_output.logprobs_tensors
@@ -1945,54 +1864,29 @@ def execute_model(
19451864
scheduler_output.num_scheduled_tokens,
19461865
)
19471866

1948-
num_sampled_tokens = sampler_output.sampled_token_ids.shape[0]
1867+
# Get the valid generated tokens.
19491868
sampled_token_ids = sampler_output.sampled_token_ids
1950-
if not self.use_async_scheduling:
1951-
# Get the valid generated tokens.
1952-
max_gen_len = sampled_token_ids.shape[-1]
1953-
if max_gen_len == 1:
1954-
# No spec decode tokens.
1955-
valid_sampled_token_ids = self._to_list(sampled_token_ids)
1956-
else:
1957-
# Includes spec decode tokens.
1958-
valid_sampled_token_ids = self.rejection_sampler.parse_output(
1959-
sampled_token_ids,
1960-
self.input_batch.vocab_size,
1961-
)
1962-
# Mask out the sampled tokens that should not be sampled.
1963-
for i in discard_sampled_tokens_req_indices:
1964-
valid_sampled_token_ids[i].clear()
1869+
max_gen_len = sampled_token_ids.shape[-1]
1870+
if max_gen_len == 1:
1871+
# No spec decode tokens.
1872+
valid_sampled_token_ids = self._to_list(sampled_token_ids)
19651873
else:
1966-
valid_sampled_token_ids = []
1967-
invalid_req_indices = list(discard_sampled_tokens_req_indices)
1968-
invalid_req_indices_set = set(invalid_req_indices)
1969-
assert sampled_token_ids.shape[-1] == 1
1970-
1971-
# Cache the sampled tokens on the GPU and avoid CPU sync.
1972-
# These will be copied into input_ids in the next step
1973-
# when preparing inputs.
1974-
self.input_batch.prev_sampled_token_ids = \
1975-
sampled_token_ids
1976-
self.input_batch.prev_sampled_token_ids_invalid_indices = \
1977-
invalid_req_indices_set
1978-
self.input_batch.prev_req_id_to_index = {
1979-
req_id: i
1980-
for i, req_id in enumerate(self.input_batch.req_ids)
1981-
if i not in invalid_req_indices_set
1982-
}
1874+
# Includes spec decode tokens.
1875+
valid_sampled_token_ids = self.rejection_sampler.parse_output(
1876+
sampled_token_ids,
1877+
self.input_batch.vocab_size,
1878+
)
1879+
# Mask out the sampled tokens that should not be sampled.
1880+
for i in discard_sampled_tokens_req_indices:
1881+
valid_sampled_token_ids[i].clear()
19831882

19841883
# Cache the sampled tokens in the model runner, so that the scheduler
19851884
# doesn't need to send them back.
19861885
# NOTE(woosuk): As an exception, when using PP, the scheduler sends
19871886
# the sampled tokens back, because there's no direct communication
19881887
# between the first-stage worker and the last-stage worker.
19891888
req_ids = self.input_batch.req_ids
1990-
for req_idx in range(num_sampled_tokens):
1991-
if self.use_async_scheduling:
1992-
sampled_ids = [-1] * 1 if \
1993-
req_idx not in invalid_req_indices_set else None
1994-
else:
1995-
sampled_ids = valid_sampled_token_ids[req_idx]
1889+
for req_idx, sampled_ids in enumerate(valid_sampled_token_ids):
19961890
if not sampled_ids:
19971891
continue
19981892

@@ -2007,7 +1901,6 @@ def execute_model(
20071901
start_idx:end_idx] = sampled_ids
20081902
self.input_batch.num_tokens_no_spec[req_idx] = end_idx
20091903
self.input_batch.num_tokens[req_idx] = end_idx
2010-
20111904
req_id = req_ids[req_idx]
20121905
req_state = self.requests[req_id]
20131906
req_state.output_token_ids.extend(sampled_ids)
@@ -2029,9 +1922,9 @@ def execute_model(
20291922

20301923
self.eplb_step()
20311924

2032-
output = ModelRunnerOutput(
2033-
req_ids=req_ids_output_copy,
2034-
req_id_to_index=req_id_to_index_output_copy,
1925+
return ModelRunnerOutput(
1926+
req_ids=self.input_batch.req_ids,
1927+
req_id_to_index=self.input_batch.req_id_to_index,
20351928
sampled_token_ids=valid_sampled_token_ids,
20361929
logprobs=logprobs_lists,
20371930
prompt_logprobs_dict=prompt_logprobs_dict,
@@ -2040,15 +1933,6 @@ def execute_model(
20401933
num_nans_in_logits=num_nans_in_logits,
20411934
)
20421935

2043-
if self.use_async_scheduling:
2044-
return AsyncModelRunnerOutput(
2045-
model_runner_output=output,
2046-
sampled_token_ids=sampled_token_ids,
2047-
invalid_req_indices=invalid_req_indices,
2048-
)
2049-
2050-
return output
2051-
20521936
def take_draft_token_ids(self) -> Optional[DraftTokenIds]:
20531937
if self._draft_token_ids is None:
20541938
return None

0 commit comments

Comments
 (0)