Skip to content

Commit a91e1f6

Browse files
committed
start to add support for implicit and explicit sessions
1 parent 118eb58 commit a91e1f6

File tree

3 files changed

+340
-61
lines changed

3 files changed

+340
-61
lines changed

lib/mongo.ex

Lines changed: 59 additions & 48 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ defmodule Mongo do
5555
alias Mongo.Topology
5656
alias Mongo.UrlParser
5757
alias Mongo.Session.ServerSession
58+
alias Mongo.Session
5859

5960
@timeout 15000 # 5000
6061

@@ -292,12 +293,21 @@ defmodule Mongo do
292293

293294
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
294295

295-
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts) do
296+
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
296297
{:ok, doc["value"]}
297298
end
298299

299300
end
300301

302+
def start_session(topology_pid, type, opts) do
303+
with {:ok, session, _, _} <- Topology.checkout_session(topology_pid, type, :explicit, opts) do
304+
{:ok, session}
305+
else
306+
{:new_connection, _server} ->
307+
start_session(topology_pid, type, opts)
308+
end
309+
end
310+
301311
@doc """
302312
This function is very fundamental. First a server is selected. If no explicit session id is found, then
303313
the Topology-Module returns an implicit session id. The `opts` is updated for using the new session id.
@@ -308,43 +318,27 @@ defmodule Mongo do
308318

309319
Logger.debug("issue_command: write #{inspect cmd}")
310320

311-
with {:ok, conn, _, _, session_id} <- Topology.select_server(topology_pid, :write, opts),
312-
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts),
313-
:ok <- checkin_session_id(topology_pid, cmd, session_id) do
314-
{:ok, conn, doc}
321+
with {:ok, session, slave_ok, _mongos} <- Topology.checkout_session(topology_pid, :write, :implicit, opts),
322+
{:ok, doc} <- exec_command_session(session, cmd, opts),
323+
:ok <- Topology.checkin_session(topology_pid, session) do
324+
{:ok, doc}
315325
else
316-
{:new_connection, _server} ->
317-
issue_command(topology_pid, cmd, :write, opts)
326+
{:new_connection, _server} -> issue_command(topology_pid, cmd, :write, opts)
318327
end
319328
end
320329
def issue_command(topology_pid, cmd, :read, opts) do
321330

322331
Logger.debug("issue_command: read #{inspect cmd}")
323-
with {:ok, conn, slave_ok, _, session_id} <- Topology.select_server(topology_pid, :read, opts),
332+
with {:ok, session, slave_ok, _mongos} <- Topology.checkout_session(topology_pid, :read, :implict, opts),
324333
opts = Keyword.put(opts, :slave_ok, slave_ok),
325-
{:ok, doc} <- exec_command(conn, update_session_id(cmd, session_id), opts),
326-
:ok <- checkin_session_id(topology_pid, cmd, session_id) do
327-
{:ok, conn, doc}
334+
{:ok, doc} <- exec_command_session(session, cmd, opts),
335+
:ok <- Topology.checkin_session(topology_pid, session) do
336+
{:ok, doc}
328337
else
329-
{:new_connection, _server} ->
330-
issue_command(topology_pid, cmd, :read, opts)
338+
{:new_connection, _server} -> issue_command(topology_pid, cmd, :read, opts)
331339
end
332340
end
333341

334-
defp update_session_id(cmd, nil) do
335-
cmd
336-
end
337-
defp update_session_id(cmd, %ServerSession{:session_id => session_id}) do
338-
Logger.debug("update_session_id for cmd #{inspect session_id}")
339-
Keyword.merge(cmd, [lsid: %{id: session_id}])
340-
end
341-
defp checkin_session_id(_topology_pid, _cmd, nil), do: :ok
342-
defp checkin_session_id(topology_pid, cmd, session_id) do
343-
case Keyword.get(cmd, :lsid, :implicit) do
344-
:implicit -> Topology.checkin_session_id(topology_pid, session_id)
345-
_ -> :ok
346-
end
347-
end
348342

349343
@doc """
350344
Finds a document and replaces it.
@@ -383,7 +377,7 @@ defmodule Mongo do
383377

384378
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
385379

386-
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts), do: {:ok, doc["value"]}
380+
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts), do: {:ok, doc["value"]}
387381
end
388382

389383
defp should_return_new(:after), do: true
@@ -414,7 +408,7 @@ defmodule Mongo do
414408
] |> filter_nils()
415409
opts = Keyword.drop(opts, ~w(max_time projection sort collation)a)
416410

417-
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts), do: {:ok, doc["value"]}
411+
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts), do: {:ok, doc["value"]}
418412
end
419413

420414
@doc false
@@ -518,7 +512,7 @@ defmodule Mongo do
518512

519513
opts = Keyword.drop(opts, ~w(max_time)a)
520514

521-
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :read, opts), do: {:ok, doc["values"]}
515+
with {:ok, doc} <- issue_command(topology_pid, cmd, :read, opts), do: {:ok, doc["values"]}
522516
end
523517

524518
@doc """
@@ -594,13 +588,13 @@ defmodule Mongo do
594588
Mongo.find_one(top, "jobs", %{}, read_concern: %{level: "local"})
595589
"""
596590
@spec find_one(GenServer.server, collection, BSON.document, Keyword.t) :: BSON.document | nil
597-
def find_one(conn, coll, filter, opts \\ []) do
591+
def find_one(topology_pid, coll, filter, opts \\ []) do
598592
opts = opts
599593
|> Keyword.delete(:sort)
600594
|> Keyword.put(:limit, 1)
601595
|> Keyword.put(:batch_size, 1)
602596

603-
conn
597+
topology_pid
604598
|> find(coll, filter, opts)
605599
|> Enum.at(0)
606600
end
@@ -614,22 +608,40 @@ defmodule Mongo do
614608
def command(topology_pid, cmd, opts \\ []) do
615609
rp = ReadPreference.defaults(%{mode: :primary})
616610
rp_opts = [read_preference: Keyword.get(opts, :read_preference, rp)]
617-
with {:ok, _conn, doc} <- Mongo.issue_command(topology_pid, cmd, :read, rp_opts) do
611+
with {:ok, doc} <- Mongo.issue_command(topology_pid, cmd, :read, rp_opts) do
618612
{:ok, doc}
619613
end
620614
end
621615

616+
@doc false
617+
@spec exec_command_session(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
618+
def exec_command_session(session, cmd, opts) do
619+
620+
Logger.debug("Executing cmd: #{inspect cmd}")
621+
622+
action = %Query{action: :command}
623+
624+
with {:ok, conn, cmd} <- Session.bind_session(session, cmd),
625+
{:ok, _cmd, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
626+
{:ok, doc} <- check_for_error(doc) do
627+
{:ok, doc}
628+
end
629+
630+
end
631+
622632
@doc false
623633
@spec exec_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
624634
def exec_command(conn, cmd, opts) do
625-
action = %Query{action: :command}
626635

627636
Logger.debug("Executing cmd: #{inspect cmd}")
628637

638+
action = %Query{action: :command}
639+
629640
with {:ok, _cmd, doc} <- DBConnection.execute(conn, action, [cmd], defaults(opts)),
630641
{:ok, doc} <- check_for_error(doc) do
631642
{:ok, doc}
632643
end
644+
633645
end
634646

635647
defp check_for_error(%{"ok" => ok} = response) when ok == 1, do: {:ok, response}
@@ -682,6 +694,13 @@ defmodule Mongo do
682694
## Examples
683695
684696
Mongo.insert_one(pid, "users", %{first_name: "John", last_name: "Smith"})
697+
698+
{:ok, session} = Mongo.start_session(pid)
699+
Session.start_transaction(session)
700+
Mongo.insert_one(pid, "users", %{first_name: "John", last_name: "Smith"}, session: session)
701+
Session.commit_transaction(session)
702+
Mongo.end_sessions([pid])
703+
685704
"""
686705
@spec insert_one(GenServer.server, collection, BSON.document, Keyword.t) :: result(Mongo.InsertOneResult.t)
687706
def insert_one(topology_pid, coll, doc, opts \\ []) do
@@ -698,12 +717,11 @@ defmodule Mongo do
698717
insert: coll,
699718
documents: [doc],
700719
ordered: Keyword.get(opts, :ordered),
701-
writeConcern: write_concern,
702-
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation),
703-
lsid: Keyword.get(opts, :lsid, :implicit)
720+
writeConcern: write_concern, ## todo in der Transaction löschen
721+
bypassDocumentValidation: Keyword.get(opts, :bypass_document_validation)
704722
] |> filter_nils()
705723

706-
with {:ok, _conn, doc} <- Mongo.issue_command(topology_pid, cmd, :write, opts) do
724+
with {:ok, doc} <- Mongo.issue_command(topology_pid, cmd, :write, opts) do
707725
case doc do
708726
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
709727
_ ->
@@ -757,7 +775,7 @@ defmodule Mongo do
757775
lsid: Keyword.get(opts, :lsid)
758776
] |> filter_nils()
759777

760-
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts) do
778+
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
761779
case doc do
762780
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
763781
_ ->
@@ -828,7 +846,7 @@ defmodule Mongo do
828846
lsid: Keyword.get(opts, :lsid)
829847
] |> filter_nils()
830848

831-
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts) do
849+
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
832850
case doc do
833851
%{"writeErrors" => _} -> {:error, %Mongo.WriteError{n: doc["n"], ok: doc["ok"], write_errors: doc["writeErrors"]}}
834852
%{ "ok" => _ok, "n" => n } ->
@@ -949,7 +967,7 @@ defmodule Mongo do
949967
] |> filter_nils()
950968

951969

952-
with {:ok, _conn, doc} <- issue_command(topology_pid, cmd, :write, opts) do
970+
with {:ok, doc} <- issue_command(topology_pid, cmd, :write, opts) do
953971

954972
case doc do
955973

@@ -973,13 +991,6 @@ defmodule Mongo do
973991
end
974992
end
975993

976-
def start_session(top) do
977-
## todo error code handling
978-
with {:ok, %{"id" => uuid, "ok" => ok}} when ok == 1 <- command(top, [startSession: 1], database: "admin") do
979-
uuid
980-
end
981-
end
982-
983994
def end_sessions(top, sessions) do
984995
## todo error code handling
985996
with {:ok, %{"ok" => ok}} when ok == 1 <- command(top, [endSessions: sessions], database: "admin") do

0 commit comments

Comments
 (0)