Skip to content

Commit 1e3ae3c

Browse files
committed
refactored cursor struct and split the code into stream and change_stream, added "errorLabels" for #47
1 parent 9bc843e commit 1e3ae3c

File tree

7 files changed

+239
-154
lines changed

7 files changed

+239
-154
lines changed

lib/mongo.ex

Lines changed: 22 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -333,7 +333,7 @@ defmodule Mongo do
333333

334334
opts = Keyword.drop(opts, ~w(explain allow_disk_use collation bypass_document_validation hint comment read_concern)a)
335335

336-
cursor(topology_pid, cmd, opts)
336+
get_stream(topology_pid, cmd, opts)
337337
end
338338

339339
@doc """
@@ -635,7 +635,12 @@ defmodule Mongo do
635635

636636
drop = ~w(limit hint single_batch read_concern max min collation return_key show_record_id tailable no_cursor_timeout await_data batch_size projection comment max_time skip sort)a
637637
opts = Keyword.drop(opts, drop)
638-
cursor(topology_pid, cmd, opts)
638+
try do
639+
get_stream(topology_pid, cmd, opts)
640+
rescue
641+
error -> {:error, error}
642+
end
643+
639644
end
640645

641646
@doc """
@@ -658,9 +663,15 @@ defmodule Mongo do
658663
|> Keyword.put(:limit, 1)
659664
|> Keyword.put(:batch_size, 1)
660665

661-
topology_pid
662-
|> find(coll, filter, opts)
663-
|> Enum.at(0)
666+
try do
667+
case find(topology_pid, coll, filter, opts) do
668+
{:error, error} -> {:error, error}
669+
other -> Enum.at(other, 0)
670+
end
671+
rescue
672+
error -> {:error, error}
673+
end
674+
664675
end
665676

666677
@doc """
@@ -704,7 +715,7 @@ defmodule Mongo do
704715
end
705716

706717
defp check_for_error(%{"ok" => ok} = response) when ok == 1, do: {:ok, response}
707-
defp check_for_error(%{"code" => code, "errmsg" => msg}), do: {:error, Mongo.Error.exception(message: msg, code: code)}
718+
defp check_for_error(doc), do: {:error, Mongo.Error.exception(doc)}
708719

709720
@doc """
710721
Returns the wire version of the database
@@ -1095,7 +1106,7 @@ defmodule Mongo do
10951106
@spec list_indexes(GenServer.server, String.t, Keyword.t) :: cursor
10961107
def list_indexes(topology_pid, coll, opts \\ []) do
10971108
cmd = [listIndexes: coll]
1098-
cursor(topology_pid, cmd, opts)
1109+
get_stream(topology_pid, cmd, opts)
10991110
end
11001111

11011112
@doc """
@@ -1150,20 +1161,20 @@ defmodule Mongo do
11501161
# https://github.com/mongodb/specifications/blob/f4bb783627e7ed5c4095c5554d35287956ef8970/source/enumerate-collections.rst#post-mongodb-280-rc3-versions
11511162
#
11521163
cmd = [listCollections: 1]
1153-
cursor(topology_pid, cmd, opts)
1164+
get_stream(topology_pid, cmd, opts)
11541165
|> Stream.filter(fn
11551166
%{"type" => name} -> name == "collection"
11561167
_ -> true
11571168
end)
11581169
|> Stream.map(fn coll -> coll["name"] end)
11591170
end
11601171

1161-
defp cursor(topology_pid, cmd, opts) do
1162-
%Mongo.Cursor{topology_pid: topology_pid, cmd: cmd, on_resume_token: nil, opts: opts}
1172+
defp get_stream(topology_pid, cmd, opts) do
1173+
Mongo.Stream.new(topology_pid, cmd, opts)
11631174
end
11641175

11651176
defp change_stream_cursor(topology_pid, cmd, fun, opts) do
1166-
%Mongo.Cursor{topology_pid: topology_pid, cmd: cmd, on_resume_token: fun, opts: opts}
1177+
Mongo.ChangeStream.new(topology_pid, cmd, fun, opts)
11671178
end
11681179

11691180
##

lib/mongo/cursor.ex renamed to lib/mongo/change_stream.ex

Lines changed: 67 additions & 115 deletions
Original file line numberDiff line numberDiff line change
@@ -1,91 +1,52 @@
1-
defmodule Mongo.Cursor do
2-
@moduledoc"""
3-
MongoDB Cursor as a stream. There are two variants:  
4-
* normal cursor: This is called in batch mode and closes automatically with a kill cursor command.  
5-
* change stream cursor: This will operate a change stream. MongoDB does not return documents after the time has expired. In this case
6-
`get_more` will called again. No kill cursor command is invoked just because no documents are being returned. In case of error
7-
  a resume process is started so that events can be received again without losing any previous events.
8-
  The resume process requires a resume token or an operation time. These are cached by the cursor. One can
9-
  determine the resume token via a function (on_resume_token). It is called when the resume token changed.
10-
"""
1+
defmodule Mongo.ChangeStream do
112

123
alias Mongo.Session
13-
alias Mongo.Cursor
144

155
import Record, only: [defrecordp: 2]
166

17-
@type t :: %__MODULE__{
18-
topology_pid: GenServer.server,
19-
cmd: BSON.document,
20-
on_resume_token: fun,
21-
opts: Keyword.t
22-
}
23-
24-
defstruct [:topology_pid, :cmd, :on_resume_token, :opts]
25-
7+
defstruct [:topology_pid, :session, :doc, :cmd, :on_resume_token, :opts]
8+
9+
def new(topology_pid, cmd, on_resume_token_fun, opts) do
10+
with new_cmd = Mongo.ReadPreference.add_read_preference(cmd, opts),
11+
{:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
12+
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, new_cmd, opts) do
13+
%Mongo.ChangeStream{
14+
topology_pid: topology_pid,
15+
session: session,
16+
doc: doc,
17+
on_resume_token: on_resume_token_fun,
18+
cmd: cmd,
19+
opts: opts
20+
}
21+
end
22+
end
2623
defimpl Enumerable do
2724

2825
defrecordp :change_stream, [:resume_token, :op_time, :cmd, :on_resume_token]
2926
defrecordp :state, [:topology_pid, :session, :cursor, :coll, :change_stream, :docs]
3027

31-
def reduce(%Cursor{topology_pid: topology_pid, cmd: cmd, on_resume_token: on_resume_token_fun, opts: opts}, acc, reduce_fun) do
32-
33-
start_fun = start_fun(topology_pid, cmd, on_resume_token_fun, opts)
34-
next_fun = next_fun(opts)
35-
after_fun = after_fun(opts)
28+
def reduce(change_stream, acc, reduce_fun) do
3629

37-
Stream.resource(start_fun, next_fun, after_fun).(acc, reduce_fun)
38-
end
39-
40-
##
41-
# start of a regular cursor
42-
#
43-
defp start_fun(topology_pid, cmd, nil, opts) do
44-
fn ->
45-
46-
with cmd = Mongo.ReadPreference.add_read_preference(cmd, opts),
47-
{:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
48-
{:ok,
49-
%{"ok" => ok,
50-
"cursor" => %{
51-
"id" => cursor_id,
52-
"ns" => coll,
53-
"firstBatch" => docs}}} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do
54-
state(topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, docs: docs)
55-
end
56-
57-
end
58-
end
59-
60-
##
61-
# start of a change stream cursor
62-
#
63-
defp start_fun(topology_pid, cmd, fun, opts) do
64-
fn ->
65-
with {:ok, state} <- aggregate(topology_pid, cmd, fun, opts) do
30+
start_fun = fn ->
31+
with {:ok, state} <- aggregate(change_stream.topology_pid, change_stream.session, change_stream.doc, change_stream.cmd, change_stream.on_resume_token) do
6632
state
67-
end
33+
end
6834
end
35+
next_fun = next_fun(change_stream.opts)
36+
after_fun = after_fun(change_stream.opts)
37+
38+
Stream.resource(start_fun, next_fun, after_fun).(acc, reduce_fun)
6939
end
7040

7141
defp next_fun(opts) do
7242
fn
7343
state(docs: [], cursor: 0) = state -> {:halt, state}
7444

75-
# this is a regular cursor
76-
state(docs: [], topology_pid: topology_pid, session: session, cursor: cursor, change_stream: nil, coll: coll) = state ->
77-
case get_more(topology_pid, session, only_coll(coll), cursor, nil, opts) do
78-
{:ok, %{cursor_id: cursor_id, docs: []}} -> {:halt, state(state, cursor: cursor_id)}
79-
{:ok, %{cursor_id: cursor_id, docs: docs}} -> {docs, state(state, cursor: cursor_id)}
80-
{:error, error} -> raise error
81-
end
82-
83-
# this is a change stream cursor
8445
state(docs: [], topology_pid: topology_pid, session: session, cursor: cursor, change_stream: change_stream, coll: coll) = state ->
8546
case get_more(topology_pid, session, only_coll(coll), cursor, change_stream, opts) do
8647
{:ok, %{cursor_id: cursor_id,
87-
docs: docs,
88-
change_stream: change_stream}} -> {docs, state(state, cursor: cursor_id, change_stream: change_stream)}
48+
docs: docs,
49+
change_stream: change_stream}} -> {docs, state(state, cursor: cursor_id, change_stream: change_stream)}
8950
{:resume, state(docs: docs) = state} -> {docs, state(state, docs: [])}
9051
{:error, error} -> raise error
9152
end
@@ -99,63 +60,55 @@ defmodule Mongo.Cursor do
9960

10061
with new_cmd = Mongo.ReadPreference.add_read_preference(cmd, opts),
10162
{:ok, session} <- Session.start_implicit_session(topology_pid, :read, opts),
102-
{:ok, %{"ok" => ok,
103-
"operationTime" => op_time,
63+
{:ok, %{"ok" => ok} = doc} when ok == 1 <- Mongo.exec_command_session(session, new_cmd, opts) do
64+
65+
aggregate(topology_pid, session, doc, cmd, fun)
66+
end
67+
end
68+
69+
def aggregate(topology_pid, session, doc, cmd, fun) do
70+
71+
with %{"operationTime" => op_time,
10472
"cursor" => %{
10573
"id" => cursor_id,
10674
"ns" => coll,
107-
"firstBatch" => docs} = response}} when ok == 1 <- Mongo.exec_command_session(session, new_cmd, opts),
75+
"firstBatch" => docs} = response} <- doc,
10876
{:ok, wire_version} <- Mongo.wire_version(topology_pid) do
10977

110-
[%{"$changeStream" => stream_opts} | _pipeline] = Keyword.get(new_cmd, :pipeline) # extract the change stream options
78+
[%{"$changeStream" => stream_opts} | _pipeline] = Keyword.get(cmd, :pipeline) # extract the change stream options
11179

112-
# The ChangeStream MUST save the operationTime from the initial aggregate response when the following critera are met:
113-
#
114-
# None of startAtOperationTime, resumeAfter, startAfter were specified in the ChangeStreamOptions.
115-
# The max wire version is >= 7.
116-
# The initial aggregate response had no results.
117-
# The initial aggregate response did not include a postBatchResumeToken.
80+
# The ChangeStream MUST save the operationTime from the initial aggregate response when the following critera are met:
81+
#
82+
# None of startAtOperationTime, resumeAfter, startAfter were specified in the ChangeStreamOptions.
83+
# The max wire version is >= 7.
84+
# The initial aggregate response had no results.
85+
# The initial aggregate response did not include a postBatchResumeToken.
11886

119-
has_values = stream_opts["startAtOperationTime"] || stream_opts["startAfter"] || stream_opts["resumeAfter"]
120-
op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], wire_version)
87+
has_values = stream_opts["startAtOperationTime"] || stream_opts["startAfter"] || stream_opts["resumeAfter"]
88+
op_time = update_operation_time(op_time, has_values, docs, response["postBatchResumeToken"], wire_version)
12189

122-
# When the ChangeStream is started:
123-
# If startAfter is set, cache it.
124-
# Else if resumeAfter is set, cache it.
125-
# Else, resumeToken remains unset.
126-
resume_token = stream_opts["startAfter"] || stream_opts["resumeAfter"]
127-
resume_token = update_resume_token(resume_token, response["postBatchResumeToken"], List.last(docs))
90+
# When the ChangeStream is started:
91+
# If startAfter is set, cache it.
92+
# Else if resumeAfter is set, cache it.
93+
# Else, resumeToken remains unset.
94+
resume_token = stream_opts["startAfter"] || stream_opts["resumeAfter"]
95+
resume_token = update_resume_token(resume_token, response["postBatchResumeToken"], List.last(docs))
12896

129-
fun.(resume_token)
97+
fun.(resume_token)
13098

131-
change_stream = change_stream(resume_token: resume_token, op_time: op_time, cmd: cmd, on_resume_token: fun)
99+
change_stream = change_stream(resume_token: resume_token, op_time: op_time, cmd: cmd, on_resume_token: fun)
132100

133-
{:ok, state(topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, change_stream: change_stream, docs: docs)}
101+
{:ok, state(topology_pid: topology_pid, session: session, cursor: cursor_id, coll: coll, change_stream: change_stream, docs: docs)}
134102
end
135103
end
136104

137105
@doc """
138106
Calls the GetCore-Command
139107
See https://github.com/mongodb/specifications/blob/master/source/find_getmore_killcursors_commands.rst
140108
"""
141-
def get_more(_topology_pid, session, coll, cursor, nil, opts) do
142-
143-
cmd = [
144-
getMore: %BSON.LongNumber{value: cursor},
145-
collection: coll,
146-
batchSize: opts[:batch_size],
147-
maxTimeMS: opts[:max_time]
148-
] |> filter_nils()
149-
150-
with {:ok, %{"cursor" => %{ "id" => cursor_id, "nextBatch" => docs}, "ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do
151-
{:ok, %{cursor_id: cursor_id, docs: docs}}
152-
end
153-
154-
end
155-
156109
def get_more(topology_pid, session, coll, cursor_id,
157-
change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd,
158-
on_resume_token: fun) = change_stream, opts) do
110+
change_stream(resume_token: resume_token, op_time: op_time, cmd: aggregate_cmd,
111+
on_resume_token: fun) = change_stream, opts) do
159112

160113
get_more = [
161114
getMore: %BSON.LongNumber{value: cursor_id},
@@ -165,9 +118,9 @@ defmodule Mongo.Cursor do
165118
] |> filter_nils()
166119

167120
with {:ok, %{"operationTime" => op_time,
168-
"cursor" => %{"id" => new_cursor_id,
169-
"nextBatch" => docs} = cursor,
170-
"ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, get_more, opts) do
121+
"cursor" => %{"id" => new_cursor_id,
122+
"nextBatch" => docs} = cursor,
123+
"ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, get_more, opts) do
171124

172125
old_token = change_stream(change_stream, :resume_token)
173126
change_stream = update_change_stream(change_stream, cursor["postBatchResumeToken"], op_time, List.last(docs))
@@ -282,14 +235,14 @@ defmodule Mongo.Cursor do
282235
def kill_cursors(session, coll, cursor_ids, opts) do
283236

284237
cmd = [
285-
killCursors: coll,
286-
cursors: cursor_ids |> Enum.map(fn id -> %BSON.LongNumber{value: id} end)
287-
] |> filter_nils()
238+
killCursors: coll,
239+
cursors: cursor_ids |> Enum.map(fn id -> %BSON.LongNumber{value: id} end)
240+
] |> filter_nils()
288241

289242
with {:ok, %{"cursorsAlive" => [],
290-
"cursorsNotFound" => [],
291-
"cursorsUnknown" => [],
292-
"ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do
243+
"cursorsNotFound" => [],
244+
"cursorsUnknown" => [],
245+
"ok" => ok}} when ok == 1 <- Mongo.exec_command_session(session, cmd, opts) do
293246
:ok
294247
end
295248
end
@@ -320,6 +273,5 @@ defmodule Mongo.Cursor do
320273
def count(_stream), do: {:error, __MODULE__}
321274
def member?(_stream, _term), do: {:error, __MODULE__}
322275

323-
324276
end
325-
end
277+
end

lib/mongo/error.ex

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,11 @@
11
defmodule Mongo.Error do
2-
defexception [:message, :code, :host]
2+
defexception [:message, :code, :host, :error_labels]
33

44
@type t :: %__MODULE__{
55
message: String.t,
66
code: number,
7-
host: String.t
7+
host: String.t,
8+
error_labels: [String.t] | nil
89
}
910

1011
def message(e) do
@@ -22,6 +23,9 @@ defmodule Mongo.Error do
2223
%Mongo.Error{message: "#{host} ssl #{action}: #{formatted_reason} - #{inspect(reason)}", host: host}
2324
end
2425

26+
def exception(%{"code" => code, "errmsg" => msg} = doc) do
27+
%Mongo.Error{message: msg, code: code, error_labels: doc["errorLabels"]}
28+
end
2529
def exception(message: message, code: code) do
2630
%Mongo.Error{message: message, code: code}
2731
end

0 commit comments

Comments
 (0)