Skip to content

Commit edf909f

Browse files
committed
not master wip
1 parent 4b64c2a commit edf909f

File tree

3 files changed

+43
-18
lines changed

3 files changed

+43
-18
lines changed

lib/mongo.ex

Lines changed: 19 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -407,7 +407,7 @@ defmodule Mongo do
407407
opts = Mongo.retryable_reads(opts)
408408

409409
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
410-
result <- exec_command_session(session, cmd, opts),
410+
result <- exec_command_session(topology_pid, session, cmd, opts),
411411
:ok <- Session.end_implict_session(topology_pid, session) do
412412
case result do
413413
{:error, error} ->
@@ -429,7 +429,7 @@ defmodule Mongo do
429429
opts = Mongo.retryable_writes(opts, acknowledged?(cmd[:writeConcerns]))
430430

431431
with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts),
432-
result <- exec_command_session(session, cmd, opts),
432+
result <- exec_command_session(topology_pid, session, cmd, opts),
433433
:ok <- Session.end_implict_session(topology_pid, session) do
434434
result
435435
else
@@ -723,20 +723,28 @@ defmodule Mongo do
723723
end
724724

725725
@doc false
726-
@spec exec_command_session(GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
727-
def exec_command_session(session, cmd, opts) do
728-
with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd),
726+
@spec exec_command_session(GenServer.server, GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
727+
def exec_command_session(topology_pid, session, cmd, opts) do
728+
with {:ok, conn, new_cmd, address} <- Session.bind_session(session, cmd),
729729
{:ok, _cmd, {doc, event}} <- DBConnection.execute(conn, %Query{action: :command}, [new_cmd], defaults(opts)),
730730
doc <- Session.update_session(session, doc, opts),
731-
{:ok, doc} <- check_for_error(doc, event) do
731+
{:ok, doc} <- check_for_error(doc, event, address) do
732732
{:ok, doc}
733733
else
734734
{:error, error} ->
735735
## todo update Topology
736+
IO.inspect(error)
737+
if error.not_master_or_recovering do
738+
IO.inspect("ok")
739+
server_description = Mongo.ServerDescription.from_is_master_error(error.address, error)
740+
GenServer.cast(topology_pid, {:server_description, server_description})
741+
end
742+
#require IEx; IEx.pry
743+
#:debugger.start()
736744
case Error.should_retry_write(error, cmd, opts) do
737745
true ->
738746
with :ok <- Session.select_server(session, opts) do
739-
exec_command_session(session, cmd, Keyword.put(opts, :write_counter, 2))
747+
exec_command_session(topology_pid, session, cmd, Keyword.put(opts, :write_counter, 2))
740748
end
741749
false -> {:error, error}
742750
end
@@ -748,13 +756,13 @@ defmodule Mongo do
748756
@spec exec_command(GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
749757
def exec_command(conn, cmd, opts) do
750758
with {:ok, _cmd, {doc, event}} <- DBConnection.execute(conn, %Query{action: :command}, [cmd], defaults(opts)),
751-
{:ok, doc} <- check_for_error(doc, event) do
759+
{:ok, doc} <- check_for_error(doc, event, conn) do
752760
{:ok, doc}
753761
end
754762

755763
end
756764

757-
defp check_for_error(%{"ok" => ok} = response, {event, duration}) when ok == 1 do
765+
defp check_for_error(%{"ok" => ok} = response, {event, duration}, conn) when ok == 1 do
758766
Events.notify(%CommandSucceededEvent{
759767
reply: response,
760768
duration: duration,
@@ -765,9 +773,9 @@ defmodule Mongo do
765773
}, :commands)
766774
{:ok, response}
767775
end
768-
defp check_for_error(doc, {event, duration}) do
776+
defp check_for_error(doc, {event, duration}, address) do
769777

770-
error = Mongo.Error.exception(doc)
778+
error = Mongo.Error.exception(doc, address)
771779

772780
Events.notify(%CommandFailedEvent{
773781
failure: error,

lib/mongo/error.ex

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ defmodule Mongo.Error do
22

33
alias Mongo.Events
44

5-
defexception [:message, :code, :host, :error_labels, :resumable, :retryable_reads, :retryable_writes]
5+
defexception [:message, :code, :host, :error_labels, :resumable, :retryable_reads, :retryable_writes, :not_master_or_recovering, :address]
66

77
@host_unreachable 6
88
@host_not_found 7
@@ -21,6 +21,18 @@ defmodule Mongo.Error do
2121
@stale_config 13388
2222
@retry_change_stream 234
2323
@failed_to_satisfy_read_preference 133
24+
25+
# https://github.com/mongodb/specifications/blob/master/source/server-discovery-and-monitoring/server-discovery-and-monitoring.rst#not-master-and-node-is-recovering
26+
@not_master_or_recovering [
27+
@interrupted_at_shutdown,
28+
@interrupted_due_to_repl_state_change,
29+
@not_master_or_secondary,
30+
@primary_stepped_down,
31+
@shutdown_in_progress,
32+
33+
@not_master,
34+
@not_master_no_slaveok,
35+
]
2436

2537
@retryable_writes [@interrupted_at_shutdown, @interrupted_due_to_repl_state_change, @not_master, @not_master_no_slaveok,
2638
@not_master_or_secondary, @primary_stepped_down, @shutdown_in_progress, @host_not_found,
@@ -42,7 +54,8 @@ defmodule Mongo.Error do
4254
error_labels: [String.t] | nil,
4355
resumable: boolean,
4456
retryable_reads: boolean,
45-
retryable_writes: boolean
57+
retryable_writes: boolean,
58+
not_master_or_recovering: boolean,
4659
}
4760

4861
def message(e) do
@@ -60,12 +73,13 @@ defmodule Mongo.Error do
6073
%Mongo.Error{message: "#{host} ssl #{action}: #{formatted_reason} - #{inspect(reason)}", host: host, resumable: false}
6174
end
6275

63-
def exception(%{"code" => code, "errmsg" => msg} = doc) do
76+
def exception(%{"code" => code, "errmsg" => msg} = doc, address) do
6477
errorLabels = doc["errorLabels"] || []
6578
resumable = Enum.any?(@resumable, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "ResumableChangeStreamError"))
6679
retryable_reads = Enum.any?(@retryable_reads, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "RetryableReadError"))
6780
retryable_writes = Enum.any?(@retryable_writes, &(&1 == code)) || Enum.any?(errorLabels, &(&1 == "RetryableWriteError"))
68-
%Mongo.Error{message: msg, code: code, error_labels: errorLabels, resumable: resumable, retryable_reads: retryable_reads, retryable_writes: retryable_writes}
81+
not_master_or_recovering = Enum.any?(@not_master_or_recovering, &(&1 == code))
82+
%Mongo.Error{message: msg, code: code, error_labels: errorLabels, resumable: resumable, retryable_reads: retryable_reads, retryable_writes: retryable_writes, not_master_or_recovering: not_master_or_recovering, address: address}
6983
end
7084
def exception(message: message, code: code) do
7185
%Mongo.Error{message: message, code: code, resumable: Enum.any?(@resumable, &(&1 == code))}
@@ -76,7 +90,7 @@ defmodule Mongo.Error do
7690
end
7791

7892
@doc """
79-
Return true if the error is retryalble for read operations.
93+
Return true if the error is retryable for read operations.
8094
"""
8195
def should_retry_read(%Mongo.Error{retryable_reads: true}, cmd, opts) do
8296
[{command_name,_}|_] = cmd
@@ -93,7 +107,7 @@ defmodule Mongo.Error do
93107
end
94108

95109
@doc """
96-
Return true if the error is retryalble for writes operations.
110+
Return true if the error is retryable for writes operations.
97111
"""
98112
def should_retry_write(%Mongo.Error{retryable_writes: true}, cmd, opts) do
99113
[{command_name,_}|_] = cmd

lib/mongo/session.ex

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -473,7 +473,10 @@ defmodule Mongo.Session do
473473
|> ReadPreference.add_read_preference(opts)
474474
|> filter_nils()
475475

476-
{:keep_state_and_data, {:ok, conn, cmd}}
476+
{host, port} = Mongo.MongoDBConnection.Utils.hostname_port(opts)
477+
address = "#{host}:#{port}"
478+
479+
{:keep_state_and_data, {:ok, conn, cmd, address}}
477480
end
478481
def handle_call_event({:bind_session, cmd}, :starting_transaction,
479482
%Session{conn: conn,

0 commit comments

Comments
 (0)