Skip to content

Commit 0699e7c

Browse files
authored
Merge pull request #14 from zookzook/bulk_api
added bulk writes
2 parents 335839e + 3b0e23c commit 0699e7c

18 files changed

+1357
-59
lines changed

.travis.yml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@ sudo: required
22
language: elixir
33

44
elixir:
5-
- 1.4
65
- 1.5
76
- 1.6
87
- 1.7

CHANGELOG.md

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
* Enhancements
44
* The driver provides now client metadata
55
* Added support for connecting via UNIX sockets (`:socket` and `:socket_dir`)
6+
* Added support for bulk writes (ordered/unordered, in-memory/stream)
7+
* Added support for `op_msg` with payload type 1
68
* Merged code from https://github.com/ankhers/mongodb/commit/63c20ff7e427744a5df915751adfaf6e5e39ae62
79
* Merged changes from https://github.com/ankhers/mongodb/pull/283
810
* Merged changes from https://github.com/ankhers/mongodb/pull/281
@@ -13,8 +15,8 @@
1315
* Travis now using the right MongoDB version
1416

1517
* Bug Fixes
16-
* added test unit for change streams
17-
* removed debug code from change streams
18+
* Added test unit for change streams
19+
* Removed debug code from change streams
1820

1921
## v0.5.2
2022

README.md

Lines changed: 68 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ for the individual options.
2121
* [x] Upgraded to ([DBConnection 2.x](https://github.com/elixir-ecto/db_connection))
2222
* [x] Removed depreacated op codes ([See](https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#request-opcodes))
2323
* [x] Added `op_msg` support ([See](https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#op-msg))
24+
* [x] Added bulk writes ([See](https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#write))
2425
* [ ] Add support for driver sessions ([See](https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst))
2526
* [ ] Add support driver transactions ([See](https://github.com/mongodb/specifications/blob/master/source/transactions/transactions.rst))
2627
* [ ] Add support for `op_compressed` ([See](https://github.com/mongodb/specifications/blob/master/source/compression/OP_COMPRESSED.rst))
@@ -32,13 +33,11 @@ for the individual options.
3233
* Connection pooling ([through DBConnection 2.x](https://github.com/elixir-ecto/db_connection))
3334
* Streaming cursors
3435
* Performant ObjectID generation
35-
* Follows driver specification set by 10gen
36-
* Safe (by default) and unsafe writes
3736
* Aggregation pipeline
3837
* Replica sets
3938
* Support for SCRAM-SHA-256 (MongoDB 4.x)
4039
* Support for change streams api ([See](https://github.com/mongodb/specifications/blob/master/source/change-streams/change-streams.rst))
41-
40+
* Support for bulk writes ([See](https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#write))
4241

4342
## Data representation
4443

@@ -190,6 +189,72 @@ end
190189
191190
spawn(fn -> for_ever(top, self()) end)
192191
```
192+
193+
For more information see
194+
195+
* [Mongo.watch_collection](https://hexdocs.pm/mongodb_driver/Mongo.html#watch_collection/5)
196+
197+
198+
### Bulk writes
199+
200+
The motivation for bulk writes lies in the possibility of optimization, the same operations
201+
to group. Here, a distinction is made between disordered and ordered bulk writes.
202+
In disordered, inserts, updates, and deletes are grouped as individual commands
203+
sent to the database. There is no influence on the order of the execution.
204+
A good use case is the import of records from one CSV file.
205+
The order of the inserts does not matter.
206+
207+
For ordered bulk writers, order compliance is important to keep.
208+
In this case, only the same consecutive operations are grouped.
209+
210+
Currently, all bulk writes are optimized in memory. This is unfavorable for large bulk writes.
211+
In this case, one can use streaming bulk writes that only have a certain set of
212+
group operation in memory and when the maximum number of operations
213+
has been reached, operations are written to the database. The size can be specified.
214+
215+
Using ordered bulk writes. In this example we first insert some dog's name, add an attribute `kind`
216+
and change all dogs to cats. After that we delete three cats. This example would not work with
217+
unordered bulk writes.
218+
219+
```elixir
220+
221+
bulk = "bulk"
222+
|> OrderedBulk.new()
223+
|> OrderedBulk.insert_one(%{name: "Greta"})
224+
|> OrderedBulk.insert_one(%{name: "Tom"})
225+
|> OrderedBulk.insert_one(%{name: "Waldo"})
226+
|> OrderedBulk.update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
227+
|> OrderedBulk.update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
228+
|> OrderedBulk.update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
229+
|> OrderedBulk.update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
230+
|> OrderedBulk.delete_one(%{kind: "cat"})
231+
|> OrderedBulk.delete_one(%{kind: "cat"})
232+
|> OrderedBulk.delete_one(%{kind: "cat"})
233+
234+
result = Mongo.BulkWrite.write(:mongo, bulk, w: 1)
235+
```
236+
237+
In the following example we import 1.000.000 integers into the MongoDB using the stream api:
238+
239+
We need to create an insert operation for each number. Then we call the `Mongo.UnorderedBulk.stream`
240+
function to import it. This function returns a stream function which accumulate
241+
all inserts operations until the limit `1000` is reached. In this case the operation group is send to
242+
MongoDB. So using the stream api you can reduce the memory using while
243+
importing big volume of data.
244+
245+
```elixir
246+
1..1_000_000
247+
|> Stream.map(fn i -> Mongo.BulkOps.get_insert_one(%{number: i}) end)
248+
|> Mongo.UnorderedBulk.write(:mongo, "bulk", 1_000)
249+
|> Stream.run()
250+
```
251+
252+
For more information see and check the test units for examples.
253+
* [Mongo.UnorderedBulk](https://hexdocs.pm/mongodb_driver/Mongo.UnorderedBulk.html#content)
254+
* [Mongo.OrderedBulk](https://hexdocs.pm/mongodb_driver/Mongo.OrderedBulk.html#content)
255+
* [Mongo.BulkWrites](https://hexdocs.pm/mongodb_driver/Mongo.BulkWrites.html#content)
256+
* [Mongo.BulkOps](https://hexdocs.pm/mongodb_driver/Mongo.BulkOps.html#content)
257+
193258
### Examples
194259

195260
Using `$and`

lib/mongo.ex

Lines changed: 39 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ defmodule Mongo do
273273
@spec find_one_and_update(GenServer.server, collection, BSON.document, BSON.document, Keyword.t) :: result(BSON.document) | {:ok, nil}
274274
def find_one_and_update(topology_pid, coll, filter, update, opts \\ []) do
275275
_ = modifier_docs(update, :update)
276-
query = [
276+
cmd = [
277277
findAndModify: coll,
278278
query: filter,
279279
update: update,
@@ -289,13 +289,12 @@ defmodule Mongo do
289289
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
290290

291291
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
292-
{:ok, doc} <- direct_command(conn, query, opts) do
292+
{:ok, doc} <- exec_command(conn, cmd, opts) do
293293
{:ok, doc["value"]}
294294
end
295295

296296
end
297297

298-
299298
@doc """
300299
Finds a document and replaces it.
301300
@@ -317,7 +316,7 @@ defmodule Mongo do
317316
@spec find_one_and_replace(GenServer.server, collection, BSON.document, BSON.document, Keyword.t) :: result(BSON.document)
318317
def find_one_and_replace(topology_pid, coll, filter, replacement, opts \\ []) do
319318
_ = modifier_docs(replacement, :replace)
320-
query = [
319+
cmd = [
321320
findAndModify: coll,
322321
query: filter,
323322
update: replacement,
@@ -333,7 +332,7 @@ defmodule Mongo do
333332
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
334333

335334
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
336-
{:ok, doc} <- direct_command(conn, query, opts), do: {:ok, doc["value"]}
335+
{:ok, doc} <- exec_command(conn, cmd, opts), do: {:ok, doc["value"]}
337336
end
338337

339338
defp should_return_new(:after), do: true
@@ -352,7 +351,7 @@ defmodule Mongo do
352351
"""
353352
@spec find_one_and_delete(GenServer.server, collection, BSON.document, Keyword.t) :: result(BSON.document)
354353
def find_one_and_delete(topology_pid, coll, filter, opts \\ []) do
355-
query = [
354+
cmd = [
356355
findAndModify: coll,
357356
query: filter,
358357
remove: true,
@@ -364,13 +363,13 @@ defmodule Mongo do
364363
opts = Keyword.drop(opts, ~w(max_time projection sort collation)a)
365364

366365
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
367-
{:ok, doc} <- direct_command(conn, query, opts), do: {:ok, doc["value"]}
366+
{:ok, doc} <- exec_command(conn, cmd, opts), do: {:ok, doc["value"]}
368367
end
369368

370369
@doc false
371370
@spec count(GenServer.server, collection, BSON.document, Keyword.t) :: result(non_neg_integer)
372371
def count(topology_pid, coll, filter, opts \\ []) do
373-
query = [
372+
cmd = [
374373
count: coll,
375374
query: filter,
376375
limit: opts[:limit],
@@ -382,7 +381,7 @@ defmodule Mongo do
382381
opts = Keyword.drop(opts, ~w(limit skip hint collation)a)
383382

384383
# Mongo 2.4 and 2.6 returns a float
385-
with {:ok, doc} <- command(topology_pid, query, opts),
384+
with {:ok, doc} <- command(topology_pid, cmd, opts),
386385
do: {:ok, trunc(doc["n"])}
387386
end
388387

@@ -456,7 +455,7 @@ defmodule Mongo do
456455
"""
457456
@spec distinct(GenServer.server, collection, String.t | atom, BSON.document, Keyword.t) :: result([BSON.t])
458457
def distinct(topology_pid, coll, field, filter, opts \\ []) do
459-
query = [
458+
cmd = [
460459
distinct: coll,
461460
key: field,
462461
query: filter,
@@ -468,7 +467,7 @@ defmodule Mongo do
468467

469468
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, opts),
470469
opts = Keyword.put(opts, :slave_ok, slave_ok),
471-
{:ok, doc} <- direct_command(conn, query, opts),
470+
{:ok, doc} <- exec_command(conn, cmd, opts),
472471
do: {:ok, doc["values"]}
473472
end
474473

@@ -562,20 +561,21 @@ defmodule Mongo do
562561
in the document.
563562
"""
564563
@spec command(GenServer.server, BSON.document, Keyword.t) :: result(BSON.document)
565-
def command(topology_pid, query, opts \\ []) do
564+
def command(topology_pid, cmd, opts \\ []) do
566565
rp = ReadPreference.defaults(%{mode: :primary})
567566
rp_opts = [read_preference: Keyword.get(opts, :read_preference, rp)]
568567
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, rp_opts),
569568
opts = Keyword.put(opts, :slave_ok, slave_ok),
570-
do: direct_command(conn, query, opts)
569+
do: exec_command(conn, cmd, opts)
571570
end
572571

573572
@doc false
574-
@spec direct_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
575-
def direct_command(conn, cmd, opts \\ []) do
573+
## refactor: exec_command
574+
@spec exec_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
575+
def exec_command(conn, cmd, opts) do
576576
action = %Query{action: :command}
577577

578-
with {:ok, _query, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
578+
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
579579
{:ok, doc} <- check_for_error(doc) do
580580
{:ok, doc}
581581
end
@@ -587,19 +587,31 @@ defmodule Mongo do
587587
@doc """
588588
Returns the current wire version.
589589
"""
590-
def wire_version(conn, opts \\ []) do
590+
@spec wire_version(pid) :: {:ok, integer} | {:error, Mongo.Error.t}
591+
def wire_version(conn) do
591592
cmd = %Query{action: :wire_version}
592-
with {:ok, _query, version} <- DBConnection.execute(conn, cmd, %{}, defaults(opts)) do
593+
with {:ok, _cmd, version} <- DBConnection.execute(conn, cmd, %{}, defaults([])) do
593594
{:ok, version}
594595
end
595596
end
596597

598+
@doc """
599+
Returns the limits of the database.
600+
"""
601+
@spec limits(pid) :: {:ok, BSON.document} | {:error, Mongo.Error.t}
602+
def limits(conn) do
603+
cmd = %Query{action: :limits}
604+
with {:ok, _cmd, limits} <- DBConnection.execute(conn, cmd, %{}, defaults([])) do
605+
{:ok, limits}
606+
end
607+
end
608+
597609
@doc """
598610
Similar to `command/3` but unwraps the result and raises on error.
599611
"""
600612
@spec command!(GenServer.server, BSON.document, Keyword.t) :: result!(BSON.document)
601-
def command!(topology_pid, query, opts \\ []) do
602-
bangify(command(topology_pid, query, opts))
613+
def command!(topology_pid, cmd, opts \\ []) do
614+
bangify(command(topology_pid, cmd, opts))
603615
end
604616

605617
@doc """
@@ -640,7 +652,7 @@ defmodule Mongo do
640652
] |> filter_nils()
641653

642654
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
643-
{:ok, doc} <- direct_command(conn, cmd, opts) do
655+
{:ok, doc} <- exec_command(conn, cmd, opts) do
644656
case doc do
645657
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
646658
_ ->
@@ -685,7 +697,7 @@ defmodule Mongo do
685697
wtimeout: Keyword.get(opts, :wtimeout)
686698
} |> filter_nils()
687699

688-
query = [
700+
cmd = [
689701
insert: coll,
690702
documents: docs,
691703
ordered: Keyword.get(opts, :ordered),
@@ -694,7 +706,7 @@ defmodule Mongo do
694706
] |> filter_nils()
695707

696708
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
697-
{:ok, doc} <- direct_command(conn, query, opts) do
709+
{:ok, doc} <- exec_command(conn, cmd, opts) do
698710
case doc do
699711
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
700712
_ ->
@@ -757,15 +769,15 @@ defmodule Mongo do
757769
collation: Keyword.get(opts, :collation)
758770
} |> filter_nils()
759771

760-
query = [
772+
cmd = [
761773
delete: coll,
762774
deletes: [filter],
763775
ordered: Keyword.get(opts, :ordered),
764776
writeConcern: write_concern
765777
] |> filter_nils()
766778

767779
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
768-
{:ok, doc} <- direct_command(conn, query, opts) do
780+
{:ok, doc} <- exec_command(conn, cmd, opts) do
769781
case doc do
770782
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
771783
%{ "ok" => _ok, "n" => n } ->
@@ -885,7 +897,7 @@ defmodule Mongo do
885897
] |> filter_nils()
886898

887899
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
888-
{:ok, doc} <- direct_command(conn, cmd, opts) do
900+
{:ok, doc} <- exec_command(conn, cmd, opts) do
889901

890902
case doc do
891903

@@ -966,7 +978,7 @@ defmodule Mongo do
966978
def select_server(topology_pid, type, opts \\ []) do
967979
with {:ok, servers, slave_ok, mongos?} <- select_servers(topology_pid, type, opts) do
968980
if Enum.empty? servers do
969-
{:ok, nil, slave_ok, mongos?} # todo: warum wird [] zurückgeliefert?, nil wäre besser?
981+
{:ok, nil, slave_ok, mongos?}
970982
else
971983
with {:ok, connection} <- servers |> Enum.take_random(1) |> Enum.at(0)
972984
|> get_connection(topology_pid) do

0 commit comments

Comments
 (0)