Skip to content

Commit 0c277ca

Browse files
committed
refactor code
1 parent a77c08a commit 0c277ca

File tree

4 files changed

+173
-73
lines changed

4 files changed

+173
-73
lines changed

lib/mongo.ex

Lines changed: 0 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -296,70 +296,6 @@ defmodule Mongo do
296296

297297
end
298298

299-
def bulk_write(topology_pid, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes}, opts \\ []) do
300-
301-
{_ids, inserts} = assign_ids(inserts)
302-
303-
write_concern = %{
304-
w: Keyword.get(opts, :w),
305-
j: Keyword.get(opts, :j),
306-
wtimeout: Keyword.get(opts, :wtimeout)
307-
} |> filter_nils()
308-
309-
insert_cmd = [
310-
insert: coll,
311-
documents: inserts,
312-
writeConcern: write_concern
313-
] |> filter_nils()
314-
315-
deletes = Enum.map(deletes,
316-
fn {filter, collaction, many} ->
317-
limit = case many do
318-
true -> 0
319-
false -> 1
320-
end
321-
%{
322-
q: filter,
323-
limit: limit,
324-
collation: collaction
325-
} |> filter_nils()
326-
end)
327-
328-
delete_cmd = [
329-
delete: coll,
330-
deletes: deletes,
331-
ordered: Keyword.get(opts, :ordered),
332-
writeConcern: write_concern
333-
] |> filter_nils()
334-
335-
updates = Enum.map(updates, fn {filter, update, update_opts} -> %{
336-
q: filter,
337-
u: update,
338-
upsert: Keyword.get(update_opts, :upsert),
339-
multi: Keyword.get(update_opts, :multi) || false,
340-
collation: Keyword.get(update_opts, :collation),
341-
arrayFilters: Keyword.get(update_opts, :filters)
342-
} |> filter_nils() end)
343-
344-
update_cmd = [
345-
update: coll,
346-
updates: updates,
347-
# ordered: Keyword.get(opts, :ordered),
348-
writeConcern: write_concern,
349-
# bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
350-
] |> filter_nils()
351-
352-
cmds = [insert_cmd, update_cmd, delete_cmd]
353-
354-
IO.puts inspect cmds
355-
356-
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts) do
357-
cmds
358-
|> Enum.map(fn cmd -> direct_command(conn, cmd, opts) end)
359-
|> Enum.map(fn {:ok, doc} -> {:ok, doc} end)
360-
end
361-
end
362-
363299
@doc """
364300
Finds a document and replaces it.
365301

lib/mongo/bulk_write.ex

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
defmodule Mongo.BulkWrite do
2+
@moduledoc """
3+
4+
todo
5+
6+
Ist immer für eine Collections
7+
8+
The maxWriteBatchSize limit of a database, which indicates the maximum number of write operations permitted in a write batch, raises from 1,000 to 100,000.
9+
10+
"""
11+
12+
import Mongo.Utils
13+
alias Mongo.UnorderedBulk
14+
15+
@doc """
16+
Unordered bulk write operations:
17+
Executes first insert commands, then all update commands and after that all delete commands are executed. If a group (inserts, updates or deletes) exceeds the limit
18+
maxWriteBatchSize it will be split into chunks. Everything is done in memory, so this use case is limited by memory. A better approach seems to use streaming bulk writes.
19+
"""
20+
def bulk_write(topology_pid, %UnorderedBulk{} = bulk, opts \\ []) do
21+
22+
write_concern = %{
23+
w: Keyword.get(opts, :w),
24+
j: Keyword.get(opts, :j),
25+
wtimeout: Keyword.get(opts, :wtimeout)
26+
} |> filter_nils()
27+
28+
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts),
29+
inserts <- conn |> run_commands(get_insert_cmds(bulk, write_concern), opts) |> collect(:inserts),
30+
updates <- conn |> run_commands(get_update_cmds(bulk, write_concern, opts), opts) |> collect(:updates),
31+
deletes <- conn |> run_commands(get_delete_cmds(bulk, write_concern, opts), opts) |> collect(:deletes) do
32+
inserts ++ updates ++ deletes
33+
end
34+
end
35+
36+
def collect(doc, :inserts) do
37+
38+
end
39+
40+
def collect(doc, :updates) do
41+
42+
end
43+
44+
def collect(doc, :deletes) do
45+
46+
end
47+
48+
defp run_commands(conn, cmds, opts) do
49+
50+
IO.puts "Running cmsd #{inspect cmds}"
51+
52+
cmds
53+
|> Enum.map(fn cmd -> Mongo.direct_command(conn, cmd, opts) end)
54+
|> Enum.map(fn {:ok, doc} -> {:ok, doc} end)
55+
end
56+
57+
def get_insert_cmds(%UnorderedBulk{coll: coll, inserts: all_inserts}, write_concern) do
58+
59+
max_batch_size = 10 ## only for test maxWriteBatchSize
60+
61+
{_ids, all_inserts} = assign_ids(all_inserts)
62+
63+
all_inserts
64+
|> Enum.chunk_every(max_batch_size)
65+
|> Enum.map(fn inserts -> get_insert_cmd(coll, inserts, write_concern) end)
66+
67+
end
68+
69+
defp get_insert_cmd(coll, inserts, write_concern) do
70+
filter_nils([insert: coll, documents: inserts, writeConcern: write_concern])
71+
end
72+
73+
defp get_delete_cmds(%UnorderedBulk{coll: coll, deletes: all_deletes}, write_concern, opts) do
74+
75+
max_batch_size = 10 ## only for test maxWriteBatchSize
76+
all_deletes
77+
|> Enum.chunk_every(max_batch_size)
78+
|> Enum.map(fn deletes -> get_delete_cmd(coll, deletes, write_concern, opts) end)
79+
80+
end
81+
82+
defp get_delete_cmd(coll, deletes, write_concern, opts ) do
83+
filter_nils([delete: coll,
84+
deletes: Enum.map(deletes, fn delete -> get_delete_doc(delete) end),
85+
ordered: Keyword.get(opts, :ordered),
86+
writeConcern: write_concern])
87+
end
88+
defp get_delete_doc({filter, collaction, limit}) do
89+
%{q: filter, limit: limit, collation: collaction} |> filter_nils()
90+
end
91+
92+
defp get_update_cmds(%UnorderedBulk{coll: coll, updates: all_updates}, write_concern, opts) do
93+
94+
max_batch_size = 10 ## only for test maxWriteBatchSize
95+
all_updates
96+
|> Enum.chunk_every(max_batch_size)
97+
|> Enum.map(fn updates -> get_update_cmd(coll, updates, write_concern, opts) end)
98+
99+
end
100+
101+
defp get_update_cmd(coll, updates, write_concern, opts) do
102+
[ update: coll,
103+
updates: Enum.map(updates, fn update -> get_update_doc(update) end),
104+
ordered: Keyword.get(opts, :ordered),
105+
writeConcern: write_concern,
106+
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
107+
] |> filter_nils()
108+
end
109+
110+
defp get_update_doc({filter, update, update_opts}) do
111+
[ q: filter,
112+
u: update,
113+
upsert: Keyword.get(update_opts, :upsert),
114+
multi: Keyword.get(update_opts, :multi) || false,
115+
collation: Keyword.get(update_opts, :collation),
116+
arrayFilters: Keyword.get(update_opts, :filters)
117+
] |> filter_nils()
118+
end
119+
defp get_update_doc(_other) do
120+
[]
121+
end
122+
123+
end

lib/mongo/unordered_bulk.ex

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,13 @@
11
defmodule Mongo.UnorderedBulk do
22
@moduledoc """
33
4-
todo
5-
6-
Ist immer für eine Collections
7-
84
The maxWriteBatchSize limit of a database, which indicates the maximum number of write operations permitted in a write batch, raises from 1,000 to 100,000.
95
106
"""
117

128
alias Mongo.UnorderedBulk
139

14-
defstruct coll: nil, inserts: [], updates: [], deletes: [], opts: []
10+
defstruct coll: nil, inserts: [], updates: [], deletes: []
1511

1612
def new(coll) do
1713
%UnorderedBulk{coll: coll}
@@ -22,20 +18,20 @@ defmodule Mongo.UnorderedBulk do
2218
end
2319

2420
def delete_one(%UnorderedBulk{deletes: rest} = bulk, doc, collaction \\ nil) do
25-
%UnorderedBulk{bulk | deletes: [{doc, collaction, false} | rest] }
21+
%UnorderedBulk{bulk | deletes: [{doc, collaction, 1} | rest] }
2622
end
2723

2824
def delete_many(%UnorderedBulk{deletes: rest} = bulk, doc, collaction \\ nil) do
29-
%UnorderedBulk{bulk | deletes: [{doc, collaction, true} | rest] }
25+
%UnorderedBulk{bulk | deletes: [{doc, collaction, 0} | rest] }
3026
end
3127

3228
def update_one(%UnorderedBulk{updates: rest} = bulk, filter, update, opts \\ []) do
3329
%UnorderedBulk{bulk | updates: [{filter, update, opts} | rest] }
3430
end
3531

36-
def test() do
32+
def test(top) do
3733

38-
"bulk"
34+
bulk = "bulk"
3935
|> new()
4036
|> insert_one(%{name: "Greta"})
4137
|> insert_one(%{name: "Tom"})
@@ -47,6 +43,8 @@ defmodule Mongo.UnorderedBulk do
4743
|> delete_one(%{kind: "dog"})
4844
|> delete_one(%{kind: "dog"})
4945

46+
result = Mongo.BulkWrite.bulk_write(top, bulk)
5047

48+
IO.puts inspect result
5149
end
5250
end

lib/utils.ex

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
defmodule Mongo.Utils do
2+
3+
4+
def filter_nils(keyword) when is_list(keyword) do
5+
Enum.reject(keyword, fn {_key, value} -> is_nil(value) end)
6+
end
7+
8+
def filter_nils(map) when is_map(map) do
9+
Enum.reject(map, fn {_key, value} -> is_nil(value) end)
10+
|> Enum.into(%{})
11+
end
12+
13+
14+
def assign_ids(list) when is_list(list) do
15+
Enum.map(list, &assign_id/1)
16+
|> Enum.unzip
17+
end
18+
19+
defp assign_id(%{_id: id} = map) when id != nil, do: {id, map}
20+
defp assign_id(%{"_id" => id} = map) when id != nil, do: {id, map}
21+
defp assign_id([{_, _} | _] = keyword) do
22+
case Keyword.take(keyword, [:_id, "_id"]) do
23+
[{_key, id} | _] when id != nil -> {id, keyword}
24+
[] -> add_id(keyword)
25+
end
26+
end
27+
28+
defp assign_id(map) when is_map(map) do
29+
map |> Map.to_list |> add_id
30+
end
31+
32+
##
33+
# Inserts an ID to the document. A distinction is made as to whether binaries or atoms are used as keys.
34+
#
35+
defp add_id(doc) do
36+
id = Mongo.IdServer.new
37+
{id, add_id(doc, id)}
38+
end
39+
defp add_id([{key, _}|_] = list, id) when is_atom(key), do: [{:_id, id}|list]
40+
defp add_id([{key, _}|_] = list, id) when is_binary(key), do: [{"_id", id}|list]
41+
defp add_id([], id), do: [{"_id", id}]
42+
43+
end

0 commit comments

Comments
 (0)