Skip to content

Commit 9dfcc1e

Browse files
committed
fixes #6
1 parent b576088 commit 9dfcc1e

File tree

6 files changed

+108
-127
lines changed

6 files changed

+108
-127
lines changed

.travis.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ elixir:
66
- 1.5
77
- 1.6
88
- 1.7
9+
- 1.8
910

1011
cache:
1112
- apt
@@ -29,7 +30,7 @@ before_script:
2930

3031
script:
3132
- mix test
32-
- not_allowed=(1.3 1.4 1.5 1.6 1.7)
33+
- not_allowed=(1.3 1.4 1.5 1.6 1.7 1.8)
3334
- case "${not_allowed[@]}" in *"${TRAVIS_ELIXIR_VERSION}"*) dialyzer=0 ;; *) dialyzer=1 ;; esac
3435
- if [ "$dialyzer" -eq 1 ]; then mix dialyzer --halt-exit-status; fi
3536

lib/mongo.ex

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -139,8 +139,15 @@ defmodule Mongo do
139139
Mongo.IdServer.new
140140
end
141141

142-
@doc"""
142+
@doc """
143+
Converts the DataTime to a MongoDB timestamp.
144+
"""
145+
@spec timestamp(DateTime.t) :: BSON.Timestamp.t
146+
def timestamp(datetime) do
147+
%BSON.Timestamp{value: DateTime.to_unix(datetime), ordinal: 1}
148+
end
143149

150+
@doc"""
144151
Creates a change stream cursor on collections.
145152
146153
`on_resume_token` is function that takes the new resume token, if it changed.
@@ -188,9 +195,26 @@ defmodule Mongo do
188195

189196
end
190197

198+
@doc"""
199+
Creates a change stream cursor all collections of the database.
200+
201+
`on_resume_token` is function that takes the new resume token, if it changed.
202+
203+
## Options
204+
205+
* `:full_document` -
206+
* `:max_time` - Specifies a time limit in milliseconds. This option is used on `getMore` commands
207+
* `:batch_size` - Specifies the number of maximum number of documents to
208+
return (default: 1)
209+
* `:resume_after` - Specifies the logical starting point for the new change stream.
210+
* `:start_at_operation_time` - The change stream will only provide changes that occurred at or after the specified timestamp (since 4.0)
211+
* `:start_after` - Similar to `resumeAfter`, this option takes a resume token and starts a new change stream
212+
returning the first notification after the token. This will allow users to watch collections that have been dropped and recreated
213+
or newly renamed collections without missing any notifications. (since 4.0.7)
214+
"""
191215
@spec watch_db(GenServer.server, [BSON.document], fun, Keyword.it) :: cursor
192216
def watch_db(topology_pid, pipeline, on_resume_token \\ nil, opts \\ []) do
193-
watch_collection(topology_pid, 0, pipeline, on_resume_token, opts)
217+
watch_collection(topology_pid, 1, pipeline, on_resume_token, opts)
194218
end
195219

196220
@doc """
@@ -537,16 +561,10 @@ defmodule Mongo do
537561

538562
@doc false
539563
@spec direct_command(pid, BSON.document, Keyword.t) :: {:ok, BSON.document | nil} | {:error, Mongo.Error.t}
540-
def direct_command(conn, query, opts \\ []) do
541-
params = [query]
542-
543-
query = case Keyword.has_key?(opts, :error) do
544-
false -> %Query{action: :command}
545-
true -> %Query{action: :error}
546-
end
547-
## query = %Query{action: :command}
564+
def direct_command(conn, cmd, opts \\ []) do
565+
action = %Query{action: :command}
548566

549-
with {:ok, _query, response} <- DBConnection.execute(conn, query, params, defaults(opts)) do
567+
with {:ok, _query, response} <- DBConnection.execute(conn, action, [cmd], defaults(opts)) do
550568
case response do
551569
op_reply(flags: flags, docs: [%{"$err" => reason, "code" => code}]) when (@reply_query_failure &&& flags) != 0 -> {:error, Mongo.Error.exception(message: reason, code: code)}
552570
op_reply(flags: flags) when (@reply_cursor_not_found &&& flags) != 0 -> {:error, Mongo.Error.exception(message: "cursor not found")}

0 commit comments

Comments
 (0)