Skip to content

Commit a3fbb05

Browse files
committed
brainstorming for bulk api
1 parent 0cb6815 commit a3fbb05

File tree

4 files changed

+104
-14
lines changed

4 files changed

+104
-14
lines changed

lib/mongo.ex

Lines changed: 57 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ defmodule Mongo do
5353
alias Mongo.TopologyDescription
5454
alias Mongo.Topology
5555
alias Mongo.UrlParser
56+
alias Mongo.UnorderedBulk
5657

5758
@timeout 15000 # 5000
5859

@@ -288,6 +289,59 @@ defmodule Mongo do
288289

289290
end
290291

292+
def bulk_write(topology_pid, coll, %UnorderedBulk{inserts: inserts, updates: updates, deletes: deletes}, opts) do
293+
294+
{_ids, inserts} = assign_ids(inserts)
295+
296+
write_concern = %{
297+
w: Keyword.get(opts, :w),
298+
j: Keyword.get(opts, :j),
299+
wtimeout: Keyword.get(opts, :wtimeout)
300+
} |> filter_nils()
301+
302+
insert_cmd = [
303+
insert: coll, ## todo
304+
documents: inserts,
305+
ordered: Keyword.get(opts, :ordered),
306+
writeConcern: write_concern,
307+
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
308+
] |> filter_nils()
309+
310+
deletes = Enum.map(deletes, fn filter -> %{
311+
q: filter,
312+
limit: 1,
313+
collation: Keyword.get(opts, :collation)
314+
} |> filter_nils() end)
315+
316+
delete_cmd = [
317+
delete: coll, ## todo
318+
deletes: deletes,
319+
ordered: Keyword.get(opts, :ordered),
320+
writeConcern: write_concern
321+
] |> filter_nils()
322+
323+
updates = Enum.map(updates, fn {filter, update} -> %{
324+
q: filter,
325+
u: update,
326+
upsert: Keyword.get(opts, :upsert),
327+
multi: false,
328+
collation: Keyword.get(opts, :collation),
329+
arrayFilters: Keyword.get(opts, :filters)
330+
} |> filter_nils() end)
331+
332+
333+
update_cmd = [
334+
update: coll,
335+
updates: updates,
336+
ordered: Keyword.get(opts, :ordered),
337+
writeConcern: write_concern,
338+
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
339+
] |> filter_nils()
340+
341+
cmds = [insert_cmd, delete_cmd, update_cmd]
342+
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
343+
{:ok, doc} <- direct_command(conn, cmds, opts), do: {:ok, doc["value"]}
344+
end
291345

292346
@doc """
293347
Finds a document and replaces it.
@@ -565,14 +619,15 @@ defmodule Mongo do
565619

566620
@doc false
567621
@spec direct_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
568-
def direct_command(conn, cmd, opts \\ []) do
622+
def direct_command(conn, cmds, opts \\ []) when is_list(cmds) do
569623
action = %Query{action: :command}
570624

571-
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
625+
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, cmds, defaults(opts)),
572626
{:ok, doc} <- check_for_error(doc) do
573627
{:ok, doc}
574628
end
575629
end
630+
def direct_command(conn, cmd, opts), do: direct_command(conn, [cmd], opts)
576631

577632
defp check_for_error(%{"ok" => ok} = response) when ok == 1, do: {:ok, response}
578633
defp check_for_error(%{"code" => code, "errmsg" => msg}), do: {:error, Mongo.Error.exception(message: msg, code: code)}

lib/mongo/messages.ex

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,8 +154,6 @@ defmodule Mongo.Messages do
154154
[<<size::int32>> | iodata]
155155
end
156156

157-
158-
159157
defp blit_flags(op, flags) when is_list(flags) do
160158
import Bitwise
161159
Enum.reduce(flags, 0x0, &(flag_to_bit(op, &1) ||| &2))

lib/mongo/mongo_db_connection.ex

Lines changed: 17 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -242,24 +242,31 @@ defmodule Mongo.MongoDBConnection do
242242
defp execute_action(:wire_version, _, _, state) do
243243
{:ok, state.wire_version, state}
244244
end
245-
defp execute_action(:command, [cmd], opts, %{wire_version: version} = state) when version >= 6 do
246-
247-
cmd = cmd ++ ["$db": opts[:database] || state.database,
248-
"$readPreference": [mode: update_read_preferences(opts[:slave_ok])]]
245+
defp execute_action(:command, cmds, opts, %{wire_version: version} = state) when version >= 6 do
249246

247+
mode = update_read_preferences(opts[:slave_ok])
250248
timeout = Keyword.get(opts, :max_time, 0)
251249

252-
# MongoDB 3.6 only allows certain command arguments to be provided this way. These are:
253-
op = case pulling_out?(cmd, :documents) || pulling_out?(cmd, :updates) || pulling_out?(cmd, :deletes) do
254-
nil -> op_msg(flags: 0, sections: [section(payload_type: 0, payload: payload(doc: cmd))])
255-
key -> pulling_out(cmd, key)
256-
end
250+
sections = cmds
251+
|> Enum.map(fn cmd -> cmd ++ ["$db": opts[:database] || state.database, "$readPreference": [mode: mode]] end)
252+
|> Enum.recude([], fn cmd, acc ->
253+
# MongoDB 3.6 only allows certain command arguments to be provided this way. These are:
254+
case pulling_out?(cmd, :documents) || pulling_out?(cmd, :updates) || pulling_out?(cmd, :deletes) do
255+
nil -> [section(payload_type: 0, payload: payload(doc: cmd)) | acc]
256+
key ->
257+
{payload_0, payload_1} = pulling_out(cmd, key)
258+
[payload_0 | [payload_1 | acc]]
259+
end
260+
end)
261+
op = op_msg(flags: 0, sections: sections)
257262

258263
with {:ok, doc} <- Utils.post_request(op, state.request_id, state, timeout),
259264
state = %{state | request_id: state.request_id + 1} do
260265
{:ok, doc, state}
266+
261267
end
262268
end
269+
263270
defp execute_action(:command, [cmd], opts, state) do
264271

265272
timeout = Keyword.get(opts, :max_time, 0)
@@ -291,7 +298,7 @@ defmodule Mongo.MongoDBConnection do
291298
payload_0 = section(payload_type: 0, payload: payload(doc: cmd))
292299
payload_1 = section(payload_type: 1, payload: payload(sequence: sequence(identifier: to_string(key), docs: docs)))
293300

294-
op_msg(flags: 0, sections: [payload_0, payload_1])
301+
{payload_0, payload_1}
295302
end
296303

297304
def update_read_preferences(true), do: "primaryPreferred"

lib/mongo/unordered_bulk.ex

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
defmodule Mongo.UnorderedBulk do
2+
@moduledoc """
3+
4+
todo
5+
6+
Ist immer für eine Collections
7+
8+
"""
9+
10+
alias Mongo.UnorderedBulk
11+
12+
defstruct inserts: [], updates: [], deletes: [], opts: []
13+
14+
def new() do
15+
%UnorderedBulk{}
16+
end
17+
18+
def insert_one(%UnorderedBulk{inserts: rest} = b, doc) do
19+
%UnorderedBulk{b | inserts: [doc | rest] }
20+
end
21+
22+
def delete_one(%UnorderedBulk{deletes: rest} = b, doc) do
23+
%UnorderedBulk{b | deletes: [doc | rest] }
24+
end
25+
26+
def update_one(%UnorderedBulk{updates: rest} = b, filter, update) do
27+
%UnorderedBulk{b | updates: [{filter, update} | rest] }
28+
end
29+
30+
end

0 commit comments

Comments
 (0)