Skip to content

Commit d982330

Browse files
committed
add support for transactions
1 parent 66f6fa2 commit d982330

File tree

6 files changed

+65
-36
lines changed

6 files changed

+65
-36
lines changed

lib/bson/encoder.ex

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@ defmodule BSON.Encoder do
5151
def encode(%BSON.Timestamp{value: epoch, ordinal: ordinal}),
5252
do: <<ordinal::int32, epoch::int32>>
5353

54+
def encode(%BSON.LongNumber{value: value}), do: <<value::int64>>
55+
5456
def encode([]) do
5557
document([])
5658
end
@@ -140,6 +142,7 @@ defmodule BSON.Encoder do
140142
defp type(%BSON.JavaScript{scope: nil}), do: @type_js
141143
defp type(%BSON.JavaScript{}), do: @type_js_scope
142144
defp type(%BSON.Timestamp{}), do: @type_timestamp
145+
defp type(%BSON.LongNumber{}), do: @type_int64
143146
defp type(nil), do: @type_null
144147
defp type(:BSON_min), do: @type_min
145148
defp type(:BSON_max), do: @type_max

lib/bson/types.ex

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -157,3 +157,19 @@ defmodule BSON.Timestamp do
157157
end
158158
end
159159
end
160+
161+
defmodule BSON.LongNumber do
162+
@moduledoc """
163+
Represents BSON long type
164+
"""
165+
166+
@type t :: %__MODULE__{value: integer}
167+
168+
defstruct [value: 0]
169+
170+
defimpl Inspect do
171+
def inspect(%BSON.LongNumber{value: value}, _opts) do
172+
"#BSON.LongNumber<#{value}>"
173+
end
174+
end
175+
end

lib/mongo.ex

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -318,7 +318,7 @@ defmodule Mongo do
318318

319319
Logger.debug("issue_command: write #{inspect cmd}")
320320

321-
with {:ok, session, slave_ok, _mongos} <- Topology.checkout_session(topology_pid, :write, :implicit, opts),
321+
with {:ok, session, _slave_ok, _mongos} <- Topology.checkout_session(topology_pid, :write, :implicit, opts),
322322
{:ok, doc} <- exec_command_session(session, cmd, opts),
323323
:ok <- Topology.checkin_session(topology_pid, session) do
324324
{:ok, doc}
@@ -617,7 +617,7 @@ defmodule Mongo do
617617
@spec exec_command_session(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
618618
def exec_command_session(session, cmd, opts) do
619619

620-
Logger.debug("Executing cmd: #{inspect cmd}")
620+
Logger.debug("Executing cmd with session: #{inspect cmd}")
621621

622622
action = %Query{action: :command}
623623

lib/mongo/session.ex

Lines changed: 33 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ defmodule Mongo.Session do
2323
# * `slave_ok` true or false
2424
# * `mongos` true or false
2525
# * `implicit` true or false
26-
defstruct [conn: nil, server_session: nil, slave_ok: false, mongos: false, implicit: false, opts: []]
26+
defstruct [conn: nil, slave_ok: false, mongos: false, server_session: nil, implicit: false, opts: []]
2727

2828
@impl true
2929
def callback_mode() do
@@ -87,7 +87,11 @@ defmodule Mongo.Session do
8787

8888
@impl true
8989
def init({conn, server_session, slave_ok, mongos, type, opts}) do
90-
data = %Session{conn: conn, server_session: server_session, slave_ok: slave_ok, mongos: mongos, implicit: (type == :implict), opts: opts}
90+
data = %Session{conn: conn,
91+
server_session: server_session,
92+
slave_ok: slave_ok,
93+
mongos: mongos,
94+
implicit: (type == :implicit), opts: opts}
9195
{:ok, :no_transaction, data}
9296
end
9397

@@ -96,22 +100,24 @@ defmodule Mongo.Session do
96100
{:next_state, :starting_transaction, %Session{data | server_session: ServerSession.next_txn_num(session)}, {:reply, from, :ok}}
97101
end
98102
def handle_event({:call, from}, {:bind_session, cmd}, :no_transaction, %Session{conn: conn, server_session: %ServerSession{session_id: id}}) do
99-
{:keep_state_and_data, {:reply, from, conn, Keyword.merge(cmd, lsid: id)}}
103+
{:keep_state_and_data, {:reply, from, {:ok, conn, Keyword.merge(cmd, lsid: %{id: id})}}}
100104
end
101105
def handle_event({:call, from}, {:bind_session, cmd}, :starting_transaction, %Session{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}} = data) do
102106
result = Keyword.merge(cmd,
103-
lsid: id,
104-
txnNumber: txn_num,
107+
readConcern: Keyword.get(opts, :read_concern),
108+
lsid: %{id: id},
109+
txnNumber: %BSON.LongNumber{value: txn_num},
105110
startTransaction: true,
106-
autocommit: false)
107-
{:next_state, :transaction_in_progress, data, {:reply, from, conn, result}}
111+
autocommit: false) |> filter_nils()
112+
IO.puts inspect result
113+
{:next_state, :transaction_in_progress, data, {:reply, from, {:ok, conn, result}}}
108114
end
109115
def handle_event({:call, from}, {:bind_session, cmd}, :transaction_in_progress, %Session{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) do
110116
result = Keyword.merge(cmd,
111-
lsid: id,
112-
txnNumber: txn_num,
117+
lsid: %{id: id},
118+
txnNumber: %BSON.LongNumber{value: txn_num},
113119
autocommit: false)
114-
{:keep_state_and_data, {:reply, from, conn, result}}
120+
{:keep_state_and_data, {:reply, from, {:ok, conn, result}}}
115121
end
116122

117123
def handle_event({:call, from}, {:commit_transaction}, :transaction_in_progress, data) do
@@ -126,47 +132,41 @@ defmodule Mongo.Session do
126132
def handle_event({:call, from}, {:end_session}, _state, _data) do
127133
{:stop_and_reply, :normal, {:reply, from, :ok}}
128134
end
135+
def handle_event({:call, from}, {:server_session}, _state, %Session{server_session: session_server, implicit: implicit}) do
136+
{:keep_state_and_data, {:reply, from, {:ok, session_server, implicit}}}
137+
end
129138

130-
defp run_commit_command(%{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) do
139+
defp run_commit_command(%{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, opts: opts}) do
131140

132141
#{
133-
# commitTransaction : 1,
134-
# lsid : { id : <UUID> },
135-
# txnNumber : <Int64>,
136-
# autocommit : false,
137-
# writeConcern : {...},
138-
# maxTimeMS: <Int64>,
139142
# recoveryToken : {...}
140143
#}
141144

142145
cmd = [
143146
commitTransaction: 1,
144-
lsid: id,
145-
txnNumber: txn_num,
147+
lsid: %{id: id},
148+
txnNumber: %BSON.LongNumber{value: txn_num},
146149
autocommit: false,
147-
writeConcern: %{w: 1}
148-
]
150+
writeConcern: Keyword.get(opts, :write_concern),
151+
maxTimeMS: Keyword.get(opts, :max_commit_time_ms)
152+
] |> filter_nils()
149153

150154
Mongo.exec_command(conn, cmd, database: "admin")
151155
end
152156

153-
defp run_abort_command(%{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}}) do
157+
defp filter_nils(keyword) when is_list(keyword) do
158+
Enum.reject(keyword, fn {_key, value} -> is_nil(value) end)
159+
end
154160

155-
#{
156-
# abortTransaction : 1,
157-
# lsid : { id : <UUID> },
158-
# txnNumber : <Int64>,
159-
# autocommit : false,
160-
# writeConcern : {...}
161-
#}
161+
defp run_abort_command(%{conn: conn, server_session: %ServerSession{session_id: id, txn_num: txn_num}, opts: opts}) do
162162

163163
cmd = [
164164
abortTransaction: 1,
165-
lsid: id,
166-
txnNumber: txn_num,
165+
lsid: %{id: id},
166+
txnNumber: %BSON.LongNumber{value: txn_num},
167167
autocommit: false,
168-
writeConcern: %{w: 1}
169-
]
168+
writeConcern: Keyword.get(opts, :write_concern),
169+
] |> filter_nils()
170170

171171
Mongo.exec_command(conn, cmd, database: "admin")
172172
end

lib/mongo/topology.ex

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ defmodule Mongo.Topology do
5959
session = Keyword.get(opts, :session, nil)
6060
case Session.alive?(session) do
6161
false -> GenServer.call(pid, {:checkout_session, cmd_type, type, opts})
62-
true -> {:ok, session}
62+
true -> {:ok, session, false, false} ## todo server_session holen
6363
end
6464

6565
end
@@ -191,6 +191,11 @@ defmodule Mongo.Topology do
191191
# checkin the current session, if the session was implicit created
192192
#
193193
def handle_cast({:checkin_session, session}, %{:session_pool => pool} = state) do
194+
195+
result = Session.server_session(session)
196+
197+
IO.puts "Result #{inspect result}"
198+
194199
case Session.server_session(session) do
195200
{:ok, server_session, true} -> SessionPool.checkin(pool, server_session)
196201
_ -> []

test/bson/types_test.exs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,9 @@ defmodule BSON.TypesTest do
5252
value = %BSON.Timestamp{value: 1412180887, ordinal: 12}
5353
assert inspect(value) == "#BSON.Timestamp<1412180887:12>"
5454
end
55+
56+
test "inspect BSON.Long" do
57+
value = %BSON.Long{value: 1412180887}
58+
assert inspect(value) == "#BSON.Long<1412180887>"
59+
end
5560
end

0 commit comments

Comments
 (0)