Skip to content

Commit bbf159c

Browse files
committed
first approach for #63
1 parent 5ebf0e1 commit bbf159c

File tree

10 files changed

+290
-117
lines changed

10 files changed

+290
-117
lines changed

lib/mongo.ex

Lines changed: 50 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,8 @@ defmodule Mongo do
5252
import Mongo.Utils
5353
import Mongo.WriteConcern
5454

55+
require Logger
56+
5557
use Bitwise
5658
use Mongo.Messages
5759

@@ -409,20 +411,24 @@ defmodule Mongo do
409411
opts = Mongo.retryable_reads(opts)
410412

411413
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
412-
result <- exec_command_session(topology_pid, session, cmd, opts),
414+
result <- exec_command_session(session, cmd, opts),
413415
:ok <- Session.end_implict_session(topology_pid, session) do
414416
case result do
415417
{:error, error} ->
416-
case Error.should_retry_read(error, cmd, opts) do
417-
true -> issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2))
418-
false -> {:error, error}
418+
419+
cond do
420+
Error.not_writable_primary_or_recovering?(error, opts) ->
421+
## in case of explicity
422+
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2))
423+
424+
Error.should_retry_read(error, cmd, opts) ->
425+
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :read_counter, 2))
426+
427+
true ->
428+
{:error, error}
419429
end
420430
_other -> result
421431
end
422-
else
423-
{:new_connection, _server} ->
424-
:timer.sleep(1000)
425-
issue_command(topology_pid, cmd, :read, opts)
426432
end
427433
end
428434
def issue_command(topology_pid, cmd, :write, opts) do
@@ -431,13 +437,27 @@ defmodule Mongo do
431437
opts = Mongo.retryable_writes(opts, acknowledged?(cmd[:writeConcerns]))
432438

433439
with {:ok, session} <- Session.start_implicit_session(topology_pid, :write, opts),
434-
result <- exec_command_session(topology_pid, session, cmd, opts),
440+
result <- exec_command_session(session, cmd, opts),
435441
:ok <- Session.end_implict_session(topology_pid, session) do
436-
result
437-
else
438-
{:new_connection, _server} ->
439-
:timer.sleep(1000)
440-
issue_command(topology_pid, cmd, :write, opts)
442+
443+
case result do
444+
{:error, error} ->
445+
cond do
446+
Error.not_writable_primary_or_recovering?(error, opts) ->
447+
## in case of explicity
448+
issue_command(topology_pid, cmd, :read, Keyword.put(opts, :retry_counter, 2))
449+
450+
Error.should_retry_write(error, cmd, opts) ->
451+
issue_command(topology_pid, cmd, :write, Keyword.put(opts, :write_counter, 2))
452+
453+
true ->
454+
{:error, error}
455+
end
456+
457+
result ->
458+
result
459+
end
460+
441461
end
442462
end
443463

@@ -724,31 +744,23 @@ defmodule Mongo do
724744
end
725745

726746
@doc false
727-
@spec exec_command_session(GenServer.server, GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
728-
def exec_command_session(topology_pid, session, cmd, opts) do
729-
with {:ok, conn, new_cmd, address} <- Session.bind_session(session, cmd),
747+
@spec exec_command_session(GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
748+
def exec_command_session(session, cmd, opts) do
749+
with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd),
730750
{:ok, _cmd, {doc, event}} <- DBConnection.execute(conn, %Query{action: :command}, [new_cmd], defaults(opts)),
731751
doc <- Session.update_session(session, doc, opts),
732-
{:ok, doc} <- check_for_error(doc, event, address) do
752+
{:ok, doc} <- check_for_error(doc, event) do
733753
{:ok, doc}
734-
else
754+
else
735755
{:error, error} ->
736-
## todo update Topology
737-
IO.inspect(error)
738-
if error.not_master_or_recovering do
739-
IO.inspect("ok")
740-
server_description = Mongo.ServerDescription.from_is_master_error(error.address, error)
741-
GenServer.cast(topology_pid, {:server_description, server_description})
742-
end
743-
#require IEx; IEx.pry
744-
#:debugger.start()
745-
case Error.should_retry_write(error, cmd, opts) do
746-
true ->
747-
with :ok <- Session.select_server(session, opts) do
748-
exec_command_session(topology_pid, session, cmd, Keyword.put(opts, :write_counter, 2))
749-
end
750-
false -> {:error, error}
756+
case Error.not_writable_primary_or_recovering?(error, opts) do
757+
true ->
758+
Session.mark_server_unknown(session)
759+
{:error, error}
760+
false ->
761+
{:error, error}
751762
end
763+
752764
end
753765

754766
end
@@ -757,13 +769,13 @@ defmodule Mongo do
757769
@spec exec_command(GenServer.server, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
758770
def exec_command(conn, cmd, opts) do
759771
with {:ok, _cmd, {doc, event}} <- DBConnection.execute(conn, %Query{action: :command}, [cmd], defaults(opts)),
760-
{:ok, doc} <- check_for_error(doc, event, conn) do
772+
{:ok, doc} <- check_for_error(doc, event) do
761773
{:ok, doc}
762774
end
763775

764776
end
765777

766-
defp check_for_error(%{"ok" => ok} = response, {event, duration}, conn) when ok == 1 do
778+
defp check_for_error(%{"ok" => ok} = response, {event, duration}) when ok == 1 do
767779
Events.notify(%CommandSucceededEvent{
768780
reply: response,
769781
duration: duration,
@@ -774,9 +786,9 @@ defmodule Mongo do
774786
}, :commands)
775787
{:ok, response}
776788
end
777-
defp check_for_error(doc, {event, duration}, address) do
789+
defp check_for_error(doc, {event, duration}) do
778790

779-
error = Mongo.Error.exception(doc, address)
791+
error = Mongo.Error.exception(doc)
780792

781793
Events.notify(%CommandFailedEvent{
782794
failure: error,

lib/mongo/bulk_write.ex

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -187,10 +187,6 @@ defmodule Mongo.BulkWrite do
187187
result = one_bulk_write(topology_pid, session, bulk, opts),
188188
:ok <- Session.end_implict_session(topology_pid, session) do
189189
result
190-
else
191-
{:new_connection, _server} ->
192-
:timer.sleep(1000)
193-
write(topology_pid, bulk, opts)
194190
end
195191

196192
end
@@ -210,10 +206,6 @@ defmodule Mongo.BulkWrite do
210206
|> BulkWriteResult.reduce(empty) do
211207

212208
result
213-
else
214-
{:new_connection, _server} ->
215-
:timer.sleep(1000)
216-
write(topology_pid, bulk, opts)
217209
end
218210

219211
end

lib/mongo/change_stream.ex

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ defmodule Mongo.ChangeStream do
33
alias Mongo.Session
44
alias Mongo.Error
55

6+
require Logger
7+
68
import Record, only: [defrecordp: 2]
79

810
defstruct [:topology_pid, :session, :doc, :cmd, :on_resume_token, :opts]
@@ -69,6 +71,7 @@ defmodule Mongo.ChangeStream do
6971

7072
def aggregate(topology_pid, cmd, fun, opts) do
7173

74+
Logger.info("start_implicit_session ")
7275
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
7376
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do
7477

@@ -84,6 +87,7 @@ defmodule Mongo.ChangeStream do
8487

8588
def aggregate(topology_pid, session, doc, cmd, fun) do
8689

90+
Logger.info("aggregate wire version: #{inspect Mongo.wire_version(topology_pid)}")
8791
with %{"operationTime" => op_time,
8892
"cursor" => %{
8993
"id" => cursor_id,
@@ -128,6 +132,7 @@ defmodule Mongo.ChangeStream do
128132
change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd,
129133
on_resume_token: fun) = change_stream, opts) do
130134

135+
Logger.info("Get more")
131136
get_more = [
132137
getMore: %BSON.LongNumber{value: cursor_id},
133138
collection: coll,
@@ -145,7 +150,7 @@ defmodule Mongo.ChangeStream do
145150

146151
case token_changes(old_token, new_token) do
147152
true -> fun.(new_token)
148-
false -> nil
153+
false -> :noop
149154
end
150155

151156
{:ok, %{cursor_id: new_cursor_id, docs: docs, change_stream: change_stream}}
@@ -154,6 +159,7 @@ defmodule Mongo.ChangeStream do
154159
{:error, %Mongo.Error{resumable: false} = not_resumable} -> {:error, not_resumable}
155160
{:error, _error} ->
156161

162+
Logger.info("Resuming....: #{inspect Mongo.wire_version(topology_pid) }")
157163
with {:ok, wire_version} <- Mongo.wire_version(topology_pid) do
158164

159165
[%{"$changeStream" => stream_opts} | pipeline] = Keyword.get(aggregate_cmd, :pipeline) # extract the change stream options
@@ -165,11 +171,14 @@ defmodule Mongo.ChangeStream do
165171
kill_cursors(session, coll, [cursor_id], opts)
166172

167173
# Start aggregation again...
174+
Logger.info("Calling aggregate again")
168175
with {:ok, state} <- aggregate(topology_pid, aggregate_cmd, fun, opts) do
169176
{:resume, state}
170177
end
171178
end
172-
179+
reason ->
180+
Logger.info("Error: #{inspect reason}")
181+
{:error, nil}
173182
end
174183
end
175184

@@ -251,6 +260,7 @@ defmodule Mongo.ChangeStream do
251260
"""
252261
def kill_cursors(session, coll, cursor_ids, opts) do
253262

263+
## todo Logger.info("Kill-Cursor")
254264
cmd = [
255265
killCursors: coll,
256266
cursors: cursor_ids |> Enum.map(fn id -> %BSON.LongNumber{value: id} end)

0 commit comments

Comments
 (0)