Skip to content

[P2P] Integration with Ray#662

Open
zhongjiechen wants to merge 15 commits intomainfrom
ray-p2p
Open

[P2P] Integration with Ray#662
zhongjiechen wants to merge 15 commits intomainfrom
ray-p2p

Conversation

@zhongjiechen
Copy link
Member

@zhongjiechen zhongjiechen commented Jan 21, 2026

Description

Integration with Ray

Type of Change

  • Bug fix
  • New feature
  • Documentation update

How Has This Been Tested?

Include any tests here.

  • Unit tests
  • Integration tests
  • Manual testing

Checklist

  • My code follows the style guidelines, e.g. format.sh.
  • I have run build_and_install.sh to verify compilation.
  • I have removed redundant variables and comments.
  • I have updated the documentation.
  • I have added tests.

@zhongjiechen
Copy link
Member Author

zhongjiechen commented Jan 23, 2026

[Client]    256 B :   0.00 Gbps |   0.00 GB/s | 0.000596 s
[Client]   1.0 KB :   0.02 Gbps |   0.00 GB/s | 0.000481 s
[Client]   4.0 KB :   0.07 Gbps |   0.01 GB/s | 0.000479 s
[Client]  16.0 KB :   0.27 Gbps |   0.03 GB/s | 0.000480 s
[Client]  64.0 KB :   0.92 Gbps |   0.12 GB/s | 0.000568 s
[Client] 256.0 KB :   3.26 Gbps |   0.41 GB/s | 0.000642 s
[Client]   1.0 MB :  15.33 Gbps |   1.92 GB/s | 0.000547 s
[Client]  10.0 MB : 109.58 Gbps |  13.70 GB/s | 0.000766 s
[Client]  64.0 MB : 270.33 Gbps |  33.79 GB/s | 0.001986 s
[Client] 100.0 MB : 286.96 Gbps |  35.87 GB/s | 0.002923 s
[Client] Benchmark complete

--raw -- num-iovs=2

[Client]    256 B :   0.08 Gbps |   0.01 GB/s | 0.000025 s
[Client]   1.0 KB :   0.40 Gbps |   0.05 GB/s | 0.000021 s
[Client]   4.0 KB :   1.56 Gbps |   0.19 GB/s | 0.000021 s
[Client]  16.0 KB :   6.37 Gbps |   0.80 GB/s | 0.000021 s
[Client]  64.0 KB :  25.87 Gbps |   3.23 GB/s | 0.000020 s
[Client] 256.0 KB :  87.23 Gbps |  10.90 GB/s | 0.000024 s
[Client]   1.0 MB : 210.14 Gbps |  26.27 GB/s | 0.000040 s
[Client]  10.0 MB : 358.89 Gbps |  44.86 GB/s | 0.000234 s
[Client]  64.0 MB : 385.12 Gbps |  48.14 GB/s | 0.001394 s
[Client] 100.0 MB : 386.77 Gbps |  48.35 GB/s | 0.002169 s
[Client] Benchmark complete

uccl_benchmark_readwrite.py --async-api --num-iovs=2

[Client]    256 B :   0.09 Gbps |   0.01 GB/s | 0.000023 s
[Client]   1.0 KB :   0.41 Gbps |   0.05 GB/s | 0.000020 s
[Client]   4.0 KB :   1.68 Gbps |   0.21 GB/s | 0.000020 s
[Client]  16.0 KB :   6.50 Gbps |   0.81 GB/s | 0.000020 s
[Client]  64.0 KB :  25.03 Gbps |   3.13 GB/s | 0.000021 s
[Client] 256.0 KB :  85.90 Gbps |  10.74 GB/s | 0.000024 s
[Client]   1.0 MB : 207.45 Gbps |  25.93 GB/s | 0.000040 s
[Client]  10.0 MB : 359.87 Gbps |  44.98 GB/s | 0.000233 s
[Client]  64.0 MB : 385.18 Gbps |  48.15 GB/s | 0.001394 s
[Client] 100.0 MB : 386.97 Gbps |  48.37 GB/s | 0.002168 s
[Client] Benchmark complete

uccl_benchmark_readwrite.py

[Client]    256 B :   0.12 Gbps |   0.01 GB/s | 0.000017 s
[Client]   1.0 KB :   0.65 Gbps |   0.08 GB/s | 0.000013 s
[Client]   4.0 KB :   2.58 Gbps |   0.32 GB/s | 0.000013 s
[Client]  16.0 KB :  10.11 Gbps |   1.26 GB/s | 0.000013 s
[Client]  64.0 KB :  38.25 Gbps |   4.78 GB/s | 0.000014 s
[Client] 256.0 KB : 118.05 Gbps |  14.76 GB/s | 0.000018 s
[Client]   1.0 MB : 248.78 Gbps |  31.10 GB/s | 0.000034 s
[Client]  10.0 MB : 369.20 Gbps |  46.15 GB/s | 0.000227 s
[Client]  64.0 MB : 386.78 Gbps |  48.35 GB/s | 0.001388 s
[Client] 100.0 MB : 388.01 Gbps |  48.50 GB/s | 0.002162 s
[Client] Benchmark complete

@zhongjiechen
Copy link
Member Author

zhongjiechen commented Jan 23, 2026

# Benchmark iterations
start = time.perf_counter()
total = 0
for iter_idx in range(args.iters):
# Create new memory buffer for each iteration
buf_v = []
for _ in range(args.num_iovs):
buf = _make_buffer(size_per_block, args.device, args.local_gpu_idx)
buf_v.append(buf)
# Register local memory (new memory for each iteration)
local_descs = ep.register_memory(buf_v)
# Add remote endpoint
success, conn_id = ep.add_remote_endpoint(remote_metadata)
assert success, "Failed to add remote endpoint"
# Wait for server to send remote descriptors (exchange memory addresses)
remote_descs_serialized = _recv_bytes(src=peer)
remote_descs = ep.deserialize_descs(remote_descs_serialized)
# Start transfer
xfer_handle = ep.transfer(conn_id, "WRITE", local_descs, remote_descs)
assert xfer_handle is not None, "Failed to start transfer"
# Check transfer state until complete
while not ep.check_xfer_state(xfer_handle):
pass
total += sz
dist.barrier()

For each iteration in benchmark_ray_p2p.py, the client:

  • allocate a new buffer and call register_memory.
  • call add_remote_endpoint (but RDMA QP connection is established only once).
  • wait server to send its descriptors via Pytorch OOB and call deserialize_descs.
  • call transfer to issue WRITE/READ.
  • call check_xfer_state to check completion.
  • call Pytorch barrier.

for sz in args.sizes:
# For each size, handle multiple iterations
for iter_idx in range(args.iters + 1): # +1 for warmup
# Create new memory buffer for each iteration
size_per_block = sz // args.num_iovs
buf_v = []
for _ in range(args.num_iovs):
buf = _make_buffer(size_per_block, args.device, args.local_gpu_idx)
buf_v.append(buf)
# Register remote memory (new memory for each iteration)
remote_descs = ep.register_memory(buf_v)
# Serialize and send remote descriptors to client
remote_descs_serialized = ep.get_serialized_descs(remote_descs)
_send_bytes(remote_descs_serialized, dst=peer)
# Wait for transfer to complete
dist.barrier()
print(f"[Server] Completed {args.iters} iterations for size {_pretty(sz)}")
print("[Server] Benchmark complete")
def _run_client(args, ep):
"""Client side: sends data via WRITE operations."""
peer = 1
# Exchange metadata with server (only once at the beginning)
local_metadata = ep.get_metadata()
_send_bytes(local_metadata, dst=peer)
remote_metadata = _recv_bytes(src=peer)
print(f"[Client] Exchanged metadata with server")
for sz in args.sizes:
# Warmup iteration
# Create new memory buffer
size_per_block = sz // args.num_iovs
buf_v = []
for _ in range(args.num_iovs):
buf = _make_buffer(size_per_block, args.device, args.local_gpu_idx)
buf_v.append(buf)
# Register local memory
local_descs = ep.register_memory(buf_v)
# Add remote endpoint
success, conn_id = ep.add_remote_endpoint(remote_metadata)
assert success, "Failed to add remote endpoint"
# Wait for server to send remote descriptors
remote_descs_serialized = _recv_bytes(src=peer)
remote_descs = ep.deserialize_descs(remote_descs_serialized)
# Warmup transfer
xfer_handle = ep.transfer(conn_id, "READ", local_descs, remote_descs)
assert xfer_handle is not None, "Failed to start warmup transfer"
while not ep.check_xfer_state(xfer_handle):
pass
dist.barrier()
# Benchmark iterations
start = time.perf_counter()
total = 0
for iter_idx in range(args.iters):
# Create new memory buffer for each iteration
buf_v = []
for _ in range(args.num_iovs):
buf = _make_buffer(size_per_block, args.device, args.local_gpu_idx)
buf_v.append(buf)
# Register local memory (new memory for each iteration)
local_descs = ep.register_memory(buf_v)
# Add remote endpoint
success, conn_id = ep.add_remote_endpoint(remote_metadata)
assert success, "Failed to add remote endpoint"
# Wait for server to send remote descriptors (exchange memory addresses)
remote_descs_serialized = _recv_bytes(src=peer)
remote_descs = ep.deserialize_descs(remote_descs_serialized)
# Start transfer
xfer_handle = ep.transfer(conn_id, "WRITE", local_descs, remote_descs)
assert xfer_handle is not None, "Failed to start transfer"
# Check transfer state until complete
while not ep.check_xfer_state(xfer_handle):
pass
total += sz
dist.barrier()
elapsed = time.perf_counter() - start
print(
f"[Client] {_pretty(sz):>8} : "
f"{(total * 8) / elapsed / 1e9:6.2f} Gbps | "
f"{total / elapsed / 1e9:6.2f} GB/s | "
f"{elapsed / args.iters:6.6f} s"
)
print("[Client] Benchmark complete")
def parse_sizes(v: str) -> List[int]:
"""Parse comma-separated size list."""
try:
return [int(x) for x in v.split(",") if x]
except ValueError:
raise argparse.ArgumentTypeError("bad --sizes")
def main():
p = argparse.ArgumentParser("UCCL Ray P2P benchmark using transfer API")
p.add_argument("--local-gpu-idx", type=int, default=0)
p.add_argument("--num-cpus", type=int, default=4)
p.add_argument("--device", choices=["cpu", "gpu"], default="gpu")

the server:

  • allocate a new buffer and call register_memory.
  • call get_serialized_descs and sends them via Pytorch OOB.
  • Do nothing.
  • call Pytorch barrier.

@YangZhou1997
Copy link
Member

Nice work @zhongjiechen! Is this benchmark done on AMD servers? Is the performance gap (compared to 50G) because of out-of-band communication?

@zhongjiechen
Copy link
Member Author

zhongjiechen commented Jan 23, 2026

Nice work @zhongjiechen! Is this benchmark done on AMD servers? Is the performance gap (compared to 50G) because of out-of-band communication?

Yes, AMD servers. Because of OOB for exchanging descriptors and synchronizing iterations. And it should also contain the overhead of register_memory. These numbers sound reasonable to me.

@YangZhou1997
Copy link
Member

Got you! Is it possible to also measure the pure RDMA transfer performance in this benchmark?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants