From ecf5863799e7d1cfbeb8fe1d87655f729096663b Mon Sep 17 00:00:00 2001 From: Yuxuan Hu Date: Fri, 17 Oct 2025 17:16:21 -0700 Subject: [PATCH 1/4] Fix RDMA memory leak by properly deregistering buffers Problem: ~1TB kernel memory leak caused by RDMA registered memory regions not being properly deregistered. When RDMABuffer objects were created, they registered memory regions with RDMA hardware which pinned pages in kernel memory. Even after Python objects were freed, RDMA registrations remained active, causing kernel to keep pages pinned as Inactive(anon) indefinitely. Memory accumulated across actor spawns/shutdowns and weight updates. Solution: - Add cleanup() method to RDMATransportBuffer that calls drop() on each RDMA buffer to deregister memory regions - Add __del__ destructor to ensure cleanup happens on garbage collection - Wrap put/get operations in try/finally blocks to guarantee cleanup - Add base cleanup() method to TransportBuffer for consistency Impact: This fix prevents unbounded Inactive(anon) growth that was observed in production GRPO training runs, where ~900GB of orphaned RDMA pinned memory accumulated in the kernel. --- torchstore/transport/buffers.py | 27 +++++++++++++++++ torchstore/transport/pipe.py | 54 ++++++++++++++++++++------------- 2 files changed, 60 insertions(+), 21 deletions(-) diff --git a/torchstore/transport/buffers.py b/torchstore/transport/buffers.py index debb585..fbf1aa9 100644 --- a/torchstore/transport/buffers.py +++ b/torchstore/transport/buffers.py @@ -59,6 +59,10 @@ async def read_into(self, tensor: Optional[torch.Tensor]) -> torch.Tensor: async def write_from(self, tensor: Optional[torch.Tensor]) -> None: raise NotImplementedError() + def cleanup(self) -> None: + """Clean up any resources held by this buffer. Override in subclasses if needed.""" + pass + class RDMATransportBuffer(TransportBuffer): # TODO: when we try this with rdma, I should be able to write rdma directly to the tensor @@ -72,6 +76,29 @@ def __init__(self) -> None: self.shape: Optional[torch.Size] = None self.dtype: Optional[torch.dtype] = None + def cleanup(self) -> None: + """Explicitly clean up RDMA buffers to prevent kernel memory leak. + + When RDMA buffers are created, they register memory regions with the RDMA + hardware which pins pages in kernel memory. Without explicit cleanup, these + pages remain pinned even after the Python objects are garbage collected, + leading to a memory leak that manifests as unbounded Inactive(anon) growth. + """ + if self.rdma_buffers is not None: + for rdma_buf in self.rdma_buffers: + try: + # Drop the RDMA buffer to deregister the memory region + rdma_buf.drop() + except Exception as e: + # Log but don't raise - cleanup should be best-effort + logging.warning(f"Failed to drop RDMA buffer during cleanup: {e}") + self.rdma_buffers = None + self.tensor_refs = None + + def __del__(self) -> None: + """Destructor that ensures RDMA buffers are cleaned up.""" + self.cleanup() + def __getstate__(self) -> Dict[str, Any]: # Any time that we serialize the transport buffer, the idea is # that tensors will be transported via tensor_enginer.RDMABuffer, so it makes diff --git a/torchstore/transport/pipe.py b/torchstore/transport/pipe.py index f0d94fd..52073fd 100644 --- a/torchstore/transport/pipe.py +++ b/torchstore/transport/pipe.py @@ -151,32 +151,44 @@ async def put_to_storage_volume(self, key, request: Request): # transporting tensors is handled by the buffer, so we don't want to send it # via monarch RPC since that would generate considerable overhead - await self.storage_volume.put.call_one( - key, transport_buffer, request.meta_only() - ) + try: + await self.storage_volume.put.call_one( + key, transport_buffer, request.meta_only() + ) + finally: + # Clean up the transport buffer after the put operation completes + # This is critical for RDMA buffers to deregister memory regions + transport_buffer.cleanup() async def get_from_storage_volume(self, key, request: Request): transport_buffer = self.create_transport_buffer() - # Certain buffers (RDMA) need to know the size of the tensor - # so we can allocate the right amount of memory locally. - # This can be avoided if the request contains a tensor slice. - # Could likely be optimized away in the future. - if transport_buffer.requires_meta and request.tensor_val is None: - meta = await self.storage_volume.get_meta.call_one(key, request.meta_only()) - transport_buffer.allocate(meta) - else: - transport_buffer.allocate(request.tensor_val) - - # TODO: consider placing the buffer inside the request or vice versa - transport_buffer.update( - await self.storage_volume.get.call_one( - key, transport_buffer, request.meta_only() + try: + # Certain buffers (RDMA) need to know the size of the tensor + # so we can allocate the right amount of memory locally. + # This can be avoided if the request contains a tensor slice. + # Could likely be optimized away in the future. + if transport_buffer.requires_meta and request.tensor_val is None: + meta = await self.storage_volume.get_meta.call_one( + key, request.meta_only() + ) + transport_buffer.allocate(meta) + else: + transport_buffer.allocate(request.tensor_val) + + # TODO: consider placing the buffer inside the request or vice versa + transport_buffer.update( + await self.storage_volume.get.call_one( + key, transport_buffer, request.meta_only() + ) ) - ) - if transport_buffer.is_object: - return transport_buffer.objects + if transport_buffer.is_object: + return transport_buffer.objects - return await transport_buffer.read_into(request.tensor_val) + return await transport_buffer.read_into(request.tensor_val) + finally: + # Clean up the transport buffer after the get operation completes + # This is critical for RDMA buffers to deregister memory regions + transport_buffer.cleanup() From 88d65232e0236a99d97b3cb7a2431957beb84602 Mon Sep 17 00:00:00 2001 From: Yuxuan Hu Date: Sat, 18 Oct 2025 03:09:01 -0700 Subject: [PATCH 2/4] Add debug prints for RDMA buffer lifecycle and remove __del__ cleanup - Add print statements before RDMA buffer creation - Add print statements before RDMA buffer drop() calls - Remove cleanup() call from __del__ to make cleanup fully explicit - Cleanup now only happens in try/finally blocks in pipe.py This makes it easier to trace RDMA buffer lifecycle and verify that buffers are being properly created and dropped. --- torchstore/transport/buffers.py | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) diff --git a/torchstore/transport/buffers.py b/torchstore/transport/buffers.py index fbf1aa9..ab58361 100644 --- a/torchstore/transport/buffers.py +++ b/torchstore/transport/buffers.py @@ -85,9 +85,13 @@ def cleanup(self) -> None: leading to a memory leak that manifests as unbounded Inactive(anon) growth. """ if self.rdma_buffers is not None: + print( + f"[RDMA] Dropping {len(self.rdma_buffers)} RDMA buffers for tensor {self.shape} {self.dtype}" + ) for rdma_buf in self.rdma_buffers: try: # Drop the RDMA buffer to deregister the memory region + print("[RDMA] Calling drop() on RDMA buffer") rdma_buf.drop() except Exception as e: # Log but don't raise - cleanup should be best-effort @@ -97,7 +101,9 @@ def cleanup(self) -> None: def __del__(self) -> None: """Destructor that ensures RDMA buffers are cleaned up.""" - self.cleanup() + # Note: Not calling cleanup() here to avoid issues with destructor timing + # and to make cleanup explicit only where we control the lifecycle + pass def __getstate__(self) -> Dict[str, Any]: # Any time that we serialize the transport buffer, the idea is @@ -156,6 +162,9 @@ def allocate(self, tensor_like: Union[torch.Tensor, Tuple]) -> None: torch.empty_like(chunk, device=torch.device("cpu")) for chunk in byte_view_chunks ] + print( + f"[RDMA] Creating {len(self.tensor_refs)} RDMA buffers for tensor {self.shape} {self.dtype}" + ) self.rdma_buffers = [RDMABuffer(chunk) for chunk in self.tensor_refs] chunk_sizes = set() From b8623ea578587c1153ab38ad5c034e9f7b5c4e94 Mon Sep 17 00:00:00 2001 From: Yuxuan Hu Date: Sat, 18 Oct 2025 13:17:59 -0700 Subject: [PATCH 3/4] stash --- torchstore/storage_volume.py | 1 + 1 file changed, 1 insertion(+) diff --git a/torchstore/storage_volume.py b/torchstore/storage_volume.py index 71fc41e..b6a87c1 100644 --- a/torchstore/storage_volume.py +++ b/torchstore/storage_volume.py @@ -299,6 +299,7 @@ async def get_meta( async def delete(self, key: str) -> None: if key not in self.kv: raise KeyError(f"Key '{key}' not found. {list(self.kv.keys())=}") + print(f"[storage volume] Deleting key: {key}") del self.kv[key] def reset(self) -> None: From 61c5f991091dd340a82c38db56f80df7cf02d743 Mon Sep 17 00:00:00 2001 From: Yuxuan Hu Date: Sat, 18 Oct 2025 14:41:05 -0700 Subject: [PATCH 4/4] Fix RDMA memory leak by properly awaiting rdma_buf.drop() The previous implementation didn't work because rdma_buf.drop() is an async method that needs to be awaited. Without awaiting, the RDMA memory regions were never actually deregistered, causing the kernel memory leak to persist. Changes: - Renamed cleanup() to drop() for consistency with RDMA API - Made TransportBuffer.drop() and RDMATransportBuffer.drop() async - Updated pipe.py to properly await transport_buffer.drop() - Removed debug print statements This should properly deregister RDMA memory regions and fix the ~1TB Inactive(anon) memory leak. --- torchstore/storage_volume.py | 1 - torchstore/transport/buffers.py | 13 +++---------- torchstore/transport/pipe.py | 4 ++-- 3 files changed, 5 insertions(+), 13 deletions(-) diff --git a/torchstore/storage_volume.py b/torchstore/storage_volume.py index b6a87c1..71fc41e 100644 --- a/torchstore/storage_volume.py +++ b/torchstore/storage_volume.py @@ -299,7 +299,6 @@ async def get_meta( async def delete(self, key: str) -> None: if key not in self.kv: raise KeyError(f"Key '{key}' not found. {list(self.kv.keys())=}") - print(f"[storage volume] Deleting key: {key}") del self.kv[key] def reset(self) -> None: diff --git a/torchstore/transport/buffers.py b/torchstore/transport/buffers.py index ab58361..32dbb52 100644 --- a/torchstore/transport/buffers.py +++ b/torchstore/transport/buffers.py @@ -59,7 +59,7 @@ async def read_into(self, tensor: Optional[torch.Tensor]) -> torch.Tensor: async def write_from(self, tensor: Optional[torch.Tensor]) -> None: raise NotImplementedError() - def cleanup(self) -> None: + async def drop(self) -> None: """Clean up any resources held by this buffer. Override in subclasses if needed.""" pass @@ -76,7 +76,7 @@ def __init__(self) -> None: self.shape: Optional[torch.Size] = None self.dtype: Optional[torch.dtype] = None - def cleanup(self) -> None: + async def drop(self) -> None: """Explicitly clean up RDMA buffers to prevent kernel memory leak. When RDMA buffers are created, they register memory regions with the RDMA @@ -85,14 +85,10 @@ def cleanup(self) -> None: leading to a memory leak that manifests as unbounded Inactive(anon) growth. """ if self.rdma_buffers is not None: - print( - f"[RDMA] Dropping {len(self.rdma_buffers)} RDMA buffers for tensor {self.shape} {self.dtype}" - ) for rdma_buf in self.rdma_buffers: try: # Drop the RDMA buffer to deregister the memory region - print("[RDMA] Calling drop() on RDMA buffer") - rdma_buf.drop() + await rdma_buf.drop() except Exception as e: # Log but don't raise - cleanup should be best-effort logging.warning(f"Failed to drop RDMA buffer during cleanup: {e}") @@ -162,9 +158,6 @@ def allocate(self, tensor_like: Union[torch.Tensor, Tuple]) -> None: torch.empty_like(chunk, device=torch.device("cpu")) for chunk in byte_view_chunks ] - print( - f"[RDMA] Creating {len(self.tensor_refs)} RDMA buffers for tensor {self.shape} {self.dtype}" - ) self.rdma_buffers = [RDMABuffer(chunk) for chunk in self.tensor_refs] chunk_sizes = set() diff --git a/torchstore/transport/pipe.py b/torchstore/transport/pipe.py index 52073fd..d869941 100644 --- a/torchstore/transport/pipe.py +++ b/torchstore/transport/pipe.py @@ -158,7 +158,7 @@ async def put_to_storage_volume(self, key, request: Request): finally: # Clean up the transport buffer after the put operation completes # This is critical for RDMA buffers to deregister memory regions - transport_buffer.cleanup() + await transport_buffer.drop() async def get_from_storage_volume(self, key, request: Request): @@ -191,4 +191,4 @@ async def get_from_storage_volume(self, key, request: Request): finally: # Clean up the transport buffer after the get operation completes # This is critical for RDMA buffers to deregister memory regions - transport_buffer.cleanup() + await transport_buffer.drop()