Skip to content

Commit 33c9322

Browse files
committed
PYTHON-1651 Publish CommandFailedEvent when bulk write fails with a network error
1 parent fb7533e commit 33c9322

File tree

3 files changed

+103
-20
lines changed

3 files changed

+103
-20
lines changed

pymongo/message.py

Lines changed: 16 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ def _convert_exception(exception):
117117

118118

119119
def _convert_write_result(operation, command, result):
120-
"""Convert a legacy write result to write commmand format."""
120+
"""Convert a legacy write result to write command format."""
121121

122122
# Based on _merge_legacy from bulk.py
123123
affected = result.get("n", 0)
@@ -971,14 +971,17 @@ def legacy_write(self, request_id, msg, max_doc_size, acknowledged, docs):
971971
# Comply with APM spec.
972972
reply = {'ok': 1}
973973
self._succeed(request_id, reply, duration)
974-
except OperationFailure as exc:
974+
except Exception as exc:
975975
if self.publish:
976976
duration = (datetime.datetime.now() - start) + duration
977-
self._fail(
978-
request_id,
979-
_convert_write_result(
980-
self.name, cmd, exc.details),
981-
duration)
977+
if isinstance(exc, OperationFailure):
978+
failure = _convert_write_result(
979+
self.name, cmd, exc.details)
980+
elif isinstance(exc, NotMasterError):
981+
failure = exc.details
982+
else:
983+
failure = _convert_exception(exc)
984+
self._fail(request_id, failure, duration)
982985
raise
983986
finally:
984987
self.start_time = datetime.datetime.now()
@@ -996,10 +999,14 @@ def write_command(self, request_id, msg, docs):
996999
if self.publish:
9971000
duration = (datetime.datetime.now() - start) + duration
9981001
self._succeed(request_id, reply, duration)
999-
except OperationFailure as exc:
1002+
except Exception as exc:
10001003
if self.publish:
10011004
duration = (datetime.datetime.now() - start) + duration
1002-
self._fail(request_id, exc.details, duration)
1005+
if isinstance(exc, (NotMasterError, OperationFailure)):
1006+
failure = exc.details
1007+
else:
1008+
failure = _convert_exception(exc)
1009+
self._fail(request_id, failure, duration)
10031010
raise
10041011
finally:
10051012
self.start_time = datetime.datetime.now()

test/__init__.py

Lines changed: 11 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -734,6 +734,17 @@ def assertEqualCommand(self, expected, actual, msg=None):
734734
def assertEqualReply(self, expected, actual, msg=None):
735735
self.assertEqual(sanitize_reply(expected), sanitize_reply(actual), msg)
736736

737+
@contextmanager
738+
def fail_point(self, command_args):
739+
cmd_on = SON([('configureFailPoint', 'failCommand')])
740+
cmd_on.update(command_args)
741+
client_context.client.admin.command(cmd_on)
742+
try:
743+
yield
744+
finally:
745+
client_context.client.admin.command(
746+
'configureFailPoint', cmd_on['configureFailPoint'], mode='off')
747+
737748

738749
class IntegrationTest(PyMongoTestCase):
739750
"""Base class for TestCases that need a connection to MongoDB to pass."""
@@ -748,16 +759,6 @@ def setUpClass(cls):
748759
else:
749760
cls.credentials = {}
750761

751-
@contextmanager
752-
def fail_point(self, command_args):
753-
cmd_on = SON([('configureFailPoint', 'failCommand')])
754-
cmd_on.update(command_args)
755-
self.client.admin.command(cmd_on)
756-
try:
757-
yield
758-
finally:
759-
self.client.admin.command(
760-
'configureFailPoint', cmd_on['configureFailPoint'], mode='off')
761762

762763
# Use assertRaisesRegex if available, otherwise use Python 2.7's
763764
# deprecated assertRaisesRegexp, with a 'p'.

test/test_monitoring.py

Lines changed: 76 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
from bson.son import SON
2626
from pymongo import CursorType, monitoring, InsertOne, UpdateOne, DeleteOne
2727
from pymongo.command_cursor import CommandCursor
28-
from pymongo.errors import NotMasterError, OperationFailure
28+
from pymongo.errors import (AutoReconnect,
29+
NotMasterError,
30+
OperationFailure)
2931
from pymongo.read_preferences import ReadPreference
3032
from pymongo.write_concern import WriteConcern
3133
from test import (client_context,
@@ -34,6 +36,7 @@
3436
sanitize_cmd,
3537
unittest)
3638
from test.utils import (EventListener,
39+
get_pool,
3740
rs_or_single_client,
3841
single_client,
3942
wait_until)
@@ -1172,6 +1175,78 @@ def test_bulk_write(self):
11721175
('limit', 1)])])])
11731176
self.assertEqualCommand(expected, started[2].command)
11741177

1178+
@client_context.require_failCommand_fail_point
1179+
def test_bulk_write_command_network_error(self):
1180+
coll = self.client.pymongo_test.test
1181+
self.listener.results.clear()
1182+
1183+
insert_network_error = {
1184+
'configureFailPoint': 'failCommand',
1185+
'mode': {'times': 1},
1186+
'data': {
1187+
'failCommands': ['insert'],
1188+
'closeConnection': True,
1189+
},
1190+
}
1191+
with self.fail_point(insert_network_error):
1192+
with self.assertRaises(AutoReconnect):
1193+
coll.bulk_write([InsertOne({'_id': 1})])
1194+
failed = self.listener.results['failed']
1195+
self.assertEqual(1, len(failed))
1196+
event = failed[0]
1197+
self.assertEqual(event.command_name, 'insert')
1198+
self.assertIsInstance(event.failure, dict)
1199+
self.assertEqual(event.failure['errtype'], 'AutoReconnect')
1200+
self.assertTrue(event.failure['errmsg'])
1201+
1202+
@client_context.require_failCommand_fail_point
1203+
def test_bulk_write_command_error(self):
1204+
coll = self.client.pymongo_test.test
1205+
self.listener.results.clear()
1206+
1207+
insert_command_error = {
1208+
'configureFailPoint': 'failCommand',
1209+
'mode': {'times': 1},
1210+
'data': {
1211+
'failCommands': ['insert'],
1212+
'closeConnection': False,
1213+
'errorCode': 10107, # NotMaster
1214+
},
1215+
}
1216+
with self.fail_point(insert_command_error):
1217+
with self.assertRaises(NotMasterError):
1218+
coll.bulk_write([InsertOne({'_id': 1})])
1219+
failed = self.listener.results['failed']
1220+
self.assertEqual(1, len(failed))
1221+
event = failed[0]
1222+
self.assertEqual(event.command_name, 'insert')
1223+
self.assertIsInstance(event.failure, dict)
1224+
self.assertEqual(event.failure['code'], 10107)
1225+
self.assertTrue(event.failure['errmsg'])
1226+
1227+
@client_context.require_version_max(3, 4, 99)
1228+
def test_bulk_write_legacy_network_error(self):
1229+
self.listener.results.clear()
1230+
1231+
# Make the delete operation run on a closed connection.
1232+
self.client.admin.command('ping')
1233+
pool = get_pool(self.client)
1234+
sock_info = pool.sockets[0]
1235+
sock_info.sock.close()
1236+
1237+
# Test legacy unacknowledged write network error.
1238+
coll = self.client.pymongo_test.get_collection(
1239+
'test', write_concern=WriteConcern(w=0))
1240+
with self.assertRaises(AutoReconnect):
1241+
coll.bulk_write([InsertOne({'_id': 1})], ordered=False)
1242+
failed = self.listener.results['failed']
1243+
self.assertEqual(1, len(failed))
1244+
event = failed[0]
1245+
self.assertEqual(event.command_name, 'insert')
1246+
self.assertIsInstance(event.failure, dict)
1247+
self.assertEqual(event.failure['errtype'], 'AutoReconnect')
1248+
self.assertTrue(event.failure['errmsg'])
1249+
11751250
def test_write_errors(self):
11761251
coll = self.client.pymongo_test.test
11771252
coll.drop()

0 commit comments

Comments
 (0)