@@ -31,15 +31,15 @@ class MsgpackEncoder:
31
31
Note that unlike vanilla `msgspec` Encoders, this interface is generally
32
32
not thread-safe when encoding tensors / numpy arrays.
33
33
34
- By default, arrays below 256MB are serialized inline.
34
+ By default, arrays below 256B are serialized inline.
35
35
Larger will get sent via dedicated messages.
36
36
Note that this is a per-tensor limit.
37
37
38
38
Sending multiple large messages via zeromq saturates memory very quickly.
39
39
See: https://github.com/vllm-project/vllm/issues/16185
40
40
"""
41
41
42
- def __init__ (self , size_threshold = 256 * 1024 * 1024 ):
42
+ def __init__ (self , size_threshold = 256 ):
43
43
self .encoder = msgpack .Encoder (enc_hook = self .enc_hook )
44
44
# This is used as a local stash of buffers that we can then access from
45
45
# our custom `msgspec` hook, `enc_hook`. We don't have a way to
@@ -102,7 +102,12 @@ def _encode_ndarray(
102
102
self , obj : np .ndarray
103
103
) -> tuple [str , tuple [int , ...], Union [int , memoryview ]]:
104
104
assert self .aux_buffers is not None
105
- arr_data = obj .data if obj .data .c_contiguous else obj .tobytes ()
105
+ # Either copy the memoryview directly or flatten the array to bytes.
106
+ # Sending memoryviews is theoretically faster, but in this particular
107
+ # case, it triggers some unnecessary copies anyway.
108
+ # With this, the tensors can still be zero-copy read.
109
+ arr_data = obj .data .tobytes () if obj .data .c_contiguous \
110
+ else obj .tobytes ()
106
111
if not obj .shape or obj .nbytes < self .size_threshold :
107
112
# Encode small arrays and scalars inline. Using this extension type
108
113
# ensures we can avoid copying when decoding.
@@ -165,8 +170,8 @@ def _decode_ndarray(self, arr: Any) -> np.ndarray:
165
170
dtype , shape , data = arr
166
171
# Copy from inline representation, otherwise Torch is unhappy since
167
172
# the returned memory is non-writeable.
168
- buffer = self .aux_buffers [data ] if isinstance (
169
- data , int ) else bytearray (data ). copy ( )
173
+ buffer = self .aux_buffers [data ] if isinstance (data , int ) \
174
+ else bytearray (data )
170
175
return np .ndarray (buffer = buffer , dtype = np .dtype (dtype ), shape = shape )
171
176
172
177
def _decode_mm_items (self , obj : list ) -> list [MultiModalKwargsItem ]:
0 commit comments