Skip to content

Commit f615987

Browse files
committed
read preferences are only sent, when the server is a mongos.
1 parent 31d6ea1 commit f615987

File tree

6 files changed

+119
-68
lines changed

6 files changed

+119
-68
lines changed

lib/mongo.ex

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -392,13 +392,11 @@ defmodule Mongo do
392392
"""
393393
def issue_command(topology_pid, cmd, :read, opts) do
394394

395-
new_cmd = ReadPreference.add_read_preference(cmd, opts)
396-
397395
## check, if retryable reads are enabled
398396
opts = Mongo.retryable_reads(opts)
399397

400398
with {:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
401-
result <- exec_command_session(session, new_cmd, opts),
399+
result <- exec_command_session(session, cmd, opts),
402400
:ok <- Session.end_implict_session(topology_pid, session) do
403401
case result do
404402
{:error, error} ->

lib/mongo/read_preference.ex

Lines changed: 34 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -37,17 +37,17 @@ defmodule Mongo.ReadPreference do
3737
hedge: BSON.document
3838
}
3939

40-
@default %{
40+
@primary %{
4141
mode: :primary,
4242
tag_sets: [],
4343
max_staleness_ms: 0
4444
}
4545

46-
def defaults(map \\ nil)
47-
def defaults(map) when is_map(map) do
48-
Map.merge(@default, map)
46+
def primary(map \\ nil)
47+
def primary(map) when is_map(map) do
48+
Map.merge(@primary, map)
4949
end
50-
def defaults(_), do: @default
50+
def primary(_), do: @primary
5151

5252
@doc """
5353
Add read preference to the cmd
@@ -56,7 +56,7 @@ defmodule Mongo.ReadPreference do
5656

5757
read_preference = opts
5858
|> Keyword.get(:read_preference)
59-
|> Mongo.ReadPreference.defaults()
59+
|> Mongo.ReadPreference.primary()
6060
|> transform()
6161

6262
cmd ++ ["$readPreference": read_preference]
@@ -86,4 +86,32 @@ defmodule Mongo.ReadPreference do
8686

8787
end
8888

89+
##
90+
# Therefore, when sending queries to a mongos, the following rules apply:
91+
#
92+
# For mode 'primary', drivers MUST NOT set the slaveOK wire protocol flag and MUST NOT use $readPreference
93+
def mongos(%{mode: :primay}) do
94+
nil
95+
end
96+
# For mode 'secondary', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
97+
def mongos(%{mode: :secondary} = config) do
98+
transform(config)
99+
end
100+
# For mode 'primaryPreferred', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
101+
def mongos(%{mode: :primary_preferred} = config) do
102+
transform(config)
103+
end
104+
# For mode 'secondaryPreferred', drivers MUST set the slaveOK wire protocol flag. If the read preference contains a
105+
# non-empty tag_sets parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty,
106+
# drivers MUST use $readPreference; otherwise, drivers MUST NOT use $readPreference
107+
def mongos(%{mode: :secondary_preferred} = config) do
108+
transform(config)
109+
end
110+
# For mode 'nearest', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
111+
def mongos(%{mode: :nearest} = config) do
112+
transform(config)
113+
end
114+
115+
116+
89117
end

lib/mongo/topology.ex

Lines changed: 14 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -253,21 +253,22 @@ defmodule Mongo.Topology do
253253
{:reply, Map.fetch(state.connection_pools, address), state}
254254
end
255255

256+
##
256257
# checkout a new session
258+
#
257259
def handle_call({:checkout_session, cmd_type, type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
258260

259261
case TopologyDescription.select_servers(topology, cmd_type, opts) do
260262
:empty ->
261263
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :checkout_session, cmd_type: cmd_type, topology: topology, opts: opts})
262264
{:noreply, %{state | waiting_pids: [from | waiting]}} ## no servers available, wait for connection
263265

264-
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
265-
with address <- Enum.take_random(servers, 1),
266-
{:ok, connection} <- get_connection(address, state),
267-
wire_version <- wire_version(address, topology),
268-
{server_session, new_state} <- checkout_server_session(state),
269-
{:ok, session} <- Session.start_link(self(), connection, server_session, type, wire_version, opts) do
270-
266+
## found
267+
{:ok, {address, opts}} ->
268+
with {:ok, connection} <- get_connection(address, state),
269+
wire_version <- wire_version(address, topology),
270+
{server_session, new_state} <- checkout_server_session(state),
271+
{:ok, session} <- Session.start_link(self(), connection, server_session, type, wire_version, opts) do
271272
{:reply, {:ok, session}, new_state}
272273
end
273274

@@ -283,10 +284,8 @@ defmodule Mongo.Topology do
283284
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :select_server, cmd_type: cmd_type, topology: topology, opts: opts})
284285
{:noreply, %{state | waiting_pids: [from | waiting]}} ## no servers available, wait for connection
285286

286-
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
287-
with {:ok, connection} <- servers
288-
|> Enum.take_random(1)
289-
|> get_connection(state) do
287+
{:ok, {address, _opts}} ->
288+
with {:ok, connection} <- get_connection(address, state) do
290289
{:reply, {:ok, connection}, state}
291290
end
292291
error ->
@@ -299,10 +298,8 @@ defmodule Mongo.Topology do
299298
:empty ->
300299
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :limits, cmd_type: :write, topology: topology})
301300
{:reply, nil, state}
302-
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
303-
with {:ok, limits} <- servers
304-
|> Enum.take_random(1)
305-
|> get_limits(topology) do
301+
{:ok, {address, _opts}} ->
302+
with {:ok, limits} <- get_limits(address, topology) do
306303
{:reply, {:ok, limits}, state}
307304
end
308305
error ->
@@ -316,10 +313,8 @@ defmodule Mongo.Topology do
316313
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :wire_version, cmd_type: :write, topology: topology})
317314
{:reply, nil, state}
318315

319-
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
320-
with address <- Enum.take_random(servers, 1) do
321-
{:reply, {:ok, wire_version(address, topology)}, state}
322-
end
316+
{:ok, {address, _opts}} ->
317+
{:reply, {:ok, wire_version(address, topology)}, state}
323318
error ->
324319
{:reply, error, state} ## in case of an error, just return the error
325320
end

lib/mongo/topology_description.ex

Lines changed: 36 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -69,28 +69,49 @@ defmodule Mongo.TopologyDescription do
6969
def select_servers(%{:compatible => false}, _type, _opts) do
7070
{:error, :invalid_wire_version}
7171
end
72-
def select_servers(topology, type, opts) do
72+
def select_servers(topology, :write, opts) do
73+
servers = case topology.type do
74+
:single -> topology.servers
75+
:sharded -> mongos_servers(topology)
76+
:replica_set_with_primary -> primary_servers(topology)
77+
_ -> []
78+
end
79+
80+
addr = servers
81+
|> Enum.map(fn {server, _} -> server end)
82+
|> Enum.take_random(1)
83+
84+
case addr do
85+
[] -> :empty
86+
_ -> {:ok, {addr, opts}}
87+
end
88+
end
89+
def select_servers(topology, :read, opts) do
7390

7491
read_preference = opts
7592
|> Keyword.get(:read_preference)
76-
|> ReadPreference.defaults()
93+
|> ReadPreference.primary()
7794

78-
servers = case topology.type do
79-
:unknown -> []
80-
:single -> topology.servers
81-
:sharded -> mongos_servers(topology)
82-
_ ->
83-
case {type, topology.type == :replica_set_with_primary} do
84-
{:read, _} -> select_replica_set_server(topology, read_preference.mode, read_preference)
85-
{:write, true} -> select_replica_set_server(topology, :primary, ReadPreference.defaults)
86-
_ -> []
87-
end
95+
{servers, read_prefs} = case topology.type do
96+
:unknown -> {[], nil}
97+
:single -> {topology.servers, nil}
98+
:sharded -> {mongos_servers(topology), ReadPreference.mongos(read_preference)}
99+
_ -> {select_replica_set_server(topology, read_preference.mode, read_preference), nil}
88100
end
89101

102+
opts = case read_prefs do
103+
nil -> Keyword.delete(opts, :read_preference)
104+
prefs -> Keyword.put(opts, :read_preference, prefs)
105+
end
106+
107+
addr = servers
108+
|> Enum.map(fn {server, _} -> server end)
109+
|> Enum.take_random(1)
110+
90111
# check now three possible cases
91-
case Enum.map(servers, fn {server, _} -> server end) do
92-
[] -> :empty
93-
servers -> {:ok, servers}
112+
case addr do
113+
[] -> :empty
114+
_ -> {:ok, {addr, opts}}
94115
end
95116
end
96117

test/mongo/topology_description_test.exs

Lines changed: 33 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,66 +7,75 @@ defmodule Mongo.TopologyDescriptionTest do
77
single_server = ["localhost:27017"]
88

99
opts = [
10-
read_preference: ReadPreference.defaults(%{mode: :secondary})
10+
read_preference: ReadPreference.primary(%{mode: :secondary})
1111
]
12-
assert {:ok, single_server} == TopologyDescription.select_servers(single(), :read, opts)
12+
assert {:ok, {^single_server, _}} = TopologyDescription.select_servers(single(), :read, opts)
1313

14-
assert {:ok, single_server} ==TopologyDescription.select_servers(single(), :write)
14+
assert {:ok, {^single_server, _}} = TopologyDescription.select_servers(single(), :write)
1515

1616
opts = [
17-
read_preference: ReadPreference.defaults(%{mode: :nearest})
17+
read_preference: ReadPreference.primary(%{mode: :nearest})
1818
]
19-
assert {:ok, single_server} ==TopologyDescription.select_servers(single(), :read, opts)
19+
assert {:ok, {^single_server, _}} = TopologyDescription.select_servers(single(), :read, opts)
2020
end
2121

2222
test "replica set server selection" do
2323
all_hosts = ["localhost:27018", "localhost:27019", "localhost:27020"]
2424
master = "localhost:27018"
25+
seconardaries = List.delete(all_hosts, master)
2526

2627
opts = [
27-
read_preference: ReadPreference.defaults(%{mode: :secondary})
28+
read_preference: ReadPreference.primary(%{mode: :secondary})
2829
]
29-
assert {:ok, List.delete(all_hosts, master)} == TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
30+
{:ok, {[server], _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
31+
32+
assert Enum.any?(seconardaries, fn sec -> sec == server end)
3033

3134
opts = [
32-
read_preference: ReadPreference.defaults(%{mode: :primary})
35+
read_preference: ReadPreference.primary(%{mode: :primary})
3336
]
34-
assert {:ok, [master]} == TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
37+
assert {:ok, {[master], _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
3538

3639
opts = [
37-
read_preference: ReadPreference.defaults(%{mode: :primary_preferred})
40+
read_preference: ReadPreference.primary(%{mode: :primary_preferred})
3841
]
39-
assert {:ok, [master]} == TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
42+
assert {:ok, {[master], _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
4043

4144
opts = [
42-
read_preference: ReadPreference.defaults(%{mode: :primary_preferred})
45+
read_preference: ReadPreference.primary(%{mode: :primary_preferred})
4346
]
44-
assert {:ok, List.delete(all_hosts, master)} == TopologyDescription.select_servers(repl_set_no_master(), :read, opts)
47+
{:ok, {[server], _}} = TopologyDescription.select_servers(repl_set_no_master(), :read, opts)
48+
assert Enum.any?(seconardaries, fn sec -> sec == server end)
4549

4650

4751
opts = [
48-
read_preference: ReadPreference.defaults(%{mode: :nearest})
52+
read_preference: ReadPreference.primary(%{mode: :nearest})
4953
]
50-
assert {:ok, all_hosts} == TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
54+
{:ok, {[server], _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
55+
assert Enum.any?(all_hosts, fn sec -> sec == server end)
5156

5257
opts = [
53-
read_preference: ReadPreference.defaults(%{mode: :secondary})
58+
read_preference: ReadPreference.primary(%{mode: :secondary})
5459
]
55-
assert {:ok, List.delete(all_hosts, master)} == TopologyDescription.select_servers(repl_set_no_master(), :read, opts)
60+
{:ok, {[server], _}} = TopologyDescription.select_servers(repl_set_no_master(), :read, opts)
61+
assert Enum.any?(seconardaries, fn sec -> sec == server end)
5662

5763
opts = [
58-
read_preference: ReadPreference.defaults(%{mode: :secondary_preferred})
64+
read_preference: ReadPreference.primary(%{mode: :secondary_preferred})
5965
]
60-
assert {:ok, List.delete(all_hosts, master)} == TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
66+
{:ok, {[server], _}} = TopologyDescription.select_servers(repl_set_with_master(), :read, opts)
67+
assert Enum.any?(seconardaries, fn sec -> sec == server end)
6168

62-
assert {:ok, [master]} == TopologyDescription.select_servers(repl_set_only_master(), :read, opts)
69+
assert {:ok, {[^master], _}} = TopologyDescription.select_servers(repl_set_only_master(), :read, opts)
6370

64-
assert {:ok, List.delete(all_hosts, master)} == TopologyDescription.select_servers(repl_set_no_master(), :read, opts)
71+
{:ok, {[server], _}} = TopologyDescription.select_servers(repl_set_no_master(), :read, opts)
72+
assert Enum.any?(seconardaries, fn sec -> sec == server end)
6573

6674
opts = [
67-
read_preference: ReadPreference.defaults(%{mode: :nearest})
75+
read_preference: ReadPreference.primary(%{mode: :nearest})
6876
]
69-
assert {:ok, all_hosts} == TopologyDescription.select_servers(repl_set_no_master(), :read, opts)
77+
{:ok, {[server], _}} = TopologyDescription.select_servers(repl_set_no_master(), :read, opts)
78+
assert Enum.any?(all_hosts, fn sec -> sec == server end)
7079

7180
end
7281

@@ -76,6 +85,6 @@ defmodule Mongo.TopologyDescriptionTest do
7685
opts = [
7786
read_preference: %{mode: :secondary}
7887
]
79-
assert {:ok, single_server} == TopologyDescription.select_servers(single(), :read, opts)
88+
assert {:ok, {^single_server, _}} = TopologyDescription.select_servers(single(), :read, opts)
8089
end
8190
end

test/mongo/topology_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ defmodule Mongo.TopologyTest do
1111
for mode <- @modes do
1212
assert {:ok, %Mongo.InsertOneResult{inserted_id: new_id}} = Mongo.insert_one(mongo_pid, "test", %{topology_test: 1}, w: 3)
1313

14-
rp = Mongo.ReadPreference.defaults(%{mode: mode})
14+
rp = Mongo.ReadPreference.primary(%{mode: mode})
1515
assert [%{"_id" => ^new_id, "topology_test" => 1}] =
1616
mongo_pid
1717
|> Mongo.find("test", %{_id: new_id}, read_preference: rp, slave_ok: mode in [:secondary, :secondary_preferred])

0 commit comments

Comments
 (0)