Skip to content

Commit b4ff435

Browse files
committed
#106: work in progress: better error handling if monitor lost the connection
1 parent dde3b44 commit b4ff435

File tree

7 files changed

+67
-86
lines changed

7 files changed

+67
-86
lines changed

insights/config/dev.exs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,9 @@ config :insights, InsightsWeb.Endpoint,
5555
]
5656

5757
# Do not include metadata nor timestamps in development logs
58-
config :logger, :console, format: "[$level] $message\n"
58+
config :logger, :console,
59+
format: "$time [$level] $message ($metadata)\n",
60+
metadata: [:module, :function, :line]
5961

6062
# Set a higher stacktrace during development. Avoid configuring such
6163
# in production as building large stacktraces may be expensive.

lib/mongo.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -841,7 +841,7 @@ defmodule Mongo do
841841
%CommandSucceededEvent{
842842
reply: response,
843843
duration: duration,
844-
command_name: event.command_name,
844+
command_name: event.command_name, ## todo
845845
#request_id: event.request_id,
846846
#operation_id: event.operation_id,
847847
#connection_id: event.connection_id

lib/mongo/mongo_db_connection.ex

Lines changed: 1 addition & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@ defmodule Mongo.MongoDBConnection do
88
use DBConnection
99
use Mongo.Messages
1010

11-
## todo import Keywords
12-
1311
alias Mongo.Events
1412
alias Mongo.Events.CommandStartedEvent
1513
alias Mongo.MongoDBConnection.Utils
@@ -223,7 +221,7 @@ defmodule Mongo.MongoDBConnection do
223221
@impl true
224222
def ping(%{wire_version: _wire_version} = state) do
225223
###with {:ok, %{wire_version: ^wire_version}} <- wire_version(state), do: {:ok, state}
226-
## Logger.info("Ignoring ping")
224+
## todo Logger.info("Ignoring ping")
227225
{:ok, state}
228226
end
229227

@@ -268,11 +266,9 @@ defmodule Mongo.MongoDBConnection do
268266
Events.notify(event, :commands)
269267

270268
with {duration, {:ok, flags, doc}} <- :timer.tc(fn -> Utils.get_response(state.request_id, %{state | timeout: timeout}) end)do
271-
Logger.info("more_to_come-Response: flags: #{inspect Integer.to_string(flags, 2)} and docs #{inspect doc}")
272269
{:ok, {doc, event, flags, duration}, state}
273270
else
274271
{_duration, error} ->
275-
Logger.info("More to come error")
276272
error
277273
end
278274
end
@@ -305,11 +301,9 @@ defmodule Mongo.MongoDBConnection do
305301

306302
with {duration, {:ok, flags, doc}} <- :timer.tc(fn -> Utils.post_request(op, state.request_id, %{state | timeout: timeout}) end),
307303
state = %{state | request_id: state.request_id + 1} do
308-
Logger.info("Post-Request ok")
309304
{:ok, {doc, event, flags, duration}, state}
310305
else
311306
{_duration, error} ->
312-
Logger.info("Post-Request error")
313307
error
314308
end
315309
end

lib/mongo/monitor.ex

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,6 +108,7 @@ defmodule Mongo.Monitor do
108108

109109
info("Stopping streaming")
110110
GenServer.stop(streaming_pid, reason)
111+
info("stopped streaming")
111112
end
112113

113114
@doc """
@@ -189,7 +190,7 @@ defmodule Mongo.Monitor do
189190
%{state | round_trip_time: round_trip_time}
190191
else
191192
error ->
192-
Logger.warn("Unable to update server description because of #{inspect error}")
193+
Logger.warn("Unable to round trip time because of #{inspect error}")
193194
state
194195
end
195196
end

lib/mongo/streaming_hello_monitor.ex

Lines changed: 34 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -1,57 +1,44 @@
1-
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#monitoring
21
defmodule Mongo.StreamingHelloMonitor do
32
@moduledoc """
4-
Each server has a monitor process. The monitor process is created by the topology process.
3+
See https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-monitoring.rst#streaming-protocol
54
6-
If the network connection is working, then the monitor process reports this and the topology process starts the
7-
connection pool. Per server we get 1 + pool size connections to each server.
5+
The streaming protocol is used to monitor MongoDB 4.4+ servers and optimally reduces the time it takes for a client to discover server state changes.
6+
Multi-threaded or asynchronous drivers MUST use the streaming protocol when connected to a server that supports the awaitable hello or legacy hello commands.
7+
This protocol requires an extra thread and an extra socket for each monitor to perform RTT calculations.
88
9-
After waiting for `heartbeat_frequency_ms` milliseconds, the monitor process calls `isMaster` command and
10-
reports the result to the topology process.
11-
12-
The result of the `isMaster` command is mapped the `ServerDescription` structure and sent to the topology process, which
13-
updates it internal data structure.
9+
This module implements the streaming protocol. The GenServer is started and maintained by the Monitor process. The streaming hello monitor uses the
10+
more to come flag while updating the current server description.
1411
"""
1512
require Logger
1613

1714
use GenServer
1815
use Bitwise
1916

17+
alias Mongo.Events.ServerHeartbeatFailedEvent
18+
alias Mongo.Events.ServerHeartbeatStartedEvent
19+
alias Mongo.Events.ServerHeartbeatSucceededEvent
20+
alias Mongo.ServerDescription
2021
alias Mongo.StreamingHelloMonitor
2122
alias Mongo.Topology
22-
alias Mongo.ServerDescription
23-
alias Mongo.Events.{ServerHeartbeatStartedEvent, ServerHeartbeatFailedEvent,ServerHeartbeatSucceededEvent}
2423

25-
# this is not configurable because the specification says so
26-
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#minheartbeatfrequencyms
27-
# not used @min_heartbeat_frequency_ms 500
24+
@more_to_come_mask 0x2
2825

2926
def start_link(args) do
3027
GenServer.start_link(StreamingHelloMonitor, args)
3128
end
3229

33-
@doc """
34-
Try to update the server description.
35-
"""
36-
def force_check(pid) do
37-
GenServer.cast(pid, :update)
38-
end
39-
40-
def set_heartbeat_frequency_ms(pid, heartbeat_frequency_ms) do
41-
GenServer.cast(pid, {:update, heartbeat_frequency_ms})
42-
end
43-
4430
@doc """
4531
Initialize the monitor process
4632
"""
4733
def init([topology_pid, address, heartbeat_frequency_ms, opts]) do
4834

35+
heartbeat_frequency_ms = 10_000
36+
4937
opts = opts
5038
|> Keyword.drop([:after_connect])
51-
|> Keyword.put(:after_connect, {__MODULE__, :connected, [self()]})
39+
|> Keyword.put(:after_connect, {StreamingHelloMonitor, :connected, [self()]})
5240
|> Keyword.put(:connection_type, :stream_monitor)
5341

54-
5542
info("Starting stream hello monitor with options #{inspect(opts, pretty: true)}")
5643

5744
with {:ok, pid} <- DBConnection.start_link(Mongo.MongoDBConnection, opts) do
@@ -60,7 +47,7 @@ defmodule Mongo.StreamingHelloMonitor do
6047
topology_pid: topology_pid, ## the topology_pid to which we report
6148
address: address, ## the address of the server, needed to make updates
6249
heartbeat_frequency_ms: heartbeat_frequency_ms, ## current heartbeat_frequency_ms
63-
max_await_time_ms: 10000,
50+
max_await_time_ms: heartbeat_frequency_ms,
6451
more_to_come: false,
6552
topology_version: nil, # {processId: <ObjectId>, counter: <int64>},
6653
opts: opts ## options
@@ -72,8 +59,8 @@ defmodule Mongo.StreamingHelloMonitor do
7259
@doc """
7360
In this case we stop the DBConnection.
7461
"""
75-
def terminate(reason, %{connection_pid: connection_pid} = state) do
76-
info("Terminating streaming hello monitor for reason #{inspect reason}")
62+
def terminate(reason, %{connection_pid: connection_pid}) do
63+
## debug info("Terminating streaming hello monitor for reason #{inspect reason}")
7764
GenServer.stop(connection_pid, reason)
7865
end
7966

@@ -93,7 +80,7 @@ defmodule Mongo.StreamingHelloMonitor do
9380

9481
def handle_info(:update, state) do
9582
new_state = update_server_description(state)
96-
Process.send_after(self(), :update, new_state.heartbeat_frequency_ms)
83+
send(self(), :update)
9784
{:noreply, new_state}
9885
end
9986

@@ -102,12 +89,12 @@ defmodule Mongo.StreamingHelloMonitor do
10289
#
10390
defp update_server_description(%{topology_pid: topology_pid} = state) do
10491
with {topology_version, flags, server_description} <- get_server_description(state) do
92+
info("Updating server description")
10593
Topology.update_server_description(topology_pid, server_description)
10694
state = %{state | topology_version: topology_version}
10795

108-
case flags &&& 0x2 do
109-
0x2 ->
110-
info("More to come")
96+
case flags &&& @more_to_come_mask do
97+
@more_to_come_mask ->
11198
state = %{state | more_to_come: true}
11299
update_server_description(state)
113100
_other ->
@@ -119,39 +106,32 @@ defmodule Mongo.StreamingHelloMonitor do
119106

120107
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#network-error-when-calling-ismaster
121108
##
122-
# Calls isMaster and parses the result to update the server description
123-
# In case of one network error, the function tries one more time to call isMaster command
109+
# Calls hello command and parses the result to update the server description
124110
#
125-
defp get_server_description(%{connection_pid: conn_pid, address: address, topology_version: topology_version, opts: opts} = state) do
111+
defp get_server_description(%{connection_pid: conn_pid, address: address, topology_version: topology_version, max_await_time_ms: max_await_time_ms, opts: opts} = state) do
126112

127113
Mongo.Events.notify(%ServerHeartbeatStartedEvent{ connection_pid: conn_pid})
128114

129115
{duration, result} = case state do
130116
%{more_to_come: true} ->
131-
:timer.tc(fn ->
132-
133-
info("Calling more to come")
134-
result = Mongo.exec_more_to_come(conn_pid, opts)
135-
info("End of more to come")
136-
result
137-
end)
117+
:timer.tc(fn -> Mongo.exec_more_to_come(conn_pid, opts) end)
138118

139119
_other ->
120+
opts = Keyword.put(opts, :max_await_time_ms, max_await_time_ms)
140121
:timer.tc(fn -> hello_command(conn_pid, topology_version, opts) end)
141122
end
142123

143124
case result do
144125
{:ok, {flags, hello_doc}} ->
145126

146-
info("Got flags: #{inspect flags}")
147-
148127
server_description = hello_doc
149128
|> ServerDescription.parse_hello_response()
150129
|> Map.put(:address, address)
151130
|> Map.put(:last_update_time, DateTime.utc_now())
152131
|> Map.put(:error, nil)
153132

154133
notify_success(duration, hello_doc, conn_pid)
134+
155135
{hello_doc["topologyVersion"], flags, server_description}
156136

157137
{:error, error} ->
@@ -168,7 +148,13 @@ defmodule Mongo.StreamingHelloMonitor do
168148

169149
defp hello_command(conn_pid, %{"counter" => counter, "processId" => process_id}, opts) do
170150
opts = Keyword.merge(opts, [flags: [:exhaust_allowed]])
171-
Mongo.exec_command(conn_pid, [isMaster: 1, maxAwaitTimeMS: 10_000, topologyVersion: %{counter: %BSON.LongNumber{value: counter}, processId: process_id}], opts)
151+
cmd = [isMaster: 1,
152+
maxAwaitTimeMS: Keyword.get(opts, :max_await_time_ms, 10_000),
153+
topologyVersion: %{
154+
counter: %BSON.LongNumber{value: counter},
155+
processId: process_id}
156+
]
157+
Mongo.exec_command(conn_pid, cmd, opts)
172158
end
173159

174160
defp hello_command(conn_pid, _topology_version, opts) do
@@ -183,7 +169,7 @@ defmodule Mongo.StreamingHelloMonitor do
183169
Mongo.Events.notify(%ServerHeartbeatSucceededEvent{duration: duration, reply: reply, connection_pid: conn_pid})
184170
end
185171

186-
defp info(message) do
172+
def info(message) do
187173
Logger.info(IO.ANSI.format([:blue, :bright, message]))
188174
end
189175

lib/mongo/topology.ex

Lines changed: 24 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -113,11 +113,8 @@ defmodule Mongo.Topology do
113113
## GenServer Callbacks
114114

115115
# see https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#configuration
116-
@doc false
117116
def init(opts) do
118117

119-
Logger.info("Starting topology!!")
120-
121118
seeds = Keyword.get(opts, :seeds, [seed(opts)])
122119
type = Keyword.get(opts, :type, :unknown)
123120
set_name = Keyword.get(opts, :set_name, nil)
@@ -158,9 +155,6 @@ defmodule Mongo.Topology do
158155
end
159156

160157
def terminate(_reason, state) do
161-
162-
Logger.info("Terminating Topology")
163-
164158
case state.opts[:pw_safe] do
165159
nil -> nil
166160
pid -> GenServer.stop(pid)
@@ -195,18 +189,27 @@ defmodule Mongo.Topology do
195189
{:noreply, new_state}
196190
end
197191

198-
def handle_cast({:disconnect, kind, host}, state) do
192+
##
193+
# In case of :monitor or :stream_monitor we mark the server description of the address as unknown
194+
##
195+
def handle_cast({:disconnect, kind, address}, state) when kind in [:monitor, :stream_monitor] do
196+
server_description = ServerDescription.parse_hello_response(address, "#{inspect kind} disconnected")
199197

200-
Logger.info("Disconnection by #{inspect kind}")
201-
new_state = remove_address(host, state)
202-
maybe_reinit(new_state)
203-
{:noreply, new_state}
198+
new_state = address
199+
|> remove_address(state)
200+
|> maybe_reinit()
201+
202+
handle_cast({:server_description, server_description}, new_state)
204203
end
205204

206-
def handle_cast({:disconnect, :client, _host}, state) do
205+
def handle_cast({:disconnect, _kind, _host}, state) do
207206
{:noreply, state}
208207
end
209208

209+
##
210+
# After the monitor is connected to the server, the connection pool is started and
211+
# the "waiting pids" are informed to call the command again
212+
##
210213
def handle_cast({:connected, monitor_pid}, state) do
211214
monitor = Enum.find(state.monitors, fn {_key, value} -> value == monitor_pid end)
212215
new_state = case monitor do
@@ -475,8 +478,9 @@ defmodule Mongo.Topology do
475478
state
476479
end
477480

478-
defp maybe_reinit(%{monitors: monitors} = state) when map_size(monitors) > 0,
479-
do: state
481+
defp maybe_reinit(%{monitors: monitors} = state) when map_size(monitors) > 0 do
482+
state
483+
end
480484
defp maybe_reinit(state) do
481485
servers = servers_from_seeds(state.seeds)
482486

@@ -493,17 +497,13 @@ defmodule Mongo.Topology do
493497

494498
defp remove_address(address, state) do
495499

496-
IO.inspect(Process.info(self(), :current_stacktrace), label: "STACKTRACE")
500+
Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})
501+
GenServer.stop(state.monitors[address])
497502

498-
Logger.info("Removing address #{inspect address}")
499-
500-
:ok = Mongo.Events.notify(%ServerClosedEvent{address: address, topology_pid: self()})
501-
:ok = GenServer.stop(state.monitors[address])
502-
503-
:ok = case state.connection_pools[address] do
504-
nil -> :ok
505-
pid -> GenServer.stop(pid)
506-
end
503+
case state.connection_pools[address] do
504+
nil -> :ok
505+
pid -> GenServer.stop(pid)
506+
end
507507

508508
%{state | monitors: Map.delete(state.monitors, address),
509509
connection_pools: Map.delete(state.connection_pools, address)}

lib/mongo/topology_description.ex

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -67,10 +67,7 @@ defmodule Mongo.TopologyDescription do
6767
end
6868

6969
@doc """
70-
Returns a tuple of three values:
71-
* servers: possible list of servers, maybe []
72-
* slave_ok:
73-
* mongod?:
70+
Selects the next possible server from the current topology.
7471
"""
7572
def select_servers(topology, type, opts \\ [])
7673
def select_servers(%{:compatible => false}, _type, _opts) do
@@ -85,6 +82,7 @@ defmodule Mongo.TopologyDescription do
8582
end
8683

8784
addr = servers
85+
|> Enum.filter(fn {_, %{type: type}} -> type != :unknown end) ## only valid servers
8886
|> Enum.map(fn {server, _} -> server end)
8987
|> Enum.take_random(1)
9088

0 commit comments

Comments
 (0)