Skip to content

Commit 6acc9f6

Browse files
authored
PYTHON-3333 Fix bug where non-cursor read operations fail in a transaction with directConnection=True on primary (#991)
1 parent 02de2c9 commit 6acc9f6

File tree

3 files changed

+41
-1
lines changed

3 files changed

+41
-1
lines changed

pymongo/message.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -367,6 +367,8 @@ def as_command(self, sock_info, apply_timeout=False):
367367

368368
def get_message(self, read_preference, sock_info, use_cmd=False):
369369
"""Get a query message, possibly setting the secondaryOk bit."""
370+
# Use the read_preference decided by _socket_from_server.
371+
self.read_preference = read_preference
370372
if read_preference.mode:
371373
# Set the secondaryOk bit.
372374
flags = self.flags | 4

pymongo/mongo_client.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1251,7 +1251,7 @@ def _socket_from_server(self, read_preference, server, session):
12511251

12521252
with self._get_socket(server, session) as sock_info:
12531253
if single:
1254-
if sock_info.is_repl:
1254+
if sock_info.is_repl and not (session and session.in_transaction):
12551255
# Use primary preferred to ensure any repl set member
12561256
# can handle the request.
12571257
read_preference = ReadPreference.PRIMARY_PREFERRED

test/test_transactions.py

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,8 @@
3535
from gridfs import GridFS, GridFSBucket
3636
from pymongo import WriteConcern, client_session
3737
from pymongo.client_session import TransactionOptions
38+
from pymongo.command_cursor import CommandCursor
39+
from pymongo.cursor import Cursor
3840
from pymongo.errors import (
3941
CollectionInvalid,
4042
ConfigurationError,
@@ -351,6 +353,42 @@ def test_transaction_starts_with_batched_write(self):
351353
self.assertEqual(txn_number, event.command["txnNumber"])
352354
self.assertEqual(48, coll.count_documents({}))
353355

356+
@client_context.require_transactions
357+
def test_transaction_direct_connection(self):
358+
client = single_client()
359+
self.addCleanup(client.close)
360+
coll = client.pymongo_test.test
361+
362+
# Make sure the collection exists.
363+
coll.insert_one({})
364+
self.assertEqual(client.topology_description.topology_type_name, "Single")
365+
ops = [
366+
(coll.bulk_write, [[InsertOne({})]]),
367+
(coll.insert_one, [{}]),
368+
(coll.insert_many, [[{}, {}]]),
369+
(coll.replace_one, [{}, {}]),
370+
(coll.update_one, [{}, {"$set": {"a": 1}}]),
371+
(coll.update_many, [{}, {"$set": {"a": 1}}]),
372+
(coll.delete_one, [{}]),
373+
(coll.delete_many, [{}]),
374+
(coll.find_one_and_replace, [{}, {}]),
375+
(coll.find_one_and_update, [{}, {"$set": {"a": 1}}]),
376+
(coll.find_one_and_delete, [{}, {}]),
377+
(coll.find_one, [{}]),
378+
(coll.count_documents, [{}]),
379+
(coll.distinct, ["foo"]),
380+
(coll.aggregate, [[]]),
381+
(coll.find, [{}]),
382+
(coll.aggregate_raw_batches, [[]]),
383+
(coll.find_raw_batches, [{}]),
384+
(coll.database.command, ["find", coll.name]),
385+
]
386+
for f, args in ops:
387+
with client.start_session() as s, s.start_transaction():
388+
res = f(*args, session=s)
389+
if isinstance(res, (CommandCursor, Cursor)):
390+
list(res)
391+
354392

355393
class PatchSessionTimeout(object):
356394
"""Patches the client_session's with_transaction timeout for testing."""

0 commit comments

Comments
 (0)