@@ -56,20 +56,24 @@ async def test_returns_error_if_no_namespace_provided(self):
56
56
57
57
# https://github.com/mongodb/specifications/tree/master/source/crud/tests
58
58
class TestClientBulkWriteCRUD (AsyncIntegrationTest ):
59
+ async def asyncSetUp (self ):
60
+ self .max_write_batch_size = await async_client_context .max_write_batch_size
61
+ self .max_bson_object_size = await async_client_context .max_bson_size
62
+ self .max_message_size_bytes = await async_client_context .max_message_size_bytes
63
+
59
64
@async_client_context .require_version_min (8 , 0 , 0 , - 24 )
60
65
async def test_batch_splits_if_num_operations_too_large (self ):
61
66
listener = OvertCommandListener ()
62
67
client = await async_rs_or_single_client (event_listeners = [listener ])
63
68
self .addAsyncCleanup (client .aclose )
64
69
65
- max_write_batch_size = (await async_client_context .hello )["maxWriteBatchSize" ]
66
70
models = []
67
- for _ in range (max_write_batch_size + 1 ):
71
+ for _ in range (self . max_write_batch_size + 1 ):
68
72
models .append (InsertOne (namespace = "db.coll" , document = {"a" : "b" }))
69
73
self .addAsyncCleanup (client .db ["coll" ].drop )
70
74
71
75
result = await client .bulk_write (models = models )
72
- self .assertEqual (result .inserted_count , max_write_batch_size + 1 )
76
+ self .assertEqual (result .inserted_count , self . max_write_batch_size + 1 )
73
77
74
78
bulk_write_events = []
75
79
for event in listener .started_events :
@@ -78,7 +82,7 @@ async def test_batch_splits_if_num_operations_too_large(self):
78
82
self .assertEqual (len (bulk_write_events ), 2 )
79
83
80
84
first_event , second_event = bulk_write_events
81
- self .assertEqual (len (first_event .command ["ops" ]), max_write_batch_size )
85
+ self .assertEqual (len (first_event .command ["ops" ]), self . max_write_batch_size )
82
86
self .assertEqual (len (second_event .command ["ops" ]), 1 )
83
87
self .assertEqual (first_event .operation_id , second_event .operation_id )
84
88
@@ -88,12 +92,9 @@ async def test_batch_splits_if_ops_payload_too_large(self):
88
92
client = await async_rs_or_single_client (event_listeners = [listener ])
89
93
self .addAsyncCleanup (client .aclose )
90
94
91
- max_message_size_bytes = (await async_client_context .hello )["maxMessageSizeBytes" ]
92
- max_bson_object_size = (await async_client_context .hello )["maxBsonObjectSize" ]
93
-
94
95
models = []
95
- num_models = int (max_message_size_bytes / max_bson_object_size + 1 )
96
- b_repeated = "b" * (max_bson_object_size - 500 )
96
+ num_models = int (self . max_message_size_bytes / self . max_bson_object_size + 1 )
97
+ b_repeated = "b" * (self . max_bson_object_size - 500 )
97
98
for _ in range (num_models ):
98
99
models .append (
99
100
InsertOne (
@@ -126,7 +127,6 @@ async def test_collects_write_concern_errors_across_batches(self):
126
127
retryWrites = False ,
127
128
)
128
129
self .addAsyncCleanup (client .aclose )
129
- max_write_batch_size = (await async_client_context .hello )["maxWriteBatchSize" ]
130
130
131
131
fail_command = {
132
132
"configureFailPoint" : "failCommand" ,
@@ -138,7 +138,7 @@ async def test_collects_write_concern_errors_across_batches(self):
138
138
}
139
139
async with self .fail_point (fail_command ):
140
140
models = []
141
- for _ in range (max_write_batch_size + 1 ):
141
+ for _ in range (self . max_write_batch_size + 1 ):
142
142
models .append (
143
143
InsertOne (
144
144
namespace = "db.coll" ,
@@ -152,7 +152,7 @@ async def test_collects_write_concern_errors_across_batches(self):
152
152
self .assertEqual (len (context .exception .write_concern_errors ), 2 ) # type: ignore[arg-type]
153
153
self .assertIsNotNone (context .exception .partial_result )
154
154
self .assertEqual (
155
- context .exception .partial_result .inserted_count , max_write_batch_size + 1
155
+ context .exception .partial_result .inserted_count , self . max_write_batch_size + 1
156
156
)
157
157
158
158
bulk_write_events = []
@@ -172,9 +172,8 @@ async def test_collects_write_errors_across_batches_unordered(self):
172
172
await collection .drop ()
173
173
await collection .insert_one (document = {"_id" : 1 })
174
174
175
- max_write_batch_size = (await async_client_context .hello )["maxWriteBatchSize" ]
176
175
models = []
177
- for _ in range (max_write_batch_size + 1 ):
176
+ for _ in range (self . max_write_batch_size + 1 ):
178
177
models .append (
179
178
InsertOne (
180
179
namespace = "db.coll" ,
@@ -184,7 +183,7 @@ async def test_collects_write_errors_across_batches_unordered(self):
184
183
185
184
with self .assertRaises (ClientBulkWriteException ) as context :
186
185
await client .bulk_write (models = models , ordered = False )
187
- self .assertEqual (len (context .exception .write_errors ), max_write_batch_size + 1 ) # type: ignore[arg-type]
186
+ self .assertEqual (len (context .exception .write_errors ), self . max_write_batch_size + 1 ) # type: ignore[arg-type]
188
187
189
188
bulk_write_events = []
190
189
for event in listener .started_events :
@@ -203,9 +202,8 @@ async def test_collects_write_errors_across_batches_ordered(self):
203
202
await collection .drop ()
204
203
await collection .insert_one (document = {"_id" : 1 })
205
204
206
- max_write_batch_size = (await async_client_context .hello )["maxWriteBatchSize" ]
207
205
models = []
208
- for _ in range (max_write_batch_size + 1 ):
206
+ for _ in range (self . max_write_batch_size + 1 ):
209
207
models .append (
210
208
InsertOne (
211
209
namespace = "db.coll" ,
@@ -233,10 +231,9 @@ async def test_handles_cursor_requiring_getMore(self):
233
231
self .addAsyncCleanup (collection .drop )
234
232
await collection .drop ()
235
233
236
- max_bson_object_size = (await async_client_context .hello )["maxBsonObjectSize" ]
237
234
models = []
238
- a_repeated = "a" * (max_bson_object_size // 2 )
239
- b_repeated = "b" * (max_bson_object_size // 2 )
235
+ a_repeated = "a" * (self . max_bson_object_size // 2 )
236
+ b_repeated = "b" * (self . max_bson_object_size // 2 )
240
237
models .append (
241
238
UpdateOne (
242
239
namespace = "db.coll" ,
@@ -275,12 +272,11 @@ async def test_handles_cursor_requiring_getMore_within_transaction(self):
275
272
self .addAsyncCleanup (collection .drop )
276
273
await collection .drop ()
277
274
278
- max_bson_object_size = (await async_client_context .hello )["maxBsonObjectSize" ]
279
275
async with client .start_session () as session :
280
276
await session .start_transaction ()
281
277
models = []
282
- a_repeated = "a" * (max_bson_object_size // 2 )
283
- b_repeated = "b" * (max_bson_object_size // 2 )
278
+ a_repeated = "a" * (self . max_bson_object_size // 2 )
279
+ b_repeated = "b" * (self . max_bson_object_size // 2 )
284
280
models .append (
285
281
UpdateOne (
286
282
namespace = "db.coll" ,
@@ -319,16 +315,15 @@ async def test_handles_getMore_error(self):
319
315
self .addAsyncCleanup (collection .drop )
320
316
await collection .drop ()
321
317
322
- max_bson_object_size = (await async_client_context .hello )["maxBsonObjectSize" ]
323
318
fail_command = {
324
319
"configureFailPoint" : "failCommand" ,
325
320
"mode" : {"times" : 1 },
326
321
"data" : {"failCommands" : ["getMore" ], "errorCode" : 8 },
327
322
}
328
323
async with self .fail_point (fail_command ):
329
324
models = []
330
- a_repeated = "a" * (max_bson_object_size // 2 )
331
- b_repeated = "b" * (max_bson_object_size // 2 )
325
+ a_repeated = "a" * (self . max_bson_object_size // 2 )
326
+ b_repeated = "b" * (self . max_bson_object_size // 2 )
332
327
models .append (
333
328
UpdateOne (
334
329
namespace = "db.coll" ,
@@ -370,8 +365,7 @@ async def test_returns_error_if_unacknowledged_too_large_insert(self):
370
365
client = await async_rs_or_single_client (event_listeners = [listener ])
371
366
self .addAsyncCleanup (client .aclose )
372
367
373
- max_bson_object_size = (await async_client_context .hello )["maxBsonObjectSize" ]
374
- b_repeated = "b" * max_bson_object_size
368
+ b_repeated = "b" * self .max_bson_object_size
375
369
376
370
# Insert document.
377
371
models_insert = [InsertOne (namespace = "db.coll" , document = {"a" : b_repeated })]
@@ -384,25 +378,35 @@ async def test_returns_error_if_unacknowledged_too_large_insert(self):
384
378
await client .bulk_write (models = models_replace , write_concern = WriteConcern (w = 0 ))
385
379
386
380
async def _setup_namespace_test_models (self ):
387
- max_message_size_bytes = (await async_client_context .hello )["maxMessageSizeBytes" ]
388
- max_bson_object_size = (await async_client_context .hello )["maxBsonObjectSize" ]
389
-
390
- ops_bytes = max_message_size_bytes - 1122
391
- num_models = ops_bytes // max_bson_object_size
392
- remainder_bytes = ops_bytes % max_bson_object_size
381
+ # See prose test specification below for details on these calculations.
382
+ # https://github.com/mongodb/specifications/tree/master/source/crud/tests#details-on-size-calculations
383
+ _EXISTING_BULK_WRITE_BYTES = 1122
384
+ _OPERATION_DOC_BYTES = 57
385
+ _NAMESPACE_DOC_BYTES = 217
386
+
387
+ # When compression is enabled, max_message_size is
388
+ # smaller to account for compression message header.
389
+ if async_client_context .client_options .get ("compressors" ):
390
+ max_message_size_bytes = self .max_message_size_bytes - 16
391
+ else :
392
+ max_message_size_bytes = self .max_message_size_bytes
393
+
394
+ ops_bytes = max_message_size_bytes - _EXISTING_BULK_WRITE_BYTES
395
+ num_models = ops_bytes // self .max_bson_object_size
396
+ remainder_bytes = ops_bytes % self .max_bson_object_size
393
397
394
398
models = []
395
- b_repeated = "b" * (max_bson_object_size - 57 )
399
+ b_repeated = "b" * (self . max_bson_object_size - _OPERATION_DOC_BYTES )
396
400
for _ in range (num_models ):
397
401
models .append (
398
402
InsertOne (
399
403
namespace = "db.coll" ,
400
404
document = {"a" : b_repeated },
401
405
)
402
406
)
403
- if remainder_bytes >= 217 :
407
+ if remainder_bytes >= _NAMESPACE_DOC_BYTES :
404
408
num_models += 1
405
- b_repeated = "b" * (remainder_bytes - 57 )
409
+ b_repeated = "b" * (remainder_bytes - _OPERATION_DOC_BYTES )
406
410
models .append (
407
411
InsertOne (
408
412
namespace = "db.coll" ,
@@ -485,17 +489,15 @@ async def test_returns_error_if_no_writes_can_be_added_to_ops(self):
485
489
client = await async_rs_or_single_client ()
486
490
self .addAsyncCleanup (client .aclose )
487
491
488
- max_message_size_bytes = (await async_client_context .hello )["maxMessageSizeBytes" ]
489
-
490
492
# Document too large.
491
- b_repeated = "b" * max_message_size_bytes
493
+ b_repeated = "b" * self . max_message_size_bytes
492
494
models = [InsertOne (namespace = "db.coll" , document = {"a" : b_repeated })]
493
495
with self .assertRaises (InvalidOperation ) as context :
494
496
await client .bulk_write (models = models )
495
497
self .assertIn ("cannot do an empty bulk write" , context .exception ._message )
496
498
497
499
# Namespace too large.
498
- c_repeated = "c" * max_message_size_bytes
500
+ c_repeated = "c" * self . max_message_size_bytes
499
501
namespace = f"db.{ c_repeated } "
500
502
models = [InsertOne (namespace = namespace , document = {"a" : "b" })]
501
503
with self .assertRaises (InvalidOperation ) as context :
@@ -522,27 +524,32 @@ async def test_returns_error_if_auto_encryption_configured(self):
522
524
523
525
# https://github.com/mongodb/specifications/blob/master/source/client-side-operations-timeout/tests/README.md#11-multi-batch-bulkwrites
524
526
class TestClientBulkWriteTimeout (AsyncIntegrationTest ):
527
+ async def asyncSetUp (self ):
528
+ self .max_write_batch_size = await async_client_context .max_write_batch_size
529
+ self .max_bson_object_size = await async_client_context .max_bson_size
530
+ self .max_message_size_bytes = await async_client_context .max_message_size_bytes
531
+
525
532
@async_client_context .require_version_min (8 , 0 , 0 , - 24 )
526
533
@async_client_context .require_failCommand_fail_point
527
534
async def test_timeout_in_multi_batch_bulk_write (self ):
535
+ _OVERHEAD = 500
536
+
528
537
internal_client = await async_rs_or_single_client (timeoutMS = None )
529
538
self .addAsyncCleanup (internal_client .aclose )
530
539
531
540
collection = internal_client .db ["coll" ]
532
541
self .addAsyncCleanup (collection .drop )
533
542
await collection .drop ()
534
543
535
- max_bson_object_size = (await async_client_context .hello )["maxBsonObjectSize" ]
536
- max_message_size_bytes = (await async_client_context .hello )["maxMessageSizeBytes" ]
537
544
fail_command = {
538
545
"configureFailPoint" : "failCommand" ,
539
546
"mode" : {"times" : 2 },
540
547
"data" : {"failCommands" : ["bulkWrite" ], "blockConnection" : True , "blockTimeMS" : 1010 },
541
548
}
542
549
async with self .fail_point (fail_command ):
543
550
models = []
544
- num_models = int (max_message_size_bytes / max_bson_object_size + 1 )
545
- b_repeated = "b" * (max_bson_object_size - 500 )
551
+ num_models = int (self . max_message_size_bytes / self . max_bson_object_size + 1 )
552
+ b_repeated = "b" * (self . max_bson_object_size - _OVERHEAD )
546
553
for _ in range (num_models ):
547
554
models .append (
548
555
InsertOne (
0 commit comments