Skip to content

Commit 2852c64

Browse files
committed
added support for write results
1 parent 186700e commit 2852c64

File tree

5 files changed

+206
-94
lines changed

5 files changed

+206
-94
lines changed

CHANGELOG.md

Lines changed: 2 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,10 @@
1-
## v0.5.5
2-
3-
* Enhancements
4-
* Added support for bulk writes (ordered/unordered, in-memory/stream)
5-
* Added support for `op_msg` with payload type 1
6-
71
## v0.5.4
82

93
* Enhancements
104
* The driver provides now client metadata
115
* Added support for connecting via UNIX sockets (`:socket` and `:socket_dir`)
6+
* Added support for bulk writes (ordered/unordered, in-memory/stream)
7+
* Added support for `op_msg` with payload type 1
128

139
## v0.5.3
1410

README.md

Lines changed: 36 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -191,25 +191,27 @@ end
191191
spawn(fn -> for_ever(top, self()) end)
192192
```
193193

194+
For more information see
195+
196+
* [Mongo.watch](https://hexdocs.pm/mongodb_driver/Mongo.Cursor.html#content)
197+
194198

195199
### Bulk writes
196200

197-
Die Motivation für bulk writes liegt in der Optimierungsmöglichkeit, gleiche Operationen
198-
zu gruppieren. Dabei wird zwischen ungeordneten und geordneten Bulk writes unterschieden.
199-
Bei ungeordneten werden Inserts, Updates und Deletes gruppiert und als einzelne Befehle
200-
zur Datenbank geschickt. Dabei gibt es keinen Einfluss auf die Reihenfolge
201-
der Ausführungen. Ein guter Anwendungsfall ist der Import von Datensätzen aus einer
202-
CSV-Datei. Die Reihenfolge der Inserts spielt keine Rolle.
201+
The motivation for bulk writes lies in the possibility of optimization, the same operations
202+
to group. Here, a distinction is made between disordered and ordered bulk writes.
203+
In disordered, inserts, updates, and deletes are grouped as individual commands
204+
sent to the database. There is no influence on the order of the execution.
205+
A good use case is the import of records from one CSV file.
206+
The order of the inserts does not matter.
203207

204-
Bei geordneten Bulk writes ist die Einhaltung der Reihenfolge wichtig, damit aus
205-
Ausführungen korrekt sind. In diesem Fall werden nur die gleiche aufeinander folgenden Operationen
206-
gruppiert.
208+
For ordered bulk writers, order compliance is important to keep.
209+
In this case, only the same consecutive operations are grouped.
207210

208-
Aktuell werden alle Bulk writes im Speicher optimiert. Dies ist für große Bulk writes ungünstig.
209-
In diesem Fall kann man streaming bulk writes verwenden, die nur einen gewissen Satz von
210-
Operation im Speicher gruppieren und sofern die maximale Anzahl von Operationen
211-
erreicht wurde, die Schreiboperationen zur Datenbank schicken. Die Anzahl
212-
kann vorgegeben werden.
211+
Currently, all bulk writes are optimized in memory. This is unfavorable for large bulk writes.
212+
In this case, one can use streaming bulk writes that only have a certain set of
213+
group operation in memory and when the maximum number of operations
214+
has been reached, operations are written to the database. The size can be specified.
213215

214216
Using ordered bulk writes. In this example we first insert some dog's name, add an attribute `kind`
215217
and change all dogs to cats. After that we delete three cats. This example would not work with
@@ -218,19 +220,19 @@ unordered bulk writes.
218220
```elixir
219221

220222
bulk = "bulk"
221-
|> new()
222-
|> insert_one(%{name: "Greta"})
223-
|> insert_one(%{name: "Tom"})
224-
|> insert_one(%{name: "Waldo"})
225-
|> update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
226-
|> update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
227-
|> update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
228-
|> update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
229-
|> delete_one(%{kind: "cat"})
230-
|> delete_one(%{kind: "cat"})
231-
|> delete_one(%{kind: "cat"})
232-
233-
result = Mongo.BulkWrite.bulk_write(:mongo, bulk, w: 1)
223+
|> OrderedBulk.new()
224+
|> OrderedBulk.insert_one(%{name: "Greta"})
225+
|> OrderedBulk.insert_one(%{name: "Tom"})
226+
|> OrderedBulk.insert_one(%{name: "Waldo"})
227+
|> OrderedBulk.update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
228+
|> OrderedBulk.update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
229+
|> OrderedBulk.update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
230+
|> OrderedBulk.update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
231+
|> OrderedBulk.delete_one(%{kind: "cat"})
232+
|> OrderedBulk.delete_one(%{kind: "cat"})
233+
|> OrderedBulk.delete_one(%{kind: "cat"})
234+
235+
result = Mongo.BulkWrite.write(:mongo, bulk, w: 1)
234236
```
235237

236238
In the following example we import 1.000.000 integers into the MongoDB using the stream api:
@@ -247,6 +249,13 @@ importing big volume of data.
247249
|> Mongo.UnorderedBulk.write(:mongo, "bulk", 1_000)
248250
|> Stream.run()
249251
```
252+
253+
For more information see and check the test units for examples.
254+
* [Mongo.UnorderedBulk](https://hexdocs.pm/mongodb_driver/Mongo.Cursor.html#content)
255+
* [Mongo.OrderedBulk](https://hexdocs.pm/mongodb_driver/Mongo.Cursor.html#content)
256+
* [Mongo.BulkWrites](https://hexdocs.pm/mongodb_driver/Mongo.Cursor.html#content)
257+
* [Mongo.BulkOps](https://hexdocs.pm/mongodb_driver/Mongo.Cursor.html#content)
258+
250259
### Examples
251260

252261
Using `$and`

lib/mongo.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -571,6 +571,7 @@ defmodule Mongo do
571571
end
572572

573573
@doc false
574+
## refactor: exec_command
574575
@spec direct_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
575576
def direct_command(conn, cmd, opts) do
576577
action = %Query{action: :command}

lib/mongo/bulk_write.ex

Lines changed: 107 additions & 61 deletions
Original file line numberDiff line numberDiff line change
@@ -187,16 +187,52 @@ defmodule Mongo.BulkWrite do
187187
one_bulk_write(conn, bulk, write_concern, opts)
188188
end
189189
end
190-
def write(topology_pid, %OrderedBulk{coll: coll, ops: ops} = bulk, opts) do
190+
def write(topology_pid, %OrderedBulk{coll: coll, ops: ops}, opts) do
191191

192192
write_concern = write_concern(opts)
193+
194+
empty = %{
195+
acknowledged: acknowledged(write_concern),
196+
insertedCount: 0,
197+
matchedCount: 0,
198+
deletedCount: 0,
199+
upsertedCount: 0,
200+
modifiedCount: 0,
201+
upsertedIds: [],
202+
insertedIds: [],
203+
}
204+
193205
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts) do
206+
194207
get_op_sequence(coll, ops)
195208
|> Enum.map(fn {cmd, docs} -> one_bulk_write_operation(conn, cmd, coll, docs, write_concern, opts) end)
196-
|> Enum.each(fn {cmd, count} -> IO.puts "#{cmd} : #{count}" end) # todo collect results
209+
|> Enum.reduce(empty, fn
210+
{cmd, {count, ids}}, acc -> merge(cmd, count, ids, acc)
211+
{cmd, {mat, mod, ups, ids}}, acc -> merge(cmd, mat, mod, ups, ids, acc)
212+
{cmd, count}, acc -> merge(cmd, count, acc)
213+
214+
end)
197215
end
198216
end
199217

218+
defp merge(:insert, count, ids, %{:insertedCount => value, :insertedIds => current_ids} = result) do
219+
%{result | insertedCount: value + count, insertedIds: current_ids ++ ids}
220+
end
221+
defp merge(:update, matched, modified, upserts, ids,
222+
%{:matchedCount => matched_count,
223+
:modifiedCount => modified_count,
224+
:upsertedCount => upserted_count,
225+
:upsertedIds => current_ids} = result) do
226+
%{result | matchedCount: matched_count + matched,
227+
modifiedCount: modified_count + modified,
228+
upsertedCount: upserted_count + upserts,
229+
upsertedIds: current_ids ++ ids}
230+
end
231+
defp merge(:delete, count, %{:deletedCount => value} = result) do
232+
%{result | deletedCount: value + count}
233+
end
234+
defp merge(_other, _count, result), do: result
235+
200236
##
201237
# returns the current write concerns from `opts`
202238
#
@@ -208,30 +244,32 @@ defmodule Mongo.BulkWrite do
208244
} |> filter_nils()
209245
end
210246

211-
@doc"""
212-
Executues one unordered bulk write. The execution order of operation groups is
247+
##
248+
# Executes one unordered bulk write. The execution order of operation groups is
249+
#
250+
# * inserts
251+
# * updates
252+
# * deletes
253+
#
254+
# The function returns a keyword list with the results of each operation group:
255+
# For the details see https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#results
256+
defp one_bulk_write(conn, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes}, write_concern, opts) do
213257

214-
* inserts
215-
* updates
216-
* deletes
258+
with {_, {inserts, ids}} <- one_bulk_write_operation(conn, :insert, coll, inserts, write_concern, opts),
259+
{_, {matched, modified, upserts, upsert_ids}} <- one_bulk_write_operation(conn, :update, coll, updates, write_concern, opts),
260+
{_, deletes} <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, opts) do
217261

218-
The function returns a keyword list with the results of each operation group:
219-
For the details see https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#results
220-
"""
221-
defp one_bulk_write(conn, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes} = bulk, write_concern, opts) do
222262

223-
with {_, inserts} <- one_bulk_write_operation(conn, :insert, coll, inserts, write_concern, opts),
224-
{_, updates} <- one_bulk_write_operation(conn, :update, coll, updates, write_concern, opts),
225-
{_, deletes} <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, opts) do
226-
[
263+
%{
227264
acknowledged: acknowledged(write_concern),
228265
insertedCount: inserts,
229-
matchedCount: updates,
266+
insertedIds: ids,
267+
matchedCount: matched,
230268
deletedCount: deletes,
231-
upsertedCount: 0,
232-
upsertedIds: [],
233-
insertedIds: [],
234-
]
269+
modifiedCount: modified,
270+
upsertedCount: upserts,
271+
upsertedIds: upsert_ids
272+
}
235273
end
236274
end
237275

@@ -262,7 +300,7 @@ defmodule Mongo.BulkWrite do
262300
defp get_op_sequence(coll, ops) do
263301
get_op_sequence(coll, ops, [])
264302
end
265-
defp get_op_sequence(coll, [], acc), do: acc
303+
defp get_op_sequence(_coll, [], acc), do: acc
266304
defp get_op_sequence(coll, ops, acc) do
267305
[{kind, _doc} | _rest] = ops
268306
{docs, rest} = find_max_sequence(kind, ops)
@@ -287,71 +325,72 @@ defmodule Mongo.BulkWrite do
287325
{acc, rest}
288326
end
289327

290-
# {
291-
#"acknowledged" : true,
292-
#"deletedCount" : 1,
293-
#"insertedCount" : 2,
294-
# "matchedCount" : 2,
295-
#"upsertedCount" : 0,
296-
#"insertedIds" : {
297-
# "0" : 4,
298-
#"1" : 5
299-
#},
300-
#"upsertedIds" : {
301-
#
302-
# }
303-
# }
304-
305-
defp collect(docs, :insert) do
306-
docs
307-
|> Enum.map(fn
308-
{:ok, %{"n" => n}} -> n
309-
{:ok, _other} -> 0
310-
end)
311-
|> Enum.reduce(0, fn x, acc -> x + acc end)
312-
end
328+
defp filter_upsert_ids(nil), do: []
329+
defp filter_upsert_ids(upserted), do: Enum.map(upserted, fn doc -> doc["_id"] end)
313330

331+
##
332+
# collects the returns values for each operation
333+
#
334+
# the update operation is more complex than insert or delete operation
335+
#
336+
defp collect({docs, ids}, :insert) do
337+
338+
{docs
339+
|> Enum.map(fn
340+
{:ok, %{"n" => n}} -> n
341+
{:ok, _other} -> 0
342+
end)
343+
|> Enum.reduce(0, fn x, acc -> x + acc end), ids}
344+
345+
end
314346
defp collect(docs, :update) do
347+
315348
docs
316349
|> Enum.map(fn
317-
{:ok, %{"n" => n}} -> n
318-
{:ok, _other} -> 0
350+
{:ok, %{"n" => n, "nModified" => modified, "upserted" => ids}} -> l = length(ids); {n - l, modified, l, filter_upsert_ids(ids)}
351+
{:ok, %{"n" => matched, "nModified" => modified}} -> {matched, modified, 0, []}
352+
{:ok, _other} -> {0, 0, 0, []}
353+
end)
354+
|> Enum.reduce({0, 0, 0, []}, fn
355+
{mat, mod, ups, ids}, {s_mat, s_mod, s_ups, all} -> {s_mat + mat, s_mod + mod, s_ups + ups, all ++ ids}
319356
end)
320-
|> Enum.reduce(0, fn x, acc -> x + acc end)
321-
end
322357

358+
end
323359
defp collect(docs, :delete) do
360+
324361
docs
325362
|> Enum.map(fn
326363
{:ok, %{"n" => n}} -> n
327364
{:ok, _other} -> 0
328365
end)
329366
|> Enum.reduce(0, fn x, acc -> x + acc end)
367+
330368
end
331369

370+
defp run_commands(conn, {cmds, ids}, opts) do
371+
{Enum.map(cmds, fn cmd -> Mongo.direct_command(conn, cmd, opts) end), ids}
372+
end
332373
defp run_commands(conn, cmds, opts) do
333-
334-
# IO.puts "Running cmds #{inspect cmds}"
335-
336-
cmds
337-
|> Enum.map(fn cmd -> Mongo.direct_command(conn, cmd, opts) end)
338-
|> Enum.map(fn {:ok, doc} -> {:ok, doc} end)
374+
Enum.map(cmds, fn cmd -> Mongo.direct_command(conn, cmd, opts) end)
339375
end
340376

341377
defp get_insert_cmds(coll, docs, write_concern, _opts) do
342378

343-
{_ids, docs} = assign_ids(docs)
379+
{ids, docs} = assign_ids(docs)
344380

345-
docs
346-
|> Enum.chunk_every(@max_batch_size)
347-
|> Enum.map(fn inserts -> get_insert_cmd(coll, inserts, write_concern) end)
381+
cmds = docs
382+
|> Enum.chunk_every(@max_batch_size)
383+
|> Enum.map(fn inserts -> get_insert_cmd(coll, inserts, write_concern) end)
384+
{cmds, ids}
348385

349386
end
350387

351388
defp get_insert_cmd(coll, inserts, write_concern) do
389+
352390
[insert: coll,
353391
documents: inserts,
354392
writeConcern: write_concern] |> filter_nils()
393+
355394
end
356395

357396
defp get_delete_cmds(coll, docs, write_concern, opts) do
@@ -363,15 +402,20 @@ defmodule Mongo.BulkWrite do
363402
end
364403

365404
defp get_delete_cmd(coll, deletes, write_concern, opts ) do
405+
366406
[delete: coll,
367407
deletes: Enum.map(deletes, fn delete -> get_delete_doc(delete) end),
368408
ordered: Keyword.get(opts, :ordered),
369409
writeConcern: write_concern] |> filter_nils()
410+
370411
end
412+
371413
defp get_delete_doc({filter, opts}) do
414+
372415
[q: filter,
373416
limit: Keyword.get(opts, :limit),
374417
collation: Keyword.get(opts, :collaction)] |> filter_nils()
418+
375419
end
376420

377421
defp get_update_cmds(coll, docs, write_concern, opts) do
@@ -383,25 +427,27 @@ defmodule Mongo.BulkWrite do
383427
end
384428

385429
defp get_update_cmd(coll, updates, write_concern, opts) do
430+
386431
[ update: coll,
387432
updates: Enum.map(updates, fn update -> get_update_doc(update) end),
388433
ordered: Keyword.get(opts, :ordered),
389434
writeConcern: write_concern,
390435
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
391436
] |> filter_nils()
437+
392438
end
393439

394440
defp get_update_doc({filter, update, update_opts}) do
441+
395442
[ q: filter,
396443
u: update,
397444
upsert: Keyword.get(update_opts, :upsert),
398445
multi: Keyword.get(update_opts, :multi) || false,
399446
collation: Keyword.get(update_opts, :collation),
400447
arrayFilters: Keyword.get(update_opts, :filters)
401448
] |> filter_nils()
449+
402450
end
403-
defp get_update_doc(_other) do
404-
[]
405-
end
451+
406452

407453
end

0 commit comments

Comments
 (0)