Skip to content

Commit 50513f5

Browse files
committed
added support for slaveOk, added test case for sharded configuration.
1 parent e7158aa commit 50513f5

File tree

7 files changed

+114
-34
lines changed

7 files changed

+114
-34
lines changed

lib/mongo/change_stream.ex

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,8 @@ defmodule Mongo.ChangeStream do
1212
## check, if retryable reads are enabled
1313
opts = Mongo.retryable_reads(opts)
1414

15-
with new_cmd = Mongo.ReadPreference.add_read_preference(cmd, opts),
16-
{:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
17-
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, new_cmd, opts) do
15+
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
16+
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do
1817
%Mongo.ChangeStream{
1918
topology_pid: topology_pid,
2019
session: session,
@@ -70,9 +69,8 @@ defmodule Mongo.ChangeStream do
7069

7170
def aggregate(topology_pid, cmd, fun, opts) do
7271

73-
with new_cmd = Mongo.ReadPreference.add_read_preference(cmd, opts),
74-
{:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
75-
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, new_cmd, opts) do
72+
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
73+
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do
7674

7775
aggregate(topology_pid, session, doc, cmd, fun)
7876
else

lib/mongo/read_preference.ex

Lines changed: 41 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -53,44 +53,41 @@ defmodule Mongo.ReadPreference do
5353
Add read preference to the cmd
5454
"""
5555
def add_read_preference(cmd, opts) do
56+
case Keyword.get(opts, :read_preference) do
57+
nil -> cmd
58+
pref -> cmd ++ ["$readPreference": pref]
59+
end
60+
end
61+
62+
@doc """
63+
From the specs:
5664
57-
read_preference = opts
58-
|> Keyword.get(:read_preference)
59-
|> Mongo.ReadPreference.primary()
60-
|> transform()
65+
Use of slaveOk
6166
62-
cmd ++ ["$readPreference": read_preference]
63-
end
67+
There are two usages of slaveOK:
6468
65-
defp transform(%{:mode => :primary}) do
69+
* A driver query parameter that predated read preference modes and tag set lists.
70+
* A wire protocol flag on OP_QUERY operations
71+
72+
"""
73+
def slave_ok(%{:mode => :primary}) do
6674
%{:mode => :primary}
6775
end
68-
defp transform(config) do
76+
def slave_ok(config) do
6977

7078
mode = case config[:mode] do
7179
:primary_preferred -> :primaryPreferred
7280
:secondary_preferred -> :secondaryPreferred
73-
other -> other
74-
end
75-
76-
max_staleness_seconds = case config[:max_staleness_ms] do
77-
i when is_integer(i) -> div(i, 1000)
78-
nil -> nil
81+
other -> other
7982
end
8083

81-
[mode: mode,
82-
tag_sets: config[:tag_sets],
83-
maxStalenessSeconds: max_staleness_seconds,
84-
hedge: config[:hedge]]
85-
|> filter_nils()
86-
84+
filter_nils([mode: mode, tag_sets: config[:tag_sets]])
8785
end
88-
8986
##
9087
# Therefore, when sending queries to a mongos, the following rules apply:
9188
#
9289
# For mode 'primary', drivers MUST NOT set the slaveOK wire protocol flag and MUST NOT use $readPreference
93-
def mongos(%{mode: :primay}) do
90+
def mongos(%{mode: :primary}) do
9491
nil
9592
end
9693
# For mode 'secondary', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
@@ -112,6 +109,28 @@ defmodule Mongo.ReadPreference do
112109
transform(config)
113110
end
114111

112+
defp transform(%{:mode => :primary}) do
113+
%{:mode => :primary}
114+
end
115+
defp transform(config) do
116+
117+
mode = case config[:mode] do
118+
:primary_preferred -> :primaryPreferred
119+
:secondary_preferred -> :secondaryPreferred
120+
other -> other
121+
end
122+
123+
max_staleness_seconds = case config[:max_staleness_ms] do
124+
i when is_integer(i) -> div(i, 1000)
125+
nil -> nil
126+
end
115127

128+
[mode: mode,
129+
tag_sets: config[:tag_sets],
130+
maxStalenessSeconds: max_staleness_seconds,
131+
hedge: config[:hedge]]
132+
|> filter_nils()
133+
134+
end
116135

117136
end

lib/mongo/session.ex

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,10 +99,11 @@ defmodule Mongo.Session do
9999
import Keywords
100100
import Mongo.WriteConcern
101101

102-
alias Mongo.Session.ServerSession
102+
alias BSON.Timestamp
103+
alias Mongo.ReadPreference
103104
alias Mongo.Session
105+
alias Mongo.Session.ServerSession
104106
alias Mongo.Topology
105-
alias BSON.Timestamp
106107

107108
@type t :: pid()
108109

@@ -430,7 +431,10 @@ defmodule Mongo.Session do
430431
_ -> [lsid: %{id: id}, readConcern: read_concern(data, Keyword.get(cmd, :readConcern))]
431432
end
432433

433-
cmd = Keyword.merge(cmd, options) |> filter_nils()
434+
cmd = cmd
435+
|> Keyword.merge(options)
436+
|> ReadPreference.add_read_preference(opts)
437+
|> filter_nils()
434438

435439
{:keep_state_and_data, {:ok, conn, cmd}}
436440
end

lib/mongo/stream.ex

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -14,8 +14,7 @@ defmodule Mongo.Stream do
1414
## check, if retryable reads are enabled
1515
opts = Mongo.retryable_reads(opts)
1616

17-
with cmd = Mongo.ReadPreference.add_read_preference(cmd, opts),
18-
{:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
17+
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
1918
{:ok,
2019
%{"ok" => ok,
2120
"cursor" => %{

lib/mongo/topology_description.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ defmodule Mongo.TopologyDescription do
9696
:unknown -> {[], nil}
9797
:single -> {topology.servers, nil}
9898
:sharded -> {mongos_servers(topology), ReadPreference.mongos(read_preference)}
99-
_ -> {select_replica_set_server(topology, read_preference.mode, read_preference), nil}
99+
_ -> {select_replica_set_server(topology, read_preference.mode, read_preference), ReadPreference.slave_ok(read_preference)}
100100
end
101101

102102
opts = case read_prefs do

test/mongo/topology_description_test.exs

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,34 @@ defmodule Mongo.TopologyDescriptionTest do
1919
assert {:ok, {^single_server, _}} = TopologyDescription.select_servers(single(), :read, opts)
2020
end
2121

22+
23+
test "shared server selection" do
24+
sharded_server = "localhost:27017"
25+
26+
assert {:ok, {^sharded_server, []}} = TopologyDescription.select_servers(sharded(), :write, [])
27+
28+
opts = [
29+
read_preference: ReadPreference.primary(%{mode: :primary})
30+
]
31+
assert {:ok, {^sharded_server, []}} = TopologyDescription.select_servers(sharded(), :read, opts)
32+
opts = [
33+
read_preference: ReadPreference.primary(%{mode: :secondary})
34+
]
35+
assert {:ok, {^sharded_server, [{:read_preference, [mode: :secondary, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)
36+
opts = [
37+
read_preference: ReadPreference.primary(%{mode: :primary_preferred})
38+
]
39+
assert {:ok, {^sharded_server, [{:read_preference, [mode: :primaryPreferred, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)
40+
opts = [
41+
read_preference: ReadPreference.primary(%{mode: :secondary_preferred})
42+
]
43+
assert {:ok, {^sharded_server, [{:read_preference, [mode: :secondaryPreferred, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)
44+
opts = [
45+
read_preference: ReadPreference.primary(%{mode: :nearest})
46+
]
47+
assert {:ok, {^sharded_server, [{:read_preference, [mode: :nearest, tag_sets: [], maxStalenessSeconds: 0]}]}} = TopologyDescription.select_servers(sharded(), :read, opts)
48+
end
49+
2250
test "replica set server selection" do
2351
all_hosts = ["localhost:27018", "localhost:27019", "localhost:27020"]
2452
master = "localhost:27018"

test/support/topology_test_data.ex

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,38 @@ defmodule Mongo.TopologyTestData do
3131
}
3232
}
3333

34+
def sharded(), do: %{
35+
set_name: nil,
36+
type: :sharded,
37+
compatibility_error: nil,
38+
compatible: true,
39+
local_threshold_ms: 15,
40+
max_election_id: nil,
41+
max_set_version: nil,
42+
servers: %{
43+
"localhost:27017" => %{
44+
address: "localhost:27017",
45+
arbiters: [],
46+
election_id: nil,
47+
error: nil,
48+
hosts: [],
49+
last_update_time: nil,
50+
last_write_date: nil,
51+
max_wire_version: 8,
52+
me: nil,
53+
min_wire_version: 0,
54+
op_time: nil,
55+
passives: [],
56+
primary: nil,
57+
round_trip_time: 44,
58+
set_name: nil,
59+
set_version: nil,
60+
tag_set: %{},
61+
type: :mongos
62+
}
63+
}
64+
}
65+
3466
def repl_set_with_master, do: %{
3567
compatibility_error: nil,
3668
compatible: true,

0 commit comments

Comments
 (0)