Skip to content

Commit 5d377b2

Browse files
jac0626silas.jiang
andauthored
fix:use list join instead of += to operate bytes to prevent oom (#3181)
use list join instead of += to operate bytes to prevent oom see #3019 --------- Signed-off-by: silas.jiang <silas.jiang@zilliz.com> Co-authored-by: silas.jiang <silas.jiang@zilliz.com>
1 parent 7fe32ed commit 5d377b2

File tree

3 files changed

+422
-16
lines changed

3 files changed

+422
-16
lines changed

pymilvus/client/entity_helper.py

Lines changed: 51 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -356,8 +356,37 @@ def entity_to_array_arr(entity_values: List[Any], field_info: Any):
356356
return convert_to_array_arr(entity_values, field_info)
357357

358358

359+
def flush_vector_bytes(
360+
field_data: schema_types.FieldData, vector_bytes_cache: Dict[int, List[bytes]]
361+
):
362+
"""Flush the temporary byte list for bytes vector fields, merging all collected bytes.
363+
364+
This function is used to optimize performance by avoiding O(n²) memory operations
365+
caused by using += operations in pack_field_value_to_field_data.
366+
Supports: INT8_VECTOR, BINARY_VECTOR, FLOAT16_VECTOR, BFLOAT16_VECTOR
367+
"""
368+
field_id = id(field_data)
369+
bytes_list = vector_bytes_cache.pop(field_id, None)
370+
if not bytes_list:
371+
return
372+
373+
vector_attr_map = {
374+
DataType.INT8_VECTOR: "int8_vector",
375+
DataType.BINARY_VECTOR: "binary_vector",
376+
DataType.FLOAT16_VECTOR: "float16_vector",
377+
DataType.BFLOAT16_VECTOR: "bfloat16_vector",
378+
}
379+
380+
attr_name = vector_attr_map.get(field_data.type)
381+
if attr_name:
382+
setattr(field_data.vectors, attr_name, b"".join(bytes_list))
383+
384+
359385
def pack_field_value_to_field_data(
360-
field_value: Any, field_data: schema_types.FieldData, field_info: Any
386+
field_value: Any,
387+
field_data: schema_types.FieldData,
388+
field_info: Any,
389+
vector_bytes_cache: Dict[int, List[bytes]],
361390
):
362391
field_type = field_data.type
363392
field_name = field_info["name"]
@@ -462,7 +491,12 @@ def pack_field_value_to_field_data(
462491
field_data.vectors.dim = field_info.get("params", {}).get("dim", 0)
463492
else:
464493
field_data.vectors.dim = len(field_value) * 8
465-
field_data.vectors.binary_vector += bytes(field_value)
494+
b_bytes = bytes(field_value)
495+
496+
field_id = id(field_data)
497+
if field_id not in vector_bytes_cache:
498+
vector_bytes_cache[field_id] = []
499+
vector_bytes_cache[field_id].append(b_bytes)
466500
except (TypeError, ValueError) as e:
467501
raise DataNotMatchException(
468502
message=ExceptionsMessage.FieldDataInconsistent
@@ -489,7 +523,11 @@ def pack_field_value_to_field_data(
489523
)
490524

491525
field_data.vectors.dim = len(v_bytes) // 2
492-
field_data.vectors.float16_vector += v_bytes
526+
527+
field_id = id(field_data)
528+
if field_id not in vector_bytes_cache:
529+
vector_bytes_cache[field_id] = []
530+
vector_bytes_cache[field_id].append(v_bytes)
493531
except (TypeError, ValueError) as e:
494532
raise DataNotMatchException(
495533
message=ExceptionsMessage.FieldDataInconsistent
@@ -516,7 +554,11 @@ def pack_field_value_to_field_data(
516554
)
517555

518556
field_data.vectors.dim = len(v_bytes) // 2
519-
field_data.vectors.bfloat16_vector += v_bytes
557+
558+
field_id = id(field_data)
559+
if field_id not in vector_bytes_cache:
560+
vector_bytes_cache[field_id] = []
561+
vector_bytes_cache[field_id].append(v_bytes)
520562
except (TypeError, ValueError) as e:
521563
raise DataNotMatchException(
522564
message=ExceptionsMessage.FieldDataInconsistent
@@ -562,7 +604,11 @@ def pack_field_value_to_field_data(
562604
)
563605

564606
field_data.vectors.dim = len(i_bytes)
565-
field_data.vectors.int8_vector += i_bytes
607+
608+
field_id = id(field_data)
609+
if field_id not in vector_bytes_cache:
610+
vector_bytes_cache[field_id] = []
611+
vector_bytes_cache[field_id].append(i_bytes)
566612
except (TypeError, ValueError) as e:
567613
raise DataNotMatchException(
568614
message=ExceptionsMessage.FieldDataInconsistent

pymilvus/client/prepare.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -761,6 +761,10 @@ def _parse_row_request(
761761
}
762762
field_info_map = {field["name"]: field for field in input_fields_info}
763763

764+
# Local cache for temporary byte lists for bytes vector fields
765+
# key: field_data object id, value: list of bytes
766+
vector_bytes_cache: Dict[int, List[bytes]] = {}
767+
764768
(
765769
struct_fields_data,
766770
struct_info_map,
@@ -799,7 +803,9 @@ def _parse_row_request(
799803
"default_value", None
800804
):
801805
field_data.valid_data.append(v is not None)
802-
entity_helper.pack_field_value_to_field_data(v, field_data, field_info)
806+
entity_helper.pack_field_value_to_field_data(
807+
v, field_data, field_info, vector_bytes_cache
808+
)
803809
elif k in struct_fields_data:
804810
# Array of structs format
805811
try:
@@ -823,7 +829,9 @@ def _parse_row_request(
823829
field_info, field_data = field_info_map[key], fields_data[key]
824830
if field_info.get("nullable", False) or field_info.get("default_value", None):
825831
field_data.valid_data.append(False)
826-
entity_helper.pack_field_value_to_field_data(None, field_data, field_info)
832+
entity_helper.pack_field_value_to_field_data(
833+
None, field_data, field_info, vector_bytes_cache
834+
)
827835
else:
828836
raise DataNotMatchException(
829837
message=ExceptionsMessage.InsertMissedField % key
@@ -852,6 +860,16 @@ def _parse_row_request(
852860
struct_sub_fields_data[struct_name][field_name]
853861
)
854862

863+
# Flush all bytes vector field temporary byte lists to optimize memory usage
864+
for field_data in fields_data.values():
865+
if field_data.type in (
866+
DataType.INT8_VECTOR,
867+
DataType.BINARY_VECTOR,
868+
DataType.FLOAT16_VECTOR,
869+
DataType.BFLOAT16_VECTOR,
870+
):
871+
entity_helper.flush_vector_bytes(field_data, vector_bytes_cache)
872+
855873
request.fields_data.extend(fields_data.values())
856874
request.fields_data.extend(struct_fields_data.values())
857875

@@ -889,6 +907,10 @@ def _parse_upsert_row_request(
889907
field_info_map = {field["name"]: field for field in input_fields_info}
890908
field_len = {field["name"]: 0 for field in input_fields_info}
891909

910+
# Local cache for temporary byte lists for bytes vector fields
911+
# key: field_data object id, value: list of bytes
912+
vector_bytes_cache: Dict[int, List[bytes]] = {}
913+
892914
# Use common struct data setup (only if not partial update)
893915
if partial_update:
894916
struct_fields_data = {}
@@ -936,7 +958,9 @@ def _parse_upsert_row_request(
936958
"default_value", None
937959
):
938960
field_data.valid_data.append(v is not None)
939-
entity_helper.pack_field_value_to_field_data(v, field_data, field_info)
961+
entity_helper.pack_field_value_to_field_data(
962+
v, field_data, field_info, vector_bytes_cache
963+
)
940964
field_len[k] += 1
941965
elif k in struct_fields_data:
942966
# Handle struct field (array of structs)
@@ -966,7 +990,9 @@ def _parse_upsert_row_request(
966990
if field_info.get("nullable", False) or field_info.get("default_value", None):
967991
field_data.valid_data.append(False)
968992
field_len[key] += 1
969-
entity_helper.pack_field_value_to_field_data(None, field_data, field_info)
993+
entity_helper.pack_field_value_to_field_data(
994+
None, field_data, field_info, vector_bytes_cache
995+
)
970996
else:
971997
raise DataNotMatchException(
972998
message=ExceptionsMessage.InsertMissedField % key
@@ -998,6 +1024,16 @@ def _parse_upsert_row_request(
9981024
fields_data = {k: v for k, v in fields_data.items() if field_len[k] > 0}
9991025
request.fields_data.extend(fields_data.values())
10001026

1027+
# Flush all bytes vector field temporary byte lists to optimize memory usage
1028+
for field_data in fields_data.values():
1029+
if field_data.type in (
1030+
DataType.INT8_VECTOR,
1031+
DataType.BINARY_VECTOR,
1032+
DataType.FLOAT16_VECTOR,
1033+
DataType.BFLOAT16_VECTOR,
1034+
):
1035+
entity_helper.flush_vector_bytes(field_data, vector_bytes_cache)
1036+
10011037
if struct_fields_data:
10021038
# reconstruct the struct array fields data (same as in insert)
10031039
for struct in input_struct_field_info:

0 commit comments

Comments
 (0)