Skip to content

PYTHON-5219 - Avoid awaiting coroutines when holding locks #2250

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Apr 3, 2025
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 16 additions & 7 deletions pymongo/asynchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -931,13 +931,15 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
return

if self.opts.max_idle_time_seconds is not None:
close_conns = []
async with self.lock:
while (
self.conns
and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds
):
conn = self.conns.pop()
await conn.close_conn(ConnectionClosedReason.IDLE)
close_conns.append(self.conns.pop())
for conn in close_conns:
await conn.close_conn(ConnectionClosedReason.IDLE)

while True:
async with self.size_cond:
Expand All @@ -957,14 +959,18 @@ async def remove_stale_sockets(self, reference_generation: int) -> None:
self._pending += 1
incremented = True
conn = await self.connect()
close_conn = False
async with self.lock:
# Close connection and return if the pool was reset during
# socket creation or while acquiring the pool lock.
if self.gen.get_overall() != reference_generation:
await conn.close_conn(ConnectionClosedReason.STALE)
return
self.conns.appendleft(conn)
self.active_contexts.discard(conn.cancel_context)
close_conn = True
if not close_conn:
self.conns.appendleft(conn)
self.active_contexts.discard(conn.cancel_context)
if close_conn:
await conn.close_conn(ConnectionClosedReason.STALE)
return
finally:
if incremented:
# Notify after adding the socket to the pool.
Expand Down Expand Up @@ -1343,17 +1349,20 @@ async def checkin(self, conn: AsyncConnection) -> None:
error=ConnectionClosedReason.ERROR,
)
else:
close_conn = False
async with self.lock:
# Hold the lock to ensure this section does not race with
# Pool.reset().
if self.stale_generation(conn.generation, conn.service_id):
await conn.close_conn(ConnectionClosedReason.STALE)
close_conn = True
else:
conn.update_last_checkin_time()
conn.update_is_writable(bool(self.is_writable))
self.conns.appendleft(conn)
# Notify any threads waiting to create a connection.
self._max_connecting_cond.notify()
if close_conn:
await conn.close_conn(ConnectionClosedReason.STALE)

async with self.size_cond:
if txn:
Expand Down
6 changes: 5 additions & 1 deletion pymongo/asynchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,16 +240,20 @@ async def open(self) -> None:
"https://dochub.mongodb.org/core/pymongo-fork-deadlock",
**kwargs,
)
close_servers = []
async with self._lock:
# Close servers and clear the pools.
for server in self._servers.values():
await server.close()
close_servers.append(server)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We close the servers here but we leave self._servers untouched? Is using a client post fork broken right now? I don't see where the Server gets recreated.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Our test_fork.py tests are all passing. We re-open each server in self._servers in _ensure_opened, called at the end of open here.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here's the race I'm concerned about:

  1. app forks with an open client
  2. child process starts 2 threads that both call find_one()
  3. both threads see a different PID and enter this if-block.
  4. T1 acquires the lock first, resets the servers, reopens and then proceeds to server selection.
  5. T2 then closes the server selected by T1 which causes a PoolClosedError in T1.

It could be that this is already possible with the current code. What do you think?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The addition of async hasn't changed the structure of this code, only the async/await syntax, so if this race condition does exist, it's existed for some time. Here's the identical PyMongo 4.8 version, before we added async support:

def open(self) -> None:

I agree looking at the code that scenario certainly seems possible. We could add a flag set post-fork to prevent that race condition?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not concerned about async here. I'm concerned that delaying the server.close() call to after we release the lock will make this race more likely.

Copy link
Contributor Author

@NoahStapp NoahStapp Apr 2, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just realized: since we don't support fork + async at all, this change is also non-functional and can be reverted.

I mention that this code hasn't changed beside the addition of async to show that this race condition has either been present for quite some time, or doesn't exist.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah let's undo the forking changes. Holding the lock while calling close() in the sync version isn't so bad in this case because it only happens once post fork().

if not _IS_SYNC:
self._monitor_tasks.append(server._monitor)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unrelated to this PR but why do we append to _monitor_tasks here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To cleanup all the monitor tasks owned by the closed servers.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since asyncio doesn't support forking at all, should we just remove this? It seems like non-functional code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What if replace the entire fork branch here with a warning on async?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before we make any code changes someone should test out the current behavior of fork+asyncio. Depending on the behavior we might need to reopen https://jira.mongodb.org/browse/PYTHON-5249.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This example:

import os

from pymongo import AsyncMongoClient
import asyncio

async def test_func():
    client = AsyncMongoClient()
    await client.aconnect()

    pid = os.fork()
    if pid == 0:
        await client.db.test.insert_one({'a': 1})
        exit()
    print("Done!")


asyncio.run(test_func())

Produces the following error multiple times, each with different tracebacks:

Traceback (most recent call last):
  File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/runners.py", line 194, in run
    return runner.run(main)
           ~~~~~~~~~~^^^^^^
  File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/runners.py", line 118, in run
    return self._loop.run_until_complete(task)
           ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~^^^^^^
  File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py", line 708, in run_until_complete
    self.run_forever()
    ~~~~~~~~~~~~~~~~^^
  File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py", line 679, in run_forever
    self._run_once()
    ~~~~~~~~~~~~~~^^
  File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/asyncio/base_events.py", line 1989, in _run_once
    event_list = self._selector.select(timeout)
  File "/Users/nstapp/.pyenv/versions/3.13.0/lib/python3.13/selectors.py", line 548, in select
    kev_list = self._selector.control(None, max_ev, timeout)
ValueError: I/O operation on closed kqueue object

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The same error happens without using pymongo at all:

import asyncio
import os

async def test_func():
    pid = os.fork()
    if pid == 0:
        await asyncio.sleep(.01)
        print("Done child!")
        exit()
    await asyncio.sleep(.1)
    print("Done parent!")


asyncio.run(test_func())

# Reset the session pool to avoid duplicate sessions in
# the child process.
self._session_pool.reset()

for server in close_servers:
await server.close()

async with self._lock:
await self._ensure_opened()

Expand Down
23 changes: 16 additions & 7 deletions pymongo/synchronous/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -927,13 +927,15 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
return

if self.opts.max_idle_time_seconds is not None:
close_conns = []
with self.lock:
while (
self.conns
and self.conns[-1].idle_time_seconds() > self.opts.max_idle_time_seconds
):
conn = self.conns.pop()
conn.close_conn(ConnectionClosedReason.IDLE)
close_conns.append(self.conns.pop())
for conn in close_conns:
conn.close_conn(ConnectionClosedReason.IDLE)

while True:
with self.size_cond:
Expand All @@ -953,14 +955,18 @@ def remove_stale_sockets(self, reference_generation: int) -> None:
self._pending += 1
incremented = True
conn = self.connect()
close_conn = False
with self.lock:
# Close connection and return if the pool was reset during
# socket creation or while acquiring the pool lock.
if self.gen.get_overall() != reference_generation:
conn.close_conn(ConnectionClosedReason.STALE)
return
self.conns.appendleft(conn)
self.active_contexts.discard(conn.cancel_context)
close_conn = True
if not close_conn:
self.conns.appendleft(conn)
self.active_contexts.discard(conn.cancel_context)
if close_conn:
conn.close_conn(ConnectionClosedReason.STALE)
return
finally:
if incremented:
# Notify after adding the socket to the pool.
Expand Down Expand Up @@ -1339,17 +1345,20 @@ def checkin(self, conn: Connection) -> None:
error=ConnectionClosedReason.ERROR,
)
else:
close_conn = False
with self.lock:
# Hold the lock to ensure this section does not race with
# Pool.reset().
if self.stale_generation(conn.generation, conn.service_id):
conn.close_conn(ConnectionClosedReason.STALE)
close_conn = True
else:
conn.update_last_checkin_time()
conn.update_is_writable(bool(self.is_writable))
self.conns.appendleft(conn)
# Notify any threads waiting to create a connection.
self._max_connecting_cond.notify()
if close_conn:
conn.close_conn(ConnectionClosedReason.STALE)

with self.size_cond:
if txn:
Expand Down
6 changes: 5 additions & 1 deletion pymongo/synchronous/topology.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,16 +240,20 @@ def open(self) -> None:
"https://dochub.mongodb.org/core/pymongo-fork-deadlock",
**kwargs,
)
close_servers = []
with self._lock:
# Close servers and clear the pools.
for server in self._servers.values():
server.close()
close_servers.append(server)
if not _IS_SYNC:
self._monitor_tasks.append(server._monitor)
# Reset the session pool to avoid duplicate sessions in
# the child process.
self._session_pool.reset()

for server in close_servers:
server.close()

with self._lock:
self._ensure_opened()

Expand Down
Loading