36
36
from bson .py3compat import iteritems
37
37
from bson .raw_bson import DEFAULT_RAW_BSON_OPTIONS , RawBSONDocument
38
38
39
- from pymongo import monitoring
40
39
from pymongo .change_stream import _NON_RESUMABLE_GETMORE_ERRORS
41
40
from pymongo .command_cursor import CommandCursor
42
41
from pymongo .errors import (InvalidOperation , OperationFailure ,
43
42
ServerSelectionTimeoutError )
44
43
from pymongo .message import _CursorAddress
45
44
from pymongo .read_concern import ReadConcern
45
+ from pymongo .write_concern import WriteConcern
46
46
47
47
from test import client_context , unittest , IntegrationTest
48
48
from test .utils import (
@@ -70,10 +70,13 @@ def kill_change_stream_cursor(self, change_stream):
70
70
client ._close_cursor_now (cursor .cursor_id , address )
71
71
72
72
def test_try_next (self ):
73
- coll = self .watched_collection ()
73
+ # ChangeStreams only read majority committed data so use w:majority.
74
+ coll = self .watched_collection ().with_options (
75
+ write_concern = WriteConcern ("majority" ))
76
+ coll .drop ()
74
77
coll .insert_one ({})
75
78
self .addCleanup (coll .drop )
76
- with self .change_stream (max_await_time_ms = 100 ) as stream :
79
+ with self .change_stream (max_await_time_ms = 250 ) as stream :
77
80
self .assertIsNone (stream .try_next ())
78
81
self .assertIsNone (stream ._resume_token )
79
82
coll .insert_one ({})
@@ -88,14 +91,16 @@ def test_try_next_runs_one_getmore(self):
88
91
# Connect to the cluster.
89
92
client .admin .command ('ping' )
90
93
listener .results .clear ()
91
- coll = self .watched_collection ()
94
+ # ChangeStreams only read majority committed data so use w:majority.
95
+ coll = self .watched_collection ().with_options (
96
+ write_concern = WriteConcern ("majority" ))
92
97
coll .drop ()
93
98
# Create the watched collection before starting the change stream to
94
99
# skip any "create" events.
95
100
coll .insert_one ({'_id' : 1 })
96
101
self .addCleanup (coll .drop )
97
102
with self .change_stream_with_client (
98
- client , max_await_time_ms = 100 ) as stream :
103
+ client , max_await_time_ms = 250 ) as stream :
99
104
self .assertEqual (listener .started_command_names (), ["aggregate" ])
100
105
listener .results .clear ()
101
106
0 commit comments