Skip to content

Commit 04ffbbe

Browse files
dstaay-fbfacebook-github-bot
authored andcommitted
Robustness/Perf imporvements - Python Bindings For Drop (#1355)
Summary: Pull Request resolved: #1355 This diff implements critical RDMA performance optimizations in Monarch by addressing memory region (MR) management bottlenecks, resolving MR binding issues, and strategy for managing the 32 MR hardware limit with mkey custom registration; we now approach hardware limits on large (100's MB) comms. ### **Key Changes in this DIFF** * Add Drop function to Python API Reviewed By: zdevito Differential Revision: D83436151 fbshipit-source-id: 7e2d8628416460ddcafbb8d50e1558556065ef03
1 parent b88f462 commit 04ffbbe

File tree

4 files changed

+118
-7
lines changed

4 files changed

+118
-7
lines changed

monarch_rdma/extension/lib.rs

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -156,14 +156,19 @@ impl PyRdmaBuffer {
156156
.request_buffer_deprecated(cx_instance, addr, size)
157157
.await?
158158
});
159-
let _result_ = instance_dispatch!(client, |cx_instance| {
159+
instance_dispatch!(client, |cx_instance| {
160160
local_buffer
161161
.write_from(cx_instance, buffer, timeout)
162162
.await
163163
.map_err(|e| {
164164
PyException::new_err(format!("failed to read into buffer: {}", e))
165165
})?
166166
});
167+
instance_dispatch!(client, |cx_instance| {
168+
local_owner_ref
169+
.release_buffer_deprecated(cx_instance, local_buffer)
170+
.await?
171+
});
167172
Ok(())
168173
})
169174
}
@@ -197,14 +202,19 @@ impl PyRdmaBuffer {
197202
.request_buffer_deprecated(cx_instance, addr, size)
198203
.await?
199204
});
200-
let _result_ = instance_dispatch!(&client, |cx_instance| {
205+
instance_dispatch!(&client, |cx_instance| {
201206
local_buffer
202207
.read_into(cx_instance, buffer, timeout)
203208
.await
204209
.map_err(|e| {
205210
PyException::new_err(format!("failed to write from buffer: {}", e))
206211
})?
207212
});
213+
instance_dispatch!(client, |cx_instance| {
214+
local_owner_ref
215+
.release_buffer_deprecated(cx_instance, local_buffer)
216+
.await?
217+
});
208218
Ok(())
209219
})
210220
}
@@ -232,10 +242,23 @@ impl PyRdmaBuffer {
232242
Ok(deserialized)
233243
}
234244

235-
fn drop<'py>(&self) -> PyResult<PyPythonTask> {
236-
// no op with CPUs, currently a stub.
237-
// TODO - replace with correct GPU behavior.
238-
PyPythonTask::new(async move { Ok(()) })
245+
fn drop<'py>(
246+
&self,
247+
_py: Python<'py>,
248+
local_proc_id: String,
249+
client: PyInstance,
250+
) -> PyResult<PyPythonTask> {
251+
let (_local_owner_ref, buffer) = setup_rdma_context(self, local_proc_id);
252+
PyPythonTask::new(async move {
253+
// Call the drop method on the buffer to release remote handles
254+
instance_dispatch!(client, |cx_instance| {
255+
buffer
256+
.drop_buffer(cx_instance)
257+
.await
258+
.map_err(|e| PyException::new_err(format!("Failed to drop buffer: {}", e)))?
259+
});
260+
Ok(())
261+
})
239262
}
240263
}
241264

python/monarch/_rust_bindings/rdma.pyi

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ class _RdmaBuffer:
3535
def create_rdma_buffer_nonblocking(
3636
cls, addr: int, size: int, proc_id: str, client: Any
3737
) -> PythonTask[Any]: ...
38-
def drop(self, client: Any) -> PythonTask[None]: ...
38+
def drop(self, local_proc_id: str, client: Any) -> PythonTask[None]: ...
3939
def read_into(
4040
self,
4141
addr: int,

python/monarch/_src/tensor_engine/rdma.py

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -342,3 +342,27 @@ async def write_from_nonblocking() -> None:
342342
return res
343343

344344
return Future(coro=write_from_nonblocking())
345+
346+
def drop(self) -> Future[None]:
347+
"""
348+
Release the handle on the memory that the remote holds to this memory.
349+
"""
350+
local_proc_id = context().actor_instance.proc_id
351+
client = context().actor_instance
352+
353+
async def drop_nonblocking() -> None:
354+
await _ensure_init_rdma_manager()
355+
356+
await self._buffer.drop(
357+
local_proc_id=local_proc_id,
358+
client=client,
359+
)
360+
361+
return Future(coro=drop_nonblocking())
362+
363+
@property
364+
def owner(self) -> ProcMesh:
365+
"""
366+
The proc that owns this buffer
367+
"""
368+
return context().actor_instance.proc

python/tests/test_rdma.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -117,6 +117,70 @@ async def test_proc_mesh_rdma():
117117
assert torch.allclose(buffer_gpu.cpu(), remote_grad.cpu())
118118

119119

120+
@needs_rdma
121+
async def test_rdma_buffer_drop():
122+
"""Test the new drop() and owner methods on RDMABuffer with two actors"""
123+
proc = this_host().spawn_procs(per_host={"processes": 1})
124+
125+
class ProducerActor(Actor):
126+
def __init__(self):
127+
self.data = torch.ones(10, 10, dtype=torch.float32) # 400 bytes
128+
self.buffer = None
129+
130+
@endpoint
131+
async def create_buffer(self) -> RDMABuffer:
132+
"""Create an RDMABuffer and return it"""
133+
byte_tensor = self.data.view(torch.uint8).flatten()
134+
self.buffer = RDMABuffer(byte_tensor)
135+
return self.buffer
136+
137+
class ConsumerActor(Actor):
138+
def __init__(self):
139+
self.received_data = torch.zeros(10, 10, dtype=torch.float32)
140+
141+
@endpoint
142+
async def receive_data(self, buffer: RDMABuffer):
143+
"""Receive data from the buffer into local storage"""
144+
byte_tensor = self.received_data.view(torch.uint8).flatten()
145+
await buffer.read_into(byte_tensor) # Read FROM buffer INTO local tensor
146+
return torch.sum(self.received_data).item() # Should be 100 (10*10*1)
147+
148+
@endpoint
149+
async def test_buffer_after_drop(self, buffer: RDMABuffer):
150+
"""Try to use buffer after it's been dropped - should fail"""
151+
byte_tensor = self.received_data.view(torch.uint8).flatten()
152+
try:
153+
await buffer.read_into(byte_tensor) # Try to read from dropped buffer
154+
return "SUCCESS" # This should not happen
155+
except Exception as e:
156+
return f"EXPECTED_ERROR: {e}"
157+
158+
# Create both actors
159+
producer = proc.spawn("producer", ProducerActor)
160+
consumer = proc.spawn("consumer", ConsumerActor)
161+
162+
# Create an RDMA buffer in the producer
163+
buffer = await producer.create_buffer.call_one()
164+
165+
# Pass buffer to consumer and test write operation
166+
result = await consumer.receive_data.call_one(buffer)
167+
assert result == 100.0, f"Expected 100.0, got {result}"
168+
169+
# Now drop the buffer
170+
await buffer.drop()
171+
172+
# Test that we can call drop multiple times (should be idempotent)
173+
await buffer.drop()
174+
175+
# Try to use the buffer after dropping - this should fail
176+
error_result = await consumer.test_buffer_after_drop.call_one(buffer)
177+
assert error_result.startswith(
178+
"EXPECTED_ERROR:"
179+
), f"Expected an error after drop, but got: {error_result}"
180+
181+
print(f"✓ Buffer operations failed after drop as expected: {error_result}")
182+
183+
120184
class TrainerActor(Actor):
121185
def __init__(self):
122186
super().__init__()

0 commit comments

Comments
 (0)