Skip to content

Commit 13fe72d

Browse files
authored
[DeepSeek R1] Upgrade Mooncake to latest (#1829)
## Purpose Changes to adoption the latest Mooncake Store API
1 parent 03db555 commit 13fe72d

File tree

4 files changed

+34
-46
lines changed

4 files changed

+34
-46
lines changed

pd_xpyd/setup_env.sh

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,8 @@ pip install colorlog
55

66
echo "2. setting up mooncake mooncake-transfer-engine private build............."
77
#Mooncake
8-
wget https://github.com/hlin99/Mooncake/releases/download/private_buildv2/mooncake_transfer_engine-0.1.0-cp310-cp310-manylinux2014_x86_64.whl
9-
pip install mooncake_transfer_engine-0.1.0-cp310-cp310-manylinux2014_x86_64.whl --force-reinstall
8+
wget https://github.com/hlin99/Mooncake/releases/download/private_buildv3/mooncake_transfer_engine-0.3.5-cp310-cp310-manylinux_2_17_x86_64.whl
9+
pip install mooncake_transfer_engine-0.3.5-cp310-cp310-manylinux_2_17_x86_64.whl --force-reinstall
1010

1111
echo "3. setting up RDMA for mooncake ..................."
1212
#RDMA

pd_xpyd/start_etcd_mooncake_master.sh

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,9 +29,4 @@ etcd --listen-client-urls http://0.0.0.0:2379 \
2929
--advertise-client-urls http://localhost:2379 \
3030
>etcd.log 2>&1 &
3131

32-
if [ "$BENCHMARK_MODE" == "1" ]; then
33-
mooncake_master -max_threads 64 -port 50001 --v=1 >mooncake_master.log 2>&1 &
34-
else
35-
mooncake_master -enable_gc true -max_threads 64 -port 50001 --v=1 >mooncake_master.log 2>&1 &
36-
fi
37-
32+
mooncake_master -max_threads 64 -port 50001 -eviction_high_watermark_ratio 0.8 -eviction_ratio 0.2 >mooncake_master.log 2>&1 &

vllm/distributed/kv_transfer/kv_connector/mooncake_store_connector.py

Lines changed: 8 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -251,9 +251,9 @@ def send_kv_caches_and_hidden_states_cpu(
251251
store_kvcache_key = f"{store_key_prefix}_{self.rank}"
252252
store_hidden_key = f"{store_key_prefix}_hidden_{self.rank}"
253253

254-
self.kv_store.put_unsafe(store_kvcache_key,
254+
self.kv_store.put_tensor(store_kvcache_key,
255255
kv_caches_send_list[idx])
256-
self.kv_store.put_unsafe(store_hidden_key, hidden_states_list[idx])
256+
self.kv_store.put_tensor(store_hidden_key, hidden_states_list[idx])
257257
logger.info("[rank %d]: KV send DONE. send %d, takes %f s", self.rank,
258258
len(input_tokens_list),
259259
time.time() - start_time)
@@ -313,12 +313,12 @@ def send_kv_caches_and_hidden_states_hpu(
313313
kvcache_to_sent = keys.cpu()
314314
logger.debug("kv cache reshape time: %s", time.time() - start_time)
315315
store_kvcache_key = f"{store_key_prefix}_{self.rank}"
316-
self.kv_store.put_unsafe(store_kvcache_key, kvcache_to_sent)
316+
self.kv_store.put_tensor(store_kvcache_key, kvcache_to_sent)
317317

318318
logger.debug("put kv cache key: %s", store_kvcache_key)
319319

320320
hidden_key = f"{store_key_prefix}_hidden_{self.rank}"
321-
self.kv_store.put(
321+
self.kv_store.put_tensor(
322322
hidden_key,
323323
hidden_or_intermediate_states[idx].unsqueeze(0).cpu())
324324
# ==== graph should end here ======
@@ -396,15 +396,14 @@ def recv_kv_caches_and_hidden_states_hpu(
396396
load_key_prefix = self.tensor_hash(current_tokens)
397397
# For deepseek, we only need recv first rank
398398
load_kvcache_key = f"{load_key_prefix}_0"
399-
shape = (61, num_blocks * 128, self.k_v_head_size)
400399
remote_kv = None
401400
if self._wait_for_key(load_kvcache_key):
402-
remote_kv = self.kv_store.get_unsafe(load_kvcache_key, shape,
401+
remote_kv = self.kv_store.get_tensor(load_kvcache_key,
403402
self.dtype)
404403
hidden_key = f"{load_key_prefix}_hidden_0"
405404
hidden = None
406405
if self._wait_for_key(hidden_key):
407-
hidden = self.kv_store.get(hidden_key)
406+
hidden = self.kv_store.get_tensor(hidden_key)
408407

409408
if remote_kv is None or hidden is None:
410409
# didn't find any match.
@@ -477,13 +476,12 @@ def recv_kv_caches_and_hidden_states_cpu(
477476
load_hidden_key = f"{prefix}_hidden_0"
478477
remote_kv = None
479478
if self._wait_for_key(load_kvcache_key):
480-
remote_kv = self.kv_store.get_unsafe(load_kvcache_key,
481-
shape=None,
479+
remote_kv = self.kv_store.get_tensor(load_kvcache_key,
482480
dtype=self.dtype)
483481
# hidden_states always use bf16.
484482
hidden = None
485483
if self._wait_for_key(load_hidden_key):
486-
hidden = self.kv_store.get_unsafe(load_hidden_key, shape=(1, 7168))
484+
hidden = self.kv_store.get_tensor(load_hidden_key)
487485

488486
if remote_kv is None or hidden is None:
489487
# didn't find any match.

vllm/distributed/kv_transfer/kv_lookup_buffer/mooncake_store.py

Lines changed: 23 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ def __init__(
8484
):
8585

8686
try:
87-
from mooncake import MooncakeDistributedStore
87+
from mooncake.store import MooncakeDistributedStore
8888
except ImportError as e:
8989
raise ImportError(
9090
"Please install mooncake by following the instructions at "
@@ -199,47 +199,42 @@ def _get_impl(
199199

200200
return None
201201

202-
def put_unsafe(
202+
def put_tensor(
203203
self,
204204
key: str,
205205
value: Optional[torch.Tensor],
206206
) -> None:
207-
"""Put KVCache to Mooncake Store"""
207+
"""Put tensor to Mooncake Store"""
208208
value = value.cpu()
209-
start_serde = time.time()
210-
data_ptr = value.data_ptr()
211-
element_size = value.element_size()
212-
numel = value.numel()
213-
total_size = element_size * numel
214-
end_serde = time.time()
209+
start_put = time.time()
215210
try:
216-
self.store.put_unsafe(key, data_ptr, total_size)
211+
self.store.put_tensor(key, value)
217212
except TypeError as err:
218-
logger.error("Failed to put value into Mooncake Store: %s", err)
213+
logger.error("Failed to put tensor into Mooncake Store: %s", err)
219214
raise TypeError("Mooncake Store Put Type Error.") from err
220215
end_put = time.time()
221-
logger.debug("contiguous time: %f, put time: %f",
222-
end_serde - start_serde, end_put - end_serde)
216+
logger.debug("Put tensor to store. Time: %f", end_put - start_put)
223217

224-
def get_unsafe(self,
218+
def get_tensor(self,
225219
key: str,
226-
shape,
227220
dtype=torch.bfloat16) -> Optional[torch.Tensor]:
228-
"""Get KVCache from Mooncake Store without type checking"""
221+
"""Get tensor from Mooncake Store"""
229222
start_get = time.time()
230-
data = self.store.get(key)
223+
try:
224+
value = self.store.get_tensor(key)
225+
except TypeError as err:
226+
logger.error("Failed to get tensor from Mooncake Store: %s", err)
227+
raise TypeError("Mooncake Store Get Type Error.") from err
231228
end_get = time.time()
232-
if data:
233-
tensor = torch.frombuffer(data, dtype=dtype)
234-
shape = (61, -1, 1, 576) if shape is None else shape
235-
tensor = tensor.view(shape)
236-
end_from_buffer = time.time()
237-
logger.debug("from buffer time: %f , get time: %f",
238-
end_from_buffer - end_get, end_get - start_get)
239-
240-
return tensor
241-
return None
229+
if value is None:
230+
logger.error("Failed to get tensor from Mooncake Store: %s", key)
231+
return None
232+
# This is a workaround for get_tensor which returns wrong tensor type
233+
# Remove when Mooncake get_tensor fixed the issue
234+
value = value.view(dtype)
235+
logger.debug("Get tensor from store. Time: %f", end_get - start_get)
236+
return value
242237

243238
def is_exist(self, key: str) -> bool:
244239
"""Check if the key exists in the Mooncake Store"""
245-
return self.store.isExist(key) == 1
240+
return self.store.is_exist(key) == 1

0 commit comments

Comments
 (0)