-
Notifications
You must be signed in to change notification settings - Fork 448
Expand file tree
/
Copy path_monarch.py
More file actions
54 lines (44 loc) · 1.86 KB
/
_monarch.py
File metadata and controls
54 lines (44 loc) · 1.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
# Copyright (c) Meta Platforms, Inc. and affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import annotations
from torchrl.modules.inference_server._queue_transport import (
_QueueInferenceClient,
QueueBasedTransport,
)
class MonarchTransport(QueueBasedTransport):
"""Transport using Monarch for distributed inference on GPU clusters.
Uses Monarch's actor model and RDMA-capable channels for efficient
cross-node communication. Monarch is imported lazily at instantiation
time; importing the class itself does not require Monarch.
.. note::
This transport requires ``monarch`` to be installed. It is designed
for large-scale GPU clusters where Monarch is the preferred
communication layer.
Keyword Args:
max_queue_size (int): maximum size of the request queue.
Default: ``1000``.
"""
def __init__(self, *, max_queue_size: int = 1000):
super().__init__()
try:
import monarch # noqa: F401
from monarch.tools.queue import MonarchQueue
except ImportError:
raise ImportError(
"Monarch is required for MonarchTransport. "
"Install it following the Monarch documentation."
)
self._request_queue = MonarchQueue(maxsize=max_queue_size)
self._response_queues: dict[int, MonarchQueue] = {}
self._MonarchQueue = MonarchQueue
def _make_response_queue(self):
return self._MonarchQueue(maxsize=1000)
def client(self) -> _QueueInferenceClient:
"""Create an actor-side client with a dedicated response queue.
Returns:
A :class:`_QueueInferenceClient` that can be passed to a Monarch
actor.
"""
return super().client()