Skip to content

Commit 1f91e44

Browse files
committed
added first version for stream api
1 parent 0957d00 commit 1f91e44

File tree

5 files changed

+95
-52
lines changed

5 files changed

+95
-52
lines changed

README.md

Lines changed: 28 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -217,22 +217,36 @@ unordered bulk writes.
217217

218218
```elixir
219219

220-
bulk = "bulk"
221-
|> new()
222-
|> insert_one(%{name: "Greta"})
223-
|> insert_one(%{name: "Tom"})
224-
|> insert_one(%{name: "Waldo"})
225-
|> update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
226-
|> update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
227-
|> update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
228-
|> update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
229-
|> delete_one(%{kind: "cat"})
230-
|> delete_one(%{kind: "cat"})
231-
|> delete_one(%{kind: "cat"})
232-
233-
result = Mongo.BulkWrite.bulk_write(:mongo, bulk, w: 1)
220+
bulk = "bulk"
221+
|> new()
222+
|> insert_one(%{name: "Greta"})
223+
|> insert_one(%{name: "Tom"})
224+
|> insert_one(%{name: "Waldo"})
225+
|> update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
226+
|> update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
227+
|> update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
228+
|> update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
229+
|> delete_one(%{kind: "cat"})
230+
|> delete_one(%{kind: "cat"})
231+
|> delete_one(%{kind: "cat"})
232+
233+
result = Mongo.BulkWrite.bulk_write(:mongo, bulk, w: 1)
234234
```
235235

236+
In the following example we import 1.000.000 integers into the MongoDB using the stream api:
237+
238+
We need to create an insert operation for each number. Then we call the `ongo.UnorderedBulk.stream`
239+
function to import it. This function returns a stream function which accumulate
240+
all inserts operations until the limit `1000` is reached. In this case the operation group is send to
241+
MongoDB. So using the stream api you can reduce the memory using while
242+
importing big volume of data.
243+
244+
```elixir
245+
stream = 1..1_000_000
246+
|> Stream.map(fn i -> Mongo.BulkUtils.get_insert_one(%{number: i}) end)
247+
|> Mongo.UnorderedBulk.stream(top, "bulk", 1_000)
248+
|> Stream.run()
249+
```
236250
### Examples
237251

238252
Using `$and`

lib/mongo/bulk_utils.ex

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
defmodule Mongo.BulkUtils do
2+
3+
def get_insert_one(doc), do: {:insert, doc}
4+
5+
def get_delete_one(doc, opts \\ []), do: {:delete, {doc, Keyword.put(opts, :limit, 1)}}
6+
def get_delete_many(doc, opts \\ []), do: {:delete, {doc, Keyword.put(opts, :limit, 0)}}
7+
8+
def get_update_one(filter, update, opts \\ []) do
9+
## _ = modifier_docs(update, :update)
10+
{:update, {filter, update, Keyword.put(opts, :multi, false)}}
11+
end
12+
13+
def get_update_many(filter, update, opts \\ []) do
14+
## _ = modifier_docs(update, :update)
15+
{:update, {filter, update, Keyword.put(opts, :multi, true)}}
16+
end
17+
18+
def get_replace_one(filter, replacement, opts \\ []) do
19+
## _ = modifier_docs(replacement, :replace)
20+
{:update, {filter, replacement, Keyword.put(opts, :multi, false)}}
21+
end
22+
23+
end

lib/mongo/bulk_write.ex

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@ defmodule Mongo.BulkWrite do
1313
alias Mongo.UnorderedBulk
1414
alias Mongo.OrderedBulk
1515

16+
@max_batch_size 100_000
17+
1618
@doc """
1719
Unordered bulk write operations:
1820
Executes first insert commands, then updates commands and after that all delete commands are executed. If a group (inserts, updates or deletes) exceeds the limit
@@ -174,7 +176,7 @@ defmodule Mongo.BulkWrite do
174176

175177
defp run_commands(conn, cmds, opts) do
176178

177-
IO.puts "Running cmds #{inspect cmds}"
179+
# IO.puts "Running cmds #{inspect cmds}"
178180

179181
cmds
180182
|> Enum.map(fn cmd -> Mongo.direct_command(conn, cmd, opts) end)
@@ -183,12 +185,10 @@ defmodule Mongo.BulkWrite do
183185

184186
def get_insert_cmds(coll, docs, write_concern, _opts) do
185187

186-
max_batch_size = 10 ## only for test maxWriteBatchSize
187-
188188
{_ids, docs} = assign_ids(docs)
189189

190190
docs
191-
|> Enum.chunk_every(max_batch_size)
191+
|> Enum.chunk_every(@max_batch_size)
192192
|> Enum.map(fn inserts -> get_insert_cmd(coll, inserts, write_concern) end)
193193

194194
end
@@ -201,9 +201,8 @@ defmodule Mongo.BulkWrite do
201201

202202
defp get_delete_cmds(coll, docs, write_concern, opts) do
203203

204-
max_batch_size = 10 ## only for test maxWriteBatchSize
205204
docs
206-
|> Enum.chunk_every(max_batch_size)
205+
|> Enum.chunk_every(@max_batch_size)
207206
|> Enum.map(fn deletes -> get_delete_cmd(coll, deletes, write_concern, opts) end)
208207

209208
end
@@ -222,9 +221,8 @@ defmodule Mongo.BulkWrite do
222221

223222
defp get_update_cmds(coll, docs, write_concern, opts) do
224223

225-
max_batch_size = 10 ## only for test maxWriteBatchSize
226224
docs
227-
|> Enum.chunk_every(max_batch_size)
225+
|> Enum.chunk_every(@max_batch_size)
228226
|> Enum.map(fn updates -> get_update_cmd(coll, updates, write_concern, opts) end)
229227

230228
end

lib/mongo/ordered_bulk.ex

Lines changed: 0 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -65,22 +65,6 @@ defmodule Mongo.OrderedBulk do
6565
IO.puts inspect result
6666
end
6767

68-
def test2() do
69-
70-
# create a streaming bulk write with max 1024 operations
71-
bulk_stream = "bulk" |> new_stream(:mongo, 1024, w: 1)
72-
73-
# now streaming a long text file with small memory usage
74-
File.stream!(file)
75-
|> Stream.with_index
76-
#|> Stream.map(fn {name, i} -> insert_one(%{line: i, name: name}) end) # {:insert, %{line: i, name: name}}
77-
# |> Stream.into(bulk_stream, (fn {name, i} -> insert_one(%{line: i, name: name}) end))
78-
|> Stream.map(fn {name, i} -> bulk_stream.insert_one(%{line: i, name: name}) end)
79-
|> Stream.reduce()
80-
81-
File.stream!(src_filename, [], 512) |> Stream.into(bulk_stream) |> Stream.run()
82-
83-
end
8468

8569

8670
end

lib/mongo/unordered_bulk.ex

Lines changed: 38 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -6,42 +6,51 @@ defmodule Mongo.UnorderedBulk do
66
"""
77

88
alias Mongo.UnorderedBulk
9+
import Mongo.BulkUtils
910

1011
defstruct coll: nil, inserts: [], updates: [], deletes: []
1112

1213
def new(coll) do
1314
%UnorderedBulk{coll: coll}
1415
end
1516

16-
17-
def insert_one(%UnorderedBulk{inserts: rest} = bulk, doc) do
17+
def push({:insert, doc}, %UnorderedBulk{inserts: rest} = bulk) do
1818
%UnorderedBulk{bulk | inserts: [doc | rest] }
1919
end
20+
def push({:update, doc}, %UnorderedBulk{updates: rest} = bulk) do
21+
%UnorderedBulk{bulk | updates: [doc | rest] }
22+
end
23+
def push({:delete, doc}, %UnorderedBulk{deletes: rest} = bulk) do
24+
%UnorderedBulk{bulk | deletes: [doc | rest] }
25+
end
26+
27+
def insert_one(%UnorderedBulk{} = bulk, doc) do
28+
get_insert_one(doc) |> push(bulk)
29+
end
2030

21-
def delete_one(%UnorderedBulk{deletes: rest} = bulk, doc, opts \\ []) do
22-
%UnorderedBulk{bulk | deletes: [{doc, Keyword.put(opts, :limit, 1)} | rest] }
31+
def delete_one(%UnorderedBulk{} = bulk, doc, opts \\ []) do
32+
get_delete_one(doc, opts) |> push(bulk)
2333
end
2434

25-
def delete_many(%UnorderedBulk{deletes: rest} = bulk, doc, opts \\ []) do
26-
%UnorderedBulk{bulk | deletes: [{doc, Keyword.put(opts, :limit, 0)} | rest] }
35+
def delete_many(%UnorderedBulk{} = bulk, doc, opts \\ []) do
36+
get_delete_many(doc, opts) |> push(bulk)
2737
end
2838

29-
def replace_one(%UnorderedBulk{updates: rest} = bulk, filter, replacement, opts \\ []) do
39+
def replace_one(%UnorderedBulk{} = bulk, filter, replacement, opts \\ []) do
3040
_ = modifier_docs(replacement, :replace)
31-
%UnorderedBulk{bulk | updates: [{filter, replacement, Keyword.put(opts, :multi, false)} | rest] }
41+
get_replace_one(filter, replacement, opts) |> push(bulk)
3242
end
3343

34-
def update_one(%UnorderedBulk{updates: rest} = bulk, filter, update, opts \\ []) do
44+
def update_one(%UnorderedBulk{} = bulk, filter, update, opts \\ []) do
3545
_ = modifier_docs(update, :update)
36-
%UnorderedBulk{bulk | updates: [{filter, update, Keyword.put(opts, :multi, false)} | rest] }
46+
get_update_one(filter, update, opts) |> push(bulk)
3747
end
3848

39-
def update_many(%UnorderedBulk{updates: rest} = bulk, filter, replacement, opts \\ []) do
40-
_ = modifier_docs(replacement, :update)
41-
%UnorderedBulk{bulk | updates: [{filter, replacement, Keyword.put(opts, :multi, true)} | rest] }
49+
def update_many(%UnorderedBulk{updates: rest} = bulk, filter, update, opts \\ []) do
50+
_ = modifier_docs(update, :update)
51+
get_update_many(filter, update, opts) |> push(bulk)
4252
end
4353

44-
4554
defp modifier_docs([{key, _}|_], type), do: key |> key_to_string |> modifier_key(type)
4655
defp modifier_docs(map, _type) when is_map(map) and map_size(map) == 0, do: :ok
4756
defp modifier_docs(map, type) when is_map(map), do: Enum.at(map, 0) |> elem(0) |> key_to_string |> modifier_key(type)
@@ -55,6 +64,20 @@ defmodule Mongo.UnorderedBulk do
5564
defp key_to_string(key) when is_atom(key), do: Atom.to_string(key)
5665
defp key_to_string(key) when is_binary(key), do: key
5766

67+
def stream(enum, top, coll, limit \\ 1000, opts \\ []) when limit > 1 do
68+
Stream.chunk_while(enum,
69+
{new(coll), limit - 1},
70+
fn
71+
op, {bulk, 0} -> {:cont, Mongo.BulkWrite.bulk_write(top, push(op, bulk), opts), {new(coll), limit - 1}}
72+
op, {bulk, l} -> {:cont, {push(op, bulk), l - 1}}
73+
end,
74+
fn
75+
{bulk, 0} -> {:cont, bulk}
76+
{bulk, _} -> {:cont, Mongo.BulkWrite.bulk_write(top, bulk, opts), {new(coll), limit - 1}}
77+
end)
78+
# todo reduce to one
79+
end
80+
5881
def test(top) do
5982

6083
bulk = "bulk"
@@ -107,4 +130,5 @@ defmodule Mongo.UnorderedBulk do
107130
IO.puts inspect result
108131
end
109132

133+
110134
end

0 commit comments

Comments
 (0)