Skip to content

Commit 678cba1

Browse files
committed
revert encode_into changes
Signed-off-by: Staszek Pasko <[email protected]>
1 parent 8bda83c commit 678cba1

File tree

3 files changed

+20
-10
lines changed

3 files changed

+20
-10
lines changed

tests/v1/test_serial_utils.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -70,12 +70,14 @@ def test_encode_decode():
7070

7171
assert_equal(decoded, obj)
7272

73-
# Test whether MsgpackEncoder properly reuses the buffers.
73+
# Test encode_into case
7474

75-
encoded2 = encoder.encode(obj)
75+
preallocated = bytearray()
76+
77+
encoded2 = encoder.encode_into(obj, preallocated)
7678

7779
assert len(encoded2) == 6
78-
assert encoded2[0] is encoded[0]
80+
assert encoded2[0] is preallocated
7981

8082
decoded2: MyType = decoder.decode(encoded2)
8183

vllm/v1/engine/core.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -507,15 +507,15 @@ def process_output_socket(self, output_path: str, engine_index: int):
507507
"""Output socket IO thread."""
508508

509509
# Msgpack serialization encoding.
510-
# The wrapper keeps an internal encoding buffer that avoids
511-
# creating a new buffer for each encode call.
512510
encoder = MsgpackEncoder()
511+
# Reuse send buffer.
512+
buffer = bytearray()
513513

514514
with zmq_socket_ctx(output_path, zmq.constants.PUSH) as socket:
515515
while True:
516516
outputs = self.output_queue.get()
517517
outputs.engine_index = engine_index
518-
buffers = encoder.encode(outputs)
518+
buffers = encoder.encode_into(outputs, buffer)
519519
socket.send_multipart(buffers, copy=False)
520520

521521

vllm/v1/serial_utils.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -58,19 +58,27 @@ def __init__(self, size_threshold: Optional[int] = None):
5858
# This is used as a local stash of buffers that we can then access from
5959
# our custom `msgspec` hook, `enc_hook`. We don't have a way to
6060
# pass custom data to the hook otherwise.
61-
self.msg_buffer = bytearray()
6261
self.aux_buffers: Optional[list[bytestr]] = None
6362
self.size_threshold = size_threshold
6463

6564
def encode(self, obj: Any) -> Sequence[bytestr]:
6665
try:
66+
self.aux_buffers = bufs = [b'']
67+
bufs[0] = self.encoder.encode(obj)
6768
# This `bufs` list allows us to collect direct pointers to backing
6869
# buffers of tensors and np arrays, and return them along with the
6970
# top-level encoded buffer instead of copying their data into the
7071
# new buffer.
71-
self.aux_buffers = [self.msg_buffer]
72-
bufs = self.aux_buffers
73-
self.encoder.encode_into(obj, self.msg_buffer)
72+
return bufs
73+
finally:
74+
self.aux_buffers = None
75+
76+
# TODO: would be nice to make this automatic
77+
def encode_into(self, obj: Any, buf: bytearray) -> Sequence[bytestr]:
78+
try:
79+
self.aux_buffers = [buf]
80+
bufs = [buf]
81+
self.encoder.encode_into(obj, buf)
7482
return bufs
7583
finally:
7684
self.aux_buffers = None

0 commit comments

Comments
 (0)