Skip to content

Commit aeb75a3

Browse files
committed
updated documents
1 parent 2852c64 commit aeb75a3

File tree

7 files changed

+194
-59
lines changed

7 files changed

+194
-59
lines changed

CHANGELOG.md

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,8 +12,8 @@
1212
* Travis now using the right MongoDB version
1313

1414
* Bug Fixes
15-
* added test unit for change streams
16-
* removed debug code from change streams
15+
* Added test unit for change streams
16+
* Removed debug code from change streams
1717

1818
## v0.5.2
1919

lib/mongo.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -967,7 +967,7 @@ defmodule Mongo do
967967
def select_server(topology_pid, type, opts \\ []) do
968968
with {:ok, servers, slave_ok, mongos?} <- select_servers(topology_pid, type, opts) do
969969
if Enum.empty? servers do
970-
{:ok, nil, slave_ok, mongos?} # todo: warum wird [] zurückgeliefert?, nil wäre besser?
970+
{:ok, nil, slave_ok, mongos?}
971971
else
972972
with {:ok, connection} <- servers |> Enum.take_random(1) |> Enum.at(0)
973973
|> get_connection(topology_pid) do

lib/mongo/bulk_ops.ex

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@ defmodule Mongo.BulkOps do
1111
## Example
1212
1313
```
14-
Filestream!("large.csv")
15-
|> Stream.map(&String.trim(&1))
16-
|> Stream.map(&String.split(&1,","))
17-
|> Stream.map(fn [firstname | [lastname | _]] -> %{firstname: firstname, lastname: lastname} end)
18-
|> Stream.map(fn doc -> BulkOps.get_insert_one(doc) end)
19-
|> UnorderedBulk.write(:mongo, "bulk", 1_000)
20-
|> Stream.run()
14+
alias Mongo.UnorderedBulk
15+
alias Mongo.BulkOps
16+
17+
Filestream!("large.csv")
18+
|> Stream.map(&String.trim(&1))
19+
|> Stream.map(&String.split(&1,","))
20+
|> Stream.map(fn [firstname | [lastname | _]] -> %{firstname: firstname, lastname: lastname} end)
21+
|> Stream.map(fn doc -> BulkOps.get_insert_one(doc) end)
22+
|> UnorderedBulk.write(:mongo, "bulk", 1_000)
23+
|> Stream.run()
2124
```
2225
2326
"""

lib/mongo/bulk_write.ex

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ defmodule Mongo.BulkWrite do
2020
2. updates
2121
3. deletes
2222
23-
2423
## Example:
2524
2625
```
@@ -68,19 +67,19 @@ defmodule Mongo.BulkWrite do
6867
6968
## Stream bulk writes
7069
71-
The examples shown initially filled the bulk with a few operations and then the bulk was written to the database.
70+
The examples shown initially filled the bulk with a few operations and then the bulk is written to the database.
7271
This is all done in memory. For larger amounts of operations or imports of very long files, the main memory would
7372
be unnecessarily burdened. It could come to some resource problems.
7473
75-
For such cases you could use streams. Unordered and ordered bulk writes can also be combined with Stream.
74+
For such cases you could use streams. Unordered and ordered bulk writes can also be combined with Streams.
7675
You set the maximum size of the bulk. Once the number of bulk operations has been reached,
7776
it will be sent to the database. While streaming you can limit the memory consumption regarding the current task.
7877
7978
In the following example we import 1.000.000 integers into the MongoDB using the stream api:
8079
8180
We need to create an insert operation (`BulkOps.get_insert_one()`) for each number. Then we call the `UnorderedBulk.stream`
8281
function to import it. This function returns a stream function which accumulate
83-
all inserts operations until the limit `1000` is reached. In this case the operation group is send to
82+
all inserts operations until the limit `1000` is reached. In this case the operation group is written to
8483
MongoDB.
8584
8685
## Example
@@ -94,7 +93,7 @@ defmodule Mongo.BulkWrite do
9493
9594
## Benchmark
9695
97-
The following benchmark compares single `Mongo.insert_one()` calls with stream unordered bulk writes.
96+
The following benchmark compares multiple `Mongo.insert_one()` calls with a stream using unordered bulk writes.
9897
Both tests inserts documents into a replica set with `w: 1`.
9998
10099
```
@@ -159,7 +158,8 @@ defmodule Mongo.BulkWrite do
159158
alias Mongo.UnorderedBulk
160159
alias Mongo.OrderedBulk
161160

162-
@max_batch_size 100_000
161+
@max_batch_size 100_000 ## todo The maxWriteBatchSize limit of a database, which indicates the maximum number of write operations permitted in a write batch, raises from 1,000 to 100,000.
162+
163163

164164
@doc """
165165
Executes unordered and ordered bulk writes.
@@ -179,14 +179,16 @@ defmodule Mongo.BulkWrite do
179179
If a group (inserts, updates or deletes) exceeds the limit `maxWriteBatchSize` it will be split into chunks.
180180
Everything is done in memory, so this use case is limited by memory. A better approach seems to use streaming bulk writes.
181181
"""
182-
@spec write(GenServer.server, UnorderedBulk.t, Keyword.t) :: Keyword.t
182+
@spec write(GenServer.server, (UnorderedBulk.t | OrderedBulk.t), Keyword.t) :: Keyword.t
183183
def write(topology_pid, %UnorderedBulk{} = bulk, opts) do
184184

185185
write_concern = write_concern(opts)
186186
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts) do
187187
one_bulk_write(conn, bulk, write_concern, opts)
188188
end
189+
189190
end
191+
190192
def write(topology_pid, %OrderedBulk{coll: coll, ops: ops}, opts) do
191193

192194
write_concern = write_concern(opts)
@@ -253,13 +255,13 @@ defmodule Mongo.BulkWrite do
253255
#
254256
# The function returns a keyword list with the results of each operation group:
255257
# For the details see https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#results
258+
#
256259
defp one_bulk_write(conn, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes}, write_concern, opts) do
257260

258261
with {_, {inserts, ids}} <- one_bulk_write_operation(conn, :insert, coll, inserts, write_concern, opts),
259262
{_, {matched, modified, upserts, upsert_ids}} <- one_bulk_write_operation(conn, :update, coll, updates, write_concern, opts),
260263
{_, deletes} <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, opts) do
261264

262-
263265
%{
264266
acknowledged: acknowledged(write_concern),
265267
insertedCount: inserts,
@@ -302,9 +304,11 @@ defmodule Mongo.BulkWrite do
302304
end
303305
defp get_op_sequence(_coll, [], acc), do: acc
304306
defp get_op_sequence(coll, ops, acc) do
307+
305308
[{kind, _doc} | _rest] = ops
306309
{docs, rest} = find_max_sequence(kind, ops)
307310
get_op_sequence(coll, rest, [{kind, docs} | acc])
311+
308312
end
309313

310314
###
@@ -325,9 +329,6 @@ defmodule Mongo.BulkWrite do
325329
{acc, rest}
326330
end
327331

328-
defp filter_upsert_ids(nil), do: []
329-
defp filter_upsert_ids(upserted), do: Enum.map(upserted, fn doc -> doc["_id"] end)
330-
331332
##
332333
# collects the returns values for each operation
333334
#
@@ -367,6 +368,10 @@ defmodule Mongo.BulkWrite do
367368

368369
end
369370

371+
defp filter_upsert_ids(nil), do: []
372+
defp filter_upsert_ids(upserted), do: Enum.map(upserted, fn doc -> doc["_id"] end)
373+
374+
370375
defp run_commands(conn, {cmds, ids}, opts) do
371376
{Enum.map(cmds, fn cmd -> Mongo.direct_command(conn, cmd, opts) end), ids}
372377
end
@@ -449,5 +454,4 @@ defmodule Mongo.BulkWrite do
449454

450455
end
451456

452-
453457
end

0 commit comments

Comments
 (0)