Skip to content

Commit 5dd5137

Browse files
committed
refactoring select_servers function for supporting driver sessions
1 parent d2594ba commit 5dd5137

File tree

7 files changed

+162
-100
lines changed

7 files changed

+162
-100
lines changed

lib/mongo.ex

Lines changed: 48 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -290,13 +290,32 @@ defmodule Mongo do
290290

291291
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
292292

293-
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
294-
{:ok, doc} <- exec_command(conn, cmd, opts) do
293+
with {:ok, doc} <- call_command_for_type(topology_pid, cmd, :write, tops) do
295294
{:ok, doc["value"]}
296295
end
297296

298297
end
299298

299+
def call_command_for_type(topology_pid, cmd, :write, opts) do
300+
with {:ok, conn, _, _, session_id} <- select_server(topology_pid, :write, opts),
301+
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts) do
302+
{:ok, doc}
303+
end
304+
end
305+
def call_command_for_type(topology_pid, cmd, :read, opts) do
306+
with {:ok, conn, slave_ok, _, session_id} <- select_server(topology_pid, :read, opts),
307+
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts) do
308+
{:ok, doc}
309+
end
310+
end
311+
312+
defp update_session_id(cmd, nil) do
313+
cmd
314+
end
315+
defp update_session_id(cmd, session_id) do
316+
Keyword.merge(cmd, [lsid: session_id])
317+
end
318+
300319
@doc """
301320
Finds a document and replaces it.
302321
@@ -1002,13 +1021,13 @@ defmodule Mongo do
10021021
the options for the following request because you are requesting a secondary server.
10031022
"""
10041023
def select_server(topology_pid, type, opts \\ []) do
1005-
with {:ok, servers, slave_ok, mongos?} <- select_servers(topology_pid, type, opts) do
1024+
with {:ok, servers, slave_ok, mongos?, session_id} <- select_servers(topology_pid, type, opts) do
10061025
if Enum.empty? servers do
1007-
{:ok, nil, slave_ok, mongos?}
1026+
{:ok, nil, slave_ok, mongos?, session_id}
10081027
else
10091028
with {:ok, connection} <- servers |> Enum.take_random(1) |> Enum.at(0)
10101029
|> get_connection(topology_pid) do
1011-
{:ok, connection, slave_ok, mongos?}
1030+
{:ok, connection, slave_ok, mongos?, session_id}
10121031
end
10131032
end
10141033
end
@@ -1022,24 +1041,34 @@ defmodule Mongo do
10221041
# connection.
10231042
defp select_servers(topology_pid, type, opts, start_time) do
10241043
topology = Topology.topology(topology_pid)
1025-
with {:ok, servers, slave_ok, mongos?} <- TopologyDescription.select_servers(topology, type, opts) do
1026-
case Enum.empty?(servers) do
1027-
true ->
1028-
case Topology.wait_for_connection(topology_pid, @sel_timeout, start_time) do
1029-
{:ok, _servers} -> select_servers(topology_pid, type, opts, start_time)
1030-
{:error, :selection_timeout} = error -> error
1031-
end
1032-
false -> {:ok, servers, slave_ok, mongos?}
1033-
end
1044+
1045+
case TopologyDescription.select_servers(topology, type, opts) do
1046+
1047+
:empty ->
1048+
case Topology.wait_for_connection(topology_pid, @sel_timeout, start_time) do
1049+
{:ok, _servers} -> select_servers(topology_pid, type, opts, start_time) ##todo wait a little
1050+
{:error, :selection_timeout} = error -> error
1051+
end
1052+
1053+
{:ok, result} -> result
1054+
1055+
error -> error
10341056
end
1057+
1058+
# with {:ok, servers, slave_ok, mongos?, session_id} <- TopologyDescription.select_servers(topology, type, opts) do
1059+
# case servers do
1060+
# [] ->
1061+
# case Topology.wait_for_connection(topology_pid, @sel_timeout, start_time) do
1062+
# {:ok, _servers} -> select_servers(topology_pid, type, opts, start_time) ##todo wait a little
1063+
# {:error, :selection_timeout} = error -> error
1064+
# end
1065+
# _full -> {:ok, servers, slave_ok, mongos?, session_id}
1066+
# end
1067+
# end
10351068
end
10361069

10371070
defp get_connection(nil, _pid), do: {:ok, nil}
1038-
defp get_connection(server, pid) do
1039-
with {:ok, connection} <- Topology.connection_for_address(pid, server) do
1040-
{:ok, connection}
1041-
end
1042-
end
1071+
defp get_connection(server, pid), do: Topology.connection_for_address(pid, server)
10431072

10441073
defp modifier_docs([{key, _}|_], type), do: key |> key_to_string |> modifier_key(type)
10451074
defp modifier_docs(map, _type) when is_map(map) and map_size(map) == 0, do: :ok

lib/mongo/monitor.ex

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -70,10 +70,6 @@ defmodule Mongo.Monitor do
7070
|> Keyword.put(:idle_interval, 5_000)
7171

7272
with {:ok, pid} <- DBConnection.start_link(Mongo.MongoDBConnection, opts) do
73-
74-
## we start after one second
75-
Process.send_after(self(), :update, 1_000)
76-
7773
{:ok, %{
7874
connection_pid: pid, ## our connection pid to the mongodb server
7975
topology_pid: topology_pid, ## the topology_pid to which we report
@@ -99,6 +95,7 @@ defmodule Mongo.Monitor do
9995
"""
10096
def connected(_connection, me, topology_pid) do
10197
Topology.monitor_connected(topology_pid, me)
98+
GenServer.cast(me, :update)
10299
end
103100

104101
@doc """

lib/mongo/server_description.ex

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,8 @@ defmodule Mongo.ServerDescription do
2222
set_version: non_neg_integer | nil,
2323
election_id: BSON.ObjectId.t | nil,
2424
primary: String.t | nil,
25-
last_update_time: non_neg_integer
25+
last_update_time: non_neg_integer,
26+
logical_session_timeout: non_neg_integer
2627
}
2728

2829
def defaults(map \\ %{}) do
@@ -44,7 +45,8 @@ defmodule Mongo.ServerDescription do
4445
set_version: nil,
4546
election_id: nil,
4647
primary: nil,
47-
last_update_time: 0
48+
last_update_time: 0,
49+
logical_session_timeout: 30
4850
}, map)
4951
end
5052

@@ -74,7 +76,8 @@ defmodule Mongo.ServerDescription do
7476
set_name: is_master_reply["setName"],
7577
set_version: is_master_reply["setVersion"],
7678
election_id: is_master_reply["electionId"],
77-
primary: is_master_reply["primary"]
79+
primary: is_master_reply["primary"],
80+
logical_session_timeout: is_master_reply["logicalSessionTimeoutMinutes"]
7881
}
7982
end
8083

@@ -84,7 +87,7 @@ defmodule Mongo.ServerDescription do
8487
defp determine_server_type(%{"isreplicaset" => true}), do: :rs_ghost
8588
defp determine_server_type(%{"setName" => set_name} = is_master_reply) when set_name != nil do
8689
case is_master_reply do
87-
%{"ismaster" => true} -> :rs_primary
90+
%{"ismaster" => true} -> :rs_primary
8891
%{"secondary" => true} -> :rs_secondary
8992
%{"arbiterOnly" => true} -> :rs_arbiter
9093
_ -> :rs_other

lib/mongo/topology.ex

Lines changed: 33 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ defmodule Mongo.Topology do
77
alias Mongo.TopologyDescription
88
alias Mongo.ServerDescription
99
alias Mongo.Monitor
10+
alias Mongo.Session.SessionPool
1011

1112
# https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#heartbeatfrequencyms-defaults-to-10-seconds-or-60-seconds
1213
@heartbeat_frequency_ms 10_000
@@ -49,8 +50,8 @@ defmodule Mongo.Topology do
4950
defp wait_for_connection(pid, timeout) when timeout >= 0 do
5051
try do
5152
case GenServer.call(pid, :wait_for_connection, timeout) do
52-
{:new_connection, server} -> {:ok, [server]}
53-
{:connected, servers} -> {:ok, servers}
53+
{:new_connection, server} -> {:ok, [server]}
54+
{:connected, servers} -> {:ok, servers}
5455
end
5556
catch
5657
:exit, {:timeout, _} -> {:error, :selection_timeout}
@@ -100,10 +101,10 @@ defmodule Mongo.Topology do
100101
opts: opts,
101102
monitors: %{},
102103
connection_pools: %{},
103-
## session_pid: nil
104+
session_pool: nil,
104105
waiting_pids: []
105106
}
106-
|> reconcile_servers()
107+
|> update_monitor()
107108
{:ok, state}
108109
end
109110
end
@@ -153,7 +154,7 @@ defmodule Mongo.Topology do
153154
end
154155

155156
def handle_cast(:reconcile, state) do
156-
new_state = reconcile_servers(state)
157+
new_state = update_monitor(state)
157158
{:noreply, new_state}
158159
end
159160
def handle_cast({:disconnect, :monitor, host}, state) do
@@ -168,8 +169,7 @@ defmodule Mongo.Topology do
168169
def handle_cast({:connected, monitor_pid}, state) do
169170
monitor = Enum.find(state.monitors, fn {_key, value} -> value == monitor_pid end)
170171
new_state = case monitor do
171-
nil ->
172-
state
172+
nil -> state
173173
{host, ^monitor_pid} ->
174174
arbiters = fetch_arbiters(state)
175175
if host in arbiters do
@@ -186,7 +186,7 @@ defmodule Mongo.Topology do
186186
Enum.each(state.waiting_pids, fn from ->
187187
GenServer.reply(from, {:new_connection, host})
188188
end)
189-
%{ state | connection_pools: connection_pools, waiting_pids: [] }
189+
%{ state | connection_pools: connection_pools, waiting_pids: []}
190190
end
191191
end
192192
{:noreply, new_state}
@@ -204,17 +204,26 @@ defmodule Mongo.Topology do
204204
end
205205
end
206206

207+
##
208+
# Update server description: in case of logical session the function creates a session pool for the `deployment`.
209+
#
210+
defp handle_server_description(state, %{:logical_session_timeout => logical_session_timeout} = server_description) do
211+
state
212+
|> get_and_update_in([:topology], &TopologyDescription.update(&1, server_description, length(state.seeds)))
213+
|> process_events()
214+
|> update_monitor()
215+
|> update_session_pool(logical_session_timeout)
216+
end
207217
defp handle_server_description(state, server_description) do
208218
state
209219
|> get_and_update_in([:topology], &TopologyDescription.update(&1, server_description, length(state.seeds)))
210220
|> process_events()
211-
|> reconcile_servers()
221+
|> update_monitor()
212222
end
213223

214224
defp process_events({events, state}) do
215225
Enum.each(events, fn
216-
{:force_check, _} = message ->
217-
:ok = GenServer.cast(self(), message)
226+
{:force_check, _} = message -> :ok = GenServer.cast(self(), message)
218227
{previous, next} ->
219228
if previous != next do
220229
:ok = Mongo.Events.notify(%ServerDescriptionChangedEvent{
@@ -230,8 +239,11 @@ defmodule Mongo.Topology do
230239
state
231240
end
232241

233-
## todo: update_monitor(state)
234-
defp reconcile_servers(state) do
242+
243+
##
244+
# update the monitor process. For new servers the function creates new monitor processes.
245+
#
246+
defp update_monitor(state) do
235247
arbiters = fetch_arbiters(state)
236248
old_addrs = Map.keys(state.monitors)
237249
# remove arbiters from connection pool as descriptions are recieved
@@ -256,6 +268,14 @@ defmodule Mongo.Topology do
256268
Enum.reduce(removed, state, &remove_address/2)
257269
end
258270

271+
defp update_session_pool(%{:session_pool => nil} = state, logical_session_timeout) do
272+
{:ok, session_pool} = SessionPool.start_link(self(), logical_session_timeout)
273+
%{ state | session_pool: session_pool}
274+
end
275+
defp update_session_pool(state, _logical_session_timeout) do
276+
state
277+
end
278+
259279
defp maybe_reinit(%{monitors: monitors} = state) when map_size(monitors) > 0,
260280
do: state
261281
defp maybe_reinit(state) do

lib/mongo/topology_description.ex

Lines changed: 45 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -48,40 +48,55 @@ defmodule Mongo.TopologyDescription do
4848
check_server_supported(topology, server_description, num_seeds)
4949
end
5050

51-
# steps 3-4
51+
def checkout_session(_top, []) do
52+
nil
53+
end
54+
def checkout_session(%{:session_pool => pool}, _servers) do
55+
SessionPool.checkout(pool)
56+
end
57+
def checkout_session(_other, _servers) do
58+
nil
59+
end
60+
61+
def select_servers(%{:compatible => false}, _type, _opts) do
62+
{:error, :invalid_wire_version}
63+
end
5264
def select_servers(topology, type, opts \\ []) do
53-
read_preference = Keyword.get(opts, :read_preference) |> ReadPreference.defaults()
54-
if topology[:compatible] == false do
55-
{:error, :invalid_wire_version}
56-
else
57-
{servers, slave_ok, mongos?} = case topology.type do
58-
:unknown -> {[], false, false}
59-
:single ->
60-
server = topology.servers |> Map.values |> Enum.at(0, %{type: :unknown})
61-
slave_ok = type != :write and server.type != :mongos
62-
{topology.servers, slave_ok, server.type == :mongos}
63-
:sharded ->
64-
mongos_servers = topology.servers |> Enum.filter(fn {_, server} -> server.type == :mongos end)
65-
{mongos_servers, false, true}
66-
_ ->
67-
case type do
68-
:read -> {select_replica_set_server(topology, read_preference.mode, read_preference), true, false}
69-
:write ->
70-
if topology.type == :replica_set_with_primary do
71-
{select_replica_set_server(topology, :primary, ReadPreference.defaults), false, false}
72-
else
73-
{[], false, false}
74-
end
75-
end
76-
end
65+
read_preference = opts
66+
|> Keyword.get(:read_preference)
67+
|> ReadPreference.defaults()
7768

78-
servers =
79-
for {server, _} <- servers do
80-
server
69+
{servers, slave_ok, mongos?} = case topology.type do
70+
:unknown -> {[], false, false}
71+
72+
:single ->
73+
server = topology.servers
74+
|> Map.values
75+
|> Enum.at(0, %{type: :unknown})
76+
slave_ok = type != :write and server.type != :mongos
77+
{topology.servers, slave_ok, server.type == :mongos}
78+
79+
:sharded -> {mongos_servers(topology), false, true}
80+
_ ->
81+
case type do
82+
:read -> {select_replica_set_server(topology, read_preference.mode, read_preference), true, false}
83+
:write ->
84+
if topology.type == :replica_set_with_primary do
85+
{select_replica_set_server(topology, :primary, ReadPreference.defaults), false, false}
86+
else
87+
{[], false, false}
88+
end
8189
end
82-
# todo: Enum.map(elem(0)) ?
83-
{:ok, servers, slave_ok, mongos?}
8490
end
91+
92+
case Enum.map(servers, fn {server, _} -> server end) do
93+
[] -> :empty
94+
servers -> {:ok, servers, slave_ok, mongos?, checkout_session(topology)}
95+
end
96+
end
97+
98+
defp mongos_servers(topology) do
99+
Enum.filter(topology.servers, fn {_, server} -> server.type == :mongos end)
85100
end
86101

87102
## Private Functions

0 commit comments

Comments
 (0)