Skip to content

Commit bbb3fe6

Browse files
committed
preparations for #12
1 parent c7946de commit bbb3fe6

File tree

4 files changed

+50
-17
lines changed

4 files changed

+50
-17
lines changed

lib/mongo.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -624,7 +624,7 @@ defmodule Mongo do
624624
wtimeout: Keyword.get(opts, :wtimeout)
625625
} |> filter_nils()
626626

627-
query = [
627+
cmd = [
628628
insert: coll,
629629
documents: [doc],
630630
ordered: Keyword.get(opts, :ordered),
@@ -633,7 +633,7 @@ defmodule Mongo do
633633
] |> filter_nils()
634634

635635
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
636-
{:ok, doc} <- direct_command(conn, query, opts) do
636+
{:ok, doc} <- direct_command(conn, cmd, opts) do
637637
case doc do
638638
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
639639
_ ->
@@ -869,7 +869,7 @@ defmodule Mongo do
869869
arrayFilters: Keyword.get(opts, :filters)
870870
} |> filter_nils()
871871

872-
query = [
872+
cmd = [
873873
update: coll,
874874
updates: [update],
875875
ordered: Keyword.get(opts, :ordered),
@@ -878,7 +878,7 @@ defmodule Mongo do
878878
] |> filter_nils()
879879

880880
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
881-
{:ok, doc} <- direct_command(conn, query, opts) do
881+
{:ok, doc} <- direct_command(conn, cmd, opts) do
882882

883883
case doc do
884884

lib/mongo/grid_fs/upload_stream.ex

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,11 +137,17 @@ defmodule Mongo.GridFs.UploadStream do
137137
#
138138
defp insert_one_file_document(topology_pid, collection, file_id, length, chunk_size, filename, metadata, opts) do
139139
doc = %{_id: file_id, length: length, filename: filename, chunkSize: chunk_size, uploadDate: now(), metadata: metadata}
140+
|> filter_nils()
141+
140142
{:ok, _} = Mongo.insert_one(topology_pid, collection, doc, opts)
141143
end
142144

143145
defp now(), do: DateTime.from_unix!(:os.system_time(), :native)
144146

147+
defp filter_nils(map) when is_map(map) do
148+
Enum.reject(map, fn {_key, value} -> is_nil(value) end)
149+
|> Enum.into(%{})
150+
end
145151
end
146152

147153
end

lib/mongo/messages.ex

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -125,32 +125,36 @@ defmodule Mongo.Messages do
125125
<<length::int32, request_id::int32, response_to::int32, op_code::int32>>
126126
end
127127

128+
defp encode_op(op_query(flags: flags, coll: coll, num_skip: num_skip,
129+
num_return: num_return, query: query, select: select)) do
130+
[<<blit_flags(:query, flags)::int32>>,
131+
coll,
132+
<<0x00, num_skip::int32, num_return::int32>>,
133+
BSON.Encoder.document(query),
134+
select]
135+
end
128136
defp encode_op(op_msg(flags: flags, sections: sections)) do
129137
# todo: flags encoding
130-
[<<0::int32>>, encode_sections(sections)]
138+
[<<0::int32>> | encode_sections(sections)]
131139
end
140+
132141
defp encode_sections(sections) do
133142
Enum.map(sections, fn section -> encode_section(section) end)
134143
end
135144

136145
defp encode_section(section(payload_type: t, payload: payload)) do
137-
[<<t::int8>>, encode_payload(payload)]
146+
[<<t::int8>> | encode_payload(payload)]
138147
end
139148
defp encode_payload(payload(doc: doc, sequence: nil)) do
140149
BSON.Encoder.document(doc)
141150
end
142-
defp encode_payload(payload(doc: nil, sequence: sequence(size: size, identifier: identifier, docs: docs))) do
143-
[<<size::int32>>, identifier, <<0x00>>, BSON.Encoder.encode(docs)]
151+
defp encode_payload(payload(doc: nil, sequence: sequence(identifier: identifier, docs: docs))) do
152+
iodata = [identifier, <<0x00>> | Enum.map(docs, fn doc -> BSON.Encoder.encode(doc) end)]
153+
size = IO.iodata_length(iodata) + 4
154+
[<<size::int32>> | iodata]
144155
end
145156

146-
defp encode_op(op_query(flags: flags, coll: coll, num_skip: num_skip,
147-
num_return: num_return, query: query, select: select)) do
148-
[<<blit_flags(:query, flags)::int32>>,
149-
coll,
150-
<<0x00, num_skip::int32, num_return::int32>>,
151-
BSON.Encoder.document(query),
152-
select]
153-
end
157+
154158

155159
defp blit_flags(op, flags) when is_list(flags) do
156160
import Bitwise

lib/mongo/mongo_db_connection.ex

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -248,7 +248,12 @@ defmodule Mongo.MongoDBConnection do
248248
"$readPreference": [mode: update_read_preferences(opts[:slave_ok])]]
249249

250250
timeout = Keyword.get(opts, :max_time, 0)
251-
op = op_msg(flags: 0, sections: [section(payload_type: 0, payload: payload(doc: cmd))])
251+
252+
# MongoDB 3.6 only allows certain command arguments to be provided this way. These are:
253+
op = case pulling_out?(cmd, :documents) || pulling_out?(cmd, :updates) || pulling_out?(cmd, :deletes) do
254+
nil -> op_msg(flags: 0, sections: [section(payload_type: 0, payload: payload(doc: cmd))])
255+
key -> pulling_out(cmd, key)
256+
end
252257

253258
with {:ok, doc} <- Utils.post_request(op, state.request_id, state, timeout),
254259
state = %{state | request_id: state.request_id + 1} do
@@ -271,6 +276,24 @@ defmodule Mongo.MongoDBConnection do
271276
{:disconnect, exception, state}
272277
end
273278

279+
defp pulling_out?(cmd, key) do
280+
case Keyword.has_key?(cmd, key) do
281+
true -> key
282+
false -> nil
283+
end
284+
end
285+
286+
defp pulling_out(cmd, key) when is_atom(key) do
287+
288+
docs = Keyword.get(cmd, key)
289+
cmd = Keyword.delete(cmd, key)
290+
291+
payload_0 = section(payload_type: 0, payload: payload(doc: cmd))
292+
payload_1 = section(payload_type: 1, payload: payload(sequence: sequence(identifier: to_string(key), docs: docs)))
293+
294+
op_msg(flags: 0, sections: [payload_0, payload_1])
295+
end
296+
274297
def update_read_preferences(true), do: "primaryPreferred"
275298
def update_read_preferences(false), do: "primary"
276299
def update_read_preferences(nil), do: "primary"

0 commit comments

Comments
 (0)