@@ -155,10 +155,12 @@ defmodule Mongo.BulkWrite do
155
155
"""
156
156
157
157
import Mongo.Utils
158
+ import Mongo.WriteConcern
158
159
alias Mongo.UnorderedBulk
159
160
alias Mongo.OrderedBulk
160
161
alias Mongo.BulkWriteResult
161
162
alias Mongo.Session.ServerSession
163
+ alias Mongo.Session
162
164
alias Mongo.Topology
163
165
164
166
@ doc """
@@ -182,15 +184,12 @@ defmodule Mongo.BulkWrite do
182
184
@ spec write ( GenServer . server , ( UnorderedBulk . t | OrderedBulk . t ) , Keyword . t ) :: Mongo.BulkWriteResult . t
183
185
def write ( topology_pid , % UnorderedBulk { } = bulk , opts ) do
184
186
185
- write_concern = write_concern ( opts )
186
-
187
- with { :ok , conn , _ , _ , session_id } <- Topology . select_server ( topology_pid , :write , opts ) ,
188
- result = one_bulk_write ( conn , bulk , write_concern , Keyword . merge ( opts , [ lsid: session_id ] ) ) ,
189
- :ok <- checkin_session_id ( topology_pid , Keyword . get ( opts , :lsid , :implicit ) , session_id ) do
187
+ with { :ok , session } <- Session . start_implicit_session ( topology_pid , :write , opts ) ,
188
+ result = one_bulk_write ( session , bulk , opts ) ,
189
+ :ok <- Session . end_implict_session ( topology_pid , session ) do
190
190
result
191
191
else
192
- { :new_connection , _server } ->
193
- write ( topology_pid , bulk , opts )
192
+ { :new_connection , _server } -> write ( topology_pid , bulk , opts )
194
193
end
195
194
196
195
end
@@ -199,36 +198,23 @@ defmodule Mongo.BulkWrite do
199
198
200
199
write_concern = write_concern ( opts )
201
200
202
- empty = % BulkWriteResult { acknowledged: acknowledged ( write_concern ) }
201
+ empty = % BulkWriteResult { acknowledged: acknowledged? ( write_concern ) }
203
202
204
- with { :ok , conn , _ , _ , session_id } <- Topology . select_server ( topology_pid , :write , opts ) ,
205
- { :ok , limits } <- Mongo . limits ( conn ) ,
203
+ with { :ok , session } <- Session . start_implicit_session ( topology_pid , :write , opts ) ,
204
+ { :ok , limits } <- get_limits ( session ) ,
206
205
max_batch_size <- limits . max_write_batch_size ,
207
206
result = ops
208
207
|> get_op_sequence ( )
209
- |> Enum . map ( fn { cmd , docs } -> one_bulk_write_operation ( conn , cmd , coll , docs , write_concern , max_batch_size , Keyword . merge ( opts , [ lsid: session_id ] ) ) end )
210
- |> BulkWriteResult . reduce ( empty ) ,
211
- :ok <- checkin_session_id ( topology_pid , Keyword . get ( opts , :lsid , :implicit ) , session_id ) do
208
+ |> Enum . map ( fn { cmd , docs } -> one_bulk_write_operation ( session , cmd , coll , docs , max_batch_size , opts ) end )
209
+ |> BulkWriteResult . reduce ( empty ) do
212
210
213
211
result
214
212
else
215
- { :new_connection , _server } ->
216
- write ( topology_pid , bulk , opts )
213
+ { :new_connection , _server } -> write ( topology_pid , bulk , opts )
217
214
end
218
215
219
216
end
220
217
221
- ##
222
- # returns the current write concerns from `opts`
223
- #
224
- defp write_concern ( opts ) do
225
- % {
226
- w: Keyword . get ( opts , :w ) ,
227
- j: Keyword . get ( opts , :j ) ,
228
- wtimeout: Keyword . get ( opts , :wtimeout )
229
- } |> filter_nils ( )
230
- end
231
-
232
218
##
233
219
# Executes one unordered bulk write. The execution order of operation groups is
234
220
#
@@ -239,47 +225,34 @@ defmodule Mongo.BulkWrite do
239
225
# The function returns a keyword list with the results of each operation group:
240
226
# For the details see https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#results
241
227
#
242
- defp one_bulk_write ( conn , % UnorderedBulk { coll: coll , inserts: inserts , updates: updates , deletes: deletes } , write_concern , opts ) do
228
+ defp one_bulk_write ( session , % UnorderedBulk { coll: coll , inserts: inserts , updates: updates , deletes: deletes } , opts ) do
243
229
244
- with { :ok , limits } <- Mongo . limits ( conn ) ,
230
+ with { :ok , limits } <- get_limits ( session ) ,
245
231
max_batch_size <- limits . max_write_batch_size ,
246
- insert_result <- one_bulk_write_operation ( conn , :insert , coll , inserts , write_concern , max_batch_size , opts ) ,
247
- update_result <- one_bulk_write_operation ( conn , :update , coll , updates , write_concern , max_batch_size , opts ) ,
248
- delete_result <- one_bulk_write_operation ( conn , :delete , coll , deletes , write_concern , max_batch_size , opts ) do
232
+ insert_result <- one_bulk_write_operation ( session , :insert , coll , inserts , max_batch_size , opts ) ,
233
+ update_result <- one_bulk_write_operation ( session , :update , coll , updates , max_batch_size , opts ) ,
234
+ delete_result <- one_bulk_write_operation ( session , :delete , coll , deletes , max_batch_size , opts ) do
249
235
250
236
[ insert_result , update_result , delete_result ]
251
- |> BulkWriteResult . reduce ( % BulkWriteResult { acknowledged: acknowledged ( write_concern ) } )
237
+ |> BulkWriteResult . reduce ( % BulkWriteResult { acknowledged: acknowledged? ( opts ) } )
252
238
end
253
239
end
254
240
255
- defp update_session_id ( cmd , nil ) do
256
- cmd
257
- end
258
- defp update_session_id ( cmd , % ServerSession { :session_id => session_id } ) do
259
- Keyword . merge ( cmd , [ lsid: % { id: session_id } ] )
260
- end
261
-
262
- defp checkin_session_id ( topology_pid , :implicit , session_id ) , do: Topology . checkin_session_id ( topology_pid , session_id )
263
- defp checkin_session_id ( _ , _ , _ ) , do: :ok
264
-
265
241
###
266
242
# Executes the command `cmd` and collects the result.
267
243
#
268
- defp one_bulk_write_operation ( conn , cmd , coll , docs , write_concern , max_batch_size , opts ) do
269
- with result <- conn |> run_commands ( get_cmds ( cmd , coll , docs , write_concern , max_batch_size , opts ) , opts ) |> collect ( cmd ) do
244
+ defp one_bulk_write_operation ( session , cmd , coll , docs , max_batch_size , opts ) do
245
+ with result <- session |> run_commands ( get_cmds ( cmd , coll , docs , max_batch_size , opts ) , opts ) |> collect ( cmd ) do
270
246
result
271
247
end
272
248
end
273
249
274
250
##
275
251
# Converts the list of operations into insert/update/delete commands
276
252
#
277
- defp get_cmds ( :insert , coll , docs , write_concern , max_batch_size , opts ) , do: get_insert_cmds ( coll , docs , write_concern , max_batch_size , opts )
278
- defp get_cmds ( :update , coll , docs , write_concern , max_batch_size , opts ) , do: get_update_cmds ( coll , docs , write_concern , max_batch_size , opts )
279
- defp get_cmds ( :delete , coll , docs , write_concern , max_batch_size , opts ) , do: get_delete_cmds ( coll , docs , write_concern , max_batch_size , opts )
280
-
281
- defp acknowledged ( % { w: w } ) when w > 0 , do: true
282
- defp acknowledged ( % { } ) , do: false
253
+ defp get_cmds ( :insert , coll , docs , max_batch_size , opts ) , do: get_insert_cmds ( coll , docs , max_batch_size , opts )
254
+ defp get_cmds ( :update , coll , docs , max_batch_size , opts ) , do: get_update_cmds ( coll , docs , max_batch_size , opts )
255
+ defp get_cmds ( :delete , coll , docs , max_batch_size , opts ) , do: get_delete_cmds ( coll , docs , max_batch_size , opts )
283
256
284
257
###
285
258
# Converts the list of operations into list of lists with same operations.
@@ -357,48 +330,47 @@ defmodule Mongo.BulkWrite do
357
330
defp filter_upsert_ids ( nil ) , do: [ ]
358
331
defp filter_upsert_ids ( upserted ) , do: Enum . map ( upserted , fn doc -> doc [ "_id" ] end )
359
332
360
- defp run_commands ( conn , { cmds , ids } , opts ) do
361
- { Enum . map ( cmds , fn cmd -> Mongo . exec_command ( conn , cmd , opts ) end ) , ids }
333
+ defp run_commands ( session , { cmds , ids } , opts ) do
334
+ { Enum . map ( cmds , fn cmd -> Mongo . exec_command_session ( session , cmd , opts ) end ) , ids }
362
335
end
363
- defp run_commands ( conn , cmds , opts ) do
364
- Enum . map ( cmds , fn cmd -> Mongo . exec_command ( conn , cmd , opts ) end )
336
+ defp run_commands ( session , cmds , opts ) do
337
+ Enum . map ( cmds , fn cmd -> Mongo . exec_command_session ( session , cmd , opts ) end )
365
338
end
366
339
367
- defp get_insert_cmds ( coll , docs , write_concern , max_batch_size , opts ) do
340
+ defp get_insert_cmds ( coll , docs , max_batch_size , opts ) do
368
341
369
342
{ ids , docs } = assign_ids ( docs )
370
343
371
344
cmds = docs
372
345
|> Enum . chunk_every ( max_batch_size )
373
- |> Enum . map ( fn inserts -> get_insert_cmd ( coll , inserts , write_concern ) end )
374
- |> Enum . map ( fn cmd -> update_session_id ( cmd , Keyword . get ( opts , :lsid ) ) end )
346
+ |> Enum . map ( fn inserts -> get_insert_cmd ( coll , inserts , opts ) end )
347
+
375
348
{ cmds , ids }
376
349
377
350
end
378
351
379
- defp get_insert_cmd ( coll , inserts , write_concern ) do
352
+ defp get_insert_cmd ( coll , inserts , opts ) do
380
353
381
354
[ insert: coll ,
382
355
documents: inserts ,
383
- writeConcern: write_concern ] |> filter_nils ( )
356
+ writeConcern: write_concern ( opts ) ] |> filter_nils ( )
384
357
385
358
end
386
359
387
- defp get_delete_cmds ( coll , docs , write_concern , max_batch_size , opts ) do
360
+ defp get_delete_cmds ( coll , docs , max_batch_size , opts ) do
388
361
389
362
docs
390
363
|> Enum . chunk_every ( max_batch_size )
391
- |> Enum . map ( fn deletes -> get_delete_cmd ( coll , deletes , write_concern , opts ) end )
392
- |> Enum . map ( fn cmd -> update_session_id ( cmd , Keyword . get ( opts , :lsid ) ) end )
364
+ |> Enum . map ( fn deletes -> get_delete_cmd ( coll , deletes , opts ) end )
393
365
394
366
end
395
367
396
- defp get_delete_cmd ( coll , deletes , write_concern , opts ) do
368
+ defp get_delete_cmd ( coll , deletes , opts ) do
397
369
398
370
[ delete: coll ,
399
371
deletes: Enum . map ( deletes , fn delete -> get_delete_doc ( delete ) end ) ,
400
372
ordered: Keyword . get ( opts , :ordered ) ,
401
- writeConcern: write_concern ] |> filter_nils ( )
373
+ writeConcern: write_concern ( opts ) ] |> filter_nils ( )
402
374
403
375
end
404
376
@@ -410,21 +382,20 @@ defmodule Mongo.BulkWrite do
410
382
411
383
end
412
384
413
- defp get_update_cmds ( coll , docs , write_concern , max_batch_size , opts ) do
385
+ defp get_update_cmds ( coll , docs , max_batch_size , opts ) do
414
386
415
387
docs
416
388
|> Enum . chunk_every ( max_batch_size )
417
- |> Enum . map ( fn updates -> get_update_cmd ( coll , updates , write_concern , opts ) end )
418
- |> Enum . map ( fn cmd -> update_session_id ( cmd , Keyword . get ( opts , :lsid ) ) end )
389
+ |> Enum . map ( fn updates -> get_update_cmd ( coll , updates , opts ) end )
419
390
420
391
end
421
392
422
- defp get_update_cmd ( coll , updates , write_concern , opts ) do
393
+ defp get_update_cmd ( coll , updates , opts ) do
423
394
424
395
[ update: coll ,
425
396
updates: Enum . map ( updates , fn update -> get_update_doc ( update ) end ) ,
426
397
ordered: Keyword . get ( opts , :ordered ) ,
427
- writeConcern: write_concern ,
398
+ writeConcern: write_concern ( opts ) ,
428
399
bypassDocumentValidation: Keyword . get ( opts , :bypass_document_validation )
429
400
] |> filter_nils ( )
430
401
@@ -442,4 +413,10 @@ defmodule Mongo.BulkWrite do
442
413
443
414
end
444
415
416
+ defp get_limits ( session ) do
417
+ with conn <- Session . connection ( session ) do
418
+ Mongo . limits ( conn )
419
+ end
420
+ end
421
+
445
422
end
0 commit comments