Skip to content

Commit 50ef200

Browse files
zdevitofacebook-github-bot
authored andcommitted
Cleanup dead python future code (meta-pytorch#917)
Summary: Pull Request resolved: meta-pytorch#917 ghstack-source-id: 304156747 Reviewed By: mariusae Differential Revision: D80132819 fbshipit-source-id: 32faf67511efa541a1990bf19bb4270181b92f87
1 parent dbce7b4 commit 50ef200

File tree

1 file changed

+1
-75
lines changed

1 file changed

+1
-75
lines changed

python/monarch/_src/actor/actor_mesh.py

Lines changed: 1 addition & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,6 @@ class MonarchContext:
124124
mailbox: Mailbox
125125
proc_id: str
126126
point: Point
127-
send_queue: Tuple[Optional["Shared[Any]"], int]
128127
controller_controller: Optional["_ControllerController"]
129128
proc_mesh: Optional["ProcMesh"] # actually this is a ProcMeshRef under the hood
130129

@@ -134,9 +133,7 @@ def get() -> "MonarchContext":
134133
if c is None:
135134
mb = Mailbox.root_client_mailbox()
136135
proc_id = mb.actor_id.proc_id
137-
c = MonarchContext(
138-
mb, proc_id, Point(0, singleton_shape), (None, 0), None, None
139-
)
136+
c = MonarchContext(mb, proc_id, Point(0, singleton_shape), None, None)
140137
_context.set(c)
141138
return c
142139

@@ -353,76 +350,6 @@ async def task():
353350
return PythonTask.from_coroutine(task())
354351

355352

356-
class SharedProtocolAdapter:
357-
def __init__(self, inner: "Shared[ActorMeshProtocol]", supervise: bool):
358-
self._inner = inner
359-
self._supervise = supervise
360-
361-
def cast(
362-
self,
363-
message: PythonMessage,
364-
selection: str,
365-
mailbox: Mailbox,
366-
) -> None:
367-
ctx = MonarchContext.get()
368-
last, count = ctx.send_queue
369-
370-
async def task():
371-
if last is not None:
372-
await last
373-
try:
374-
self._inner.__await__()
375-
inner = await self._inner
376-
inner.cast(message, selection, mailbox)
377-
except Exception as e:
378-
match message.kind:
379-
case CallMethod(response_port=port) if port is not None:
380-
Port(port, mailbox, 0).exception(e)
381-
382-
ctx.send_queue = (PythonTask.from_coroutine(task()).spawn(), count + 1)
383-
384-
def new_with_shape(self, shape: Shape) -> "SharedProtocolAdapter":
385-
async def task():
386-
inner = await self._inner
387-
return inner.new_with_shape(shape)
388-
389-
return SharedProtocolAdapter(PythonTask.from_coroutine(task()).spawn(), False)
390-
391-
def supervision_event(self) -> "Optional[Shared[Exception]]":
392-
if not self._supervise:
393-
return None
394-
395-
async def task():
396-
inner = await self._inner
397-
return await inner.supervision_event()
398-
399-
return PythonTask.from_coroutine(task()).spawn()
400-
401-
def stop(self) -> "PythonTask[None]":
402-
async def task():
403-
await (await self._inner).stop()
404-
405-
return PythonTask.from_coroutine(task())
406-
407-
@staticmethod
408-
def _restore(inner: "ActorMeshProtocol") -> "ActorMeshProtocol":
409-
return inner
410-
411-
def __reduce_ex__(self, protocol):
412-
# blocking here means that we cannot send messages that contain actor
413-
# references from the tokio event loop
414-
if is_tokio_thread():
415-
raise NotImplementedError(
416-
"Cannot send actor references from a coroutine on the tokio event loop."
417-
"To fix this we have to either make it psosible for pickling to defer this work,"
418-
"or resolve the actor id without blocking."
419-
)
420-
return SharedProtocolAdapter._restore, (self._inner.block_on(),)
421-
422-
async def initialized(self):
423-
await self._inner
424-
425-
426353
class ActorEndpoint(Endpoint[P, R]):
427354
def __init__(
428355
self,
@@ -783,7 +710,6 @@ async def handle(
783710
mailbox,
784711
mailbox.actor_id.proc_id,
785712
Point(rank, shape),
786-
(None, 0),
787713
None,
788714
None,
789715
)

0 commit comments

Comments
 (0)