Skip to content

Commit d6ad85c

Browse files
committed
fixes #54
1 parent f372865 commit d6ad85c

File tree

10 files changed

+344
-174
lines changed

10 files changed

+344
-174
lines changed

README.md

Lines changed: 27 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -52,13 +52,9 @@ BSON symbols can only be decoded.
5252

5353
### Installation:
5454

55-
Add `mongodb_driver` to your mix.exs `deps` and `:applications`.
55+
Add `mongodb_driver` to your mix.exs `deps`.
5656

5757
```elixir
58-
def application do
59-
[applications: [:mongodb_driver]]
60-
end
61-
6258
defp deps do
6359
[{:mongodb_driver, "~> 0.6"}]
6460
end
@@ -171,6 +167,27 @@ you'll want to add this cipher to your `ssl_opts`:
171167
]
172168
)
173169
```
170+
171+
### Find
172+
173+
Using `$and`
174+
175+
```elixir
176+
Mongo.find(:mongo, "users", %{"$and" => [%{email: "[email protected]"}, %{first_name: "first_name"}]})
177+
```
178+
179+
Using `$or`
180+
181+
```elixir
182+
Mongo.find(:mongo, "users", %{"$or" => [%{email: "[email protected]"}, %{first_name: "first_name"}]})
183+
```
184+
185+
Using `$in`
186+
187+
```elixir
188+
Mongo.find(:mongo, "users", %{email: %{"$in" => ["[email protected]", "[email protected]"]}})
189+
```
190+
174191
### Change streams
175192

176193
Change streams exist in replica set and cluster systems and tell you about changes to collections.
@@ -351,25 +368,10 @@ For more information see:
351368

352369
and have a look at the test units as well.
353370

354-
### Examples
355371

356-
Using `$and`
357-
358-
```elixir
359-
Mongo.find(:mongo, "users", %{"$and" => [%{email: "[email protected]"}, %{first_name: "first_name"}]})
360-
```
372+
### Command Monitoring
361373

362-
Using `$or`
363-
364-
```elixir
365-
Mongo.find(:mongo, "users", %{"$or" => [%{email: "[email protected]"}, %{first_name: "first_name"}]})
366-
```
367-
368-
Using `$in`
369-
370-
```elixir
371-
Mongo.find(:mongo, "users", %{email: %{"$in" => ["[email protected]", "[email protected]"]}})
372-
```
374+
You can watch all events that are triggered while the driver send requests and processes responses.
373375

374376
## Testing and Travis CI
375377

@@ -410,32 +412,17 @@ $ mongod --sslMode allowSSL --sslPEMKeyFile /path/to/mongodb.pem
410412

411413
Special thanks to [JetBrains](https://www.jetbrains.com/?from=elixir-mongodb-driver) for providing a free JetBrains Open Source license for their complete toolbox.
412414

413-
This is an alternative development from the [original](https://github.com/ankhers/mongodb), which was the starting point
414-
and already contained very nice code.
415-
416415
The [Documentation](https://hexdocs.pm/mongodb_driver/readme.html) is online, but currently not up to date.
417416
This will be done as soon as possible. In the meantime, look in the source code. Especially
418417
for the individual options.
419418

420-
## Motivation
421-
422-
* [x] I have made a number of changes to understand how the driver works. For example, I reduced cursor modules to just one cursor and
423-
replaced some op code calls with command calls.
424-
* [x] Simplify code: remove raw_find (raw_find called from cursors, raw_find called with "$cmd"), so raw_find is more calling a command than a find query.
425-
* [x] Better support for new MongoDB version, for example the ability to use views
426-
* [x] Upgraded to ([DBConnection 2.x](https://github.com/elixir-ecto/db_connection))
427-
* [x] Removed depreacated op codes ([See](https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#request-opcodes))
428-
* [x] Added `op_msg` support ([See](https://docs.mongodb.com/manual/reference/mongodb-wire-protocol/#op-msg))
429-
* [x] Added bulk writes ([See](https://github.com/mongodb/specifications/blob/master/source/crud/crud.rst#write))
430-
* [x] Add support for driver sessions ([See](https://github.com/mongodb/specifications/blob/master/source/sessions/driver-sessions.rst))
431-
* [x] Add support for driver transactions ([See](https://github.com/mongodb/specifications/blob/master/source/transactions/transactions.rst))
432-
* [ ] Add support for `op_compressed` ([See](https://github.com/mongodb/specifications/blob/master/source/compression/OP_COMPRESSED.rst))
433-
419+
This driver is based on [original](https://github.com/ankhers/mongodb).
420+
434421
## License
435422

436423
Copyright 2015 Eric Meadows-Jönsson and Justin Wood
437424

438-
Copyright 2019 Michael Maier
425+
Copyright 2019 - 2020 Michael Maier
439426

440427
Licensed under the Apache License, Version 2.0 (the "License");
441428
you may not use this file except in compliance with the License.

lib/mongo/event_handler.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ defmodule Mongo.EventHandler do
33

44
require Logger
55

6-
@all [:commands, :is_master, :topology]
6+
@all [:commands, :topology]
77

88
def start(opts \\ [:commands]) do
99
Logger.info("Starting EventHandler")

lib/mongo/events.ex

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,15 @@ defmodule Mongo.Events do
8383
]
8484
end
8585

86+
87+
##
88+
#
89+
defmodule ServerSelectionEmptyEvent do
90+
@moduledoc false
91+
defstruct [:action, :cmd_type, :topology, :opts]
92+
end
93+
94+
8695
# Published when server description changes, but does NOT include changes to
8796
# the RTT
8897
defmodule ServerDescriptionChangedEvent do

lib/mongo/monitor.ex

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,16 @@ defmodule Mongo.Monitor do
164164

165165
defp call_is_master_command(conn_pid, opts) do
166166

167-
start_time = System.monotonic_time
168-
result = Mongo.exec_command(conn_pid, [isMaster: 1], opts)
169-
finish_time = System.monotonic_time
170-
rtt = System.convert_time_unit(finish_time - start_time, :native, :millisecond)
171-
finish_time = System.convert_time_unit(finish_time, :native, :millisecond)
167+
{rtt, result} = :timer.tc(fn -> Mongo.exec_command(conn_pid, [isMaster: 1], opts) end)
172168

173-
{result, finish_time, rtt}
169+
finish_time = DateTime.utc_now()
170+
#start_time = System.monotonic_time
171+
#result = Mongo.exec_command(conn_pid, [isMaster: 1], opts)
172+
#finish_time = System.monotonic_time
173+
#rtt = System.convert_time_unit(finish_time - start_time, :native, :millisecond)
174+
#finish_time = System.convert_time_unit(finish_time, :native, :millisecond)
175+
176+
{result, finish_time, div(rtt, 1000)}
174177
end
175178

176179
defp notify_error(rtt, error, conn_pid) do

lib/mongo/read_preference.ex

Lines changed: 60 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,45 @@
11
defmodule Mongo.ReadPreference do
2+
3+
import Keywords
4+
25
@moduledoc ~S"""
36
Determines which servers are considered suitable for read operations
7+
8+
A read preference consists of a mode and optional `tag_sets`, max_staleness_ms, and `hedge`.
9+
The mode prioritizes between primaries and secondaries to produce either a single suitable server or a list of candidate servers.
10+
If tag_sets and maxStalenessSeconds are set, they determine which candidate servers are eligible for selection.
11+
If hedge is set, it configures how server hedged reads are used.
12+
13+
The default mode is 'primary'.
14+
The default tag_sets is a list with an empty tag set: [{}].
15+
The default max_staleness_ms is unset.
16+
The default hedge is unset.
17+
18+
## mode
19+
20+
* `primary` Only an available primary is suitable.
21+
* `secondary` All secondaries (and only secondaries) are candidates, but only eligible candidates (i.e. after applying tag_sets and maxStalenessSeconds) are suitable.
22+
* `primaryPreferred` If a primary is available, only the primary is suitable. Otherwise, all secondaries are candidates,
23+
but only eligible secondaries are suitable.
24+
* `secondaryPreferred` All secondaries are candidates. If there is at least one eligible secondary, only eligible secondaries are suitable.
25+
Otherwise, when there are no eligible secondaries, the primary is suitable.
26+
* `nearest` The primary and all secondaries are candidates, but only eligible candidates are suitable.
27+
428
"""
529
@type t :: %{
6-
mode: :primary | :secondary | :primary_preferred | :secondary_preferred |
30+
mode: :primary |
31+
:secondary |
32+
:primary_preferred |
33+
:secondary_preferred |
734
:nearest,
835
tag_sets: [%{String.t => String.t}],
9-
max_staleness_ms: non_neg_integer
36+
max_staleness_ms: non_neg_integer,
37+
hedge: BSON.document
1038
}
1139

1240
@default %{
1341
mode: :primary,
14-
tag_sets: [%{}],
42+
tag_sets: [],
1543
max_staleness_ms: 0
1644
}
1745

@@ -29,13 +57,38 @@ defmodule Mongo.ReadPreference do
2957
read_preference = opts
3058
|> Keyword.get(:read_preference)
3159
|> Mongo.ReadPreference.defaults()
32-
|> Map.update(:mode, :primary, fn mode -> map_mode(mode) end)
60+
|> transform()
3361

3462
cmd ++ ["$readPreference": read_preference]
3563
end
3664

37-
defp map_mode(:primary_preferred), do: :primaryPreferred
38-
defp map_mode(:secondary_preferred), do: :secondaryPreferred
39-
defp map_mode(mode), do: mode
65+
defp transform(%{:mode => :primary}) do
66+
%{:mode => :primary}
67+
end
68+
defp transform(config) do
4069

70+
mode = case config[:mode] do
71+
:primary_preferred -> :primaryPreferred
72+
:secondary_preferred -> :secondaryPreferred
73+
other -> other
74+
end
75+
76+
max_staleness_seconds = case config[:max_staleness_ms] do
77+
i when is_integer(i) -> div(i, 1000)
78+
nil -> nil
79+
end
80+
81+
[mode: mode,
82+
tag_sets: config[:tag_sets],
83+
maxStalenessSeconds: max_staleness_seconds,
84+
hedge: config[:hedge]]
85+
|> filter_nils()
86+
87+
end
88+
89+
defp is_max_staleness_valid?() do
90+
#max_staleness_ms >= heartbeatFrequencyMS + idleWritePeriodMS
91+
#max_staleness_ms >= smallestMaxStalenessSeconds
92+
93+
end
4194
end

lib/mongo/topology.ex

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,8 @@ defmodule Mongo.Topology do
55

66
use GenServer
77
alias Mongo.Events.{ServerDescriptionChangedEvent, ServerOpeningEvent, ServerClosedEvent,
8-
TopologyDescriptionChangedEvent, TopologyOpeningEvent, TopologyClosedEvent}
8+
TopologyDescriptionChangedEvent, TopologyOpeningEvent, TopologyClosedEvent,
9+
ServerSelectionEmptyEvent}
910
alias Mongo.TopologyDescription
1011
alias Mongo.ServerDescription
1112
alias Mongo.Monitor
@@ -99,7 +100,8 @@ defmodule Mongo.Topology do
99100
type: type,
100101
set_name: set_name,
101102
servers: servers,
102-
local_threshold_ms: local_threshold_ms
103+
local_threshold_ms: local_threshold_ms,
104+
heartbeat_frequency_ms: @heartbeat_frequency_ms
103105
}),
104106
seeds: seeds,
105107
opts: opts,
@@ -252,23 +254,20 @@ defmodule Mongo.Topology do
252254
end
253255

254256
# checkout a new session
255-
def handle_call({:checkout_session, cmd_type, type, opts}, from, %{:topology => topology, :waiting_pids => waiting, connection_pools: pools} = state) do
257+
def handle_call({:checkout_session, cmd_type, type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
256258

257259
case TopologyDescription.select_servers(topology, cmd_type, opts) do
258260
:empty ->
259-
Logger.debug("select_server: empty")
261+
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :checkout_session, cmd_type: cmd_type, topology: topology, opts: opts})
260262
{:noreply, %{state | waiting_pids: [from | waiting]}} ## no servers available, wait for connection
261263

262264
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
263-
Logger.debug("select_server: found #{inspect servers}, pools: #{inspect pools}")
264-
265265
with address <- Enum.take_random(servers, 1),
266266
{:ok, connection} <- get_connection(address, state),
267267
wire_version <- wire_version(address, topology),
268268
{server_session, new_state} <- checkout_server_session(state),
269269
{:ok, session} <- Session.start_link(self(), connection, server_session, type, wire_version, opts) do
270270

271-
Logger.debug("select_server: connection is #{inspect connection}, server_session is #{inspect server_session}")
272271
{:reply, {:ok, session}, new_state}
273272
end
274273

@@ -278,63 +277,50 @@ defmodule Mongo.Topology do
278277
end
279278
end
280279

281-
def handle_call({:select_server, type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
282-
case TopologyDescription.select_servers(topology, type, opts) do
280+
def handle_call({:select_server, cmd_type, opts}, from, %{:topology => topology, :waiting_pids => waiting} = state) do
281+
case TopologyDescription.select_servers(topology, cmd_type, opts) do
283282
:empty ->
284-
Logger.debug("select_server: empty")
283+
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :select_server, cmd_type: cmd_type, topology: topology, opts: opts})
285284
{:noreply, %{state | waiting_pids: [from | waiting]}} ## no servers available, wait for connection
286285

287286
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
288-
Logger.debug("select_server: found #{inspect servers}")
289-
290287
with {:ok, connection} <- servers
291288
|> Enum.take_random(1)
292289
|> get_connection(state) do
293-
Logger.debug("select_server: connection is #{inspect connection}")
294-
295290
{:reply, {:ok, connection}, state}
296291
end
297292
error ->
298-
Logger.debug("select_servers: #{inspect error}")
299293
{:reply, error, state} ## in case of an error, just return the error
300294
end
301295
end
302296

303297
def handle_call(:limits, _from, %{:topology => topology} = state) do
304298
case TopologyDescription.select_servers(topology, :write, []) do
305299
:empty ->
306-
Logger.debug("select_server: empty")
300+
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :limits, cmd_type: :write, topology: topology})
307301
{:reply, nil, state}
308-
309302
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
310-
Logger.debug("select_server: found #{inspect servers}")
311-
312303
with {:ok, limits} <- servers
313304
|> Enum.take_random(1)
314305
|> get_limits(topology) do
315-
Logger.debug("select_server: connection is #{inspect limits}")
316-
317306
{:reply, {:ok, limits}, state}
318307
end
319308
error ->
320-
Logger.debug("select_servers: #{inspect error}")
321309
{:reply, error, state} ## in case of an error, just return the error
322310
end
323311
end
324312

325313
def handle_call(:wire_version, _from, %{:topology => topology} = state) do
326314
case TopologyDescription.select_servers(topology, :write, []) do
327315
:empty ->
328-
Logger.debug("select_server: empty")
316+
Mongo.Events.notify(%ServerSelectionEmptyEvent{action: :wire_version, cmd_type: :write, topology: topology})
329317
{:reply, nil, state}
330318

331319
{:ok, servers} -> ## found, select randomly a server and return its connection_pool
332-
Logger.debug("select_server: found #{inspect servers}")
333320
with address <- Enum.take_random(servers, 1) do
334321
{:reply, {:ok, wire_version(address, topology)}, state}
335322
end
336323
error ->
337-
Logger.debug("select_servers: #{inspect error}")
338324
{:reply, error, state} ## in case of an error, just return the error
339325
end
340326
end

0 commit comments

Comments
 (0)