Skip to content

Commit a1afbe0

Browse files
authored
Merge branch 'zookzook:master' into master
2 parents d3a9062 + 2823e26 commit a1afbe0

23 files changed

+413
-169
lines changed

.credo.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@
126126
{Credo.Check.Refactor.MapJoin, []},
127127
{Credo.Check.Refactor.NegatedConditionsInUnless, []},
128128
{Credo.Check.Refactor.NegatedConditionsWithElse, []},
129-
{Credo.Check.Refactor.Nesting, []},
129+
{Credo.Check.Refactor.Nesting, [max_nesting: 4]},
130130
{Credo.Check.Refactor.UnlessWithElse, []},
131131
{Credo.Check.Refactor.WithClauses, []},
132132
{Credo.Check.Refactor.FilterFilter, []},

README.md

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -719,7 +719,7 @@ You need roughly three additional configuration steps:
719719
* Authenticate with an x.509 Certificate
720720

721721
To get the x.509 authentication working you need to prepare the ssl configuration accordingly:
722-
* you need set the ssl option: `verify_peer`
722+
* you need to set the ssl option: `verify_peer`
723723
* you need to specify the `cacertfile` because Erlang BEAM don't provide any CA certificate store by default
724724
* you maybe need to customize the hostname check to allow wildcard certificates
725725
* you need to specify the `username` from the subject entry of the user certificate
@@ -802,15 +802,15 @@ a simple map, supporting the following keys:
802802

803803
* `:mode`, possible values: `:primary`, `:primary_preferred`, `:secondary`, `:secondary_preferred` and `:nearest`
804804
* `:max_staleness_ms`, the maxStaleness value in milliseconds
805-
* `:tag_sets`, the set of tags, for example: `[dc: "west", usage: "production"]`
805+
* `:tags`, the set of tags, for example: `[dc: "west", usage: "production"]`
806806

807807
The driver selects the server using the read preference.
808808

809809
```elixr
810810
prefs = %{
811811
mode: :secondary,
812812
max_staleness_ms: 120_000,
813-
tag_sets: [dc: "west", usage: "production"]
813+
tags: [dc: "west", usage: "production"]
814814
}
815815
816816
Mongo.find_one(top, "dogs", %{name: "Oskar"}, read_preference: prefs)
@@ -907,7 +907,7 @@ result = Mongo.BulkWrite.write(:mongo, bulk, w: 1)
907907
In the following example we import 1.000.000 integers into the MongoDB using the stream api:
908908

909909
We need to create an insert operation for each number. Then we call the `Mongo.UnorderedBulk.stream`
910-
function to import it. This function returns a stream function which accumulate
910+
function to import it. This function returns a stream function that accumulates
911911
all inserts operations until the limit `1000` is reached. In this case the operation group is send to
912912
MongoDB. So using the stream api you can reduce the memory using while
913913
importing big volume of data.
@@ -1026,7 +1026,7 @@ That means, you can just generate a `raise :should_not_happen` exception as well
10261026

10271027
## Command Monitoring
10281028

1029-
You can watch all events that are triggered while the driver send requests and processes responses. You can use the
1029+
You can watch all events that are triggered while the driver sends requests and processes responses. You can use the
10301030
`Mongo.EventHandler` as a starting point. It logs the events from the topic `:commands` (by ignoring the `:isMaster` command)
10311031
to `Logger.info`:
10321032

@@ -1041,7 +1041,7 @@ iex> {:ok, conn} = Mongo.start_link(url: "mongodb://localhost:27017/test")
10411041

10421042
## Testing
10431043

1044-
Latest MongoDB is used while running the tests. Replica set of three nodes is created and runs all test except the socket and ssl test. If you want to
1044+
Latest MongoDB is used while running the tests. Replica set of three nodes is created and runs all tests, except the socket and ssl test. If you want to
10451045
run the test cases against other MongoDB deployments or older versions, you can use the [mtools](https://github.com/rueckstiess/mtools) for deployment and run the test cases locally:
10461046

10471047
```bash

lib/mongo.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1502,7 +1502,7 @@ defmodule Mongo do
15021502
@spec exec_command_session(GenServer.server(), BSON.document(), Keyword.t()) ::
15031503
{:ok, BSON.document() | nil} | {:error, Mongo.Error.t()}
15041504
def exec_command_session(session, cmd, opts) do
1505-
with {:ok, conn, new_cmd} <- Session.bind_session(session, cmd),
1505+
with {:ok, conn, new_cmd, opts} <- Session.bind_session(session, cmd, opts),
15061506
{:ok, _cmd, response} <- DBConnection.execute(conn, %Query{action: {:command, new_cmd}}, [], opts),
15071507
:ok <- Session.update_session(session, response, opts),
15081508
{:ok, {_flags, doc}} <- check_for_error(response, cmd, opts) do

lib/mongo/grid_fs/upload.ex

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,12 @@ defmodule Mongo.GridFs.Upload do
77

88
@doc """
99
Opens a stream that the application can write the contents of the file to.
10-
The driver generates the file id.
10+
The driver generates the file id if not provided.
1111
1212
User data for the 'metadata' field of the files collection document.
1313
"""
14-
@spec open_upload_stream(Mongo.GridFs.Bucket.t(), String.t(), BSON.document() | nil) :: UploadStream.t()
15-
def open_upload_stream(bucket, filename, meta \\ nil) do
16-
UploadStream.new(bucket, filename, meta)
14+
@spec open_upload_stream(Mongo.GridFs.Bucket.t(), String.t(), BSON.document() | nil, UploadStream.file_id() | nil) :: UploadStream.t()
15+
def open_upload_stream(bucket, filename, meta \\ nil, file_id \\ nil) do
16+
UploadStream.new(bucket, filename, meta, file_id)
1717
end
1818
end

lib/mongo/grid_fs/upload_stream.ex

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,9 +20,10 @@ defmodule Mongo.GridFs.UploadStream do
2020
alias Mongo.GridFs.Bucket
2121
alias Mongo.GridFs.UploadStream
2222

23+
@type file_id :: BSON.ObjectId.t() | binary()
2324
@type t :: %__MODULE__{
2425
bucket: Bucket.t(),
25-
id: BSON.ObjectId.t(),
26+
id: file_id(),
2627
filename: String.t(),
2728
metadata: {BSON.document() | nil}
2829
}
@@ -31,9 +32,9 @@ defmodule Mongo.GridFs.UploadStream do
3132
@doc """
3233
Creates a new upload stream to insert a file into the grid-fs.
3334
"""
34-
@spec new(Bucket.t(), String.t(), BSON.document() | nil) :: UploadStream.t()
35-
def new(bucket, filename, metadata \\ nil) do
36-
%UploadStream{bucket: bucket, filename: filename, id: Mongo.object_id(), metadata: metadata}
35+
@spec new(Bucket.t(), String.t(), BSON.document() | nil, file_id() | nil) :: UploadStream.t()
36+
def new(bucket, filename, metadata \\ nil, file_id \\ nil) do
37+
%UploadStream{bucket: bucket, filename: filename, id: file_id || Mongo.object_id(), metadata: metadata}
3738
end
3839

3940
defimpl Collectable, for: UploadStream do

lib/mongo/monitor.ex

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -186,18 +186,8 @@ defmodule Mongo.Monitor do
186186
##
187187
# Get a new server description from the server and send it to the Topology process.
188188
##
189-
defp update_server_description(%{topology_pid: topology_pid, address: address, mode: :streaming_mode} = state) do
190-
case get_server_description(state) do
191-
%{round_trip_time: round_trip_time} ->
192-
## debug info("Updating round_trip_time: #{inspect round_trip_time}")
193-
Topology.update_rrt(topology_pid, address, round_trip_time)
194-
195-
%{state | round_trip_time: round_trip_time}
196-
197-
error ->
198-
warning("Unable to round trip time because of #{inspect(error)}")
199-
state
200-
end
189+
defp update_server_description(%{mode: :streaming_mode} = state) do
190+
state
201191
end
202192

203193
##

lib/mongo/read_preference.ex

Lines changed: 77 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,134 +4,141 @@ defmodule Mongo.ReadPreference do
44
@moduledoc ~S"""
55
Determines which servers are considered suitable for read operations
66
7-
A read preference consists of a mode and optional `tag_sets`, max_staleness_ms, and `hedge`.
7+
A read preference consists of a mode and optional `tags`, max_staleness_ms, and `hedge`.
88
The mode prioritizes between primaries and secondaries to produce either a single suitable server or a list of candidate servers.
9-
If tag_sets and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
9+
If tags and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
1010
If hedge is set, it configures how server hedged reads are used.
1111
1212
The default mode is `:primary`.
13-
The default tag_sets is a list with an empty tag set: [{}].
13+
The default tags is a list with an empty tag set: [{}].
1414
The default max_staleness_ms is unset.
1515
The default hedge is unset.
1616
1717
## mode
1818
1919
* `:primary` Only an available primary is suitable.
20-
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tag_sets and maxStalenessSeconds) are suitable.
20+
* `:secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tags and maxStalenessSeconds) are suitable.
2121
* `:primary_preferred` If a primary is available, only the primary is suitable. Otherwise, all secondaries are candidates,
2222
but only eligible secondaries are suitable.
2323
* `:secondary_preferred` All secondaries are candidates. If there is at least one eligible secondary, only eligible secondaries are suitable.
2424
Otherwise, when there are no eligible secondaries, the primary is suitable.
2525
* `:nearest` The primary and all secondaries are candidates, but only eligible candidates are suitable.
2626
2727
"""
28-
@type t :: %{
29-
mode:
30-
:primary
31-
| :secondary
32-
| :primary_preferred
33-
| :secondary_preferred
34-
| :nearest,
35-
tag_sets: [%{String.t() => String.t()}],
36-
max_staleness_ms: non_neg_integer,
37-
hedge: BSON.document()
38-
}
3928

4029
@primary %{
4130
mode: :primary,
42-
tag_sets: [],
31+
tags: [],
4332
max_staleness_ms: 0
4433
}
4534

46-
def primary(map \\ nil)
35+
@doc """
36+
Merge default values to the read preferences and converts deprecated tag_sets to tags
37+
"""
38+
def merge_defaults(%{tag_sets: tags} = map) do
39+
map =
40+
map
41+
|> Map.delete(:tag_sets)
42+
|> Map.put(:tags, tags)
43+
44+
Map.merge(@primary, map)
45+
end
4746

48-
def primary(map) when is_map(map) do
47+
def merge_defaults(map) when is_map(map) do
4948
Map.merge(@primary, map)
5049
end
5150

52-
def primary(_), do: @primary
51+
def merge_defaults(_other) do
52+
@primary
53+
end
5354

5455
@doc """
5556
Add read preference to the cmd
5657
"""
5758
def add_read_preference(cmd, opts) do
5859
case Keyword.get(opts, :read_preference) do
59-
nil -> cmd
60-
pref -> cmd ++ ["$readPreference": pref]
60+
nil ->
61+
cmd
62+
63+
pref ->
64+
cmd ++ ["$readPreference": pref]
6165
end
6266
end
6367

6468
@doc """
65-
From the specs:
66-
67-
Use of slaveOk
68-
69-
There are two usages of slaveOK:
70-
71-
* A driver query parameter that predated read preference modes and tag set lists.
72-
* A wire protocol flag on OP_QUERY operations
73-
69+
Converts the preference to the mongodb format for replica sets
7470
"""
75-
def slave_ok(%{:mode => :primary}) do
76-
%{:mode => :primary}
71+
def to_replica_set(%{:mode => :primary}) do
72+
%{mode: :primary}
7773
end
7874

79-
def slave_ok(config) do
75+
def to_replica_set(config) do
8076
mode =
8177
case config[:mode] do
82-
:primary_preferred -> :primaryPreferred
83-
:secondary_preferred -> :secondaryPreferred
84-
other -> other
85-
end
78+
:primary_preferred ->
79+
:primaryPreferred
8680

87-
filter_nils(mode: mode, tag_sets: config[:tag_sets])
88-
end
81+
:secondary_preferred ->
82+
:secondaryPreferred
8983

90-
##
91-
# Therefore, when sending queries to a mongos, the following rules apply:
92-
#
93-
# For mode 'primary', drivers MUST NOT set the slaveOK wire protocol flag and MUST NOT use $readPreference
94-
def mongos(%{mode: :primary}) do
95-
nil
96-
end
84+
other ->
85+
other
86+
end
9787

98-
# For mode 'secondary', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
99-
def mongos(%{mode: :secondary} = config) do
100-
transform(config)
101-
end
88+
case config[:tags] do
89+
[] ->
90+
%{mode: mode}
10291

103-
# For mode 'primaryPreferred', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
104-
def mongos(%{mode: :primary_preferred} = config) do
105-
transform(config)
106-
end
92+
nil ->
93+
%{mode: mode}
10794

108-
# For mode 'secondaryPreferred', drivers MUST set the slaveOK wire protocol flag. If the read preference contains a
109-
# non-empty tag_sets parameter, maxStalenessSeconds is a positive integer, or the hedge parameter is non-empty,
110-
# drivers MUST use $readPreference; otherwise, drivers MUST NOT use $readPreference
111-
def mongos(%{mode: :secondary_preferred} = config) do
112-
transform(config)
95+
tags ->
96+
%{mode: mode, tags: [tags]}
97+
end
11398
end
11499

115-
# For mode 'nearest', drivers MUST set the slaveOK wire protocol flag and MUST also use $readPreference
116-
def mongos(%{mode: :nearest} = config) do
117-
transform(config)
100+
@doc """
101+
Converts the preference to the mongodb format for mongos
102+
"""
103+
def to_mongos(%{mode: :primary}) do
104+
nil
118105
end
119106

120-
defp transform(config) do
107+
# for the others we should use the read preferences
108+
def to_mongos(config) do
121109
mode =
122110
case config[:mode] do
123-
:primary_preferred -> :primaryPreferred
124-
:secondary_preferred -> :secondaryPreferred
125-
other -> other
111+
:primary_preferred ->
112+
:primaryPreferred
113+
114+
:secondary_preferred ->
115+
:secondaryPreferred
116+
117+
other ->
118+
other
126119
end
127120

128121
max_staleness_seconds =
129122
case config[:max_staleness_ms] do
130-
i when is_integer(i) -> div(i, 1000)
131-
nil -> nil
123+
i when is_integer(i) ->
124+
div(i, 1000)
125+
126+
nil ->
127+
nil
128+
end
129+
130+
read_preference =
131+
case config[:tags] do
132+
[] ->
133+
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}
134+
135+
nil ->
136+
%{mode: mode, maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}
137+
138+
tags ->
139+
%{mode: mode, tags: [tags], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]}
132140
end
133141

134-
[mode: mode, tag_sets: config[:tag_sets], maxStalenessSeconds: max_staleness_seconds, hedge: config[:hedge]]
135-
|> filter_nils()
142+
filter_nils(read_preference)
136143
end
137144
end

0 commit comments

Comments
 (0)