Skip to content

Commit c55d681

Browse files
committed
removed debug code and added a test case for change streams
1 parent a0d243c commit c55d681

File tree

2 files changed

+75
-7
lines changed

2 files changed

+75
-7
lines changed

lib/mongo/cursor.ex

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -155,20 +155,18 @@ defmodule Mongo.Cursor do
155155
maxTimeMS: opts[:max_time]
156156
] |> filter_nils()
157157

158-
kill_cursors(conn, coll, [cursor_id], opts)
159-
160158
with {:ok, %{"operationTime" => op_time,
161159
"cursor" => %{"id" => new_cursor_id,
162-
"nextBatch" => docs} = cursor,
163-
"ok" => ok}} when ok == 1 <- Mongo.direct_command(conn, get_more, opts) do
160+
"nextBatch" => docs} = cursor,
161+
"ok" => ok}} when ok == 1 <- Mongo.direct_command(conn, get_more, opts) do
164162

165163
old_token = change_stream(change_stream, :resume_token)
166164
change_stream = update_change_stream(change_stream, cursor["postBatchResumeToken"], op_time, List.last(docs))
167165
new_token = change_stream(change_stream, :resume_token)
168166

169-
case Map.equal?(old_token, new_token) do
170-
false -> fun.(new_token)
171-
true -> nil
167+
case token_changes(old_token, new_token) do
168+
true -> fun.(new_token)
169+
false -> nil
172170
end
173171

174172
{:ok, %{cursor_id: new_cursor_id, docs: docs, change_stream: change_stream}}
@@ -195,6 +193,11 @@ defmodule Mongo.Cursor do
195193
end
196194
end
197195

196+
defp token_changes(nil, nil), do: false
197+
defp token_changes(nil, _new_token), do: true
198+
defp token_changes(_old_token, nil), do: true
199+
defp token_changes(old_token, new_token), do: not(Map.equal?(old_token, new_token))
200+
198201
##
199202
# we are updating the resume token by matching different cases
200203
#

test/mongo/change_stream_test.exs

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
defmodule Mongo.ChangeStreamTest do
2+
use ExUnit.Case # DO NOT MAKE ASYNCHRONOUS
3+
4+
setup_all do
5+
assert {:ok, top} = Mongo.TestConnection.connect
6+
assert {:ok, %Mongo.InsertOneResult{}} = Mongo.insert_one(top, "users", %{name: "Waldo"})
7+
%{pid: top}
8+
end
9+
10+
def consumer_1(top, monitor) do
11+
cursor = Mongo.watch_collection(top, "users", [], fn doc -> send(monitor, {:token, doc}) end, max_time: 1_000 )
12+
result = cursor |> Enum.take(2) |> Enum.at(0)
13+
send(monitor, {:insert, result})
14+
end
15+
16+
def consumer_2(top, monitor, token) do
17+
cursor = Mongo.watch_collection(top, "users", [], fn doc -> send(monitor, {:token, doc}) end, resume_after: token, max_time: 1_000 )
18+
result = cursor |> Enum.take(1) |> Enum.at(0)
19+
send(monitor, {:insert, result})
20+
end
21+
22+
def consumer_3(top, monitor, token) do
23+
cursor = Mongo.watch_collection(top, "users", [], fn doc -> send(monitor, {:token, doc}) end, resume_after: token, max_time: 1_000 )
24+
result = cursor |> Enum.take(4) |> Enum.map(fn %{"fullDocument" => %{"name" => name}} -> name end)
25+
send(monitor, {:insert, result})
26+
27+
end
28+
29+
def producer(top) do
30+
Process.sleep(100)
31+
assert {:ok, %Mongo.InsertOneResult{}} = Mongo.insert_one(top, "users", %{name: "Greta"})
32+
assert {:ok, %Mongo.InsertOneResult{}} = Mongo.insert_one(top, "users", %{name: "Gustav"})
33+
assert {:ok, %Mongo.InsertOneResult{}} = Mongo.insert_one(top, "users", %{name: "Tom"})
34+
end
35+
36+
test "change stream: watch and resume_after", %{pid: top} do
37+
38+
me = self()
39+
spawn(fn -> consumer_1(top, me) end)
40+
spawn(fn -> producer(top) end)
41+
42+
assert_receive {:token, nil}, 2_000
43+
assert_receive {:token, token}, 2_000
44+
assert_receive {:insert, %{"fullDocument" => %{"name" => "Greta"}}}, 2_000
45+
46+
Process.sleep(500)
47+
48+
assert {:ok, %Mongo.InsertOneResult{}} = Mongo.insert_one(top, "users", %{name: "Liese"})
49+
50+
spawn(fn -> consumer_2(top, me, token) end)
51+
spawn(fn -> producer(top) end)
52+
53+
assert_receive {:token, _}, 2_000
54+
assert_receive {:insert, %{"fullDocument" => %{"name" => "Gustav"}}}, 2_000
55+
56+
Process.sleep(500)
57+
58+
spawn(fn -> consumer_3(top, me, token) end)
59+
spawn(fn -> producer(top) end)
60+
61+
assert_receive {:token, _}, 2_000
62+
assert_receive {:insert, ["Gustav", "Tom", "Liese", "Greta"]}, 2_000
63+
64+
end
65+
end

0 commit comments

Comments
 (0)