@@ -985,11 +985,10 @@ def _start(
985
985
986
986
987
987
def _client_construct_op_msg (
988
- command : Mapping [ str , Any ] ,
989
- to_send_ops : list [Mapping [ str , Any ] ],
990
- to_send_ns : list [Mapping [ str , Any ] ],
988
+ command_encoded : bytes ,
989
+ to_send_ops_encoded : list [bytes ],
990
+ to_send_ns_encoded : list [bytes ],
991
991
ack : bool ,
992
- opts : CodecOptions ,
993
992
buf : _BytesIO ,
994
993
) -> int :
995
994
# Write flags
@@ -998,7 +997,7 @@ def _client_construct_op_msg(
998
997
999
998
# Type 0 Section
1000
999
buf .write (b"\x00 " )
1001
- buf .write (_dict_to_bson ( command , False , opts ) )
1000
+ buf .write (command_encoded )
1002
1001
1003
1002
# Type 1 Section for ops
1004
1003
buf .write (b"\x01 " )
@@ -1007,8 +1006,8 @@ def _client_construct_op_msg(
1007
1006
buf .write (b"\x00 \x00 \x00 \x00 " )
1008
1007
buf .write (b"ops\x00 " )
1009
1008
# Write all the ops documents
1010
- for op in to_send_ops :
1011
- buf .write (_dict_to_bson ( op , False , opts ) )
1009
+ for op_encoded in to_send_ops_encoded :
1010
+ buf .write (op_encoded )
1012
1011
resume_location = buf .tell ()
1013
1012
# Write type 1 section size
1014
1013
length = buf .tell ()
@@ -1023,8 +1022,8 @@ def _client_construct_op_msg(
1023
1022
buf .write (b"\x00 \x00 \x00 \x00 " )
1024
1023
buf .write (b"nsInfo\x00 " )
1025
1024
# Write all the nsInfo documents
1026
- for ns in to_send_ns :
1027
- buf .write (_dict_to_bson ( ns , False , opts ) )
1025
+ for ns_encoded in to_send_ns_encoded :
1026
+ buf .write (ns_encoded )
1028
1027
# Write type 1 section size
1029
1028
length = buf .tell ()
1030
1029
buf .seek (size_location )
@@ -1045,19 +1044,23 @@ def _client_batched_op_msg_impl(
1045
1044
1046
1045
def _check_doc_size_limits (
1047
1046
op_type : str ,
1048
- document : Mapping [ str , Any ] ,
1047
+ doc_size : int ,
1049
1048
limit : int ,
1050
- ) -> int :
1051
- doc_size = len (_dict_to_bson (document , False , opts ))
1049
+ ) -> None :
1052
1050
if doc_size > limit :
1053
1051
_raise_document_too_large (op_type , doc_size , limit )
1054
- return doc_size
1055
1052
1056
1053
max_bson_size = ctx .max_bson_size
1057
1054
max_write_batch_size = ctx .max_write_batch_size
1058
1055
max_message_size = ctx .max_message_size
1059
1056
1060
- # Don't include bulkWrite-command-agnostic fields in document size calculations.
1057
+ command_encoded = _dict_to_bson (command , False , opts )
1058
+ # When OP_MSG is used unacknowledged we have to check command
1059
+ # document size client-side or applications won't be notified.
1060
+ if not ack :
1061
+ _check_doc_size_limits ("bulkWrite" , len (command_encoded ), max_bson_size + _COMMAND_OVERHEAD )
1062
+
1063
+ # Don't include bulkWrite-command-agnostic fields in batch-splitting calculations.
1061
1064
abridged_keys = ["bulkWrite" , "errorsOnly" , "ordered" ]
1062
1065
if command .get ("bypassDocumentValidation" ):
1063
1066
abridged_keys .append ("bypassDocumentValidation" )
@@ -1068,17 +1071,14 @@ def _check_doc_size_limits(
1068
1071
command_abridged = {key : command [key ] for key in abridged_keys }
1069
1072
command_len_abridged = len (_dict_to_bson (command_abridged , False , opts ))
1070
1073
1071
- # When OP_MSG is used unacknowledged we have to check command
1072
- # document size client-side or applications won't be notified.
1073
- if not ack :
1074
- _check_doc_size_limits ("bulkWrite" , command_abridged , max_bson_size + _COMMAND_OVERHEAD )
1075
-
1076
1074
# Maximum combined size of the ops and nsInfo document sequences.
1077
1075
max_doc_sequences_bytes = max_message_size - (_OP_MSG_OVERHEAD + command_len_abridged )
1078
1076
1079
1077
ns_info = {}
1080
1078
to_send_ops : list [Mapping [str , Any ]] = []
1081
1079
to_send_ns : list [Mapping [str , int ]] = []
1080
+ to_send_ops_encoded : list [bytes ] = []
1081
+ to_send_ns_encoded : list [bytes ] = []
1082
1082
total_ops_length = 0
1083
1083
total_ns_length = 0
1084
1084
idx = 0
@@ -1088,11 +1088,13 @@ def _check_doc_size_limits(
1088
1088
# Check insert/replace document size if unacknowledged.
1089
1089
if real_op_type == "insert" :
1090
1090
if not ack :
1091
- _check_doc_size_limits (real_op_type , op_doc ["document" ], max_bson_size )
1091
+ doc_size = len (_dict_to_bson (op_doc ["document" ], False , opts ))
1092
+ _check_doc_size_limits (real_op_type , doc_size , max_bson_size )
1092
1093
if real_op_type == "replace" :
1093
1094
op_type = "update"
1094
1095
if not ack :
1095
- _check_doc_size_limits (real_op_type , op_doc ["updateMods" ], max_bson_size )
1096
+ doc_size = len (_dict_to_bson (op_doc ["updateMods" ], False , opts ))
1097
+ _check_doc_size_limits (real_op_type , doc_size , max_bson_size )
1096
1098
1097
1099
ns_doc_to_send = None
1098
1100
ns_length = 0
@@ -1108,30 +1110,40 @@ def _check_doc_size_limits(
1108
1110
op_doc_to_send [op_type ] = ns_info [namespace ] # type: ignore[index]
1109
1111
1110
1112
# Encode current operation doc and, if newly added, namespace doc.
1111
- op_length = len (_dict_to_bson (op_doc_to_send , False , opts ))
1113
+ op_doc_encoded = _dict_to_bson (op_doc_to_send , False , opts )
1114
+ op_length = len (op_doc_encoded )
1112
1115
if ns_doc_to_send :
1113
- ns_length = len (_dict_to_bson (ns_doc_to_send , False , opts ))
1116
+ ns_doc_encoded = _dict_to_bson (ns_doc_to_send , False , opts )
1117
+ ns_length = len (ns_doc_encoded )
1114
1118
1115
1119
# Check operation document size if unacknowledged.
1116
1120
if not ack :
1117
- _check_doc_size_limits (op_type , op_doc_to_send , max_bson_size + _COMMAND_OVERHEAD )
1121
+ _check_doc_size_limits (op_type , op_length , max_bson_size + _COMMAND_OVERHEAD )
1118
1122
1119
1123
new_message_size = total_ops_length + total_ns_length + op_length + ns_length
1120
1124
# We have enough data, return this batch.
1121
1125
if new_message_size > max_doc_sequences_bytes :
1122
1126
break
1127
+
1128
+ # Add op and ns documents to this batch.
1123
1129
to_send_ops .append (op_doc_to_send )
1130
+ to_send_ops_encoded .append (op_doc_encoded )
1124
1131
total_ops_length += op_length
1125
1132
if ns_doc_to_send :
1126
1133
to_send_ns .append (ns_doc_to_send )
1134
+ to_send_ns_encoded .append (ns_doc_encoded )
1127
1135
total_ns_length += ns_length
1136
+
1128
1137
idx += 1
1138
+
1129
1139
# We have enough documents, return this batch.
1130
1140
if idx == max_write_batch_size :
1131
1141
break
1132
1142
1133
1143
# Construct the entire OP_MSG.
1134
- length = _client_construct_op_msg (command , to_send_ops , to_send_ns , ack , opts , buf )
1144
+ length = _client_construct_op_msg (
1145
+ command_encoded , to_send_ops_encoded , to_send_ns_encoded , ack , buf
1146
+ )
1135
1147
1136
1148
return to_send_ops , to_send_ns , length
1137
1149
0 commit comments