Skip to content

Commit 210cf37

Browse files
committed
refactored monitor code to understand how it works
1 parent 9a103b6 commit 210cf37

File tree

5 files changed

+173
-220
lines changed

5 files changed

+173
-220
lines changed

lib/mongo.ex

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -573,7 +573,6 @@ defmodule Mongo do
573573
rp_opts = [read_preference: Keyword.get(opts, :read_preference, rp)]
574574
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, rp_opts),
575575
opts = Keyword.put(opts, :slave_ok, slave_ok),
576-
opts = Keyword.put(opts, :lsid, session_id), füge die session_id hier rein
577576
do: exec_command(conn, cmd, opts)
578577
end
579578

@@ -586,8 +585,6 @@ defmodule Mongo do
586585
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
587586
{:ok, doc} <- check_for_error(doc) do
588587
{:ok, doc}
589-
else
590-
591588
end
592589
end
593590

@@ -911,11 +908,8 @@ defmodule Mongo do
911908
] |> filter_nils()
912909

913910

914-
## {:ok, connection, slave_ok, mongos?}
915-
{:ok, conn, slave_ok, mongos, session_id} = Topology.select_server(topology_pid, :write, opts)
916-
917911
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
918-
{:ok, doc} <- exec_command(conn, cmd, session_id, opts) do
912+
{:ok, doc} <- exec_command(conn, cmd, opts) do
919913

920914
case doc do
921915

lib/mongo/monitor.ex

Lines changed: 114 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -1,130 +1,174 @@
11
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#monitoring
22
defmodule Mongo.Monitor do
3-
@moduledoc false
3+
@moduledoc """
4+
Each server has a monitor process. The monitor process is created by the topology process.
5+
6+
If the network connection is working, then the monitor process reports this and the topology process starts the
7+
connection pool. Per server we get 1 + pool size connections to each server.
8+
9+
After waiting for `heartbeat_frequency_ms` milliseconds, the monitor process calls `isMaster` command and
10+
reports the result to the topology process.
11+
12+
The result of the `isMaster` command is mapped the `ServerDescription` structure and sent to the topology process, which
13+
updates it internal data structure.
14+
"""
15+
416
use GenServer
5-
use Bitwise
6-
require Logger
17+
18+
# require Logger
19+
20+
alias Mongo.Topology
721
alias Mongo.ServerDescription
8-
alias Mongo.Events.ServerHeartbeatStartedEvent
9-
alias Mongo.Events.{ServerHeartbeatStartedEvent, ServerHeartbeatFailedEvent,
10-
ServerHeartbeatSucceededEvent}
22+
alias Mongo.Events.{ServerHeartbeatStartedEvent, ServerHeartbeatFailedEvent,ServerHeartbeatSucceededEvent}
1123

1224
# this is not configurable because the specification says so
1325
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#minheartbeatfrequencyms
1426
@min_heartbeat_frequency_ms 500
1527

16-
def start_link(args, gen_server_opts \\ []) do
17-
GenServer.start_link(__MODULE__, args, gen_server_opts)
28+
def start_link(args) do
29+
GenServer.start_link(__MODULE__, args)
1830
end
1931

2032
# We need to stop asynchronously because a Monitor can call the Topology
2133
# which may try to stop the same Monitor that called it. Ending in a timeout.
2234
# See issues #139 for some information.
35+
36+
@doc """
37+
Stop the monitor process.
38+
39+
We need to stop asynchronously because a Monitor can call the Topology
40+
which may try to stop the same Monitor that called it. Ending in a timeout.
41+
"""
2342
def stop(pid) do
2443
GenServer.cast(pid, :stop)
2544
end
2645

46+
@doc """
47+
Try to update the server description.
48+
"""
2749
def force_check(pid) do
28-
GenServer.call(pid, :check, :infinity)
29-
end
30-
31-
## GenServer callbacks
32-
33-
@doc false
34-
def init([server_description, topology_pid, heartbeat_frequency_ms, connection_opts]) do
35-
opts = # monitors don't authenticate and use the "admin" database
36-
connection_opts
37-
|> Keyword.put(:database, "admin")
38-
|> Keyword.put(:skip_auth, true)
39-
|> Keyword.put(:after_connect, {__MODULE__, :connected, [self(), topology_pid]})
40-
|> Keyword.put(:backoff_min, heartbeat_frequency_ms)
41-
|> Keyword.put(:backoff_max, heartbeat_frequency_ms)
42-
|> Keyword.put(:backoff_type, :rand)
43-
|> Keyword.put(:connection_type, :monitor)
44-
|> Keyword.put(:topology_pid, topology_pid)
45-
|> Keyword.put(:pool_size, 1)
46-
|> Keyword.put(:idle_interval, 5_000)
47-
48-
{:ok, pid} = DBConnection.start_link(Mongo.MongoDBConnection, opts)
49-
:ok = GenServer.cast(self(), :check)
50-
{:ok, %{
51-
connection_pid: pid,
52-
topology_pid: topology_pid,
53-
server_description: server_description,
54-
heartbeat_frequency_ms: heartbeat_frequency_ms,
55-
opts: opts
56-
}}
57-
end
58-
59-
@doc false
50+
GenServer.cast(pid, :update)
51+
end
52+
53+
54+
@doc """
55+
Initialize the monitor process
56+
"""
57+
def init([address, topology_pid, heartbeat_frequency_ms, connection_opts]) do
58+
59+
# monitors don't authenticate and use the "admin" database
60+
opts = connection_opts
61+
|> Keyword.put(:database, "admin")
62+
|> Keyword.put(:skip_auth, true)
63+
|> Keyword.put(:after_connect, {__MODULE__, :connected, [self(), topology_pid]})
64+
|> Keyword.put(:backoff_min, heartbeat_frequency_ms)
65+
|> Keyword.put(:backoff_max, heartbeat_frequency_ms)
66+
|> Keyword.put(:backoff_type, :rand)
67+
|> Keyword.put(:connection_type, :monitor)
68+
|> Keyword.put(:topology_pid, topology_pid)
69+
|> Keyword.put(:pool_size, 1)
70+
|> Keyword.put(:idle_interval, 5_000)
71+
72+
with {:ok, pid} <- DBConnection.start_link(Mongo.MongoDBConnection, opts) do
73+
74+
## we start after one second
75+
Process.send_after(self(), :update, 1_000)
76+
77+
{:ok, %{
78+
connection_pid: pid, ## our connection pid to the mongodb server
79+
topology_pid: topology_pid, ## the topology_pid to which we report
80+
address: address, ## the address of the server, needed to make updates
81+
round_trip_time: nil, ## current round_trip_time, needed to make average value
82+
heartbeat_frequency_ms: heartbeat_frequency_ms, ## current heartbeat_frequency_ms
83+
opts: opts ## options
84+
}}
85+
86+
end
87+
88+
end
89+
90+
@doc """
91+
In this case we stop the DBConnection.
92+
"""
6093
def terminate(reason, state) do
6194
GenServer.stop(state.connection_pid, reason)
6295
end
6396

64-
@doc false
97+
@doc """
98+
Report the connection event, so the topology process can now create the connection pool.
99+
"""
65100
def connected(_connection, me, topology_pid) do
66-
GenServer.cast(topology_pid, {:connected, me})
101+
Topology.monitor_connected(topology_pid, me)
67102
end
68103

69-
@doc false
70-
def handle_cast(:check, state) do
71-
check(state)
104+
@doc """
105+
Time to update the topology. Calling `isMaster` and updating the server description
106+
"""
107+
def handle_cast(:update, state) do
108+
new_state = update_server_description(state)
109+
# we return with heartbeat_frequency_ms, so after heartbeat_frequency_ms handle_info(:timeout...) gets called.
110+
{:noreply, new_state, new_state.heartbeat_frequency_ms}
72111
end
112+
73113
def handle_cast(:stop, state) do
74114
exit(:normal)
75115
{:noreply, state}
76116
end
77117

78-
@doc false
79-
def handle_call(:check, _from, state) do
80-
{_, state, diff} = check(state)
81-
{:reply, diff, state}
82-
end
83-
84-
@doc false
118+
@doc """
119+
The `:timeout` call is the periodic call defined by the heartbeat frequency
120+
The ':update' call updates the server description for the topology process
121+
"""
85122
def handle_info(:timeout, state) do
86-
check(state)
123+
new_state = update_server_description(state)
124+
{:noreply, new_state, new_state.heartbeat_frequency_ms}
125+
end
126+
def handle_info(:update, state) do
127+
new_state = update_server_description(state)
128+
# we return with heartbeat_frequency_ms, so after heartbeat_frequency_ms handle_info(:timeout...) gets called.
129+
{:noreply, new_state, new_state.heartbeat_frequency_ms}
87130
end
88131

89-
## Private functions
90-
91-
defp check(state) do
92-
diff = :os.system_time(:milli_seconds) - state.server_description.last_update_time
93-
case diff < @min_heartbeat_frequency_ms do
94-
true -> {:noreply, state, diff}
95-
false ->
96-
server_description = get_server_description(state, 0)
97-
GenServer.cast(state.topology_pid, {:server_description, server_description})
98-
{:noreply, %{state | server_description: server_description}, state.heartbeat_frequency_ms}
99-
end
132+
##
133+
# Get a new server description from the server and send it to the Topology process.
134+
#
135+
defp update_server_description(%{topology_pid: topology_pid} = state) do
136+
%{:round_trip_time => round_trip_time} = server_description = get_server_description(state, 0)
137+
Topology.update_server_description(topology_pid, server_description)
138+
%{state | round_trip_time: round_trip_time}
100139
end
101140

102141
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#network-error-when-calling-ismaster
103142
##
104143
# Calls isMaster and parses the result to update the server description
144+
# In case of one network error, the function tries one more time to call isMaster command
105145
#
106-
defp get_server_description(%{connection_pid: conn_pid, server_description: last_server_description, opts: opts} = state, counter) do
146+
defp get_server_description(%{connection_pid: conn_pid, address: address, round_trip_time: last_rtt, opts: opts} = state, counter) do
107147

108148
Mongo.Events.notify(%ServerHeartbeatStartedEvent{ connection_pid: conn_pid})
109149

110-
{result, finish_time, rtt} = call_is_master(conn_pid, opts)
150+
{result, finish_time, rtt} = call_is_master_command(conn_pid, opts)
111151
case result do
112152
{:ok, is_master_reply} ->
113153
notify_success(rtt, is_master_reply, conn_pid)
114-
ServerDescription.from_is_master(last_server_description, rtt, finish_time, is_master_reply)
154+
ServerDescription.from_is_master(address, average_rtt(last_rtt, rtt), finish_time, is_master_reply)
115155

116156
{:error, error} when counter == 1 ->
117157
notify_error(rtt, error, conn_pid)
118-
ServerDescription.from_is_master_error(last_server_description, error)
119-
{:error, error} -> get_server_description(state, counter + 1)
158+
ServerDescription.from_is_master_error(address, error)
159+
{:error, _error} -> get_server_description(state, counter + 1)
120160
end
121161
end
122162

163+
defp average_rtt(nil, rtt) do
164+
round(rtt)
165+
end
123166
defp average_rtt(last_rtt, rtt) do
124167
round(0.2 * rtt + 0.8 * last_rtt)
125168
end
126169

127-
defp call_is_master(conn_pid, opts) do
170+
defp call_is_master_command(conn_pid, opts) do
171+
128172
start_time = System.monotonic_time
129173
result = Mongo.exec_command(conn_pid, [isMaster: 1], opts)
130174
finish_time = System.monotonic_time
@@ -150,30 +194,4 @@ defmodule Mongo.Monitor do
150194
})
151195
end
152196

153-
defp get_server_description_old(%{connection_pid: conn_pid, server_description: last_server_description, opts: opts}) do
154-
Mongo.Events.notify(%ServerHeartbeatStartedEvent{ connection_pid: conn_pid})
155-
156-
{result, finish_time, rtt} = call_is_master(conn_pid, opts)
157-
case result do
158-
{:ok, is_master_reply} ->
159-
notify_success(rtt, is_master_reply, conn_pid)
160-
ServerDescription.from_is_master(last_server_description, rtt, finish_time, is_master_reply)
161-
162-
{:error, error} ->
163-
if last_server_description.type in [:unknown, :possible_primary] do
164-
notify_error(rtt, error, conn_pid)
165-
ServerDescription.from_is_master_error(last_server_description, error)
166-
else
167-
{result, finish_time, rtt} = call_is_master(conn_pid, opts)
168-
case result do
169-
{:ok, is_master_reply} ->
170-
notify_success(rtt, is_master_reply, conn_pid)
171-
ServerDescription.from_is_master(last_server_description, rtt, finish_time, is_master_reply)
172-
{:error, error} ->
173-
notify_error(rtt, error, conn_pid)
174-
ServerDescription.from_is_master_error(last_server_description, error)
175-
end
176-
end
177-
end
178-
end
179197
end

lib/mongo/server_description.ex

Lines changed: 14 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -48,39 +48,34 @@ defmodule Mongo.ServerDescription do
4848
}, map)
4949
end
5050

51-
def from_is_master_error(last_server_description, error) do
51+
def from_is_master_error(address, error) do
5252
defaults(%{
53-
address: last_server_description.address,
53+
address: address,
5454
error: error
5555
})
5656
end
5757

5858
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#parsing-an-ismaster-response
59-
def from_is_master(last_description, rtt, finish_time, is_master_reply) do
60-
last_rtt = last_description.round_trip_time || rtt
61-
62-
defaults(%{
63-
address: last_description.address,
64-
round_trip_time: round(0.2 * rtt + 0.8 * last_rtt),
59+
def from_is_master(address, rtt, finish_time, is_master_reply) do
60+
%{
61+
address: address,
62+
round_trip_time: rtt,
6563
type: determine_server_type(is_master_reply),
66-
last_write_date: get_in(is_master_reply,
67-
["lastWrite", "lastWriteDate"]),
64+
last_write_date: get_in(is_master_reply,["lastWrite", "lastWriteDate"]),
6865
op_time: get_in(is_master_reply, ["lastWrite", "opTime"]),
6966
last_update_time: finish_time,
7067
min_wire_version: is_master_reply["minWireVersion"] || 0,
7168
max_wire_version: is_master_reply["maxWireVersion"] || 0,
7269
me: is_master_reply["me"],
7370
hosts: (is_master_reply["hosts"] || []) |> Enum.map(&String.downcase/1),
74-
passives: (is_master_reply["passives"] || [])
75-
|> Enum.map(&String.downcase/1),
76-
arbiters: (is_master_reply["arbiters"] || [])
77-
|> Enum.map(&String.downcase/1),
71+
passives: (is_master_reply["passives"] || []) |> Enum.map(&String.downcase/1),
72+
arbiters: (is_master_reply["arbiters"] || []) |> Enum.map(&String.downcase/1),
7873
tag_set: is_master_reply["tags"] || %{},
7974
set_name: is_master_reply["setName"],
8075
set_version: is_master_reply["setVersion"],
8176
election_id: is_master_reply["electionId"],
8277
primary: is_master_reply["primary"]
83-
})
78+
}
8479
end
8580

8681
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#type
@@ -89,14 +84,10 @@ defmodule Mongo.ServerDescription do
8984
defp determine_server_type(%{"isreplicaset" => true}), do: :rs_ghost
9085
defp determine_server_type(%{"setName" => set_name} = is_master_reply) when set_name != nil do
9186
case is_master_reply do
92-
%{"ismaster" => true} ->
93-
:rs_primary
94-
%{"secondary" => true} ->
95-
:rs_secondary
96-
%{"arbiterOnly" => true} ->
97-
:rs_arbiter
98-
_ ->
99-
:rs_other
87+
%{"ismaster" => true} -> :rs_primary
88+
%{"secondary" => true} -> :rs_secondary
89+
%{"arbiterOnly" => true} -> :rs_arbiter
90+
_ -> :rs_other
10091
end
10192
end
10293
defp determine_server_type(_), do: :standalone

0 commit comments

Comments
 (0)