Skip to content

Commit 6aa9a4c

Browse files
authored
Refactor turbomind engine (#4223)
* refactor turbomind engine * simplify interface * minor * metrics * refactor & logprobs * fix output logits * fix logprobs * rename * mrope * fix cuda-12.4 build * ix cuda-12.4 build * fix cuda-12.4 build * fix MSVC build * fix MSVC build * fix guided decoding * fix warm-up for TP * fix VLMs * refactor DP * remove redundant `rank` parameter * add `no queue` error & fix lint * fix vocab size * fix attn output for finished seqs * fix lint * fix lint * add async flag * fix prefix caching * minor fix * fix lint * fix lint * fix typo * fix log level
1 parent 4c0ca84 commit 6aa9a4c

File tree

122 files changed

+6543
-6317
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

122 files changed

+6543
-6317
lines changed

CMakeLists.txt

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -71,11 +71,9 @@ FetchContent_MakeAvailable(repo-cutlass)
7171
FetchContent_Declare(
7272
yaml-cpp
7373
GIT_REPOSITORY https://github.com/jbeder/yaml-cpp.git
74-
GIT_TAG 0.8.0
74+
GIT_TAG 65c1c270dbe7eec37b2df2531d7497c4eea79aee
7575
GIT_PROGRESS TRUE
7676
USES_TERMINAL_DOWNLOAD TRUE
77-
PATCH_COMMAND git apply ${CMAKE_CURRENT_SOURCE_DIR}/cmake/yaml-cpp_cmake_policy.patch
78-
UPDATE_DISCONNECTED 1
7977
)
8078
set(YAML_BUILD_SHARED_LIBS OFF CACHE BOOL "Build static library of yaml-cpp")
8179
FetchContent_MakeAvailable(yaml-cpp)
@@ -87,7 +85,6 @@ FetchContent_Declare(
8785
GIT_SUBMODULES "3rdparty/dlpack"
8886
GIT_PROGRESS TRUE
8987
USES_TERMINAL_DOWNLOAD TRUE
90-
UPDATE_DISCONNECTED 1
9188
)
9289

9390
FetchContent_GetProperties(xgrammar)
@@ -110,6 +107,7 @@ endif()
110107

111108
# the environment variable
112109
# ASAN_OPTIONS=protect_shadow_gap=0,intercept_tls_get_addr=0
110+
# LD_PRELOAD=/usr/lib/x86_64-linux-gnu/libasan.so.6:/usr/lib/x86_64-linux-gnu/libstdc++.so.6
113111
# must be set at runtime
114112
# https://github.com/google/sanitizers/issues/1322
115113
if (LMDEPLOY_ASAN_ENABLE)
@@ -333,6 +331,8 @@ if (MSVC)
333331
CMAKE_CUDA_FLAGS_RELWITHDEBINFO)
334332
string(REGEX REPLACE "-Wall" " /W0 " ${flag_var} "${${flag_var}}")
335333
endforeach()
334+
# avoid min/max macro in "windows.h" conflict with std::min/std::max
335+
add_definitions(-DNOMINMAX=1)
336336
endif()
337337

338338
include_directories(

benchmark/profile_throughput.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -327,6 +327,7 @@ def parse_args():
327327
ArgumentHelper.model_format(tb_group, default='hf')
328328
ArgumentHelper.num_tokens_per_iter(tb_group)
329329
ArgumentHelper.max_prefill_iters(tb_group)
330+
ArgumentHelper.async_(tb_group)
330331
ArgumentHelper.communicator(tb_group)
331332

332333
args = parser.parse_args()
@@ -348,6 +349,7 @@ def main():
348349
quant_policy=args.quant_policy,
349350
num_tokens_per_iter=args.num_tokens_per_iter,
350351
max_prefill_iters=args.max_prefill_iters,
352+
async_=args.async_,
351353
enable_prefix_caching=args.enable_prefix_caching,
352354
dtype=args.dtype,
353355
communicator=args.communicator,

lmdeploy/cli/cli.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@ def add_parser_chat():
6969
ArgumentHelper.rope_scaling_factor(tb_group)
7070
ArgumentHelper.communicator(tb_group)
7171
ArgumentHelper.cp(tb_group)
72+
ArgumentHelper.async_(tb_group)
7273

7374
# speculative decoding
7475
ArgumentHelper.add_spec_group(parser)

lmdeploy/cli/serve.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -146,6 +146,7 @@ def add_parser_api_server():
146146
ArgumentHelper.rope_scaling_factor(tb_group)
147147
ArgumentHelper.num_tokens_per_iter(tb_group)
148148
ArgumentHelper.max_prefill_iters(tb_group)
149+
ArgumentHelper.async_(tb_group)
149150
ArgumentHelper.communicator(tb_group)
150151
ArgumentHelper.dist_init_addr(tb_group)
151152

@@ -262,6 +263,7 @@ def api_server(args):
262263
max_prefill_token_num=args.max_prefill_token_num,
263264
num_tokens_per_iter=args.num_tokens_per_iter,
264265
max_prefill_iters=args.max_prefill_iters,
266+
async_=args.async_,
265267
communicator=args.communicator,
266268
enable_metrics=not args.disable_metrics,
267269
hf_overrides=args.hf_overrides)

lmdeploy/cli/utils.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -567,6 +567,16 @@ def max_prefill_iters(parser):
567567
default=1,
568568
help='the max number of forward passes in prefill stage')
569569

570+
@staticmethod
571+
def async_(parser):
572+
return parser.add_argument('--async',
573+
type=int,
574+
default=1,
575+
choices=[0, 1],
576+
dest='async_',
577+
help='Enable async execution (default: 1, enabled). '
578+
'Set to 0 to disable async mode, 1 to enable it.')
579+
570580
@staticmethod
571581
def max_prefill_token_num(parser):
572582
return parser.add_argument('--max-prefill-token-num',

lmdeploy/messages.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -235,6 +235,7 @@ class TurbomindEngineConfig:
235235
"Dynamic SplitFuse"-like scheduling
236236
max_prefill_iters: the max number of forward pass during prefill
237237
stage
238+
async_: enable async execution, default to 1 (enabled)
238239
devices: the used devices
239240
empty_init: Whether to load the model weights, you should set
240241
it to True if you want to update weights after create the pipeline
@@ -273,6 +274,7 @@ class TurbomindEngineConfig:
273274
max_prefill_token_num: int = 8192
274275
num_tokens_per_iter: int = 0
275276
max_prefill_iters: int = 1
277+
async_: int = 1
276278
devices: Optional[List[int]] = None
277279
empty_init: bool = False
278280
communicator: str = 'nccl'
@@ -289,6 +291,7 @@ def __post_init__(self):
289291
assert self.max_prefill_token_num >= 0, \
290292
'invalid max_prefill_token_num'
291293
assert self.num_tokens_per_iter >= 0, 'invalid num_tokens_per_iter'
294+
assert self.async_ in (0, 1), 'async_ must be 0 (disabled) or 1 (enabled)'
292295

293296

294297
@dataclass
@@ -451,6 +454,7 @@ class ResponseType(enum.Enum):
451454
INTERNAL_ENGINE_ERROR = enum.auto()
452455
CANCEL = enum.auto()
453456
PREFIX_CACHE_CONFLICT_INTERACTIVE_MODE = enum.auto()
457+
NO_QUEUE = enum.auto()
454458

455459

456460
@dataclass

lmdeploy/turbomind/turbomind.py

Lines changed: 30 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,9 @@
1515
from queue import Queue
1616
from typing import Any, Dict, List, Optional
1717

18-
import numpy as np
1918
import pybase64
2019
import torch
2120
import yaml
22-
from torch.nn.utils.rnn import pad_sequence
2321

2422
import lmdeploy
2523
from lmdeploy.messages import EngineOutput, GenerationConfig, ResponseType, ScheduleMetrics, TurbomindEngineConfig
@@ -195,28 +193,22 @@ def _load_weights(self):
195193
def _process_weights(self):
196194
"""Process weight."""
197195
with ThreadPoolExecutor(max_workers=self.gpu_count) as e:
198-
ranks = [self.node_id * self.gpu_count + device_id for device_id in range(self.gpu_count)]
199-
for _ in e.map(self.model_comm.process_weight, range(self.gpu_count), ranks):
196+
for _ in e.map(self.model_comm.process_weight, range(self.gpu_count)):
200197
pass
201198

202199
def _create_engine(self):
203200
"""Create engine."""
204201
with ThreadPoolExecutor(max_workers=self.gpu_count) as e:
205-
ranks = [self.node_id * self.gpu_count + device_id for device_id in range(self.gpu_count)]
206-
for _ in e.map(self.model_comm.create_engine, range(self.gpu_count), ranks):
202+
for _ in e.map(self.model_comm.create_engine, range(self.gpu_count)):
207203
pass
208204
self._engine_created = True
209205

210206
def _create_weight(self, model_comm):
211207
"""Allocate weight buffer, load params if from_workspace."""
212208

213-
engine_cfg = self.config_dict['engine_config']
214-
self.node_id = engine_cfg['node_rank']
215-
216209
# create weight
217210
def _create_weight_func(device_id):
218-
rank = self.node_id * self.gpu_count + device_id
219-
model_comm.create_shared_weights(device_id, rank)
211+
model_comm.create_weights(device_id)
220212

221213
with ThreadPoolExecutor(max_workers=self.gpu_count) as executor:
222214
futures = []
@@ -233,8 +225,7 @@ def _get_model_params(self):
233225
tm_params.clear()
234226

235227
def _get_params(device_id, que):
236-
rank = self.node_id * self.gpu_count + device_id
237-
out = model_comm.get_params(device_id, rank)
228+
out = model_comm.get_weights(device_id)
238229
que.put(out)
239230

240231
que = Queue()
@@ -266,12 +257,6 @@ def _postprocess_config(self, tm_config: TurbomindModelConfig, engine_config: Tu
266257
# update some attributes of `engine_config` which depends on
267258
# `session_len`
268259
self.engine_config = engine_config
269-
if engine_config.max_prefill_token_num is not None \
270-
and engine_config.num_tokens_per_iter == 0:
271-
self.engine_config.num_tokens_per_iter = \
272-
engine_config.max_prefill_token_num
273-
self.engine_config.max_prefill_iters = (self.config.session_len + engine_config.max_prefill_token_num -
274-
1) // engine_config.max_prefill_token_num
275260

276261
# pack `self.config` and `self.engine_config` into a dict
277262
self.config_dict = self.config.to_dict()
@@ -290,9 +275,9 @@ def _from_hf(self, model_path: str, engine_config: TurbomindEngineConfig):
290275

291276
self._postprocess_config(tm_model.tm_config, engine_config)
292277

293-
model_comm = _tm.AbstractTransformerModel.create_llama_model(model_dir='',
294-
config=yaml.safe_dump(self.config_dict),
295-
weight_type=self.config.model_config.weight_type)
278+
model_comm = _tm.TurboMind.create(model_dir='',
279+
config=yaml.safe_dump(self.config_dict),
280+
weight_type=self.config.model_config.weight_type)
296281

297282
# create empty weight
298283
self._create_weight(model_comm)
@@ -311,8 +296,7 @@ def wakeup(self, tags: Optional[list[str]] = None):
311296
if tags is None:
312297
tags = ['weights', 'kv_cache']
313298
with ThreadPoolExecutor(max_workers=self.gpu_count) as e:
314-
ranks = [self.node_id * self.gpu_count + device_id for device_id in range(self.gpu_count)]
315-
for _ in e.map(self.model_comm.wakeup, range(self.gpu_count), [tags] * self.gpu_count, ranks):
299+
for _ in e.map(self.model_comm.wakeup, range(self.gpu_count), [tags] * self.gpu_count):
316300
pass
317301

318302
def update_params(self, request: UpdateParamsRequest):
@@ -501,7 +485,7 @@ def _func(out: EngineOutput, step: int, **kwargs):
501485
out.req_metrics = RequestMetrics(token_timestamp=time.time())
502486
else:
503487
events = [
504-
EngineEvent(EventType.QUEUED, metrics.enque_time / 1000000),
488+
EngineEvent(EventType.QUEUED, metrics.enqueue_time / 1000000),
505489
EngineEvent(EventType.SCHEDULED, metrics.scheduled_time / 1000000),
506490
]
507491
out.req_metrics = RequestMetrics(token_timestamp=time.time(), engine_events=events)
@@ -547,7 +531,7 @@ def __init__(self, tm_model: TurboMind, config: TurbomindModelConfig, cuda_strea
547531

548532
# create model instances
549533
lazy_init = self.tm_model.config_dict['engine_config'].get('empty_init', False)
550-
self._model_inst = None if lazy_init else self._create_model_instance(0)
534+
self._model_inst = None if lazy_init else self._create_model_instance()
551535

552536
self.config = config
553537
self.lock = None
@@ -564,17 +548,18 @@ def __init__(self, tm_model: TurboMind, config: TurbomindModelConfig, cuda_strea
564548
7: ResponseType.FINISH,
565549
8: ResponseType.CANCEL,
566550
9: ResponseType.PREFIX_CACHE_CONFLICT_INTERACTIVE_MODE,
551+
10: ResponseType.NO_QUEUE,
567552
-1: ResponseType.INTERNAL_ENGINE_ERROR,
568553
}
569554

570555
@property
571556
def model_inst(self):
572557
if self._model_inst is None:
573-
self._model_inst = self._create_model_instance(0)
558+
self._model_inst = self._create_model_instance()
574559
return self._model_inst
575560

576-
def _create_model_instance(self, device_id):
577-
model_inst = self.tm_model.model_comm.create_model_instance(device_id)
561+
def _create_model_instance(self):
562+
model_inst = self.tm_model.model_comm.create_request()
578563
return model_inst
579564

580565
def _get_extra_output_processors(self, outputs: Dict[str, torch.Tensor], gen_config: GenerationConfig,
@@ -598,47 +583,27 @@ def _get_offset(type):
598583

599584
def prepare_embeddings(self, input_embeddings=None, input_embedding_ranges=None):
600585
"""Convert embeddings."""
601-
if input_embeddings is None:
586+
if not input_embeddings:
602587
return None, None
603588

589+
assert isinstance(input_embeddings, List)
590+
assert isinstance(input_embedding_ranges, List)
604591
assert len(input_embeddings) == len(input_embedding_ranges)
605-
if not isinstance(input_embeddings[0], (list, type(None))):
606-
input_embeddings = [input_embeddings]
607-
input_embedding_ranges = [input_embedding_ranges]
608592

609-
if all([isinstance(x, type(None)) for x in input_embeddings]):
610-
return None, None
593+
length = sum([x.shape[0] for x in input_embeddings])
594+
595+
_MAP = dict(bfloat16=torch.bfloat16, float16=torch.float16)
596+
dtype = _MAP[self.tm_model.config.model_config.data_type]
597+
598+
values = torch.empty((length, input_embeddings[0].shape[-1]), dtype=dtype, device='cpu')
599+
ranges = torch.tensor(input_embedding_ranges, dtype=torch.int32, device='cpu')
600+
601+
offset = 0
602+
for embeds in input_embeddings:
603+
values[offset:offset + embeds.shape[0]].copy_(embeds)
604+
offset += embeds.shape[0]
611605

612-
hidden_dim = None
613-
for embeddings in input_embeddings:
614-
if embeddings is not None:
615-
hidden_dim = embeddings[0].squeeze().shape[-1]
616-
break
617-
assert hidden_dim is not None
618-
619-
# construct input_embeddings
620-
for i in range(len(input_embeddings)):
621-
item = input_embeddings[i] or []
622-
# convert to torch.Tensor if input is np.ndarray
623-
if item and isinstance(item[0], np.ndarray):
624-
item = [torch.from_numpy(x).squeeze() for x in item]
625-
# convert to lookup table type
626-
_MAP = dict(float=torch.float, bfloat16=torch.bfloat16, float16=torch.float16, fp8=torch.bfloat16)
627-
dtype = _MAP.get(self.tm_model.config.weight_type, torch.float16)
628-
item = [x.to(dtype=dtype) for x in item]
629-
item = item or [torch.zeros(0, hidden_dim, dtype=dtype)]
630-
input_embeddings[i] = item
631-
input_embeddings = [torch.cat(x) for x in input_embeddings]
632-
input_embeddings = pad_sequence(input_embeddings, batch_first=True)
633-
input_embeddings = input_embeddings.reshape(input_embeddings.shape[0], -1).view(torch.int8)
634-
# construct input_embedding_ranges
635-
for i in range(len(input_embedding_ranges)):
636-
item = input_embedding_ranges[i] or []
637-
item = torch.IntTensor(item).reshape(-1, 2)
638-
input_embedding_ranges[i] = item
639-
input_embedding_ranges = pad_sequence(input_embedding_ranges, batch_first=True, padding_value=-1)
640-
641-
return input_embeddings, input_embedding_ranges
606+
return values, ranges
642607

643608
def prepare_mrope(self, input_meta: Dict[str, Any], input_len: int):
644609
mrope_position_ids = input_meta['mrope_position_ids']

lmdeploy/vl/model/base.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -293,7 +293,7 @@ def to_turbomind_aux(self, messages, prompt, IMAGE_TOKEN, tokenizer, sequence_st
293293
# collect image features from messages
294294
features = [x['content'] for x in messages if x['role'] == 'forward']
295295
features = features[0]
296-
features = [x.cpu().numpy() for x in features]
296+
features = [x.cpu() for x in features]
297297
# split prompt into segments and validate data
298298
segs = prompt.split(IMAGE_TOKEN)
299299
assert len(segs) == len(features) + 1, (f'the number of {IMAGE_TOKEN} is not equal '

src/turbomind/CMakeLists.txt

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,25 @@
1515
add_subdirectory(utils)
1616
add_subdirectory(core)
1717
add_subdirectory(kernels)
18-
add_subdirectory(layers)
1918
add_subdirectory(comm)
19+
add_subdirectory(generation)
2020
add_subdirectory(models)
2121
add_subdirectory(engine)
22-
if(BUILD_PYT)
23-
add_subdirectory(th_op)
24-
endif()
22+
2523
if(BUILD_PY_FFI)
2624
add_subdirectory(python)
2725
endif()
28-
add_subdirectory(triton_backend)
26+
27+
add_library(turbomind STATIC turbomind.cc)
28+
set_property(TARGET turbomind PROPERTY POSITION_INDEPENDENT_CODE ON)
29+
target_link_libraries(turbomind PUBLIC
30+
engine
31+
models
32+
device_comm
33+
host_comm
34+
core
35+
memory_utils
36+
nvtx_utils
37+
CUDA::cublasLt
38+
CUDA::cudart
39+
yaml-cpp::yaml-cpp)

src/turbomind/core/CMakeLists.txt

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,8 @@ add_library(core STATIC
1111
layout.cc
1212
tensor.cc
1313
tensor.cu
14-
module.cc)
14+
module.cc
15+
copy.cc)
1516

1617
target_link_libraries(core PUBLIC cuda_utils logger CUDA::cudart CUDA::cuda_driver)
1718

0 commit comments

Comments
 (0)