Skip to content

Commit 399e71b

Browse files
committed
refactoring select_server
1 parent 65cd1c5 commit 399e71b

File tree

4 files changed

+139
-159
lines changed

4 files changed

+139
-159
lines changed

lib/mongo.ex

Lines changed: 55 additions & 90 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@ defmodule Mongo do
5353
alias Mongo.TopologyDescription
5454
alias Mongo.Topology
5555
alias Mongo.UrlParser
56+
alias Mongo.Session.ServerSession
5657

5758
@timeout 15000 # 5000
5859

@@ -285,35 +286,62 @@ defmodule Mongo do
285286
sort: opts[:sort],
286287
upsert: opts[:upsert],
287288
collation: opts[:collation],
288-
lsid: opts[:lsid]
289+
lsid: opts[:lsid] || :implicit
289290
] |> filter_nils()
290291

291292
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
292293

293-
with {:ok, doc} <- call_command_for_type(topology_pid, cmd, :write, tops) do
294+
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts) do
294295
{:ok, doc["value"]}
295296
end
296297

297298
end
298299

299-
def call_command_for_type(topology_pid, cmd, :write, opts) do
300-
with {:ok, conn, _, _, session_id} <- select_server(topology_pid, :write, opts),
301-
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts) do
302-
{:ok, doc}
300+
@doc """
301+
This function is very fundamental. First a server is selected. If no explicit session id is found, then
302+
the Topology-Module returns an implicit session id. The `opts` is updated for using the new session id.
303+
Then is command is sent to the server and executed. On return implicit session id is checked in into the
304+
session pool for reuse.
305+
"""
306+
def issue_command(topology_pid, cmd, :write, opts) do
307+
with {:ok, conn, _, _, session_id} <- Topology.select_server(topology_pid, :write, opts),
308+
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts),
309+
ok <- checkin_session_id(topology_pid, cmd, session_id) do
310+
{:ok, conn, doc}
311+
else
312+
{:new_connection, _server} ->
313+
IO.puts "too fast, call it again"
314+
issue_command(topology_pid, cmd, :write, opts)
303315
end
304316
end
305-
def call_command_for_type(topology_pid, cmd, :read, opts) do
306-
with {:ok, conn, slave_ok, _, session_id} <- select_server(topology_pid, :read, opts),
307-
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts) do
308-
{:ok, doc}
317+
def issue_command(topology_pid, cmd, :read, opts) do
318+
319+
IO.puts "issue_command: read #{inspect cmd}"
320+
with {:ok, conn, slave_ok, _, session_id} <- Topology.select_server(topology_pid, :read, opts),
321+
opts = Keyword.put(opts, :slave_ok, slave_ok),
322+
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts),
323+
ok <- checkin_session_id(topology_pid, cmd, session_id) do
324+
{:ok, conn, doc}
325+
else
326+
{:new_connection, _server} ->
327+
IO.puts "too fast, call it again"
328+
issue_command(topology_pid, cmd, :read, opts)
309329
end
310330
end
311331

312332
defp update_session_id(cmd, nil) do
313333
cmd
314334
end
315-
defp update_session_id(cmd, session_id) do
316-
Keyword.merge(cmd, [lsid: session_id])
335+
defp update_session_id(cmd, %ServerSession{:session_id => session_id}) do
336+
IO.puts "update_session_id #{inspect session_id}"
337+
Keyword.merge(cmd, [lsid: %{id: session_id}])
338+
end
339+
defp checkin_session_id(topology_pid, cmd, nil), do: :ok
340+
defp checkin_session_id(topology_pid, cmd, session_id) do
341+
case Keyword.get(cmd, :lsid, :implicit) do
342+
:implicit -> Topology.checkin_session_id(topology_pid, session_id)
343+
_ -> :ok
344+
end
317345
end
318346

319347
@doc """
@@ -353,8 +381,7 @@ defmodule Mongo do
353381

354382
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
355383

356-
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
357-
{:ok, doc} <- exec_command(conn, cmd, opts), do: {:ok, doc["value"]}
384+
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts), do: {:ok, doc["value"]}
358385
end
359386

360387
defp should_return_new(:after), do: true
@@ -385,8 +412,7 @@ defmodule Mongo do
385412
] |> filter_nils()
386413
opts = Keyword.drop(opts, ~w(max_time projection sort collation)a)
387414

388-
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
389-
{:ok, doc} <- exec_command(conn, cmd, opts), do: {:ok, doc["value"]}
415+
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts), do: {:ok, doc["value"]}
390416
end
391417

392418
@doc false
@@ -490,10 +516,7 @@ defmodule Mongo do
490516

491517
opts = Keyword.drop(opts, ~w(max_time)a)
492518

493-
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, opts),
494-
opts = Keyword.put(opts, :slave_ok, slave_ok),
495-
{:ok, doc} <- exec_command(conn, cmd, opts),
496-
do: {:ok, doc["values"]}
519+
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :read, opts), do: {:ok, doc["values"]}
497520
end
498521

499522
@doc """
@@ -545,7 +568,7 @@ defmodule Mongo do
545568
maxTimeMS: opts[:max_time],
546569
skip: opts[:skip],
547570
sort: opts[:sort],
548-
lsid: opts[:lsid]
571+
lsid: opts[:lsid] || :implicit
549572
]
550573

551574
cmd = filter_nils(cmd)
@@ -568,8 +591,7 @@ defmodule Mongo do
568591
569592
Mongo.find_one(top, "jobs", %{}, read_concern: %{level: "local"})
570593
"""
571-
@spec find_one(GenServer.server, collection, BSON.document, Keyword.t) ::
572-
BSON.document | nil
594+
@spec find_one(GenServer.server, collection, BSON.document, Keyword.t) :: BSON.document | nil
573595
def find_one(conn, coll, filter, opts \\ []) do
574596
opts = opts
575597
|> Keyword.delete(:sort)
@@ -590,9 +612,9 @@ defmodule Mongo do
590612
def command(topology_pid, cmd, opts \\ []) do
591613
rp = ReadPreference.defaults(%{mode: :primary})
592614
rp_opts = [read_preference: Keyword.get(opts, :read_preference, rp)]
593-
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, rp_opts),
594-
opts = Keyword.put(opts, :slave_ok, slave_ok),
595-
do: exec_command(conn, cmd, opts)
615+
with {:ok, _conn, doc} <- Mongo.issue_command(topology_pid, cmd, :read, rp_opts) do
616+
{:ok, doc}
617+
end
596618
end
597619

598620
@doc false
@@ -601,6 +623,8 @@ defmodule Mongo do
601623
def exec_command(conn, cmd, opts) do
602624
action = %Query{action: :command}
603625

626+
IO.puts "Executing cmd #{inspect cmd}"
627+
604628
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
605629
{:ok, doc} <- check_for_error(doc) do
606630
{:ok, doc}
@@ -675,11 +699,10 @@ defmodule Mongo do
675699
ordered: Keyword.get(opts, :ordered),
676700
writeConcern: write_concern,
677701
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation),
678-
lsid: Keyword.get(opts, :lsid)
702+
lsid: Keyword.get(opts, :lsid, :implicit)
679703
] |> filter_nils()
680704

681-
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
682-
{:ok, doc} <- exec_command(conn, cmd, opts) do
705+
with {:ok, _conn, doc} <- Mongo.issue_command(topology_pid, cmd, :write, opts) do
683706
case doc do
684707
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
685708
_ ->
@@ -733,8 +756,7 @@ defmodule Mongo do
733756
lsid: Keyword.get(opts, :lsid)
734757
] |> filter_nils()
735758

736-
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
737-
{:ok, doc} <- exec_command(conn, cmd, opts) do
759+
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts) do
738760
case doc do
739761
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
740762
_ ->
@@ -805,8 +827,7 @@ defmodule Mongo do
805827
lsid: Keyword.get(opts, :lsid)
806828
] |> filter_nils()
807829

808-
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
809-
{:ok, doc} <- exec_command(conn, cmd, opts) do
830+
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts) do
810831
case doc do
811832
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
812833
%{ "ok" => _ok, "n" => n } ->
@@ -927,8 +948,7 @@ defmodule Mongo do
927948
] |> filter_nils()
928949

929950

930-
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
931-
{:ok, doc} <- exec_command(conn, cmd, opts) do
951+
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts) do
932952

933953
case doc do
934954

@@ -1015,61 +1035,6 @@ defmodule Mongo do
10151035
|> Stream.map(fn coll -> coll["name"] end)
10161036
end
10171037

1018-
@doc"""
1019-
Determines the appropriate connection depending on the type (:read, :write). The result is
1020-
  a tuple with the connection, slave_ok flag and mongos flag. Possibly you have to set slave_ok == true in
1021-
the options for the following request because you are requesting a secondary server.
1022-
"""
1023-
def select_server(topology_pid, type, opts \\ []) do
1024-
with {:ok, servers, slave_ok, mongos?, session_id} <- select_servers(topology_pid, type, opts) do
1025-
if Enum.empty? servers do
1026-
{:ok, nil, slave_ok, mongos?, session_id}
1027-
else
1028-
with {:ok, connection} <- servers |> Enum.take_random(1) |> Enum.at(0)
1029-
|> get_connection(topology_pid) do
1030-
{:ok, connection, slave_ok, mongos?, session_id}
1031-
end
1032-
end
1033-
end
1034-
end
1035-
1036-
defp select_servers(topology_pid, type, opts), do: select_servers(topology_pid, type, opts, System.monotonic_time)
1037-
@sel_timeout 30000
1038-
# NOTE: Should think about the handling completely in the Topology GenServer
1039-
# in order to make the entire operation atomic instead of querying
1040-
# and then potentially having an outdated topology when waiting for the
1041-
# connection.
1042-
defp select_servers(topology_pid, type, opts, start_time) do
1043-
topology = Topology.topology(topology_pid)
1044-
1045-
case TopologyDescription.select_servers(topology, type, opts) do
1046-
1047-
:empty ->
1048-
case Topology.wait_for_connection(topology_pid, @sel_timeout, start_time) do
1049-
{:ok, _servers} -> select_servers(topology_pid, type, opts, start_time) ##todo wait a little
1050-
{:error, :selection_timeout} = error -> error
1051-
end
1052-
1053-
{:ok, result} -> result
1054-
1055-
error -> error
1056-
end
1057-
1058-
# with {:ok, servers, slave_ok, mongos?, session_id} <- TopologyDescription.select_servers(topology, type, opts) do
1059-
# case servers do
1060-
# [] ->
1061-
# case Topology.wait_for_connection(topology_pid, @sel_timeout, start_time) do
1062-
# {:ok, _servers} -> select_servers(topology_pid, type, opts, start_time) ##todo wait a little
1063-
# {:error, :selection_timeout} = error -> error
1064-
# end
1065-
# _full -> {:ok, servers, slave_ok, mongos?, session_id}
1066-
# end
1067-
# end
1068-
end
1069-
1070-
defp get_connection(nil, _pid), do: {:ok, nil}
1071-
defp get_connection(server, pid), do: Topology.connection_for_address(pid, server)
1072-
10731038
defp modifier_docs([{key, _}|_], type), do: key |> key_to_string |> modifier_key(type)
10741039
defp modifier_docs(map, _type) when is_map(map) and map_size(map) == 0, do: :ok
10751040
defp modifier_docs(map, type) when is_map(map), do: Enum.at(map, 0) |> elem(0) |> key_to_string |> modifier_key(type)

lib/mongo/cursor.ex

Lines changed: 12 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,16 @@ defmodule Mongo.Cursor do
3939
#
4040
defp start_fun(topology_pid, cmd, nil, opts) do
4141
fn ->
42-
with {:ok, conn, opts} <- select_server(topology_pid, opts),
43-
{:ok, %{"ok" => ok,
44-
"cursor" => %{
45-
"id" => cursor_id,
46-
"ns" => coll,
47-
"firstBatch" => docs}}} when ok == 1 <- Mongo.exec_command(conn, cmd, opts) do
48-
state(conn: conn, cursor: cursor_id, coll: coll, docs: docs)
42+
43+
with {:ok, conn,
44+
%{"ok" => ok,
45+
"cursor" => %{
46+
"id" => cursor_id,
47+
"ns" => coll,
48+
"firstBatch" => docs}}} when ok == 1 <- Mongo.issue_command(topology_pid, cmd, :read, opts) do
49+
state(conn: conn, cursor: cursor_id, coll: coll, docs: docs)
4950
end
51+
5052
end
5153
end
5254

@@ -89,13 +91,12 @@ defmodule Mongo.Cursor do
8991

9092
def aggregate(topology_pid, cmd, fun, opts) do
9193

92-
with {:ok, conn, opts} <- select_server(topology_pid, opts),
93-
{:ok, %{"ok" => ok,
94+
with {:ok, conn, %{"ok" => ok,
9495
"operationTime" => op_time,
9596
"cursor" => %{
9697
"id" => cursor_id,
9798
"ns" => coll,
98-
"firstBatch" => docs} = response}} when ok == 1 <- Mongo.exec_command(conn, cmd, opts),
99+
"firstBatch" => docs} = response}} when ok == 1 <- Mongo.issue_command(topology_pid, cmd, :read, opts),
99100
{:ok, wire_version} <- Mongo.wire_version(conn) do
100101

101102
[%{"$changeStream" => stream_opts} | _pipeline] = Keyword.get(cmd, :pipeline) # extract the change stream options
@@ -287,13 +288,6 @@ defmodule Mongo.Cursor do
287288
end
288289
end
289290

290-
defp select_server(topology_pid, opts) do
291-
with {:ok, conn, slave_ok, _} <- Mongo.select_server(topology_pid, :read, opts),
292-
opts = Keyword.put(opts, :slave_ok, slave_ok) do
293-
{:ok, conn, opts}
294-
end
295-
end
296-
297291
defp filter_nils(keyword) when is_list(keyword) do
298292
Enum.reject(keyword, fn {_key, value} -> is_nil(value) end)
299293
end
@@ -302,7 +296,7 @@ defmodule Mongo.Cursor do
302296
fn
303297
state(cursor: 0) -> :ok
304298
state(cursor: cursor, coll: coll, conn: conn) -> kill_cursors(conn, only_coll(coll), [cursor], opts)
305-
{:error, error} = error -> error
299+
error -> error
306300
end
307301
end
308302

0 commit comments

Comments
 (0)