Skip to content

Commit b576088

Browse files
committed
first implementation of the event stream
1 parent 3c8ae30 commit b576088

File tree

6 files changed

+402
-112
lines changed

6 files changed

+402
-112
lines changed

CHANGELOG.md

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,13 @@
1-
## v0.5.0-dev
1+
## v0.5.1
22

33
* Enhancements
44
* upgraded to DBConnection 2.0.6
55
* refactored code, simplified code and api
66
* replaced deprecated op_code by database commands
77
* update_one, update_many, replace_one, replace_many return upserted ids
88
* support for all find options
9+
* Support for MongoDB 3.6 collection [Change Streams](https://docs.mongodb.com/manual/changeStreams/)
10+
* Support for SCRAM-SHA-256 (MongoDB 4.x)
911

1012
## v0.4.8-dev
1113

lib/mongo.ex

Lines changed: 113 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@ defmodule Mongo do
2020
a 1-arity fun, `{module, function, args}` with `DBConnection.LogEntry.t`
2121
prepended to `args` or `nil`. See `DBConnection.LogEntry` (default: `nil`)
2222
* `:database` - the database to run the operation on
23-
* `:connect_timeout_ms` - maximum timeout for connect (default: `5_000`)
23+
* `:connect_timeout` - maximum timeout for connect (default: `5_000`)
2424
2525
## Read options
2626
@@ -54,7 +54,7 @@ defmodule Mongo do
5454
alias Mongo.Topology
5555
alias Mongo.UrlParser
5656

57-
@timeout 5000
57+
@timeout 15000 # 5000
5858

5959
@dialyzer [no_match: [count_documents!: 4]]
6060

@@ -64,8 +64,6 @@ defmodule Mongo do
6464
@type result(t) :: :ok | {:ok, t} | {:error, Mongo.Error.t}
6565
@type result!(t) :: nil | t | no_return
6666

67-
@cmd_collection "$cmd"
68-
6967
defmacrop bangify(result) do
7068
quote do
7169
case unquote(result) do
@@ -101,7 +99,7 @@ defmodule Mongo do
10199
* `:idle` - The idle strategy, `:passive` to avoid checkin when idle and
102100
`:active` to checking when idle (default: `:passive`)
103101
* `:idle_timeout` - The idle timeout to ping the database (default: `1_000`)
104-
* `:connect_timeout_ms` - The maximum timeout for the initial connection
102+
* `:connect_timeout` - The maximum timeout for the initial connection
105103
(default: `5_000`)
106104
* `:backoff_min` - The minimum backoff interval (default: `1_000`)
107105
* `:backoff_max` - The maximum backoff interval (default: `30_000`)
@@ -141,41 +139,86 @@ defmodule Mongo do
141139
Mongo.IdServer.new
142140
end
143141

142+
@doc"""
143+
144+
Creates a change stream cursor on collections.
145+
146+
`on_resume_token` is function that takes the new resume token, if it changed.
147+
148+
## Options
149+
150+
* `:full_document` -
151+
* `:max_time` - Specifies a time limit in milliseconds. This option is used on `getMore` commands
152+
* `:batch_size` - Specifies the number of maximum number of documents to
153+
return (default: 1)
154+
* `:resume_after` - Specifies the logical starting point for the new change stream.
155+
* `:start_at_operation_time` - The change stream will only provide changes that occurred at or after the specified timestamp (since 4.0)
156+
* `:start_after` - Similar to `resumeAfter`, this option takes a resume token and starts a new change stream
157+
returning the first notification after the token. This will allow users to watch collections that have been dropped and recreated
158+
or newly renamed collections without missing any notifications. (since 4.0.7)
159+
"""
160+
@spec watch_collection(GenServer.server, collection, [BSON.document], fun, Keyword.it) :: cursor
161+
def watch_collection(topology_pid, coll, pipeline, on_resume_token \\ nil, opts \\ []) do
162+
163+
stream_opts = %{
164+
fullDocument: opts[:full_document] || "default",
165+
resumeAfter: opts[:resume_after],
166+
startAtOperationTime: opts[:start_at_operation_time],
167+
startAfter: opts[:start_after]
168+
} |> filter_nils()
169+
170+
cmd = [
171+
aggregate: coll,
172+
pipeline: [%{"$changeStream" => stream_opts} | pipeline],
173+
explain: opts[:explain],
174+
allowDiskUse: opts[:allow_disk_use],
175+
collation: opts[:collation],
176+
maxTimeMS: opts[:max_time],
177+
cursor: filter_nils(%{batchSize: opts[:batch_size]}),
178+
bypassDocumentValidation: opts[:bypass_document_validation],
179+
hint: opts[:hint],
180+
comment: opts[:comment],
181+
readConcern: opts[:read_concern]
182+
] |> filter_nils()
183+
184+
opts = Keyword.drop(opts, ~w(full_document resume_after start_at_operation_time start_after explain allow_disk_use collation bypass_document_validation hint comment read_concern)a)
185+
186+
on_resume_token = on_resume_token || (fn _token -> nil end)
187+
change_stream_cursor(topology_pid, cmd, on_resume_token, opts)
188+
189+
end
190+
191+
@spec watch_db(GenServer.server, [BSON.document], fun, Keyword.it) :: cursor
192+
def watch_db(topology_pid, pipeline, on_resume_token \\ nil, opts \\ []) do
193+
watch_collection(topology_pid, 0, pipeline, on_resume_token, opts)
194+
end
195+
144196
@doc """
145197
Performs aggregation operation using the aggregation pipeline.
146198
147-
## Options
199+
For all options see [Options](https://docs.mongodb.com/manual/reference/command/aggregate/#aggregate)
148200
149-
* `:allow_disk_use` - Enables writing to temporary files (Default: false)
150-
* `:collation` - Optionally specifies a collation to use in MongoDB 3.4 and
151-
* `:max_time` - Specifies a time limit in milliseconds
152-
* `:use_cursor` - Use a cursor for a batched response (Default: true)
153201
"""
154202
@spec aggregate(GenServer.server, collection, [BSON.document], Keyword.t) :: cursor
155203
def aggregate(topology_pid, coll, pipeline, opts \\ []) do
156-
query = [
204+
205+
cmd = [
157206
aggregate: coll,
158207
pipeline: pipeline,
208+
explain: opts[:explain],
159209
allowDiskUse: opts[:allow_disk_use],
160210
collation: opts[:collation],
161-
maxTimeMS: opts[:max_time]
162-
] |> filter_nils
163-
wv_query = %Query{action: :wire_version}
211+
maxTimeMS: opts[:max_time],
212+
cursor: filter_nils(%{batchSize: opts[:batch_size]}),
213+
bypassDocumentValidation: opts[:bypass_document_validation],
214+
hint: opts[:hint],
215+
comment: opts[:comment],
216+
readConcern: opts[:read_concern]
217+
] |> filter_nils()
164218

165-
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, opts),
166-
opts = Keyword.put(opts, :slave_ok, slave_ok),
167-
{:ok, _query, version} <- DBConnection.execute(conn, wv_query, [], defaults(opts)) do
168-
cursor? = version >= 1 and Keyword.get(opts, :use_cursor, true)
169-
opts = Keyword.drop(opts, ~w(allow_disk_use max_time use_cursor)a)
170-
171-
if cursor? do
172-
query = query ++ [cursor: filter_nils(%{batchSize: opts[:batch_size]})]
173-
cursor(conn, @cmd_collection, query, opts)
174-
else
175-
query = query ++ [cursor: %{}]
176-
cursor(conn, @cmd_collection, query, opts)
177-
end
178-
end
219+
opts = Keyword.drop(opts, ~w(explain allow_disk_use collation bypass_document_validation hint comment read_concern)a)
220+
221+
cursor(topology_pid, cmd, opts)
179222
end
180223

181224
@doc """
@@ -209,7 +252,7 @@ defmodule Mongo do
209252
sort: opts[:sort],
210253
upsert: opts[:upsert],
211254
collation: opts[:collation],
212-
] |> filter_nils
255+
] |> filter_nils()
213256

214257
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
215258

@@ -249,7 +292,7 @@ defmodule Mongo do
249292
sort: opts[:sort],
250293
upsert: opts[:upsert],
251294
collation: opts[:collation],
252-
] |> filter_nils
295+
] |> filter_nils()
253296

254297
opts = Keyword.drop(opts, ~w(bypass_document_validation max_time projection return_document sort upsert collation)a)
255298

@@ -281,7 +324,7 @@ defmodule Mongo do
281324
fields: opts[:projection],
282325
sort: opts[:sort],
283326
collation: opts[:collation],
284-
] |> filter_nils
327+
] |> filter_nils()
285328
opts = Keyword.drop(opts, ~w(max_time projection sort collation)a)
286329

287330
with {:ok, conn, _, _} <- select_server(topology_pid, :write, opts),
@@ -298,7 +341,7 @@ defmodule Mongo do
298341
skip: opts[:skip],
299342
hint: opts[:hint],
300343
collation: opts[:collation]
301-
] |> filter_nils
344+
] |> filter_nils()
302345

303346
opts = Keyword.drop(opts, ~w(limit skip hint collation)a)
304347

@@ -323,11 +366,11 @@ defmodule Mongo do
323366
@spec count_documents(GenServer.server, collection, BSON.document, Keyword.t) :: result(non_neg_integer)
324367
def count_documents(topology_pid, coll, filter, opts \\ []) do
325368
pipeline = [
326-
{"$match", filter},
327-
{"$skip", opts[:skip]},
328-
{"$limit", opts[:limit]},
329-
{"$group", %{"_id" => nil, "n" => %{"$sum" => 1}}}
330-
] |> filter_nils |> Enum.map(&List.wrap/1)
369+
"$match": filter,
370+
"$skip": opts[:skip],
371+
"$limit": opts[:limit],
372+
"$group": %{"_id" => nil, "n" => %{"$sum" => 1}}
373+
] |> filter_nils() |> Enum.map(&List.wrap/1)
331374

332375
documents =
333376
topology_pid
@@ -383,7 +426,7 @@ defmodule Mongo do
383426
query: filter,
384427
collation: opts[:collation],
385428
maxTimeMS: opts[:max_time]
386-
] |> filter_nils
429+
] |> filter_nils()
387430

388431
opts = Keyword.drop(opts, ~w(max_time)a)
389432

@@ -420,7 +463,7 @@ defmodule Mongo do
420463
other -> other
421464
end
422465

423-
query = [
466+
cmd = [
424467
{"find", coll},
425468
{"filter", filter},
426469
{"limit", opts[:limit]},
@@ -445,13 +488,11 @@ defmodule Mongo do
445488
{"sort", opts[:sort]}
446489
]
447490

448-
query = filter_nils(query)
491+
cmd = filter_nils(cmd)
449492

450493
drop = ~w(limit hint single_batch read_concern max min collation return_key show_record_id tailable no_cursor_timeout await_data batch_size projection comment max_time skip sort)a
451494
opts = Keyword.drop(opts, drop)
452-
with {:ok, conn, slave_ok, _} <- select_server(topology_pid, :read, opts),
453-
opts = Keyword.put(opts, :slave_ok, slave_ok),
454-
do: cursor(conn, coll, query, opts)
495+
cursor(topology_pid, cmd, opts)
455496
end
456497

457498
@doc """
@@ -498,7 +539,12 @@ defmodule Mongo do
498539
@spec direct_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
499540
def direct_command(conn, query, opts \\ []) do
500541
params = [query]
501-
query = %Query{action: :command}
542+
543+
query = case Keyword.has_key?(opts, :error) do
544+
false -> %Query{action: :command}
545+
true -> %Query{action: :error}
546+
end
547+
## query = %Query{action: :command}
502548

503549
with {:ok, _query, response} <- DBConnection.execute(conn, query, params, defaults(opts)) do
504550
case response do
@@ -512,6 +558,17 @@ defmodule Mongo do
512558
end
513559
end
514560

561+
562+
@doc """
563+
Returns the current wire version.
564+
"""
565+
def wire_version(conn, opts \\ []) do
566+
cmd = %Query{action: :wire_version}
567+
with {:ok, _query, version} <- DBConnection.execute(conn, cmd, %{}, defaults(opts)) do
568+
{:ok, version}
569+
end
570+
end
571+
515572
@doc """
516573
Similar to `command/3` but unwraps the result and raises on error.
517574
"""
@@ -843,11 +900,8 @@ defmodule Mongo do
843900
"""
844901
@spec list_indexes(GenServer.server, String.t, Keyword.t) :: cursor
845902
def list_indexes(topology_pid, coll, opts \\ []) do
846-
with {:ok, conn, slave_ok, _} <- Mongo.select_server(topology_pid, :read, opts) do
847-
opts = Keyword.put(opts, :slave_ok, slave_ok)
848-
query = [listIndexes: coll]
849-
cursor(conn, @cmd_collection, query, opts)
850-
end
903+
cmd = [listIndexes: coll]
904+
cursor(topology_pid, cmd, opts)
851905
end
852906

853907
@doc """
@@ -872,13 +926,10 @@ defmodule Mongo do
872926
#
873927
# In versions 2.8.0-rc3 and later, the listCollections command returns a cursor!
874928
#
875-
with {:ok, conn, slave_ok, _} <- Mongo.select_server(topology_pid, :read, opts) do
876-
params = [listCollections: 1]
877-
opts = Keyword.put(opts, :slave_ok, slave_ok)
878-
cursor(conn, @cmd_collection, params, opts)
879-
|> Stream.filter(fn coll -> coll["type"] == "collection" end)
880-
|> Stream.map(fn coll -> coll["name"] end)
881-
end
929+
cmd = [listCollections: 1]
930+
cursor(topology_pid, cmd, opts)
931+
|> Stream.filter(fn coll -> coll["type"] == "collection" end)
932+
|> Stream.map(fn coll -> coll["name"] end)
882933
end
883934

884935
@doc"""
@@ -939,12 +990,12 @@ defmodule Mongo do
939990
defp key_to_string(key) when is_atom(key), do: Atom.to_string(key)
940991
defp key_to_string(key) when is_binary(key), do: key
941992

942-
defp cursor(conn, coll, query, opts) do
943-
%Mongo.Cursor{
944-
conn: conn,
945-
coll: coll,
946-
query: query,
947-
opts: opts}
993+
defp cursor(topology_pid, cmd, opts) do
994+
%Mongo.Cursor{topology_pid: topology_pid, cmd: cmd, on_resume_token: nil, opts: opts}
995+
end
996+
997+
defp change_stream_cursor(topology_pid, cmd, fun, opts) do
998+
%Mongo.Cursor{topology_pid: topology_pid, cmd: cmd, on_resume_token: fun, opts: opts}
948999
end
9491000

9501001
defp filter_nils(keyword) when is_list(keyword) do

0 commit comments

Comments
 (0)