Skip to content

Commit 508f33b

Browse files
enable deepspeed inference (#47)
Signed-off-by: depenglee1707 <[email protected]>
1 parent 682acb8 commit 508f33b

File tree

7 files changed

+169
-63
lines changed

7 files changed

+169
-63
lines changed

llmserve/backend/llm/initializers/hf_transformers/deepspeed.py

Lines changed: 75 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,9 @@ def __init__(
4545
max_tokens: int = 1024,
4646
use_kernel: bool = False,
4747
use_meta_tensor: bool = False,
48-
injection_policy=None,
48+
test_hybrid_engine: bool = False,
49+
save_mp_checkpoint_path: bool = False,
50+
# injection_policy=None,
4951
ds_inference_kwargs: Optional[Dict[str, Any]] = None,
5052
**from_pretrained_kwargs,
5153
):
@@ -60,8 +62,10 @@ def __init__(
6062
self.max_tokens = max_tokens
6163
self.use_kernel = use_kernel
6264
self.use_meta_tensor = use_meta_tensor
65+
self.test_hybrid_engine = test_hybrid_engine
66+
self.save_mp_checkpoint_path = save_mp_checkpoint_path
6367
# TODO: Allow conversion from strings (need to do dynamic imports)
64-
self.injection_policy = injection_policy
68+
# self.injection_policy = injection_policy
6569
self.ds_inference_kwargs = ds_inference_kwargs
6670

6771
if self.use_kernel:
@@ -114,6 +118,8 @@ def _generate_checkpoint_json(
114118
for entry in Path(repo_root).rglob("*.[bp][it][n]")
115119
if entry.is_file()
116120
]
121+
122+
# BOOLM ?!
117123
data = {"type": "BLOOM",
118124
"checkpoints": file_list, "version": 1.0}
119125
json.dump(data, f)
@@ -170,58 +176,78 @@ def load_model(self, model_id: str) -> "PreTrainedModel":
170176
return model
171177

172178
def postprocess_model(self, model: "PreTrainedModel") -> "PreTrainedModel":
173-
from transformers import GPTNeoXForCausalLM, LlamaForCausalLM
174-
175-
injection_policy = self.injection_policy
176-
# TODO: remove those later when deepspeed master is updated
177-
if injection_policy is None and not self.use_kernel:
178-
if isinstance(model, GPTNeoXForCausalLM):
179-
from transformers import GPTNeoXLayer
180-
181-
injection_policy = {
182-
GPTNeoXLayer: ("attention.dense", "mlp.dense_4h_to_h")
183-
}
184-
elif isinstance(model, LlamaForCausalLM):
185-
from transformers.models.llama.modeling_llama import LlamaDecoderLayer
186-
187-
injection_policy = {
188-
LlamaDecoderLayer: ("self_attn.o_proj", "mlp.down_proj")
189-
}
190-
191-
if self.use_bettertransformer:
192-
from optimum.bettertransformer import BetterTransformer
193-
194-
logger.info("Transforming the model with BetterTransformer...")
195-
model = BetterTransformer.transform(model)
196-
197-
ds_kwargs = self.ds_inference_kwargs or {}
198-
ds_kwargs = ds_kwargs.copy()
199-
ds_kwargs.update(
200-
dict(
201-
dtype=self.dtype,
202-
mp_size=self.world_size,
203-
replace_with_kernel_inject=self.use_kernel,
204-
injection_policy=injection_policy,
205-
max_tokens=self.max_tokens,
206-
)
207-
)
208179
if self.use_meta_tensor:
209-
ds_kwargs.update(
210-
dict(base_dir=self._repo_root, checkpoint=self._checkpoints_json)
211-
)
212-
213-
logger.info(f"deepspeed.init_inference kwargs: {ds_kwargs}")
214-
model = deepspeed.init_inference(
215-
model,
216-
**ds_kwargs,
217-
)
180+
ds_kwargs = dict(base_dir=self._repo_root, checkpoint=self._checkpoints_json)
181+
else:
182+
ds_kwargs = dict()
183+
184+
# Use DeepSpeed Hybrid Engine for inference
185+
if self.test_hybrid_engine:
186+
ds_config = {"train_batch_size": 2, "fp16": {"enabled": True if self.dtype==torch.half else False}, "hybrid_engine": {"enabled": True}}
187+
model, *_ = deepspeed.initialize(model=model, config=ds_config)
188+
model.eval()
189+
# If not trying with the HuggingFace baseline, use DeepSpeed Inference Engine
190+
else:
191+
model = deepspeed.init_inference(model,
192+
dtype=self.dtype,
193+
mp_size=self.world_size,
194+
replace_with_kernel_inject=self.use_kernel,
195+
max_tokens=self.max_tokens,
196+
save_mp_checkpoint_path=self.save_mp_checkpoint_path,
197+
**ds_kwargs
198+
)
199+
# from transformers import GPTNeoXForCausalLM, LlamaForCausalLM
200+
201+
# injection_policy = self.injection_policy
202+
# # TODO: remove those later when deepspeed master is updated
203+
# if injection_policy is None and not self.use_kernel:
204+
# if isinstance(model, GPTNeoXForCausalLM):
205+
# from transformers import GPTNeoXLayer
206+
207+
# injection_policy = {
208+
# GPTNeoXLayer: ("attention.dense", "mlp.dense_4h_to_h")
209+
# }
210+
# elif isinstance(model, LlamaForCausalLM):
211+
# from transformers.models.llama.modeling_llama import LlamaDecoderLayer
212+
213+
# injection_policy = {
214+
# LlamaDecoderLayer: ("self_attn.o_proj", "mlp.down_proj")
215+
# }
216+
217+
# if self.use_bettertransformer:
218+
# from optimum.bettertransformer import BetterTransformer
219+
220+
# logger.info("Transforming the model with BetterTransformer...")
221+
# model = BetterTransformer.transform(model)
222+
223+
# ds_kwargs = self.ds_inference_kwargs or {}
224+
# ds_kwargs = ds_kwargs.copy()
225+
# ds_kwargs.update(
226+
# dict(
227+
# dtype=self.dtype,
228+
# mp_size=self.world_size,
229+
# replace_with_kernel_inject=self.use_kernel,
230+
# injection_policy=injection_policy,
231+
# max_tokens=self.max_tokens,
232+
# )
233+
# )
234+
# if self.use_meta_tensor:
235+
# ds_kwargs.update(
236+
# dict(base_dir=self._repo_root, checkpoint=self._checkpoints_json)
237+
# )
238+
239+
# logger.info(f"deepspeed.init_inference kwargs: {ds_kwargs}")
240+
# model = deepspeed.init_inference(
241+
# model,
242+
# **ds_kwargs,
243+
# )
218244

219245
if self.torch_compile and self.torch_compile["backend"]:
220246
logger.info("Compiling the model with torch.compile()...")
221247
model = torch.compile(model, **self.torch_compile)
222248

223249
# Add attributes for compatibility with the pipeline
224-
model.use_kernel = self.use_kernel
225-
model.device = self.device
226-
model = model.to(self.device)
250+
# model.use_kernel = self.use_kernel
251+
# model.device = self.device
252+
# model = model.to(self.device)
227253
return model

llmserve/backend/llm/pipelines/_base.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -165,8 +165,9 @@ def __call__(self, inputs: List[Union[str, Prompt]], **kwargs) -> List[Response]
165165
postprocess_params,
166166
) = self._sanitize_parameters(**kwargs)
167167
model_inputs = self.preprocess(inputs, **preprocess_params)
168+
168169
model_inputs = self._ensure_tensor_on_device(
169-
model_inputs, device=self.model.device)
170+
model_inputs, device=(self.model.device if hasattr(self.model, 'device') else self.device))
170171

171172
forward_params = self._add_default_generate_kwargs(
172173
forward_params, model_inputs)

llmserve/backend/llm/pipelines/default_pipeline.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
import time
2-
from typing import List, Optional, Union
2+
from typing import List, Optional, Union, TYPE_CHECKING
33

44
import torch
55
from transformers import PreTrainedModel, PreTrainedTokenizer
@@ -56,7 +56,7 @@ def preprocess(self, prompts: List[str], **generate_kwargs):
5656

5757
inputs = self.tokenizer(
5858
prompt_text, return_tensors="pt", add_special_tokens = generate_kwargs.get("add_special_tokens", True), padding=True
59-
).to(self.model.device)
59+
).to(self.model.device if hasattr(self.model, 'device') else self.device)
6060

6161
if not generate_kwargs.get("return_token_type_ids", True):
6262
inputs.pop("token_type_ids", None)

llmserve/backend/llm/predictor.py

Lines changed: 29 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,8 +52,8 @@ def init_model(
5252
# Lazy import so that the new cache location is used
5353
torch.backends.cuda.matmul.allow_tf32 = True
5454
if torch.cuda.is_available():
55-
# device = torch.device(f"cuda:{local_rank}")
56-
device = torch.device("cuda")
55+
device = torch.device(f"cuda:{local_rank}")
56+
# device = torch.device("cuda")
5757
else:
5858
device = torch.device("cpu")
5959

@@ -383,11 +383,12 @@ async def _create_worker_group(
383383
await asyncio.gather(
384384
*[
385385
worker.init_model.remote(
386-
local_rank,
386+
local_rank = local_rank,
387387
num_cpus_per_worker=scaling_config.num_cpus_per_worker,
388388
num_gpus_per_worker=scaling_config.num_gpus_per_worker
389389
)
390390
for worker, local_rank in zip(worker_group, local_ranks)
391+
# for worker in worker_group
391392
]
392393
)
393394

@@ -425,20 +426,39 @@ def slice_prompts(worker_num: int, worker_index: int, prompts: list[str]):
425426
logger.info('LLM Predictor do async predict')
426427

427428
async with self._base_worker_group_lock:
429+
# prediction = (
430+
# await asyncio.gather(
431+
# *[
432+
# worker.generate.remote(
433+
# slice_prompts(len(self.base_worker_group), index, prompts),
434+
# # prompts,
435+
# timeout_s=timeout_s,
436+
# start_timestamp=start_timestamp,
437+
# **self.args.model_config.generation.all_generate_kwargs if self.args.model_config.generation else {}, # pylint:disable=no-member
438+
# ) if len(slice_prompts(len(self.base_worker_group), index, prompts)) > 0 else ray.put([])
439+
440+
# for index, worker in enumerate(self.base_worker_group)
441+
# # for worker in self.base_worker_group
442+
# ]
443+
# )
444+
# )
445+
# return [response for responses in prediction for response in responses]
428446
prediction = (
429447
await asyncio.gather(
430448
*[
431449
worker.generate.remote(
432-
slice_prompts(len(self.base_worker_group), index, prompts),
433-
# prompts,
450+
# slice_prompts(len(self.base_worker_group), index, prompts),
451+
prompts,
434452
timeout_s=timeout_s,
435453
start_timestamp=start_timestamp,
436454
**self.args.model_config.generation.all_generate_kwargs if self.args.model_config.generation else {}, # pylint:disable=no-member
437-
) if len(slice_prompts(len(self.base_worker_group), index, prompts)) > 0 else ray.put([])
455+
)
438456

439-
for index, worker in enumerate(self.base_worker_group)
440-
# for worker in self.base_worker_group
457+
# for index, worker in enumerate(self.base_worker_group)
458+
for worker in self.base_worker_group
441459
]
442460
)
443461
)
444-
return [response for responses in prediction for response in responses]
462+
463+
return prediction
464+

llmserve/backend/server/models.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,6 +240,8 @@ class DeepSpeed(Transformers):
240240
use_kernel: bool = False
241241
max_tokens: int = 1024
242242
use_meta_tensor: bool = False
243+
test_hybrid_engine: bool = False
244+
save_mp_checkpoint_path: bool = False
243245
ds_inference_kwargs: Optional[Dict[str, Any]] = None
244246

245247
@root_validator
@@ -257,7 +259,10 @@ def use_kernel_use_meta_tensor(cls, values): # pylint:disable=no-self-argument
257259
if not values.get("use_kernel") and values.get("use_meta_tensor"):
258260
raise ValueError("'use_meta_tensor=True' needs 'use_kernel=True'.")
259261
return values
260-
262+
263+
@property
264+
def allowed_pipelines(self) -> Set[str]:
265+
return {"default"}
261266

262267
class DeviceMap(Transformers):
263268
type: Literal["DeviceMap"]
Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,54 @@
1+
deployment_config:
2+
autoscaling_config:
3+
min_replicas: 1
4+
initial_replicas: 1
5+
max_replicas: 8
6+
target_num_ongoing_requests_per_replica: 1.0
7+
metrics_interval_s: 10.0
8+
look_back_period_s: 30.0
9+
smoothing_factor: 1.0
10+
downscale_delay_s: 300.0
11+
upscale_delay_s: 90.0
12+
ray_actor_options:
13+
num_cpus: 0.1 # for a model deployment, we have 3 actor created, 1 and 2 will cost 0.1 cpu, and the model infrence will cost 6(see the setting in the end of the file)
14+
model_config:
15+
warmup: True
16+
model_task: text-generation
17+
model_id: bigscience/bloom-3b
18+
max_input_words: 800
19+
initialization:
20+
# s3_mirror_config:
21+
# bucket_uri: s3://large-dl-models-mirror/models--amazon--LightGPT/main-safetensors/
22+
initializer:
23+
type: DeepSpeed
24+
dtype: float32
25+
max_tokens: 512
26+
use_kernel: true
27+
use_meta_tensor: false
28+
test_hybrid_engine: false
29+
save_mp_checkpoint_path: false
30+
from_pretrained_kwargs:
31+
use_cache: true
32+
trust_remote_code: true
33+
pipeline: default
34+
generation:
35+
max_batch_size: 2
36+
batch_wait_timeout_s: 30
37+
generate_kwargs:
38+
do_sample: false
39+
max_new_tokens: 512
40+
min_new_tokens: 16
41+
temperature: 0.7
42+
repetition_penalty: 1.1
43+
top_p: 0.8
44+
top_k: 50
45+
# prompt_format: "Below is an instruction that describes a task. Write a response that appropriately completes the request.\n### Instruction:\n{instruction}\n### Response:\n"
46+
# stopping_sequences: ["### Response:", "### End"]
47+
scaling_config:
48+
num_workers: 1
49+
num_gpus_per_worker: 0
50+
num_cpus_per_worker: 6 # for infrence
51+
# resources_per_worker:
52+
# accelerator_type_cpu: 0.01
53+
ray_actor_options:
54+
num_cpus: 0.1

setup.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@
4949
"torchaudio==2.1.2",
5050
"torchvision==0.16.2",
5151
"accelerate==0.25.0",
52-
"deepspeed==0.12.6",
52+
"deepspeed==0.14.0",
5353
"torchmetrics==1.2.1",
5454
"llama_cpp_python==0.2.20",
5555
"transformers==4.33.3",

0 commit comments

Comments
 (0)