@@ -8,26 +8,29 @@ defmodule Mongo.ChangeStreamTest do
8
8
end
9
9
10
10
def consumer_1 ( top , monitor ) do
11
+ Process . sleep ( 1000 )
11
12
cursor = Mongo . watch_collection ( top , "users" , [ ] , fn doc -> IO . puts ( "Token #{ inspect doc } " ) ; send ( monitor , { :token , doc } ) end , max_time: 1_000 , debug: true )
12
13
result = cursor |> Enum . take ( 2 ) |> Enum . at ( 0 )
13
14
send ( monitor , { :insert , result } )
14
15
end
15
16
16
17
def consumer_2 ( top , monitor , token ) do
18
+ Process . sleep ( 1000 )
17
19
cursor = Mongo . watch_collection ( top , "users" , [ ] , fn doc -> IO . puts ( "Token #{ inspect doc } " ) ; send ( monitor , { :token , doc } ) end , resume_after: token , max_time: 1_000 )
18
20
result = cursor |> Enum . take ( 1 ) |> Enum . at ( 0 )
19
21
send ( monitor , { :insert , result } )
20
22
end
21
23
22
24
def consumer_3 ( top , monitor , token ) do
25
+ Process . sleep ( 1000 )
23
26
cursor = Mongo . watch_collection ( top , "users" , [ ] , fn doc -> IO . puts ( "Token #{ inspect doc } " ) ; send ( monitor , { :token , doc } ) end , resume_after: token , max_time: 1_000 )
24
27
result = cursor |> Enum . take ( 4 ) |> Enum . map ( fn % { "fullDocument" => % { "name" => name } } -> name end )
25
28
send ( monitor , { :insert , result } )
26
29
27
30
end
28
31
29
32
def producer ( top ) do
30
- Process . sleep ( 300 )
33
+ Process . sleep ( 2000 )
31
34
assert { :ok , % Mongo.InsertOneResult { } } = Mongo . insert_one ( top , "users" , % { name: "Greta" } )
32
35
assert { :ok , % Mongo.InsertOneResult { } } = Mongo . insert_one ( top , "users" , % { name: "Gustav" } )
33
36
assert { :ok , % Mongo.InsertOneResult { } } = Mongo . insert_one ( top , "users" , % { name: "Tom" } )
@@ -40,9 +43,9 @@ defmodule Mongo.ChangeStreamTest do
40
43
spawn ( fn -> consumer_1 ( top , me ) end )
41
44
spawn ( fn -> producer ( top ) end )
42
45
43
- assert_receive { :token , nil } , 2_000
44
- assert_receive { :token , token } , 2_000
45
- assert_receive { :insert , % { "fullDocument" => % { "name" => "Greta" } } } , 2_000
46
+ assert_receive { :token , nil } , 5_000
47
+ assert_receive { :token , token } , 5_000
48
+ assert_receive { :insert , % { "fullDocument" => % { "name" => "Greta" } } } , 5_000
46
49
47
50
Process . sleep ( 500 )
48
51
@@ -51,16 +54,18 @@ defmodule Mongo.ChangeStreamTest do
51
54
spawn ( fn -> consumer_2 ( top , me , token ) end )
52
55
spawn ( fn -> producer ( top ) end )
53
56
54
- assert_receive { :token , _ } , 2_000
55
- assert_receive { :insert , % { "fullDocument" => % { "name" => "Gustav" } } } , 2_000
57
+ assert_receive { :token , _ } , 5_000
58
+ assert_receive { :insert , % { "fullDocument" => % { "name" => "Gustav" } } } , 5_000
56
59
57
60
Process . sleep ( 500 )
58
61
59
62
spawn ( fn -> consumer_3 ( top , me , token ) end )
60
63
spawn ( fn -> producer ( top ) end )
61
64
62
- assert_receive { :token , _ } , 2_000
63
- assert_receive { :insert , [ "Gustav" , "Tom" , "Liese" , "Greta" ] } , 2_000
65
+ assert_receive { :token , _ } , 5_000
66
+ assert_receive { :insert , [ "Gustav" , "Tom" , "Liese" , "Greta" ] } , 5_000
67
+
68
+ Process . sleep ( 1000 )
64
69
65
70
end
66
71
end
0 commit comments