Skip to content

Commit 37e52a8

Browse files
committed
fixes #31
1 parent 09e88c6 commit 37e52a8

File tree

6 files changed

+115
-67
lines changed

6 files changed

+115
-67
lines changed

lib/mongo.ex

Lines changed: 25 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -630,23 +630,40 @@ defmodule Mongo do
630630
defp check_for_error(%{"code" => code, "errmsg" => msg}), do: {:error, Mongo.Error.exception(message: msg, code: code)}
631631

632632
@doc """
633-
Returns the current wire version.
633+
Returns the wire version of the database
634+
## Example
635+
636+
{:ok, top} = Mongo.start_link(...)
637+
Mongo.wire_version(top)
638+
639+
{:ok, 8}
634640
"""
635641
@spec wire_version(pid) :: {:ok, integer} | {:error, Mongo.Error.t}
636-
def wire_version(conn) do
637-
cmd = %Query{action: :wire_version}
638-
with {:ok, _cmd, version} <- DBConnection.execute(conn, cmd, %{}, defaults([])) do
639-
{:ok, version}
642+
def wire_version(topology_pid) do
643+
with {:ok, wire_version} <- Topology.wire_version(topology_pid) do
644+
{:ok, wire_version}
640645
end
641646
end
642647

643648
@doc """
644649
Returns the limits of the database.
650+
651+
## Example
652+
653+
{:ok, top} = Mongo.start_link(...)
654+
Mongo.limits(top)
655+
656+
{:ok, %{
657+
logical_session_timeout: 30,
658+
max_bson_object_size: 16777216,
659+
max_message_size_bytes: 48000000,
660+
max_wire_version: 8,
661+
max_write_batch_size: 100000
662+
}}
645663
"""
646664
@spec limits(pid) :: {:ok, BSON.document} | {:error, Mongo.Error.t}
647-
def limits(conn) do
648-
cmd = %Query{action: :limits}
649-
with {:ok, _cmd, limits} <- DBConnection.execute(conn, cmd, %{}, defaults([])) do
665+
def limits(topology_pid) do
666+
with {:ok, limits} <- Topology.limits(topology_pid) do
650667
{:ok, limits}
651668
end
652669
end

lib/mongo/bulk_write.ex

Lines changed: 4 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -183,7 +183,7 @@ defmodule Mongo.BulkWrite do
183183
def write(topology_pid, %UnorderedBulk{} = bulk, opts) do
184184

185185
with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts),
186-
result = one_bulk_write(session, bulk, opts),
186+
result = one_bulk_write(topology_pid, session, bulk, opts),
187187
:ok <- Session.end_implict_session(topology_pid, session) do
188188
result
189189
else
@@ -201,7 +201,7 @@ defmodule Mongo.BulkWrite do
201201
empty = %BulkWriteResult{acknowledged: acknowledged?(write_concern)}
202202

203203
with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts),
204-
{:ok, limits} <- get_limits(session),
204+
{:ok, limits} <- Mongo.limits(topology_pid),
205205
max_batch_size <- limits.max_write_batch_size,
206206
result = ops
207207
|> get_op_sequence()
@@ -227,9 +227,9 @@ defmodule Mongo.BulkWrite do
227227
# The function returns a keyword list with the results of each operation group:
228228
# For the details see https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#results
229229
#
230-
defp one_bulk_write(session, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes}, opts) do
230+
defp one_bulk_write(topology_pid, session, %UnorderedBulk{coll: coll, inserts: inserts, updates: updates, deletes: deletes}, opts) do
231231

232-
with {:ok, limits} <- get_limits(session),
232+
with {:ok, limits} <- Mongo.limits(topology_pid),
233233
max_batch_size <- limits.max_write_batch_size,
234234
insert_result <- one_bulk_write_operation(session, :insert, coll, inserts, max_batch_size, opts),
235235
update_result <- one_bulk_write_operation(session, :update, coll, updates, max_batch_size, opts),
@@ -418,10 +418,4 @@ defmodule Mongo.BulkWrite do
418418

419419
end
420420

421-
defp get_limits(session) do
422-
with conn <- Session.connection(session) do
423-
Mongo.limits(conn)
424-
end
425-
end
426-
427421
end

lib/mongo/cursor.ex

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ defmodule Mongo.Cursor do
104104
"id" => cursor_id,
105105
"ns" => coll,
106106
"firstBatch" => docs} = response}} when ok == 1 <- Mongo.exec_command_session(session, new_cmd, opts),
107-
{:ok, wire_version} <- wire_version(session) do
107+
{:ok, wire_version} <- Mongo.wire_version(topology_pid) do
108108

109109
[%{"$changeStream" => stream_opts} | _pipeline] = Keyword.get(new_cmd, :pipeline) # extract the change stream options
110110

@@ -183,7 +183,7 @@ defmodule Mongo.Cursor do
183183
{:error, %Mongo.Error{code: code} = not_resumable} when code == 11601 or code == 136 or code == 237 -> {:error, not_resumable}
184184
{:error, _error} ->
185185

186-
with {:ok, wire_version} <- wire_version(session) do
186+
with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do
187187

188188
[%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options
189189

@@ -319,12 +319,6 @@ defmodule Mongo.Cursor do
319319
def count(_stream), do: {:error, __MODULE__}
320320
def member?(_stream, _term), do: {:error, __MODULE__}
321321

322-
defp wire_version(session) do
323322

324-
session
325-
|> Mongo.Session.connection()
326-
|> Mongo.wire_version()
327-
328-
end
329323
end
330324
end

lib/mongo/mongo_db_connection.ex

Lines changed: 2 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -28,8 +28,7 @@ defmodule Mongo.MongoDBConnection do
2828
connect_timeout: opts[:connect_timeout] || @timeout,
2929
database: Keyword.fetch!(opts, :database),
3030
write_concern: Map.new(write_concern),
31-
wire_version: nil, # todo move to topolgy and topology-description
32-
limits: nil, # todo move to topolgy and topology-description
31+
wire_version: 0,
3332
auth_mechanism: opts[:auth_mechanism] || nil,
3433
connection_type: Keyword.fetch!(opts, :connection_type),
3534
topology_pid: Keyword.fetch!(opts, :topology_pid),
@@ -128,7 +127,7 @@ defmodule Mongo.MongoDBConnection do
128127
cmd = [ismaster: 1, client: client] |> filter_nils()
129128

130129
case Utils.command(-1, cmd, state) do
131-
{:ok, %{"ok" => ok, "maxWireVersion" => version} = response} when ok == 1 -> {:ok, %{update_limits(state, response) | wire_version: version}}
130+
{:ok, %{"ok" => ok, "maxWireVersion" => version}} when ok == 1 -> {:ok, %{state | wire_version: version}}
132131
{:ok, %{"ok" => ok}} when ok == 1 -> {:ok, %{state | wire_version: 0}}
133132
{:ok, %{"ok" => ok, "errmsg" => msg, "code" => code}} when ok == 0 ->
134133
err = Mongo.Error.exception(message: msg, code: code)
@@ -188,29 +187,6 @@ defmodule Mongo.MongoDBConnection do
188187
defp pretty_name("apple"), do: "Mac OS X"
189188
defp pretty_name(name), do: name
190189

191-
##
192-
#
193-
# Updates the limits from the isMaster response:
194-
#
195-
# isMaster.maxBsonObjectSize / 16 * 1024 * 1024
196-
# isMaster.maxMessageSizeBytes / 48000000
197-
# isMaster.maxWriteBatchSize / 100,000
198-
# isMaster.localTime
199-
# isMaster.compression
200-
# isMaster.readOnly
201-
# isMaster.logicalSessionTimeoutMinutes
202-
#
203-
defp update_limits(state, result) do
204-
limits = %{max_bson_object_size: (result["maxBsonObjectSize"] || 16_777_216),
205-
max_message_size_bytes: (result["maxMessageSizeBytes"] || 48_000_000),
206-
max_write_batch_size: (result["maxWriteBatchSize"] || 100_000),
207-
compression: result["compression"],
208-
read_only: (result["readOnly"] || false),
209-
logical_session_timeout_minutes: result["logicalSessionTimeoutMinutes"]}
210-
211-
%{state | limits: limits}
212-
end
213-
214190
@impl true
215191
def checkout(state), do: {:ok, state}
216192
@impl true
@@ -252,12 +228,6 @@ defmodule Mongo.MongoDBConnection do
252228
end
253229
end
254230

255-
defp execute_action(:limits, _, _, state) do
256-
{:ok, state.limits, state}
257-
end
258-
defp execute_action(:wire_version, _, _, state) do
259-
{:ok, state.wire_version, state}
260-
end
261231
defp execute_action(:command, [cmd], opts, %{wire_version: version} = state) when version >= 6 do
262232

263233
cmd = cmd ++ ["$db": opts[:database] || state.database]

lib/mongo/server_description.ex

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,11 @@ defmodule Mongo.ServerDescription do
2323
election_id: BSON.ObjectId.t | nil,
2424
primary: String.t | nil,
2525
last_update_time: non_neg_integer,
26+
max_bson_object_size: non_neg_integer,
27+
max_message_size_bytes: non_neg_integer,
28+
max_write_batch_size: non_neg_integer,
29+
compression: String.t | nil,
30+
read_only: boolean(),
2631
logical_session_timeout: non_neg_integer
2732
}
2833

@@ -46,6 +51,11 @@ defmodule Mongo.ServerDescription do
4651
election_id: nil,
4752
primary: nil,
4853
last_update_time: 0,
54+
max_bson_object_size: 16_777_216,
55+
max_message_size_bytes: 48_000_000,
56+
max_write_batch_size: 100_000,
57+
compression: nil,
58+
read_only: false,
4959
logical_session_timeout: 30
5060
}, map)
5161
end
@@ -77,6 +87,11 @@ defmodule Mongo.ServerDescription do
7787
set_version: is_master_reply["setVersion"],
7888
election_id: is_master_reply["electionId"],
7989
primary: is_master_reply["primary"],
90+
max_bson_object_size: (is_master_reply["maxBsonObjectSize"] || 16_777_216),
91+
max_message_size_bytes: (is_master_reply["maxMessageSizeBytes"] || 48_000_000),
92+
max_write_batch_size: (is_master_reply["maxWriteBatchSize"] || 100_000),
93+
compression: is_master_reply["compression"],
94+
read_only: (is_master_reply["readOnly"] || false),
8095
logical_session_timeout: is_master_reply["logicalSessionTimeoutMinutes"] || 30
8196
}
8297
end

lib/mongo/topology.ex

Lines changed: 67 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,8 @@ defmodule Mongo.Topology do
1212
alias Mongo.Session.SessionPool
1313
alias Mongo.Session
1414

15+
@limits [:logical_session_timeout, :max_bson_object_size, :max_message_size_bytes, :max_wire_version, :max_write_batch_size]
16+
1517
# https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#heartbeatfrequencyms-defaults-to-10-seconds-or-60-seconds
1618
@heartbeat_frequency_ms 10_000
1719

@@ -52,6 +54,14 @@ defmodule Mongo.Topology do
5254
GenServer.call(pid, {:select_server, type, opts})
5355
end
5456

57+
def limits(pid) do
58+
GenServer.call(pid, :limits)
59+
end
60+
61+
def wire_version(pid) do
62+
GenServer.call(pid, :wire_version)
63+
end
64+
5565
@doc """
5666
5767
"""
@@ -248,9 +258,9 @@ defmodule Mongo.Topology do
248258
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
249259
Logger.debug("select_server: found #{inspect servers}, pools: #{inspect pools}")
250260

251-
with host <- Enum.take_random(servers, 1),
252-
{:ok, connection} <- get_connection(host, state),
253-
wire_version <- max_version(topology, host),
261+
with address <- Enum.take_random(servers, 1),
262+
{:ok, connection} <- get_connection(address, state),
263+
wire_version <- wire_version(address, topology),
254264
{server_session, new_state} <- checkout_server_session(state),
255265
{:ok, session} <- Session.start_link(connection, server_session, type, wire_version, opts) do
256266

@@ -264,14 +274,14 @@ defmodule Mongo.Topology do
264274
end
265275
end
266276

267-
def handle_call({:select_server, type, opts}, from, %{:topology => topology, :waiting_pids => waiting, connection_pools: pools} = state) do
277+
def handle_call({:select_server, type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
268278
case TopologyDescription.select_servers(topology, type, opts) do
269279
:empty ->
270280
Logger.debug("select_server: empty")
271281
{:noreply, %{state | waiting_pids: [from | waiting]}} ## no servers available, wait for connection
272282

273283
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
274-
Logger.debug("select_server: found #{inspect servers}, pools: #{inspect pools}")
284+
Logger.info("select_server: found #{inspect servers}")
275285

276286
with {:ok, connection} <- servers
277287
|> Enum.take_random(1)
@@ -286,6 +296,45 @@ defmodule Mongo.Topology do
286296
end
287297
end
288298

299+
def handle_call(:limits, _from, %{:topology => topology} = state) do
300+
case TopologyDescription.select_servers(topology, :write, []) do
301+
:empty ->
302+
Logger.debug("select_server: empty")
303+
{:reply, nil, state}
304+
305+
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
306+
Logger.debug("select_server: found #{inspect servers}")
307+
308+
with {:ok, limits} <- servers
309+
|> Enum.take_random(1)
310+
|> get_limits(topology) do
311+
Logger.debug("select_server: connection is #{inspect limits}")
312+
313+
{:reply, {:ok, limits}, state}
314+
end
315+
error ->
316+
Logger.debug("select_servers: #{inspect error}")
317+
{:reply, error, state} ## in case of an error, just return the error
318+
end
319+
end
320+
321+
def handle_call(:wire_version, _from, %{:topology => topology} = state) do
322+
case TopologyDescription.select_servers(topology, :write, []) do
323+
:empty ->
324+
Logger.debug("select_server: empty")
325+
{:reply, nil, state}
326+
327+
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
328+
Logger.debug("select_server: found #{inspect servers}")
329+
with address <- Enum.take_random(servers, 1) do
330+
{:reply, {:ok, wire_version(address, topology)}, state}
331+
end
332+
error ->
333+
Logger.debug("select_servers: #{inspect error}")
334+
{:reply, error, state} ## in case of an error, just return the error
335+
end
336+
end
337+
289338
defp checkout_server_session(%{:session_pool => session_pool} = state) do
290339
with {session, pool} <- SessionPool.checkout(session_pool) do
291340
{session, %{state | session_pool: pool}}
@@ -298,10 +347,19 @@ defmodule Mongo.Topology do
298347
defp get_connection([], _state), do: nil
299348
defp get_connection([address], %{connection_pools: pools}), do: Map.fetch(pools, address)
300349

301-
defp max_version(topology, [host]) do
302-
case Map.get(topology.servers, host) do
303-
nil -> 0
304-
server -> server.max_wire_version
350+
defp get_limits([], _topology), do: nil
351+
defp get_limits([address], %{servers: servers}) do
352+
with {:ok, desc} <- Map.fetch(servers, address) do
353+
{:ok, Map.take(desc, @limits)}
354+
end
355+
end
356+
357+
defp wire_version([], _topology), do: nil
358+
defp wire_version([address], topology) do
359+
with {:ok, server} <- Map.fetch(topology.servers, address) do
360+
server.max_wire_version
361+
else
362+
_other -> 0
305363
end
306364
end
307365

0 commit comments

Comments
 (0)