1
1
defmodule Mongo.UnorderedBulk do
2
- @ moduledoc """
2
+ @ moduledoc """
3
3
4
- The maxWriteBatchSize limit of a database, which indicates the maximum number of write operations permitted in a write batch, raises from 1,000 to 100,000.
4
+ An unordered bulk is filled in the store with the bulk operations. These are divided into three lists (inserts, updates, deletes)
5
+ added. If the unordered bulk is sent to the database, the groups are written in the following order:
5
6
7
+ 1. inserts
8
+ 2. updates
9
+ 3. deletes
10
+
11
+ The order within the group is undefined.
12
+
13
+ ## Example
14
+
15
+ ```
16
+ bulk = "bulk"
17
+ |> new()
18
+ |> insert_one(%{name: "Greta"})
19
+ |> insert_one(%{name: "Tom"})
20
+ |> insert_one(%{name: "Waldo"})
21
+ |> update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
22
+ |> update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
23
+ |> update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
24
+ |> delete_one(%{kind: "dog"})
25
+ |> delete_one(%{kind: "dog"})
26
+ |> delete_one(%{kind: "dog"})
27
+
28
+ result = BulkWrite.write(:mongo, bulk, w: 1)
29
+ ```
30
+
31
+ To reduce the memory usage the unordered bulk can be used with streams.
32
+
33
+ ## Example
34
+
35
+ ```
36
+ 1..1_000_000
37
+ |> Stream.map(fn i -> BulkOps.get_insert_one(%{number: i}) end)
38
+ |> UnorderedBulk.write(:mongo, "bulk", 1_000)
39
+ |> Stream.run()
40
+ ```
41
+
42
+ This example first generates the bulk operation by calling `get_insert_one\1 `. The operation is used as a parameter in the `write\3 ` function.
43
+ The unordered bulk was created with a buffer of 1000 operations. After 1000 operations, the
44
+ unordered bulk is written to the database. Depending on the selected size you can control the speed and memory consumption. The higher the
45
+ value, the faster the processing and the greater the memory consumption.
6
46
"""
7
47
8
48
alias Mongo.UnorderedBulk
@@ -20,10 +60,26 @@ defmodule Mongo.UnorderedBulk do
20
60
21
61
defstruct coll: nil , inserts: [ ] , updates: [ ] , deletes: [ ]
22
62
63
+ @ doc """
64
+ Creates an empty unordered bulk for a collection.
65
+
66
+ Example:
67
+
68
+ ```
69
+ Mongo.UnorderedBulk.new("bulk")
70
+ %Mongo.UnorderedBulk{coll: "bulk", deletes: [], inserts: [], updates: []}
71
+ ```
72
+ """
73
+ @ spec new ( String . t ) :: UnorderedBulk . t
23
74
def new ( coll ) do
24
75
% UnorderedBulk { coll: coll }
25
76
end
26
77
78
+ @ doc """
79
+ Appends a bulk operation to the unordered bulk. One of the field (inserts, updates or deletes)
80
+ will be updated.
81
+ """
82
+ @ spec push ( BulkOps . bulk_op , UnorderedBulk . t ) :: UnorderedBulk . t
27
83
def push ( { :insert , doc } , % UnorderedBulk { inserts: rest } = bulk ) do
28
84
% UnorderedBulk { bulk | inserts: [ doc | rest ] }
29
85
end
@@ -34,30 +90,138 @@ defmodule Mongo.UnorderedBulk do
34
90
% UnorderedBulk { bulk | deletes: [ doc | rest ] }
35
91
end
36
92
93
+
94
+ @ doc """
95
+ Appends an insert operation.
96
+
97
+ Example:
98
+
99
+ ```
100
+ Mongo.UnorderedBulk.insert_one(bulk, %{name: "Waldo"})
101
+ %Mongo.UnorderedBulk{
102
+ coll: "bulk",
103
+ deletes: [],
104
+ inserts: [%{name: "Waldo"}],
105
+ updates: []
106
+ }
107
+ ```
108
+ """
109
+ @ spec insert_one ( UnorderedBulk . t , BulkOps . bulk_op ) :: UnorderedBulk . t
37
110
def insert_one ( % UnorderedBulk { } = bulk , doc ) do
38
111
get_insert_one ( doc ) |> push ( bulk )
39
112
end
40
113
41
- def delete_one ( % UnorderedBulk { } = bulk , doc , opts \\ [ ] ) do
42
- get_delete_one ( doc , opts ) |> push ( bulk )
114
+ @ doc """
115
+ Appends a delete operation with `:limit = 1`.
116
+
117
+ Example:
118
+
119
+ ```
120
+ Mongo.UnorderedBulk.delete_one(bulk, %{name: "Waldo"})
121
+ %Mongo.UnorderedBulk{
122
+ coll: "bulk",
123
+ deletes: [{%{name: "Waldo"}, [limit: 1]}],
124
+ inserts: [],
125
+ updates: []
126
+ }
127
+ ```
128
+ """
129
+ @ spec delete_one ( UnorderedBulk . t , BulkOps . bulk_op ) :: UnorderedBulk . t
130
+ def delete_one ( % UnorderedBulk { } = bulk , doc ) do
131
+ get_delete_one ( doc ) |> push ( bulk )
43
132
end
44
133
45
- def delete_many ( % UnorderedBulk { } = bulk , doc , opts \\ [ ] ) do
46
- get_delete_many ( doc , opts ) |> push ( bulk )
134
+ @ doc """
135
+ Appends a delete operation with `:limit = 0`.
136
+
137
+ Example:
138
+
139
+ ```
140
+ Mongo.UnorderedBulk.delete_many(bulk, %{name: "Waldo"})
141
+ %Mongo.UnorderedBulk{
142
+ coll: "bulk",
143
+ deletes: [{%{name: "Waldo"}, [limit: 0]}],
144
+ inserts: [],
145
+ updates: []
146
+ }
147
+ ```
148
+ """
149
+ @ spec delete_many ( UnorderedBulk . t , BulkOps . bulk_op ) :: UnorderedBulk . t
150
+ def delete_many ( % UnorderedBulk { } = bulk , doc ) do
151
+ get_delete_many ( doc ) |> push ( bulk )
47
152
end
48
153
154
+ @ doc """
155
+ Appends a replace operation with `:multi = false`.
156
+
157
+ Example:
158
+
159
+ ```
160
+ Mongo.UnorderedBulk.replace_one(bulk, %{name: "Waldo"}, %{name: "Greta", kind: "dog"})
161
+ %Mongo.UnorderedBulk{
162
+ coll: "bulk",
163
+ deletes: [],
164
+ inserts: [],
165
+ updates: [{%{name: "Waldo"}, %{kind: "dog", name: "Greta"}, [multi: false]}]
166
+ }
167
+ ```
168
+ """
169
+ @ spec replace_one ( UnorderedBulk . t , BSON . document , BSON . document , Keyword . t ) :: UnorderedBulk . t
49
170
def replace_one ( % UnorderedBulk { } = bulk , filter , replacement , opts \\ [ ] ) do
50
171
get_replace_one ( filter , replacement , opts ) |> push ( bulk )
51
172
end
52
173
174
+ @ doc """
175
+ Appends a update operation with `:multi = false`.
176
+
177
+ Example:
178
+
179
+ ```
180
+ Mongo.UnorderedBulk.update_one(bulk, %{name: "Waldo"}, %{"$set": %{name: "Greta", kind: "dog"}})
181
+ %Mongo.UnorderedBulk{
182
+ coll: "bulk",
183
+ deletes: [],
184
+ inserts: [],
185
+ updates: [
186
+ {%{name: "Waldo"}, %{"$set": %{kind: "dog", name: "Greta"}}, [multi: false]}
187
+ ]
188
+ }
189
+ ```
190
+ """
191
+ @ spec update_one ( UnorderedBulk . t , BSON . document , BSON . document , Keyword . t ) :: UnorderedBulk . t
53
192
def update_one ( % UnorderedBulk { } = bulk , filter , update , opts \\ [ ] ) do
54
193
get_update_one ( filter , update , opts ) |> push ( bulk )
55
194
end
56
195
196
+ @ doc """
197
+ Appends a update operation with `:multi = true`.
198
+
199
+ Example:
200
+
201
+ ```
202
+ Mongo.UnorderedBulk.update_many(bulk, %{name: "Waldo"}, %{"$set": %{name: "Greta", kind: "dog"}})
203
+ %Mongo.UnorderedBulk{
204
+ coll: "bulk",
205
+ deletes: [],
206
+ inserts: [],
207
+ updates: [
208
+ {%{name: "Waldo"}, %{"$set": %{kind: "dog", name: "Greta"}}, [multi: true]}
209
+ ]
210
+ }
211
+ ```
212
+ """
213
+ @ spec update_many ( UnorderedBulk . t , BSON . document , BSON . document , Keyword . t ) :: UnorderedBulk . t
57
214
def update_many ( % UnorderedBulk { } = bulk , filter , update , opts \\ [ ] ) do
58
215
get_update_many ( filter , update , opts ) |> push ( bulk )
59
216
end
60
217
218
+ @ doc """
219
+ Returns a stream chunk function that can be used with streams. The `limit` specifies the number
220
+ of operation hold in the memory while processing the stream inputs.
221
+
222
+ The inputs of the stream should be `Mongo.BulkOps.bulk_op`. See `Mongo.BulkOps`
223
+ """
224
+ @ spec write ( Enumerable . t ( ) , GenServer . server , String . t , non_neg_integer , Keyword . t ) :: Enumerable . t ( )
61
225
def write ( enum , top , coll , limit \\ 1000 , opts \\ [ ] ) when limit > 1 do
62
226
Stream . chunk_while ( enum ,
63
227
{ new ( coll ) , limit - 1 } ,
@@ -71,6 +235,8 @@ defmodule Mongo.UnorderedBulk do
71
235
end )
72
236
# todo reduce to one
73
237
end
238
+ ## todo limit == 0 => write direct
239
+ ## Stream.map(fn op -> BulkWrite.write(conn, op, opts)
74
240
75
241
def test ( top ) do
76
242
0 commit comments