Skip to content

Commit 0957d00

Browse files
committed
added ordered bulk writes
1 parent 0c277ca commit 0957d00

File tree

4 files changed

+358
-38
lines changed

4 files changed

+358
-38
lines changed

README.md

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -190,6 +190,49 @@ end
190190
191191
spawn(fn -> for_ever(top, self()) end)
192192
```
193+
194+
195+
### Bulk writes
196+
197+
Die Motivation für bulk writes liegt in der Optimierungsmöglichkeit, gleiche Operationen
198+
zu gruppieren. Dabei wird zwischen ungeordneten und geordneten Bulk writes unterschieden.
199+
Bei ungeordneten werden Inserts, Updates und Deletes gruppiert und als einzelne Befehle
200+
zur Datenbank geschickt. Dabei gibt es keinen Einfluss auf die Reihenfolge
201+
der Ausführungen. Ein guter Anwendungsfall ist der Import von Datensätzen aus einer
202+
CSV-Datei. Die Reihenfolge der Inserts spielt keine Rolle.
203+
204+
Bei geordneten Bulk writes ist die Einhaltung der Reihenfolge wichtig, damit aus
205+
Ausführungen korrekt sind. In diesem Fall werden nur die gleiche aufeinander folgenden Operationen
206+
gruppiert.
207+
208+
Aktuell werden alle Bulk writes im Speicher optimiert. Dies ist für große Bulk writes ungünstig.
209+
In diesem Fall kann man streaming bulk writes verwenden, die nur einen gewissen Satz von
210+
Operation im Speicher gruppieren und sofern die maximale Anzahl von Operationen
211+
erreicht wurde, die Schreiboperationen zur Datenbank schicken. Die Anzahl
212+
kann vorgegeben werden.
213+
214+
Using ordered bulk writes. In this example we first insert some dog's name, add an attribute `kind`
215+
and change all dogs to cats. After that we delete three cats. This example would not work with
216+
unordered bulk writes.
217+
218+
```elixir
219+
220+
bulk = "bulk"
221+
|> new()
222+
|> insert_one(%{name: "Greta"})
223+
|> insert_one(%{name: "Tom"})
224+
|> insert_one(%{name: "Waldo"})
225+
|> update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
226+
|> update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
227+
|> update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
228+
|> update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
229+
|> delete_one(%{kind: "cat"})
230+
|> delete_one(%{kind: "cat"})
231+
|> delete_one(%{kind: "cat"})
232+
233+
result = Mongo.BulkWrite.bulk_write(:mongo, bulk, w: 1)
234+
```
235+
193236
### Examples
194237

195238
Using `$and`

lib/mongo/bulk_write.ex

Lines changed: 163 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -11,88 +11,219 @@ defmodule Mongo.BulkWrite do
1111

1212
import Mongo.Utils
1313
alias Mongo.UnorderedBulk
14+
alias Mongo.OrderedBulk
1415

1516
@doc """
1617
Unordered bulk write operations:
17-
Executes first insert commands, then all update commands and after that all delete commands are executed. If a group (inserts, updates or deletes) exceeds the limit
18+
Executes first insert commands, then updates commands and after that all delete commands are executed. If a group (inserts, updates or deletes) exceeds the limit
1819
maxWriteBatchSize it will be split into chunks. Everything is done in memory, so this use case is limited by memory. A better approach seems to use streaming bulk writes.
1920
"""
20-
def bulk_write(topology_pid, %UnorderedBulk{} = bulk, opts \\ []) do
21-
22-
write_concern = %{
23-
w: Keyword.get(opts, :w),
24-
j: Keyword.get(opts, :j),
25-
wtimeout: Keyword.get(opts, :wtimeout)
26-
} |> filter_nils()
27-
28-
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts),
29-
inserts <- conn |> run_commands(get_insert_cmds(bulk, write_concern), opts) |> collect(:inserts),
30-
updates <- conn |> run_commands(get_update_cmds(bulk, write_concern, opts), opts) |> collect(:updates),
31-
deletes <- conn |> run_commands(get_delete_cmds(bulk, write_concern, opts), opts) |> collect(:deletes) do
32-
inserts ++ updates ++ deletes
21+
def bulk_write(topology_pid, %UnorderedBulk{} = bulk, opts) do
22+
23+
write_concern = write_concern(opts)
24+
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts) do
25+
one_bulk_write(conn, bulk, write_concern, opts)
3326
end
3427
end
3528

36-
def collect(doc, :inserts) do
29+
@doc """
30+
Schreibt den OrderedBulk in die Datenbank. Es erfolgt eine kleine Optimierung. Folgen von gleichen Operationen
31+
werden zusammengefasst und als ein Befehl gesendet.
32+
"""
33+
def bulk_write(topology_pid, %OrderedBulk{coll: coll, ops: ops} = bulk, opts) do
34+
35+
write_concern = write_concern(opts)
36+
with {:ok, conn, _, _} <- Mongo.select_server(topology_pid, :write, opts) do
37+
get_op_sequence(coll, ops)
38+
|> Enum.map(fn {cmd, docs} -> one_bulk_write_operation(conn, cmd, coll, docs, write_concern, opts) end)
39+
|> Enum.each(fn {cmd, count} -> IO.puts "#{cmd} : #{count}" end)
40+
end
41+
end
3742

43+
##
44+
# returns the current write concerns from `opts`
45+
#
46+
defp write_concern(opts) do
47+
%{
48+
w: Keyword.get(opts, :w),
49+
j: Keyword.get(opts, :j),
50+
wtimeout: Keyword.get(opts, :wtimeout)
51+
} |> filter_nils()
3852
end
3953

40-
def collect(doc, :updates) do
54+
@doc"""
55+
Executues one unordered bulk write. The execution order of operation groups is
4156
57+
* inserts
58+
* updates
59+
* deletes
60+
61+
The function returns a keyword list with the results of each operation group:
62+
For the details see https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#results
63+
"""
64+
def one_bulk_write(conn, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes} = bulk, write_concern, opts) do
65+
66+
with {_, inserts} <- one_bulk_write_operation(conn, :insert, coll, inserts, write_concern, opts),
67+
{_, updates} <- one_bulk_write_operation(conn, :update, coll, updates, write_concern, opts),
68+
{_, deletes} <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, opts) do
69+
[
70+
acknowledged: acknowledged(write_concern),
71+
insertedCount: inserts,
72+
matchedCount: updates,
73+
deletedCount: deletes,
74+
upsertedCount: 0,
75+
upsertedIds: [],
76+
insertedIds: [],
77+
]
78+
end
79+
end
80+
81+
###
82+
# Executes the command `cmd` and collects the result.
83+
#
84+
def one_bulk_write_operation(conn, cmd, coll, docs, write_concern, opts) do
85+
with result <- conn |> run_commands(get_cmds(cmd, coll, docs, write_concern, opts), opts) |> collect(cmd) do
86+
{cmd, result}
87+
end
4288
end
4389

44-
def collect(doc, :deletes) do
90+
##
91+
# Converts the list of operations into insert/update/delete commands
92+
#
93+
defp get_cmds(:insert, coll, docs, write_concern, opts), do: get_insert_cmds(coll, docs, write_concern, opts)
94+
defp get_cmds(:update, coll, docs, write_concern, opts), do: get_update_cmds(coll, docs, write_concern, opts)
95+
defp get_cmds(:delete, coll, docs, write_concern, opts), do: get_delete_cmds(coll, docs, write_concern, opts)
96+
97+
defp acknowledged(%{w: w}) when w > 0, do: true
98+
defp acknowledged(%{}), do: false
99+
100+
###
101+
# Converts the list of operations into list of lists with same operations.
102+
#
103+
# [inserts, inserts, updates] -> [[inserts, inserts],[updates]]
104+
#
105+
defp get_op_sequence(coll, ops) do
106+
get_op_sequence(coll, ops, [])
107+
end
108+
defp get_op_sequence(coll, [], acc), do: acc
109+
defp get_op_sequence(coll, ops, acc) do
110+
[{kind, _doc} | _rest] = ops
111+
{docs, rest} = find_max_sequence(kind, ops)
112+
get_op_sequence(coll, rest, [{kind, docs} | acc])
113+
end
114+
115+
###
116+
# Splits the sequence of operations into two parts
117+
# 1) sequence of operations of kind `kind`
118+
# 2) rest of operations
119+
#
120+
defp find_max_sequence(kind, rest) do
121+
find_max_sequence(kind, rest, [])
122+
end
123+
defp find_max_sequence(_kind, [], acc) do
124+
{acc, []}
125+
end
126+
defp find_max_sequence(kind, [{other, desc} | rest], acc) when kind == other do
127+
find_max_sequence(kind, rest, [desc | acc])
128+
end
129+
defp find_max_sequence(_kind, rest, acc) do
130+
{acc, rest}
131+
end
132+
133+
# {
134+
#"acknowledged" : true,
135+
#"deletedCount" : 1,
136+
#"insertedCount" : 2,
137+
# "matchedCount" : 2,
138+
#"upsertedCount" : 0,
139+
#"insertedIds" : {
140+
# "0" : 4,
141+
#"1" : 5
142+
#},
143+
#"upsertedIds" : {
144+
#
145+
# }
146+
# }
147+
148+
def collect(docs, :insert) do
149+
docs
150+
|> Enum.map(fn
151+
{:ok, %{"n" => n}} -> n
152+
{:ok, _other} -> 0
153+
end)
154+
|> Enum.reduce(0, fn x, acc -> x + acc end)
155+
end
156+
157+
def collect(docs, :update) do
158+
docs
159+
|> Enum.map(fn
160+
{:ok, %{"n" => n}} -> n
161+
{:ok, _other} -> 0
162+
end)
163+
|> Enum.reduce(0, fn x, acc -> x + acc end)
164+
end
45165

166+
def collect(docs, :delete) do
167+
docs
168+
|> Enum.map(fn
169+
{:ok, %{"n" => n}} -> n
170+
{:ok, _other} -> 0
171+
end)
172+
|> Enum.reduce(0, fn x, acc -> x + acc end)
46173
end
47174

48175
defp run_commands(conn, cmds, opts) do
49176

50-
IO.puts "Running cmsd #{inspect cmds}"
177+
IO.puts "Running cmds #{inspect cmds}"
51178

52179
cmds
53180
|> Enum.map(fn cmd -> Mongo.direct_command(conn, cmd, opts) end)
54181
|> Enum.map(fn {:ok, doc} -> {:ok, doc} end)
55182
end
56183

57-
def get_insert_cmds(%UnorderedBulk{coll: coll, inserts: all_inserts}, write_concern) do
184+
def get_insert_cmds(coll, docs, write_concern, _opts) do
58185

59186
max_batch_size = 10 ## only for test maxWriteBatchSize
60187

61-
{_ids, all_inserts} = assign_ids(all_inserts)
188+
{_ids, docs} = assign_ids(docs)
62189

63-
all_inserts
190+
docs
64191
|> Enum.chunk_every(max_batch_size)
65192
|> Enum.map(fn inserts -> get_insert_cmd(coll, inserts, write_concern) end)
66193

67194
end
68195

69196
defp get_insert_cmd(coll, inserts, write_concern) do
70-
filter_nils([insert: coll, documents: inserts, writeConcern: write_concern])
197+
[insert: coll,
198+
documents: inserts,
199+
writeConcern: write_concern] |> filter_nils()
71200
end
72201

73-
defp get_delete_cmds(%UnorderedBulk{coll: coll, deletes: all_deletes}, write_concern, opts) do
202+
defp get_delete_cmds(coll, docs, write_concern, opts) do
74203

75204
max_batch_size = 10 ## only for test maxWriteBatchSize
76-
all_deletes
205+
docs
77206
|> Enum.chunk_every(max_batch_size)
78207
|> Enum.map(fn deletes -> get_delete_cmd(coll, deletes, write_concern, opts) end)
79208

80209
end
81210

82211
defp get_delete_cmd(coll, deletes, write_concern, opts ) do
83-
filter_nils([delete: coll,
84-
deletes: Enum.map(deletes, fn delete -> get_delete_doc(delete) end),
85-
ordered: Keyword.get(opts, :ordered),
86-
writeConcern: write_concern])
212+
[delete: coll,
213+
deletes: Enum.map(deletes, fn delete -> get_delete_doc(delete) end),
214+
ordered: Keyword.get(opts, :ordered),
215+
writeConcern: write_concern] |> filter_nils()
87216
end
88-
defp get_delete_doc({filter, collaction, limit}) do
89-
%{q: filter, limit: limit, collation: collaction} |> filter_nils()
217+
defp get_delete_doc({filter, opts}) do
218+
[q: filter,
219+
limit: Keyword.get(opts, :limit),
220+
collation: Keyword.get(opts, :collaction)] |> filter_nils()
90221
end
91222

92-
defp get_update_cmds(%UnorderedBulk{coll: coll, updates: all_updates}, write_concern, opts) do
223+
defp get_update_cmds(coll, docs, write_concern, opts) do
93224

94225
max_batch_size = 10 ## only for test maxWriteBatchSize
95-
all_updates
226+
docs
96227
|> Enum.chunk_every(max_batch_size)
97228
|> Enum.map(fn updates -> get_update_cmd(coll, updates, write_concern, opts) end)
98229

lib/mongo/ordered_bulk.ex

Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
defmodule Mongo.OrderedBulk do
2+
@moduledoc """
3+
4+
The maxWriteBatchSize limit of a database, which indicates the maximum number of write operations permitted in a write batch, raises from 1,000 to 100,000.
5+
6+
"""
7+
8+
alias Mongo.OrderedBulk
9+
10+
defstruct coll: nil, ops: []
11+
12+
def new(coll) do
13+
%OrderedBulk{coll: coll}
14+
end
15+
16+
def insert_one(%OrderedBulk{ops: rest} = bulk, doc) do
17+
%OrderedBulk{bulk | ops: [{:insert, doc} | rest] }
18+
end
19+
20+
def delete_one(%OrderedBulk{ops: rest} = bulk, doc, opts \\ []) do
21+
%OrderedBulk{bulk | ops: [{:delete, {doc, Keyword.put(opts, :limit, 1)}} | rest] }
22+
end
23+
24+
def delete_many(%OrderedBulk{ops: rest} = bulk, doc, opts \\ []) do
25+
%OrderedBulk{bulk | ops: [{:delete, {doc, Keyword.put(opts, :limit, 0)}} | rest] }
26+
end
27+
28+
def update_one(%OrderedBulk{ops: rest} = bulk, filter, update, opts \\ []) do
29+
## _ = modifier_docs(update, :update)
30+
%OrderedBulk{bulk | ops: [{:update, {filter, update, Keyword.put(opts, :multi, false)}} | rest] }
31+
end
32+
33+
def update_many(%OrderedBulk{ops: rest} = bulk, filter, update, opts \\ []) do
34+
## _ = modifier_docs(update, :update)
35+
%OrderedBulk{bulk | ops: [{:update, {filter, update, Keyword.put(opts, :multi, true)}} | rest] }
36+
end
37+
38+
def replace_one(%OrderedBulk{ops: rest} = bulk, filter, replacement, opts \\ []) do
39+
## _ = modifier_docs(replacement, :replace)
40+
%OrderedBulk{bulk | ops: [{:update, {filter, replacement, Keyword.put(opts, :multi, false)}} | rest] }
41+
end
42+
43+
def test() do
44+
45+
seeds = ["127.0.0.1:27001", "127.0.0.1:27002", "127.0.0.1:27003"]
46+
{:ok, top} = Mongo.start_link(database: "me", seeds: seeds, show_sensitive_data_on_connection_error: true)
47+
48+
bulk = "bulk"
49+
|> new()
50+
|> insert_one(%{name: "Greta"})
51+
|> insert_one(%{name: "Tom"})
52+
|> insert_one(%{name: "Waldo"})
53+
|> update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
54+
|> update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
55+
|> update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
56+
|> update_many(%{kind: "dog"}, %{"$set": %{kind: "cat"}})
57+
|> delete_one(%{kind: "cat"})
58+
|> delete_one(%{kind: "cat"})
59+
|> delete_one(%{kind: "cat"})
60+
61+
IO.puts inspect bulk
62+
63+
result = Mongo.BulkWrite.bulk_write(top, bulk, w: 1)
64+
65+
IO.puts inspect result
66+
end
67+
68+
def test2() do
69+
70+
# create a streaming bulk write with max 1024 operations
71+
bulk_stream = "bulk" |> new_stream(:mongo, 1024, w: 1)
72+
73+
# now streaming a long text file with small memory usage
74+
File.stream!(file)
75+
|> Stream.with_index
76+
#|> Stream.map(fn {name, i} -> insert_one(%{line: i, name: name}) end) # {:insert, %{line: i, name: name}}
77+
# |> Stream.into(bulk_stream, (fn {name, i} -> insert_one(%{line: i, name: name}) end))
78+
|> Stream.map(fn {name, i} -> bulk_stream.insert_one(%{line: i, name: name}) end)
79+
|> Stream.reduce()
80+
81+
File.stream!(src_filename, [], 512) |> Stream.into(bulk_stream) |> Stream.run()
82+
83+
end
84+
85+
86+
end

0 commit comments

Comments
 (0)