Skip to content

Commit 8730d01

Browse files
committed
first test on api
1 parent e006171 commit 8730d01

File tree

3 files changed

+83
-61
lines changed

3 files changed

+83
-61
lines changed

lib/mongo.ex

Lines changed: 34 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -296,7 +296,7 @@ defmodule Mongo do
296296

297297
end
298298

299-
def bulk_write(topology_pid, coll, %UnorderedBulk{inserts: inserts, updates: updates, deletes: deletes}, opts) do
299+
def bulk_write(topology_pid, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes}, opts \\ []) do
300300

301301
{_ids, inserts} = assign_ids(inserts)
302302

@@ -307,47 +307,57 @@ defmodule Mongo do
307307
} |> filter_nils()
308308

309309
insert_cmd = [
310-
insert: coll, ## todo
310+
insert: coll,
311311
documents: inserts,
312-
ordered: Keyword.get(opts, :ordered),
313-
writeConcern: write_concern,
314-
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
312+
writeConcern: write_concern
315313
] |> filter_nils()
316314

317-
deletes = Enum.map(deletes, fn filter -> %{
318-
q: filter,
319-
limit: 1,
320-
collation: Keyword.get(opts, :collation)
321-
} |> filter_nils() end)
315+
deletes = Enum.map(deletes,
316+
fn {filter, collaction, many} ->
317+
limit = case many do
318+
true -> 0
319+
false -> 1
320+
end
321+
%{
322+
q: filter,
323+
limit: limit,
324+
collation: collaction
325+
} |> filter_nils()
326+
end)
322327

323328
delete_cmd = [
324-
delete: coll, ## todo
329+
delete: coll,
325330
deletes: deletes,
326331
ordered: Keyword.get(opts, :ordered),
327332
writeConcern: write_concern
328333
] |> filter_nils()
329334

330-
updates = Enum.map(updates, fn {filter, update} -> %{
335+
updates = Enum.map(updates, fn {filter, update, update_opts} -> %{
331336
q: filter,
332337
u: update,
333-
upsert: Keyword.get(opts, :upsert),
334-
multi: false,
335-
collation: Keyword.get(opts, :collation),
336-
arrayFilters: Keyword.get(opts, :filters)
338+
upsert: Keyword.get(update_opts, :upsert),
339+
multi: Keyword.get(update_opts, :multi) || false,
340+
collation: Keyword.get(update_opts, :collation),
341+
arrayFilters: Keyword.get(update_opts, :filters)
337342
} |> filter_nils() end)
338343

339-
340344
update_cmd = [
341345
update: coll,
342346
updates: updates,
343-
ordered: Keyword.get(opts, :ordered),
347+
# ordered: Keyword.get(opts, :ordered),
344348
writeConcern: write_concern,
345-
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
349+
# bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
346350
] |> filter_nils()
347351

348-
cmds = [insert_cmd, delete_cmd, update_cmd]
349-
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
350-
{:ok, doc} <- direct_command(conn, cmds, opts), do: {:ok, doc["value"]}
352+
cmds = [insert_cmd, update_cmd, delete_cmd]
353+
354+
IO.puts inspect cmds
355+
356+
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts) do
357+
cmds
358+
|> Enum.map(fn cmd -> direct_command(conn, cmd, opts) end)
359+
|> Enum.map(fn {:ok, doc} -> {:ok, doc} end)
360+
end
351361
end
352362

353363
@doc """
@@ -626,15 +636,14 @@ defmodule Mongo do
626636

627637
@doc false
628638
@spec direct_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
629-
def direct_command(conn, cmds, opts \\ []) when is_list(cmds) do
639+
def direct_command(conn, cmd, opts) do
630640
action = %Query{action: :command}
631641

632-
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, cmds, defaults(opts)),
642+
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
633643
{:ok, doc} <- check_for_error(doc) do
634644
{:ok, doc}
635645
end
636646
end
637-
def direct_command(conn, cmd, opts), do: direct_command(conn, [cmd], opts)
638647

639648
defp check_for_error(%{"ok" => ok} = response) when ok == 1, do: {:ok, response}
640649
defp check_for_error(%{"code" => code, "errmsg" => msg}), do: {:error, Mongo.Error.exception(message: msg, code: code)}

lib/mongo/mongo_db_connection.ex

Lines changed: 18 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -95,17 +95,15 @@ defmodule Mongo.MongoDBConnection do
9595
end
9696
end
9797

98-
defp tcp_connect(opts, state) do
98+
defp tcp_connect(opts, s) do
99+
host = (opts[:hostname] || "localhost") |> to_charlist
100+
port = opts[:port] || 27017
101+
sock_opts = [:binary, active: false, packet: :raw, nodelay: true]
102+
++ (opts[:socket_options] || [])
99103

100-
{host, port} = Utils.hostname_port(opts)
101-
sock_opts = [:binary, active: false, packet: :raw, nodelay: true] ++ (opts[:socket_options] || [])
104+
s = Map.put(s, :host, "#{host}:#{port}")
102105

103-
s = case host do
104-
{:local, socket} -> Map.put(state, :host, socket)
105-
hostname -> Map.put(state, :host, "#{hostname}:#{port}")
106-
end
107-
108-
case :gen_tcp.connect(host, port, sock_opts, state.connect_timeout) do
106+
case :gen_tcp.connect(host, port, sock_opts, s.connect_timeout) do
109107
{:ok, socket} ->
110108
# A suitable :buffer is only set if :recbuf is included in
111109
# :socket_options.
@@ -115,7 +113,7 @@ defmodule Mongo.MongoDBConnection do
115113

116114
{:ok, %{s | connection: {:gen_tcp, socket}}}
117115

118-
{:error, reason} -> {:error, Mongo.Error.exception(tag: :tcp, action: "connect", reason: reason, host: state.host)}
116+
{:error, reason} -> {:error, Mongo.Error.exception(tag: :tcp, action: "connect", reason: reason, host: s.host)}
119117
end
120118
end
121119

@@ -244,31 +242,24 @@ defmodule Mongo.MongoDBConnection do
244242
defp execute_action(:wire_version, _, _, state) do
245243
{:ok, state.wire_version, state}
246244
end
247-
defp execute_action(:command, cmds, opts, %{wire_version: version} = state) when version >= 6 do
245+
defp execute_action(:command, [cmd], opts, %{wire_version: version} = state) when version >= 6 do
246+
247+
cmd = cmd ++ ["$db": opts[:database] || state.database,
248+
"$readPreference": [mode: update_read_preferences(opts[:slave_ok])]]
248249

249-
mode = update_read_preferences(opts[:slave_ok])
250250
timeout = Keyword.get(opts, :max_time, 0)
251251

252-
sections = cmds
253-
|> Enum.map(fn cmd -> cmd ++ ["$db": opts[:database] || state.database, "$readPreference": [mode: mode]] end)
254-
|> Enum.recude([], fn cmd, acc ->
255-
# MongoDB 3.6 only allows certain command arguments to be provided this way. These are:
256-
case pulling_out?(cmd, :documents) || pulling_out?(cmd, :updates) || pulling_out?(cmd, :deletes) do
257-
nil -> [section(payload_type: 0, payload: payload(doc: cmd)) | acc]
258-
key ->
259-
{payload_0, payload_1} = pulling_out(cmd, key)
260-
[payload_0 | [payload_1 | acc]]
261-
end
262-
end)
263-
op = op_msg(flags: 0, sections: sections)
252+
# MongoDB 3.6 only allows certain command arguments to be provided this way. These are:
253+
op = case pulling_out?(cmd, :documents) || pulling_out?(cmd, :updates) || pulling_out?(cmd, :deletes) do
254+
nil -> op_msg(flags: 0, sections: [section(payload_type: 0, payload: payload(doc: cmd))])
255+
key -> pulling_out(cmd, key)
256+
end
264257

265258
with {:ok, doc} <- Utils.post_request(op, state.request_id, state, timeout),
266259
state = %{state | request_id: state.request_id + 1} do
267260
{:ok, doc, state}
268-
269261
end
270262
end
271-
272263
defp execute_action(:command, [cmd], opts, state) do
273264

274265
timeout = Keyword.get(opts, :max_time, 0)
@@ -300,7 +291,7 @@ defmodule Mongo.MongoDBConnection do
300291
payload_0 = section(payload_type: 0, payload: payload(doc: cmd))
301292
payload_1 = section(payload_type: 1, payload: payload(sequence: sequence(identifier: to_string(key), docs: docs)))
302293

303-
{payload_0, payload_1}
294+
op_msg(flags: 0, sections: [payload_0, payload_1])
304295
end
305296

306297
def update_read_preferences(true), do: "primaryPreferred"

lib/mongo/unordered_bulk.ex

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -5,26 +5,48 @@ defmodule Mongo.UnorderedBulk do
55
66
Ist immer für eine Collections
77
8+
The maxWriteBatchSize limit of a database, which indicates the maximum number of write operations permitted in a write batch, raises from 1,000 to 100,000.
9+
810
"""
911

1012
alias Mongo.UnorderedBulk
1113

12-
defstruct inserts: [], updates: [], deletes: [], opts: []
14+
defstruct coll: nil, inserts: [], updates: [], deletes: [], opts: []
15+
16+
def new(coll) do
17+
%UnorderedBulk{coll: coll}
18+
end
1319

14-
def new() do
15-
%UnorderedBulk{}
20+
def insert_one(%UnorderedBulk{inserts: rest} = bulk, doc) do
21+
%UnorderedBulk{bulk | inserts: [doc | rest] }
1622
end
1723

18-
def insert_one(%UnorderedBulk{inserts: rest} = b, doc) do
19-
%UnorderedBulk{b | inserts: [doc | rest] }
24+
def delete_one(%UnorderedBulk{deletes: rest} = bulk, doc, collaction \\ nil) do
25+
%UnorderedBulk{bulk | deletes: [{doc, collaction, false} | rest] }
2026
end
2127

22-
def delete_one(%UnorderedBulk{deletes: rest} = b, doc) do
23-
%UnorderedBulk{b | deletes: [doc | rest] }
28+
def delete_many(%UnorderedBulk{deletes: rest} = bulk, doc, collaction \\ nil) do
29+
%UnorderedBulk{bulk | deletes: [{doc, collaction, true} | rest] }
2430
end
2531

26-
def update_one(%UnorderedBulk{updates: rest} = b, filter, update) do
27-
%UnorderedBulk{b | updates: [{filter, update} | rest] }
32+
def update_one(%UnorderedBulk{updates: rest} = bulk, filter, update, opts \\ []) do
33+
%UnorderedBulk{bulk | updates: [{filter, update, opts} | rest] }
2834
end
2935

36+
def test() do
37+
38+
"bulk"
39+
|> new()
40+
|> insert_one(%{name: "Greta"})
41+
|> insert_one(%{name: "Tom"})
42+
|> insert_one(%{name: "Waldo"})
43+
|> update_one(%{name: "Greta"}, %{"$set": %{kind: "dog"}})
44+
|> update_one(%{name: "Tom"}, %{"$set": %{kind: "dog"}})
45+
|> update_one(%{name: "Waldo"}, %{"$set": %{kind: "dog"}})
46+
|> delete_one(%{kind: "dog"})
47+
|> delete_one(%{kind: "dog"})
48+
|> delete_one(%{kind: "dog"})
49+
50+
51+
end
3052
end

0 commit comments

Comments
 (0)