Skip to content

Commit 9ecee63

Browse files
committed
BulkWriteResult defstruct has a new attribute errors, which contains the errors reported from the server.
1 parent a5e8834 commit 9ecee63

File tree

3 files changed

+95
-57
lines changed

3 files changed

+95
-57
lines changed

lib/mongo/bulk_write.ex

Lines changed: 23 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -200,33 +200,10 @@ defmodule Mongo.BulkWrite do
200200
ops
201201
|> get_op_sequence()
202202
|> Enum.map(fn {cmd, docs} -> one_bulk_write_operation(conn, cmd, coll, docs, write_concern, max_batch_size, opts) end)
203-
|> Enum.reduce(empty, fn
204-
{cmd, {count, ids}}, acc -> merge(cmd, count, ids, acc)
205-
{cmd, {mat, mod, ups, ids}}, acc -> merge(cmd, mat, mod, ups, ids, acc)
206-
{cmd, count}, acc -> merge(cmd, count, acc)
207-
208-
end)
203+
|> BulkWriteResult.reduce(empty)
209204
end
210205
end
211206

212-
defp merge(:insert, count, ids, %BulkWriteResult{:inserted_count => value, :inserted_ids => current_ids} = result) do
213-
%BulkWriteResult{result | inserted_count: value + count, inserted_ids: current_ids ++ ids}
214-
end
215-
defp merge(:update, matched, modified, upserts, ids,
216-
%BulkWriteResult{:matched_count => matched_count,
217-
:modified_count => modified_count,
218-
:upserted_count => upserted_count,
219-
:upserted_ids => current_ids} = result) do
220-
%BulkWriteResult{result | matched_count: matched_count + matched,
221-
modified_count: modified_count + modified,
222-
upserted_count: upserted_count + upserts,
223-
upserted_ids: current_ids ++ ids}
224-
end
225-
defp merge(:delete, count, %BulkWriteResult{:deleted_count => value} = result) do
226-
%BulkWriteResult{result | deleted_count: value + count}
227-
end
228-
defp merge(_other, _count, result), do: result
229-
230207
##
231208
# returns the current write concerns from `opts`
232209
#
@@ -252,20 +229,12 @@ defmodule Mongo.BulkWrite do
252229

253230
with {:ok, limits} <- Mongo.limits(conn),
254231
max_batch_size <- limits.max_write_batch_size,
255-
{_, {inserts, ids}} <- one_bulk_write_operation(conn, :insert, coll, inserts, write_concern, max_batch_size, opts),
256-
{_, {matched, modified, upserts, upsert_ids}} <- one_bulk_write_operation(conn, :update, coll, updates, write_concern, max_batch_size, opts),
257-
{_, deletes} <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, max_batch_size, opts) do
258-
259-
%BulkWriteResult{
260-
acknowledged: acknowledged(write_concern),
261-
inserted_count: inserts,
262-
inserted_ids: ids,
263-
matched_count: matched,
264-
deleted_count: deletes,
265-
modified_count: modified,
266-
upserted_count: upserts,
267-
upserted_ids: upsert_ids
268-
}
232+
insert_result <- one_bulk_write_operation(conn, :insert, coll, inserts, write_concern, max_batch_size, opts),
233+
update_result <- one_bulk_write_operation(conn, :update, coll, updates, write_concern, max_batch_size, opts),
234+
delete_result <- one_bulk_write_operation(conn, :delete, coll, deletes, write_concern, max_batch_size, opts) do
235+
236+
[insert_result, update_result, delete_result]
237+
|> BulkWriteResult.reduce(%BulkWriteResult{acknowledged: acknowledged(write_concern)})
269238
end
270239
end
271240

@@ -274,7 +243,7 @@ defmodule Mongo.BulkWrite do
274243
#
275244
defp one_bulk_write_operation(conn, cmd, coll, docs, write_concern, max_batch_size, opts) do
276245
with result <- conn |> run_commands(get_cmds(cmd, coll, docs, write_concern, max_batch_size, opts), opts) |> collect(cmd) do
277-
{cmd, result}
246+
result
278247
end
279248
end
280249

@@ -330,35 +299,34 @@ defmodule Mongo.BulkWrite do
330299
#
331300
defp collect({docs, ids}, :insert) do
332301

333-
{docs
334-
|> Enum.map(fn
335-
{:ok, %{"n" => n}} -> n
336-
{:ok, _other} -> 0
337-
end)
338-
|> Enum.reduce(0, fn x, acc -> x + acc end), ids}
339-
302+
docs
303+
|> Enum.map(fn
304+
{:ok, %{"n" => n} = doc} -> BulkWriteResult.insert_result(n, ids, doc["writeErrors"] || [])
305+
{:ok, _other} -> BulkWriteResult.empty()
306+
end)
307+
|> BulkWriteResult.reduce()
340308
end
309+
341310
defp collect(docs, :update) do
342311

343312
docs
344313
|> Enum.map(fn
345-
{:ok, %{"n" => n, "nModified" => modified, "upserted" => ids}} -> l = length(ids); {n - l, modified, l, filter_upsert_ids(ids)}
346-
{:ok, %{"n" => matched, "nModified" => modified}} -> {matched, modified, 0, []}
347-
{:ok, _other} -> {0, 0, 0, []}
348-
end)
349-
|> Enum.reduce({0, 0, 0, []}, fn
350-
{mat, mod, ups, ids}, {s_mat, s_mod, s_ups, all} -> {s_mat + mat, s_mod + mod, s_ups + ups, all ++ ids}
314+
{:ok, %{"n" => n, "nModified" => modified, "upserted" => ids} = doc} -> l = length(ids)
315+
BulkWriteResult.update_result(n - l, modified, l, filter_upsert_ids(ids), doc["writeErrors"] || [])
316+
{:ok, %{"n" => matched, "nModified" => modified} = doc} -> BulkWriteResult.update_result(matched, modified, 0, [], doc["writeErrors"] || [])
317+
{:ok, _other} -> BulkWriteResult.empty()
351318
end)
319+
|> BulkWriteResult.reduce()
352320

353321
end
354322
defp collect(docs, :delete) do
355323

356324
docs
357325
|> Enum.map(fn
358-
{:ok, %{"n" => n}} -> n
359-
{:ok, _other} -> 0
326+
{:ok, %{"n" => n} = doc } -> BulkWriteResult.delete_result(n, doc["writeErrors"] || [])
327+
{:ok, _other} -> BulkWriteResult.empty()
360328
end)
361-
|> Enum.reduce(0, fn x, acc -> x + acc end)
329+
|> BulkWriteResult.reduce()
362330

363331
end
364332

lib/mongo/results.ex

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,8 @@ defmodule Mongo.BulkWriteResult do
7676
* `:upserted_count` - Number of upserted documents
7777
* `:upserted_ids` - If the operation was an upsert, the upserted ids
7878
* `:inserted_ids` - If the operation was an insert, the inserted ids
79+
* `:errors` - If the operation results in an error, the error is collected
80+
7981
"""
8082

8183
@type t :: %__MODULE__{
@@ -86,9 +88,49 @@ defmodule Mongo.BulkWriteResult do
8688
deleted_count: non_neg_integer,
8789
upserted_count: non_neg_integer,
8890
upserted_ids: list(BSON.ObjectId.t),
89-
inserted_ids: list(BSON.ObjectId.t)
91+
inserted_ids: list(BSON.ObjectId.t),
92+
errors: list(Map.t)
9093
}
9194

92-
defstruct [acknowledged: true, matched_count: 0, modified_count: 0, inserted_count: 0, deleted_count: 0, upserted_count: 0, inserted_ids: [], upserted_ids: []]
95+
alias Mongo.BulkWriteResult
96+
97+
defstruct [acknowledged: true, matched_count: 0, modified_count: 0, inserted_count: 0, deleted_count: 0, upserted_count: 0, inserted_ids: [], upserted_ids: [], errors: []]
98+
99+
def insert_result(count, ids, errors) do
100+
%BulkWriteResult{inserted_count: count, inserted_ids: ids, errors: errors}
101+
end
102+
103+
def update_result(matched_count, modified_count, upserted_count, ids, errors) do
104+
%BulkWriteResult{matched_count: matched_count, modified_count: modified_count, upserted_count: upserted_count, upserted_ids: ids, errors: errors}
105+
end
106+
107+
def delete_result(count, errors) do
108+
%BulkWriteResult{deleted_count: count, errors: errors}
109+
end
110+
111+
def empty() do
112+
%BulkWriteResult{}
113+
end
114+
115+
def add(%BulkWriteResult{} = src, %BulkWriteResult{} = dest) do
116+
%BulkWriteResult{acknowledged: src.acknowledged,
117+
matched_count: src.matched_count + dest.matched_count,
118+
modified_count: src.modified_count + dest.modified_count,
119+
inserted_count: src.inserted_count + dest.inserted_count,
120+
deleted_count: src.deleted_count + dest.deleted_count,
121+
upserted_count: src.upserted_count + dest.upserted_count,
122+
inserted_ids: src.inserted_ids ++ dest.inserted_ids,
123+
upserted_ids: src.upserted_ids ++ dest.upserted_ids,
124+
errors: src.errors ++ dest.errors
125+
}
126+
end
127+
128+
def reduce(results, acc) do
129+
Enum.reduce(results, acc, fn x, acc -> BulkWriteResult.add(acc, x) end)
130+
end
131+
132+
def reduce(results) do
133+
reduce(results, %BulkWriteResult{})
134+
end
93135

94136
end

test/mongo/bulk_writes_test.exs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,5 +143,33 @@ defmodule Mongo.BulkWritesTest do
143143

144144
end
145145

146+
test "create one small document and one too large document", top do
147+
coll = unique_name()
148+
max_n = (16*1024*1024)
149+
150+
a_line_1k = Enum.reduce(1..1_024, "", fn _, acc -> acc <> "A" end)
151+
a_line_1m = Enum.reduce(1..1_024, "", fn _, acc -> acc <> a_line_1k end)
152+
a_line_16m = String.slice(Enum.reduce(1..16, "", fn _, acc -> acc <> a_line_1m end), 0..max_n)
153+
154+
b_line_1k = Enum.reduce(1..1_024, "", fn _, acc -> acc <> "B" end)
155+
b_line_1m = Enum.reduce(1..1_024, "", fn _, acc -> acc <> b_line_1k end)
156+
b_line_16m = String.slice(Enum.reduce(1..15, "", fn _, acc -> acc <> b_line_1m end), 0..max_n)
157+
158+
bulk = coll
159+
|> OrderedBulk.new()
160+
|> OrderedBulk.insert_one(%{v: a_line_1k, key: "small"})
161+
|> OrderedBulk.insert_one(%{v: a_line_16m, key: "big"})
162+
|> OrderedBulk.update_one(%{key: "small"}, %{"$set": %{v: b_line_1k}})
163+
|> OrderedBulk.update_one(%{key: "big"}, %{"$set": %{v: b_line_16m}})
164+
|> OrderedBulk.delete_one(%{key: "small"})
165+
|> OrderedBulk.delete_one(%{key: "big"})
166+
167+
%BulkWriteResult{errors: [%{"code" => code}]} = result = BulkWrite.write(top.pid, bulk, w: 1)
168+
169+
assert code == 2
170+
assert %{:matched_count => 1, :deleted_count => 1, :modified_count => 1} == Map.take(result, [:matched_count, :deleted_count, :modified_count])
171+
assert {:ok, 0} == Mongo.count(top.pid, coll, %{})
172+
173+
end
146174

147175
end

0 commit comments

Comments
 (0)