Skip to content

Commit e31a0ef

Browse files
committed
PYTHON-1911 Implement missing changeStream prose tests
1 parent d0423d2 commit e31a0ef

File tree

2 files changed

+80
-7
lines changed

2 files changed

+80
-7
lines changed

test/__init__.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
except ImportError:
3030
HAVE_IPADDRESS = False
3131

32+
from contextlib import contextmanager
3233
from functools import wraps
3334
from unittest import SkipTest
3435

@@ -585,6 +586,13 @@ def require_test_commands(self, func):
585586
"Test commands must be enabled",
586587
func=func)
587588

589+
def require_failCommand_fail_point(self, func):
590+
"""Run a test only if the server supports the failCommand fail
591+
point."""
592+
return self._require(lambda: self.supports_failCommand_fail_point,
593+
"failCommand fail point must be supported",
594+
func=func)
595+
588596
def require_ssl(self, func):
589597
"""Run a test only if the client can connect over SSL."""
590598
return self._require(lambda: self.ssl,
@@ -656,6 +664,17 @@ def supports_getpreverror(self):
656664
"""Does the connected server support getpreverror?"""
657665
return not (self.version.at_least(4, 1, 0) or self.is_mongos)
658666

667+
@property
668+
def supports_failCommand_fail_point(self):
669+
"""Does the server support the failCommand fail point?"""
670+
if self.is_mongos:
671+
return (self.version.at_least(4, 1, 5) and
672+
self.test_commands_enabled)
673+
else:
674+
return (self.version.at_least(4, 0) and
675+
self.test_commands_enabled)
676+
677+
659678
@property
660679
def requires_hint_with_min_max_queries(self):
661680
"""Does the server require a hint with min/max queries."""
@@ -713,6 +732,17 @@ def setUpClass(cls):
713732
else:
714733
cls.credentials = {}
715734

735+
@contextmanager
736+
def fail_point(self, command_args):
737+
cmd_on = SON([('configureFailPoint', 'failCommand')])
738+
cmd_on.update(command_args)
739+
self.client.admin.command(cmd_on)
740+
try:
741+
yield
742+
finally:
743+
cmd_off = {'configureFailPoint': cmd_on['configureFailPoint'],
744+
'mode': 'off'}
745+
self.client.admin.command(cmd_off)
716746

717747
# Use assertRaisesRegex if available, otherwise use Python 2.7's
718748
# deprecated assertRaisesRegexp, with a 'p'.

test/test_change_stream.py

Lines changed: 50 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import time
2424
import uuid
2525

26+
from contextlib import contextmanager
2627
from itertools import product
2728

2829
sys.path[0:0] = ['']
@@ -535,26 +536,65 @@ def test_resume_on_error(self):
535536
self.kill_change_stream_cursor(change_stream)
536537
self.insert_one_and_check(change_stream, {'_id': 2})
537538

539+
# Prose test no. 4
540+
@client_context.require_failCommand_fail_point
541+
def test_no_resume_attempt_if_aggregate_command_fails(self):
542+
# Set non-retryable error on aggregate command.
543+
fail_point = {'mode': {'times': 1},
544+
'data': {'errorCode': 2, 'failCommands': ['aggregate']}}
545+
client, listener = self._client_with_listener("aggregate", "getMore")
546+
with self.fail_point(fail_point):
547+
try:
548+
_ = self.change_stream_with_client(client)
549+
except OperationFailure:
550+
pass
551+
552+
# Driver should have attempted aggregate command only once.
553+
self.assertEqual(len(listener.results['started']), 1)
554+
self.assertEqual(listener.results['started'][0].command_name,
555+
'aggregate')
556+
538557
# Prose test no. 5
539558
def test_does_not_resume_fatal_errors(self):
540559
"""ChangeStream will not attempt to resume fatal server errors."""
541-
for code in _NON_RESUMABLE_GETMORE_ERRORS:
542-
with self.change_stream() as change_stream:
543-
self.watched_collection().insert_one({})
544-
560+
if client_context.supports_failCommand_fail_point:
561+
# failCommand does not support returning no errorCode.
562+
TEST_ERROR_CODES = _NON_RESUMABLE_GETMORE_ERRORS - {None}
563+
@contextmanager
564+
def generate_error(change_stream, code):
565+
fail_point = {'mode': {'times': 1}, 'data': {
566+
'errorCode': code, 'failCommands': ['getMore']}}
567+
with self.fail_point(fail_point):
568+
yield
569+
else:
570+
TEST_ERROR_CODES = _NON_RESUMABLE_GETMORE_ERRORS
571+
@contextmanager
572+
def generate_error(change_stream, code):
545573
def mock_try_next(*args, **kwargs):
546574
change_stream._cursor.close()
547575
raise OperationFailure('Mock server error', code=code)
548576

549577
original_try_next = change_stream._cursor._try_next
550578
change_stream._cursor._try_next = mock_try_next
579+
try:
580+
yield
581+
finally:
582+
change_stream._cursor._try_next = original_try_next
551583

552-
with self.assertRaises(OperationFailure):
553-
next(change_stream)
554-
change_stream._cursor._try_next = original_try_next
584+
for code in TEST_ERROR_CODES:
585+
with self.change_stream() as change_stream:
586+
self.watched_collection().insert_one({})
587+
with generate_error(change_stream, code):
588+
with self.assertRaises(OperationFailure):
589+
next(change_stream)
555590
with self.assertRaises(StopIteration):
556591
next(change_stream)
557592

593+
# Prose test no. 6 - SKIPPED
594+
# readPreference is not configurable using the watch() helpers so we can
595+
# skip this test. Also, PyMongo performs server selection for each
596+
# operation which ensure compliance with this prose test.
597+
558598
# Prose test no. 7
559599
def test_initial_empty_batch(self):
560600
with self.change_stream() as change_stream:
@@ -603,6 +643,9 @@ def test_start_at_operation_time_caching(self):
603643
"startAtOperationTime"), optime, str([k.command for k in
604644
listener.results['started']]))
605645

646+
# Prose test no. 10 - SKIPPED
647+
# This test is identical to prose test no. 3.
648+
606649
# Prose test no. 11
607650
@client_context.require_version_min(4, 0, 7)
608651
def test_resumetoken_empty_batch(self):

0 commit comments

Comments
 (0)