Skip to content

Commit 469d55f

Browse files
committed
Basic support for OP_MSG, fixes #2
1 parent f4f3adc commit 469d55f

File tree

8 files changed

+192
-109
lines changed

8 files changed

+192
-109
lines changed

lib/mongo.ex

Lines changed: 39 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -281,9 +281,13 @@ defmodule Mongo do
281281
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
282282

283283
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
284-
{:ok, doc} <- direct_command(conn, query, opts), do: {:ok, doc["value"]}
284+
{:ok, doc} <- direct_command(conn, query, opts) do
285+
{:ok, doc["value"]}
286+
end
287+
285288
end
286289

290+
287291
@doc """
288292
Finds a document and replaces it.
289293
@@ -487,30 +491,29 @@ defmodule Mongo do
487491
other -> other
488492
end
489493

490-
cmd = [
491-
{"find", coll},
492-
{"filter", filter},
493-
{"limit", opts[:limit]},
494-
{"hint", opts[:hint]},
495-
{"singleBatch", opts[:single_batch]},
496-
{"readConcern", opts[:read_concern]},
497-
{"max", opts[:max]},
498-
{"min", opts[:min]},
499-
{"collation", opts[:collation]},
500-
{"returnKey", opts[:return_key]},
501-
{"showRecordId", opts[:show_record_id]},
502-
{"tailable", opts[:tailable]},
503-
{"oplogReplay", opts[:oplog_replay]},
504-
{"tailable", opts[:tailable]},
505-
{"noCursorTimeout", opts[:no_cursor_timeout]},
506-
{"awaitData", opts[:await_data]},
507-
{"batchSize", opts[:batch_size]},
508-
{"projection", opts[:projection]},
509-
{"comment", opts[:comment]},
510-
{"maxTimeMS", opts[:max_time]},
511-
{"skip", opts[:skip]},
512-
{"sort", opts[:sort]}
513-
]
494+
cmd = [find: coll,
495+
filter: filter,
496+
limit: opts[:limit],
497+
hint: opts[:hint],
498+
singleBatch: opts[:single_batch],
499+
readConcern: opts[:read_concern],
500+
max: opts[:max],
501+
min: opts[:min],
502+
collation: opts[:collation],
503+
returnKey: opts[:return_key],
504+
showRecordId: opts[:show_record_id],
505+
tailable: opts[:tailable],
506+
oplogReplay: opts[:oplog_replay],
507+
tailable: opts[:tailable],
508+
noCursorTimeout: opts[:no_cursor_timeout],
509+
awaitData: opts[:await_data],
510+
batchSize: opts[:batch_size],
511+
projection: opts[:projection],
512+
comment: opts[:comment],
513+
maxTimeMS: opts[:max_time],
514+
skip: opts[:skip],
515+
sort: opts[:sort]
516+
]
514517

515518
cmd = filter_nils(cmd)
516519

@@ -564,18 +567,18 @@ defmodule Mongo do
564567
def direct_command(conn, cmd, opts \\ []) do
565568
action = %Query{action: :command}
566569

567-
with {:ok, _query, response} <- DBConnection.execute(conn, action, [cmd], defaults(opts)) do
568-
case response do
569-
op_reply(flags: flags, docs: [%{"$err" => reason, "code" => code}]) when (@reply_query_failure &&& flags) != 0 -> {:error, Mongo.Error.exception(message: reason, code: code)}
570-
op_reply(flags: flags) when (@reply_cursor_not_found &&& flags) != 0 -> {:error, Mongo.Error.exception(message: "cursor not found")}
571-
op_reply(docs: [%{"ok" => 0.0, "errmsg" => reason} = error]) -> {:error, %Mongo.Error{message: "command failed: #{reason}", code: error["code"]}}
572-
op_reply(docs: [%{"ok" => ok} = doc]) when ok == 1 -> {:ok, doc}
573-
# TODO: Check if needed
574-
op_reply(docs: []) -> {:ok, nil}
575-
end
570+
with {:ok, _query, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
571+
{:ok, doc} <- check_for_error(doc) do
572+
{:ok, doc}
576573
end
577574
end
578575

576+
defp check_for_error(%{"ok" => ok} = response) when ok == 1 do
577+
{:ok, response}
578+
end
579+
defp check_for_error(%{"code" => code, "codeName" => _name, "errmsg" => msg}) do
580+
{:error, Mongo.Error.exception(message: msg, code: code)}
581+
end
579582

580583
@doc """
581584
Returns the current wire version.
@@ -600,7 +603,7 @@ defmodule Mongo do
600603
"""
601604
@spec ping(GenServer.server) :: result(BSON.document)
602605
def ping(topology_pid) do
603-
command(topology_pid, %{ping: 1}, [batch_size: 1])
606+
command(topology_pid, [ping: 1], [batch_size: 1])
604607
end
605608

606609
@doc """
@@ -977,7 +980,7 @@ defmodule Mongo do
977980
defp select_servers(topology_pid, type, opts, start_time) do
978981
topology = Topology.topology(topology_pid)
979982
with {:ok, servers, slave_ok, mongos?} <- TopologyDescription.select_servers(topology, type, opts) do
980-
case Enum.empty? servers do
983+
case Enum.empty?(servers) do
981984
true ->
982985
case Topology.wait_for_connection(topology_pid, @sel_timeout, start_time) do
983986
{:ok, _servers} -> select_servers(topology_pid, type, opts, start_time)
@@ -1054,24 +1057,6 @@ defmodule Mongo do
10541057
Keyword.put_new(opts, :timeout, @timeout)
10551058
end
10561059

1057-
defp get_last_error(:ok) do
1058-
:ok
1059-
end
1060-
defp get_last_error(op_reply(docs: [%{"ok" => ok, "err" => nil} = doc])) when ok == 1 do
1061-
{:ok, doc}
1062-
end
1063-
defp get_last_error(op_reply(docs: [%{"ok" => ok, "err" => message, "code" => code}])) when ok == 1 do
1064-
# If a batch insert (OP_INSERT) fails some documents may still have been
1065-
# inserted, but mongo always returns {n: 0}
1066-
# When we support the 2.6 bulk write API we will get number of inserted
1067-
# documents and should change the return value to be something like:
1068-
# {:error, %WriteResult{}, %Error{}}
1069-
{:error, Mongo.Error.exception(message: message, code: code)}
1070-
end
1071-
defp get_last_error(op_reply(docs: [%{"ok" => 0.0, "errmsg" => message, "code" => code}])) do
1072-
{:error, Mongo.Error.exception(message: message, code: code)}
1073-
end
1074-
10751060
defp assign_ids(list) when is_list(list) do
10761061
Enum.map(list, &assign_id/1)
10771062
|> Enum.unzip
@@ -1101,7 +1086,4 @@ defmodule Mongo do
11011086
defp add_id([{key, _}|_] = list, id) when is_binary(key), do: [{"_id", id}|list]
11021087
defp add_id([], id), do: [{"_id", id}]
11031088

1104-
defp maybe_failure(op_reply(flags: flags, docs: [%{"$err" => reason, "code" => code}])) when (@reply_query_failure &&& flags) != 0, do: {:error, Mongo.Error.exception(message: reason, code: code)}
1105-
defp maybe_failure(op_reply(flags: flags)) when (@reply_cursor_not_found &&& flags) != 0, do: {:error, Mongo.Error.exception(message: "cursor not found")}
1106-
defp maybe_failure(_op_reply), do: :ok
11071089
end

lib/mongo/cursor.ex

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,10 @@ defmodule Mongo.Cursor do
132132
def get_more(conn, coll, cursor, nil, opts) do
133133

134134
cmd = [
135-
{"getMore", cursor},
136-
{"collection", coll},
137-
{"batchSize", opts[:batch_size]},
138-
{"maxTimeMS", opts[:max_time]}
135+
getMore: cursor,
136+
collection: coll,
137+
batchSize: opts[:batch_size],
138+
maxTimeMS: opts[:max_time]
139139
] |> filter_nils()
140140

141141
with {:ok, %{"cursor" => %{ "id" => cursor_id, "nextBatch" => docs}, "ok" => ok}} when ok == 1 <- Mongo.direct_command(conn, cmd, opts) do
@@ -149,10 +149,10 @@ defmodule Mongo.Cursor do
149149
on_resume_token: fun, topology_pid: topology_pid) = change_stream, opts) do
150150

151151
get_more = [
152-
{"getMore", cursor_id},
153-
{"collection", coll},
154-
{"batchSize", opts[:batch_size]},
155-
{"maxTimeMS", opts[:max_time]}
152+
getMore: cursor_id,
153+
collection: coll,
154+
batchSize: opts[:batch_size],
155+
maxTimeMS: opts[:max_time]
156156
] |> filter_nils()
157157

158158
with {:ok, %{"operationTime" => op_time,
@@ -266,8 +266,8 @@ defmodule Mongo.Cursor do
266266
def kill_cursors(conn, coll, cursor_ids, opts) do
267267

268268
cmd = [
269-
{"killCursors", coll},
270-
{"cursors", cursor_ids}
269+
killCursors: coll,
270+
cursors: cursor_ids
271271
]
272272

273273
cmd = filter_nils(cmd)

lib/mongo/grid_fs/bucket.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -94,8 +94,8 @@ defmodule Mongo.GridFs.Bucket do
9494
"""
9595
@spec drop(Bucket.t) :: Mongo.result(BSON.document)
9696
def drop(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
97-
{:ok, _} = Mongo.command(topology_pid, %{drop: files_collection_name(bucket)}, opts)
98-
{:ok, _} = Mongo.command(topology_pid, %{drop: chunks_collection_name(bucket)}, opts)
97+
{:ok, _} = Mongo.command(topology_pid, [drop: files_collection_name(bucket)], opts)
98+
{:ok, _} = Mongo.command(topology_pid, [drop: chunks_collection_name(bucket)], opts)
9999
end
100100

101101
@doc """

lib/mongo/messages.ex

Lines changed: 59 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,6 @@ defmodule Mongo.Messages do
88
defmacro __using__(_opts) do
99
quote do
1010
import unquote(__MODULE__)
11-
@reply_cursor_not_found 0x1
12-
@reply_query_failure 0x2
13-
@reply_shard_config_stale 0x4
14-
@reply_await_capable 0x8
1511
end
1612
end
1713

@@ -37,7 +33,10 @@ defmodule Mongo.Messages do
3733
defrecordp :msg_header, [:length, :request_id, :response_to, :op_code]
3834
defrecord :op_query, [:flags, :coll, :num_skip, :num_return, :query, :select]
3935
defrecord :op_reply, [:flags, :cursor_id, :from, :num, :docs]
40-
defrecord :op_msg, [:flags, :type, :docs]
36+
defrecord :sequence, [:size, :identifier, :docs]
37+
defrecord :payload, [:doc, :sequence]
38+
defrecord :section, [:payload_type, :payload]
39+
defrecord :op_msg, [:flags, :sections]
4140

4241
@doc """
4342
Decodes the header from response of a request sent by the mongodb server
@@ -66,8 +65,9 @@ defmodule Mongo.Messages do
6665
def decode_response(msg_header(length: length, response_to: response_to, op_code: op_code), binary) when byte_size(binary) >= length do
6766
<<response::binary(length), rest::binary>> = binary
6867
case op_code do
69-
@op_reply -> {:ok, response_to, decode_reply(response), rest}
70-
_ -> :error
68+
@op_reply -> {:ok, response_to, decode_reply(response), rest}
69+
@op_msg_code -> {:ok, response_to, decode_msg(response), rest}
70+
_ -> :error
7171
end
7272
end
7373
def decode_response(_header, _binary), do: :error
@@ -78,19 +78,69 @@ defmodule Mongo.Messages do
7878
def decode_reply(<<flags::int32, cursor_id::int64, from::int32, num::int32, rest::binary>>) do
7979
op_reply(flags: flags, cursor_id: cursor_id, from: from, num: num, docs: BSON.Decoder.documents(rest))
8080
end
81+
def decode_msg(<<flags::int32, rest::binary>>) do
82+
op_msg(flags: flags, sections: decode_sections(rest))
83+
end
84+
85+
def decode_sections(binary), do: decode_sections(binary, [])
86+
def decode_sections("", acc), do: Enum.reverse(acc)
87+
88+
def decode_sections(<<0x00::int8, payload::binary>>, acc) do
89+
<<size::int32, _rest::binary>> = payload
90+
<<doc::binary(size), rest::binary>> = payload
91+
with {doc, ""} <- BSON.Decoder.document(doc) do
92+
decode_sections(rest, [section(payload_type: 0, payload: payload(doc: doc)) | acc])
93+
end
94+
end
95+
def decode_sections(<<0x01::int8, payload::binary>>, acc) do
96+
<<size::int32, _rest::binary>> = payload
97+
<<sequence::binary(size), rest::binary>> = payload
98+
decode_sections(rest, [section(payload_type: 1, payload: payload(sequence: decode_sequence(sequence))) | acc])
99+
end
100+
101+
def decode_sequence(<<size::int32, rest::binary>>) do
102+
with {identifier, docs} <- cstring(rest) do
103+
sequence(size: size, identifier: identifier, docs: BSON.Decoder.documents(docs))
104+
end
105+
end
106+
107+
defp cstring(binary) do
108+
[string, rest] = :binary.split(binary, <<0x00>>)
109+
{string, rest}
110+
end
81111

82112
def encode(request_id, op_query() = op) do
83113
iodata = encode_op(op)
84114
header = msg_header(length: IO.iodata_length(iodata) + @header_size, request_id: request_id, response_to: 0, op_code: @op_query)
85115
[encode_header(header)|iodata]
86116
end
87117

118+
def encode(request_id, op_msg() = op) do
119+
iodata = encode_op(op)
120+
header = msg_header(length: IO.iodata_length(iodata) + @header_size, request_id: request_id, response_to: 0, op_code: @op_msg_code)
121+
[encode_header(header)|iodata]
122+
end
123+
88124
defp encode_header(msg_header(length: length, request_id: request_id, response_to: response_to, op_code: op_code)) do
89125
<<length::int32, request_id::int32, response_to::int32, op_code::int32>>
90126
end
91127

92-
defp encode_op(op_msg(flags: flags, docs: [doc])) do
93-
[<<0::int32>>, <<0x00>>, doc]
128+
defp encode_op(op_msg(flags: flags, sections: sections)) do
129+
# todo: flags encoding
130+
[<<0::int32>>, encode_sections(sections)]
131+
end
132+
defp encode_sections(sections) do
133+
Enum.map(sections, fn section -> encode_section(section) end)
134+
end
135+
136+
defp encode_section(section(payload_type: t, payload: payload)) do
137+
[<<t::int8>>, encode_payload(payload)]
138+
end
139+
defp encode_payload(payload(doc: doc, sequence: nil)) do
140+
BSON.Encoder.document(doc)
141+
end
142+
defp encode_payload(payload(doc: nil, sequence: sequence(size: size, identifier: identifier, docs: docs))) do
143+
[<<size::int32>>, identifier, <<0x00>>, BSON.Encoder.encode(docs)]
94144
end
95145

96146
defp encode_op(op_query(flags: flags, coll: coll, num_skip: num_skip,

lib/mongo/mongo_db_connection.ex

Lines changed: 27 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
defmodule Mongo.MongoDBConnection do
22
@moduledoc """
3-
Implementierung für das DBConnection-Protokoll.
3+
Implementation of the DBConnection behaviour module.
44
"""
55

66
use DBConnection
@@ -13,6 +13,7 @@ defmodule Mongo.MongoDBConnection do
1313

1414
@impl true
1515
def connect(opts) do
16+
1617
{write_concern, opts} = Keyword.split(opts, @write_concern)
1718
write_concern = Keyword.put_new(write_concern, :w, 1)
1819

@@ -54,9 +55,7 @@ defmodule Mongo.MongoDBConnection do
5455
end
5556

5657
case result do
57-
{:ok, state} ->
58-
## IO.puts inspect state
59-
{:ok, state}
58+
{:ok, state} -> {:ok, state}
6059

6160
{:disconnect, reason, state} ->
6261
reason = case reason do
@@ -128,6 +127,7 @@ defmodule Mongo.MongoDBConnection do
128127
end
129128
end
130129

130+
@impl true
131131
def checkout(state), do: {:ok, state}
132132
@impl true
133133
def checkin(state), do: {:ok, state}
@@ -171,22 +171,39 @@ defmodule Mongo.MongoDBConnection do
171171
defp execute_action(:wire_version, _, _, state) do
172172
{:ok, state.wire_version, state}
173173
end
174+
defp execute_action(:command, [cmd], opts, %{wire_version: version} = state) when version >= 6 do
175+
176+
cmd = cmd ++ ["$db": opts[:database] || state.database,
177+
"$readPreference": [mode: update_read_preferences(opts[:slave_ok])]]
174178

175-
defp execute_action(:command, [query], opts, state) do
176179
timeout = Keyword.get(opts, :max_time, 0)
177-
flags = Keyword.take(opts, @find_one_flags)
178-
op = op_query(coll: Utils.namespace("$cmd", state, opts[:database]), query: query, select: "", num_skip: 0, num_return: 1, flags: flags(flags))
180+
op = op_msg(flags: 0, sections: [section(payload_type: 0, payload: payload(doc: cmd))])
179181

180-
with {:ok, response} <- Utils.post_request(state.request_id, op, state, timeout),
181-
state = %{state | request_id: state.request_id + 1},
182-
do: {:ok, response, state}
182+
with {:ok, doc} <- Utils.post_request(op, state.request_id, state, timeout),
183+
state = %{state | request_id: state.request_id + 1} do
184+
{:ok, doc, state}
185+
end
183186
end
187+
defp execute_action(:command, [cmd], opts, state) do
184188

189+
timeout = Keyword.get(opts, :max_time, 0)
190+
flags = Keyword.take(opts, @find_one_flags)
191+
op = op_query(coll: Utils.namespace("$cmd", state, opts[:database]), query: cmd, select: "", num_skip: 0, num_return: 1, flags: flags(flags))
192+
193+
with {:ok, doc} <- Utils.post_request(op, state.request_id, state, timeout),
194+
state = %{state | request_id: state.request_id + 1} do
195+
{:ok, doc, state}
196+
end
197+
end
185198
defp execute_action(:error, _query, _opts, state) do
186199
exception = Mongo.Error.exception("Test-case")
187200
{:disconnect, exception, state}
188201
end
189202

203+
def update_read_preferences(true), do: "primaryPreferred"
204+
def update_read_preferences(false), do: "primary"
205+
def update_read_preferences(nil), do: "primary"
206+
190207
defp flags(flags) do
191208
Enum.reduce(flags, [], fn
192209
{flag, true}, acc -> [flag|acc]

lib/mongo/monitor.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ defmodule Mongo.Monitor do
104104
defp call_is_master(conn_pid, opts) do
105105
start_time = System.monotonic_time
106106
result = try do
107-
Mongo.direct_command(conn_pid, %{isMaster: 1}, opts)
107+
Mongo.direct_command(conn_pid, [isMaster: 1], opts)
108108
rescue
109109
e -> {:error, e}
110110
end

0 commit comments

Comments
 (0)