Skip to content

Commit 311cce9

Browse files
committed
add afterClusterTime option to readConcern in case of causal_consistency
1 parent f0d118b commit 311cce9

File tree

5 files changed

+98
-8
lines changed

5 files changed

+98
-8
lines changed

lib/bson/types.ex

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -151,6 +151,14 @@ defmodule BSON.Timestamp do
151151
@type t :: %__MODULE__{value: integer, ordinal: integer}
152152
defstruct [:value, :ordinal]
153153

154+
def is_after(this, that) do
155+
(this.value > that.value or (this.value == that.value and this.ordinal > that.ordinal))
156+
end
157+
158+
def is_before(this, that) do
159+
(this.value < that.value or (this.value == that.value and this.ordinal < that.ordinal))
160+
end
161+
154162
defimpl Inspect do
155163
def inspect(%BSON.Timestamp{value: value, ordinal: ordinal}, _opts) do
156164
"#BSON.Timestamp<#{value}:#{ordinal}>"

lib/mongo.ex

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -604,6 +604,7 @@ defmodule Mongo do
604604

605605
with {:ok, conn, cmd} <- Session.bind_session(session, cmd),
606606
{:ok, _cmd, doc} <- DBConnection.execute(conn, %Query{action: :command}, [cmd], defaults(opts)),
607+
doc <- Session.update_session(session, doc, opts),
607608
{:ok, doc} <- check_for_error(doc) do
608609
{:ok, doc}
609610
end

lib/mongo/grid_fs/bucket.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -175,7 +175,7 @@ defmodule Mongo.GridFs.Bucket do
175175
#
176176
# db.fs.files.findOne({}, { _id : 1 })
177177
#
178-
defp files_collection_empty?(%Bucket{topology_pid: topology_pid, opts: opts} = bucket) do
178+
defp files_collection_empty?(%Bucket{topology_pid: topology_pid} = bucket) do
179179
coll_name = files_collection_name(bucket)
180180
topology_pid
181181
|> Mongo.show_collections()

lib/mongo/session.ex

Lines changed: 68 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ defmodule Mongo.Session do
1515
alias Mongo.Session.ServerSession
1616
alias Mongo.Session
1717
alias Mongo.Topology
18+
alias BSON.Timestamp
1819

1920
require Logger
2021

@@ -26,7 +27,7 @@ defmodule Mongo.Session do
2627
# * `server_session` the server_session data
2728
# * `opts` options
2829
# * `implicit` true or false
29-
defstruct [conn: nil, server_session: nil, implicit: false, wire_version: 0, opts: []]
30+
defstruct [conn: nil, server_session: nil, causal_consistency: false, operation_time: nil, implicit: false, wire_version: 0, opts: []]
3031

3132
@impl true
3233
def callback_mode() do
@@ -99,6 +100,30 @@ defmodule Mongo.Session do
99100
:gen_statem.call(pid, {:bind_session, cmd})
100101
end
101102

103+
@doc """
104+
Update the `operationTime` for causally consistent read commands
105+
"""
106+
def update_session(pid, %{"operationTime" => operationTime} = doc, opts) do
107+
case opts |> write_concern() |> acknowledged?() do
108+
true -> advance_operation_time(pid, operationTime)
109+
false -> []
110+
end
111+
doc
112+
end
113+
def update_session(_pid, doc, _opts) do
114+
doc
115+
end
116+
117+
@doc """
118+
Advance the `operationTime` for causally consistent read commands
119+
"""
120+
def advance_operation_time(pid, timestamp) do
121+
:gen_statem.cast(pid, {:advance_operation_time, timestamp})
122+
end
123+
124+
@doc """
125+
End implicit session
126+
"""
102127
def end_implict_session(topology_pid, session) do
103128
with {:ok, session_server} <- :gen_statem.call(session, {:end_implicit_session}) do
104129
Topology.checkin_session(topology_pid, session_server)
@@ -108,12 +133,18 @@ defmodule Mongo.Session do
108133
end
109134
end
110135

136+
@doc """
137+
End explicit session
138+
"""
111139
def end_session(topology_pid, session) do
112140
with {:ok, session_server} <- :gen_statem.call(session, {:end_session}) do
113141
Topology.checkin_session(topology_pid, session_server)
114142
end
115143
end
116144

145+
@doc """
146+
Convient function for running multiple write commands in a transaction
147+
"""
117148
def with_transaction(topology_pid, fun, opts \\ []) do
118149

119150
with {:ok, session} <- Session.start_session(topology_pid, :write, opts),
@@ -127,7 +158,6 @@ defmodule Mongo.Session do
127158
error ->
128159
abort_transaction(session)
129160
end_session(topology_pid, session)
130-
## todo rerun
131161
error
132162
end
133163

@@ -164,6 +194,7 @@ defmodule Mongo.Session do
164194
server_session: server_session,
165195
implicit: (type == :implicit),
166196
wire_version: wire_version,
197+
causal_consistency: Keyword.get(opts, :causal_consistency, false),
167198
opts: opts}
168199
{:ok, :no_transaction, data}
169200
end
@@ -184,19 +215,21 @@ defmodule Mongo.Session do
184215
transaction,
185216
%Session{conn: conn,
186217
wire_version: wire_version,
187-
server_session: %ServerSession{session_id: id}}) when wire_version >= 6 and transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
188-
{:keep_state_and_data, {:reply, from, {:ok, conn, Keyword.merge(cmd, lsid: %{id: id})}}}
218+
server_session: %ServerSession{session_id: id}} = data) when wire_version >= 6 and transaction in [:no_transaction, :transaction_aborted, :transaction_committed] do
219+
220+
cmd = Keyword.merge(cmd, lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))) |> filter_nils()
221+
{:keep_state_and_data, {:reply, from, {:ok, conn, cmd}}}
189222
end
190223

191224
def handle_event({:call, from},
192225
{:bind_session, cmd},
193226
:starting_transaction,
194227
%Session{conn: conn,
195228
server_session: %ServerSession{session_id: id, txn_num: txn_num},
196-
wire_version: wire_version,
197-
opts: opts} = data) when wire_version >= 6 do
229+
wire_version: wire_version} = data) when wire_version >= 6 do
230+
198231
result = Keyword.merge(cmd,
199-
readConcern: Keyword.get(opts, :read_concern),
232+
readConcern: read_concern(data, Keyword.get(cmd, :readConcern)),
200233
lsid: %{id: id},
201234
txnNumber: %BSON.LongNumber{value: txn_num},
202235
startTransaction: true,
@@ -253,6 +286,15 @@ defmodule Mongo.Session do
253286
def handle_event({:call, from}, {:server_session}, _state, %Session{server_session: session_server, implicit: implicit}) do
254287
{:keep_state_and_data, {:reply, from, {:ok, session_server, implicit}}}
255288
end
289+
def handle_event(:cast, {:advance_operation_time, timestamp}, _state, %Session{operation_time: nil} = data) do
290+
{:keep_state, %Session{data | operation_time: timestamp}}
291+
end
292+
def handle_event(:cast, {:advance_operation_time, timestamp}, _state, %Session{operation_time: time} = data) do
293+
case Timestamp.is_after(timestamp, time) do
294+
true -> {:keep_state, %Session{data | operation_time: timestamp}}
295+
false -> :keep_state_and_data
296+
end
297+
end
256298

257299
@impl true
258300
def terminate(reason, state, data) when state in [:transaction_in_progress] do
@@ -312,4 +354,23 @@ defmodule Mongo.Session do
312354
end
313355

314356

357+
##
358+
# create the readConcern options
359+
#
360+
defp read_concern(%Session{causal_consistency: false}, read_concern) do
361+
read_concern
362+
end
363+
defp read_concern(%Session{causal_consistency: true, operation_time: nil}, read_concern) do
364+
read_concern
365+
end
366+
defp read_concern(%Session{causal_consistency: true, operation_time: time}, nil) do
367+
[afterClusterTime: time]
368+
end
369+
defp read_concern(%Session{causal_consistency: true, operation_time: time}, read_concern) when is_map(read_concern) do
370+
Map.put(read_concern, :afterClusterTime, time)
371+
end
372+
defp read_concern(%Session{causal_consistency: true, operation_time: time}, read_concern) when is_list(read_concern) do
373+
read_concern ++ [afterClusterTime: time]
374+
end
375+
315376
end

test/mongo/session_test.exs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,26 @@ defmodule Mongo.SessionTest do
179179
assert {:ok, 3} == Mongo.count(top.pid, coll, %{})
180180
end
181181

182+
@tag :mongo_4_2
183+
test "with_transaction_causal_consistency", top do
184+
185+
coll = "dogs_with_commit_transaction_causal_consistency"
186+
187+
Mongo.insert_one(top.pid, coll, %{name: "Wuff"})
188+
Mongo.delete_many(top.pid, coll, %{})
189+
190+
Session.with_transaction(top.pid, fn opts ->
191+
{:ok, %InsertOneResult{:inserted_id => id}} = Mongo.insert_one(top.pid, coll, %{name: "Greta"}, opts)
192+
assert id != nil
193+
{:ok, %InsertOneResult{:inserted_id => id}} = Mongo.insert_one(top.pid, coll, %{name: "Waldo"}, opts)
194+
assert id != nil
195+
{:ok, %InsertOneResult{:inserted_id => id}} = Mongo.insert_one(top.pid, coll, %{name: "Tom"}, opts)
196+
assert id != nil
197+
{:ok, :ok}
198+
end, w: 1, causal_consistency: true)
199+
assert {:ok, 3} == Mongo.count(top.pid, coll, %{})
200+
end
201+
182202
@tag :mongo_4_2
183203
test "with_transaction_abort", top do
184204

0 commit comments

Comments
 (0)