Skip to content

Commit 8d978bb

Browse files
author
Michael Maier
committed
first test with op_msg
1 parent ed42ff3 commit 8d978bb

File tree

6 files changed

+117
-20
lines changed

6 files changed

+117
-20
lines changed

lib/bson/decoder.ex

Lines changed: 7 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -169,18 +169,11 @@ defmodule BSON.Decoder do
169169
{string, rest}
170170
end
171171

172-
defp subtype(0x00),
173-
do: :generic
174-
defp subtype(0x01),
175-
do: :function
176-
defp subtype(0x02),
177-
do: :binary_old
178-
defp subtype(0x03),
179-
do: :uuid_old
180-
defp subtype(0x04),
181-
do: :uuid
182-
defp subtype(0x05),
183-
do: :md5
184-
defp subtype(int) when is_integer(int) and int in 0x80..0xFF,
185-
do: int
172+
defp subtype(0x00), do: :generic
173+
defp subtype(0x01), do: :function
174+
defp subtype(0x02), do: :binary_old
175+
defp subtype(0x03), do: :uuid_old
176+
defp subtype(0x04), do: :uuid
177+
defp subtype(0x05), do: :md5
178+
defp subtype(int) when is_integer(int) and int in 0x80..0xFF, do: int
186179
end

lib/mongo.ex

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -548,6 +548,26 @@ defmodule Mongo do
548548
end
549549
end
550550

551+
@doc false
552+
@spec direct_command_msg(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
553+
def direct_command_msg(conn, doc, opts \\ []) do
554+
docs = [doc]
555+
query = %Query{action: :msg}
556+
557+
with {:ok, response} <- DBConnection.execute(conn, query, docs, defaults(opts)) do
558+
case response do
559+
op_msg(docs: [%{"ok" => ok} = doc]) when ok == 1 -> {:ok, doc}
560+
op_reply(flags: flags, docs: [%{"$err" => reason, "code" => code}]) when (@reply_query_failure &&& flags) != 0 -> {:error, Mongo.Error.exception(message: reason, code: code)}
561+
op_reply(flags: flags) when (@reply_cursor_not_found &&& flags) != 0 -> {:error, Mongo.Error.exception(message: "cursor not found")}
562+
op_reply(docs: [%{"ok" => 0.0, "errmsg" => reason} = error]) -> {:error, %Mongo.Error{message: "command failed: #{reason}", code: error["code"]}}
563+
op_reply(docs: [%{"ok" => ok} = doc]) when ok == 1 -> {:ok, doc}
564+
# TODO: Check if needed
565+
op_reply(docs: []) -> {:ok, nil}
566+
end
567+
end
568+
end
569+
570+
551571
@doc """
552572
Similar to `command/3` but unwraps the result and raises on error.
553573
"""
@@ -579,14 +599,15 @@ defmodule Mongo do
579599

580600
query = [
581601
insert: coll,
602+
"$db": "test",
582603
documents: [doc],
583604
ordered: Keyword.get(opts, :ordered),
584605
writeConcern: write_concern,
585606
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
586607
] |> filter_nils()
587608

588609
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
589-
{:ok, doc} <- direct_command(conn, query, opts) do
610+
{:ok, doc} <- direct_command_msg(conn, query, opts) do
590611
case doc do
591612
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
592613
_ ->
@@ -644,7 +665,7 @@ defmodule Mongo do
644665
] |> filter_nils()
645666

646667
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
647-
{:ok, doc} <- direct_command(conn, query, opts) do
668+
{:ok, doc} <- direct_command_msg(conn, query, opts) do
648669
case doc do
649670
%{"writeErrors" => _} ->
650671
{:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}

lib/mongo/messages.ex

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ defmodule Mongo.Messages do
2020
@op_get_more 2005
2121
@op_delete 2006
2222
@op_kill_cursors 2007
23+
@op_msg_code 2013
2324

2425
@update_flags [
2526
upsert: 0x1,
@@ -54,6 +55,7 @@ defmodule Mongo.Messages do
5455
defrecord :op_delete, [:coll, :flags, :query]
5556
defrecord :op_kill_cursors, [:cursor_ids]
5657
defrecord :op_reply, [:flags, :cursor_id, :from, :num, :docs]
58+
defrecord :op_msg, [:flags, :type, :docs]
5759

5860
def encode(request_id, op) do
5961
iodata = encode_op(op)
@@ -64,6 +66,25 @@ defmodule Mongo.Messages do
6466
[encode_header(header)|iodata]
6567
end
6668

69+
def decode_response_msg(msg_header(length: length) = header, iolist) when is_list(iolist) do
70+
IO.puts "decode_response_msg: #{inspect header}"
71+
if IO.iodata_length(iolist) >= length,
72+
do: decode_response_msg(header, IO.iodata_to_binary(iolist)),
73+
else: :error
74+
end
75+
def decode_response_msg(msg_header(length: length, response_to: response_to) = header, binary) when byte_size(binary) >= length do
76+
IO.puts "decode_response_msg: #{inspect header}"
77+
<<response::binary(length), rest::binary>> = binary
78+
79+
IO.puts inspect decode_msg(response)
80+
IO.puts inspect rest
81+
{:ok, response_to, decode_msg(response), rest}
82+
end
83+
def decode_response_msg(_header, _binary) do
84+
:error
85+
end
86+
87+
6788
def decode_response(msg_header(length: length) = header, iolist) when is_list(iolist) do
6889
if IO.iodata_length(iolist) >= length,
6990
do: decode_response(header, IO.iodata_to_binary(iolist)),
@@ -82,8 +103,7 @@ defmodule Mongo.Messages do
82103
do: IO.iodata_to_binary(iolist) |> decode_header,
83104
else: :error
84105
end
85-
def decode_header(<<length::int32, request_id::int32, response_to::int32,
86-
op_code::int32, rest::binary>>) do
106+
def decode_header(<<length::int32, request_id::int32, response_to::int32, op_code::int32, rest::binary>>) do
87107
header = msg_header(length: length-@header_size, request_id: request_id, response_to: response_to, op_code: op_code)
88108
{:ok, header, rest}
89109
end
@@ -106,6 +126,10 @@ defmodule Mongo.Messages do
106126
docs]
107127
end
108128

129+
defp encode_op(op_msg(flags: flags, docs: [doc])) do
130+
[<<0::int32>>, <<0x00>>, doc]
131+
end
132+
109133
defp encode_op(op_query(flags: flags, coll: coll, num_skip: num_skip,
110134
num_return: num_return, query: query, select: select)) do
111135
[<<blit_flags(:query, flags)::int32>>,
@@ -140,11 +164,16 @@ defmodule Mongo.Messages do
140164
defp op_to_code(op_get_more()), do: @op_get_more
141165
defp op_to_code(op_delete()), do: @op_delete
142166
defp op_to_code(op_kill_cursors()), do: @op_kill_cursors
167+
defp op_to_code(op_msg()), do: @op_msg_code
143168

144169
defp decode_reply(<<flags::int32, cursor_id::int64, from::int32, num::int32, rest::binary>>) do
145170
op_reply(flags: flags, cursor_id: cursor_id, from: from, num: num, docs: rest)
146171
end
147172

173+
defp decode_msg(<<flags::int32, _type::int8, rest::binary>>) do
174+
op_msg(flags: flags, docs: rest)
175+
end
176+
148177
defp encode_header(msg_header(length: length, request_id: request_id,
149178
response_to: response_to, op_code: op_code)) do
150179
<<length::int32, request_id::int32, response_to::int32, op_code::int32>>

lib/mongo/mongo_db_connection.ex

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -143,12 +143,24 @@ defmodule Mongo.MongoDBConnection do
143143
{:ok, state.wire_version, state}
144144
end
145145

146+
defp handle_execute(:msg, nil, [doc], opts, state) do
147+
op_msg(docs: [doc])
148+
|> get_response_msg(state)
149+
end
150+
151+
defp get_response_msg(op, state) do
152+
with {:ok, response} <- Utils.post_msg(state.request_id, op, state),
153+
state = %{state | request_id: state.request_id + 1},
154+
do: {:ok, response, state}
155+
end
156+
146157
defp handle_execute(:command, nil, [query], opts, s) do
147158
flags = Keyword.take(opts, @find_one_flags)
148159
op_query(coll: Utils.namespace("$cmd", s, opts[:database]), query: query, select: "", num_skip: 0, num_return: 1, flags: flags(flags))
149160
|> get_response(s)
150161
end
151162

163+
152164
defp get_response(op, state) do
153165
with {:ok, response} <- Utils.post_request(state.request_id, op, state),
154166
state = %{state | request_id: state.request_id + 1},

lib/mongo/query.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,23 +5,27 @@ defmodule Mongo.Query do
55
end
66

77
defimpl DBConnection.Query, for: Mongo.Query do
8-
import Mongo.Messages, only: [op_reply: 1, op_reply: 2]
8+
import Mongo.Messages, only: [op_reply: 1, op_reply: 2, op_msg: 1, op_msg: 2]
99

1010
def parse(query, _opts), do: query
1111
def describe(query, _opts), do: query
1212

1313
def encode(query, params, _opts) do
14-
if query.encoded? do
14+
result = if query.encoded? do
1515
params
1616
else
1717
Enum.map(params, fn
1818
nil -> ""
1919
doc -> BSON.Encoder.document(doc)
2020
end)
2121
end
22+
23+
IO.puts "Encode message :#{inspect result}"
24+
result
2225
end
2326

2427
def decode(_query, :ok, _opts), do: :ok
2528
def decode(_query, wire_version, _opts) when is_integer(wire_version), do: wire_version
2629
def decode(_query, op_reply(docs: docs) = reply, _opts), do: op_reply(reply, docs: BSON.Decoder.documents(docs))
30+
def decode(_query, op_msg(docs: docs) = msg, _opts), do: op_msg(msg, docs: BSON.Decoder.documents(docs))
2731
end

lib/mongo_db_connection/utils.ex

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,20 @@ defmodule Mongo.MongoDBConnection.Utils do
1717
do: {:ok, response}
1818
end
1919

20+
@doc"""
21+
Sends a request id and waits for the response with the same id
22+
"""
23+
def post_msg(id, ops, state) when is_list(ops) do
24+
with :ok <- send(ops, state),
25+
{:ok, ^id, response} <- recv_msg(state),
26+
do: {:ok, response}
27+
end
28+
def post_msg(id, op, state) do
29+
with :ok <- send(id, op, state),
30+
{:ok, ^id, response} <- recv_msg(state),
31+
do: {:ok, response}
32+
end
33+
2034
def command(id, command, s) do
2135
ns =
2236
if Keyword.get(command, :mechanism) == "MONGODB-X509" && Keyword.get(command, :authenticate) == 1 do
@@ -82,6 +96,30 @@ defmodule Mongo.MongoDBConnection.Utils do
8296
recv(nil, "", s)
8397
end
8498

99+
def recv_msg(s) do
100+
recv_msg(nil, "", s)
101+
end
102+
defp recv_msg(nil, data, %{socket: {mod, sock}} = s) do
103+
case decode_header(data) do
104+
{:ok, header, rest} -> recv_msg(header, rest, s)
105+
:error ->
106+
case mod.recv(sock, 0, s.timeout) do
107+
{:ok, tail} -> recv_msg(nil, [data|tail], s)
108+
{:error, reason} -> recv_error(reason, s)
109+
end
110+
end
111+
end
112+
defp recv_msg(header, data, %{socket: {mod, sock}} = s) do
113+
case decode_response_msg(header, data) do
114+
{:ok, id, msg, ""} -> {:ok, id, msg}
115+
:error ->
116+
case mod.recv(sock, 0, s.timeout) do
117+
{:ok, tail} -> recv_msg(header, [data|tail], s)
118+
{:error, reason} -> recv_error(reason, s)
119+
end
120+
end
121+
end
122+
85123
# TODO: Optimize to reduce :gen_tcp.recv and decode_response calls
86124
# based on message size in header.
87125
# :gen.tcp.recv(socket, min(size, max_packet))

0 commit comments

Comments
 (0)