Skip to content

Commit 601eeca

Browse files
dstaay-fbmeta-codesync[bot]
authored andcommitted
remove asyncio.lock (#1391)
Summary: Pull Request resolved: #1391 so things to know: - Actors are run in asyncio loop, so ok to assume loop is running - but any lock is blocking, and we can use the monarch Future to call await multiple times on common future, results will be: (1) enqueue Future to event loop (2) return reference to enqueued Future (3) return Future result (immediate) if already completed Reviewed By: allenwang28 Differential Revision: D83666787 fbshipit-source-id: 2d6fd4d2873e31b05235cce9c3bb736848240f54
1 parent 1c57e04 commit 601eeca

File tree

1 file changed

+11
-14
lines changed

1 file changed

+11
-14
lines changed

python/monarch/_src/rdma/rdma.py

Lines changed: 11 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,6 @@
55
# LICENSE file in the root directory of this source tree.
66

77
# pyre-unsafe
8-
import asyncio
98
import ctypes
109
import functools
1110
import logging
@@ -118,27 +117,25 @@ def _get_addr_and_size(buf: torch.Tensor | memoryview) -> tuple[int, int]:
118117

119118
class RdmaController(Actor):
120119
def __init__(self) -> None:
121-
self._managers: Dict[ProcMesh, _RdmaManager] = {}
122-
self._lock = asyncio.Lock()
120+
self._manager_futures: Dict[ProcMesh, Future[_RdmaManager]] = {}
123121

124122
@endpoint
125123
async def init_rdma_on_mesh(self, proc_mesh: ProcMesh) -> None:
126124
# Note: RdmaController acts as coordinator and can run on any node
127125
# The RDMA support check should happen on the target proc_mesh nodes, not on RdmaController's node
128126

129-
if proc_mesh in self._managers:
130-
return
131-
132-
async with self._lock:
133-
if proc_mesh not in self._managers:
134-
self._managers[proc_mesh] = none_throws(
135-
await Future(
136-
coro=_RdmaManager.create_rdma_manager_nonblocking(
137-
await Future(coro=proc_mesh._proc_mesh.task())
138-
)
139-
)
127+
if proc_mesh not in self._manager_futures:
128+
129+
async def create_manager() -> _RdmaManager:
130+
proc_mesh_result = await Future(coro=proc_mesh._proc_mesh.task())
131+
return none_throws(
132+
await _RdmaManager.create_rdma_manager_nonblocking(proc_mesh_result)
140133
)
141134

135+
self._manager_futures[proc_mesh] = Future(coro=create_manager())
136+
137+
await self._manager_futures[proc_mesh]
138+
142139

143140
@functools.cache
144141
def _check_cuda_expandable_segments_enabled() -> bool:

0 commit comments

Comments
 (0)