Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
7724121
Defer import of aio_pika
unkcpz Dec 11, 2024
97746b4
Explicit future implementation: distinguish concurrent.future.Future …
unkcpz Dec 11, 2024
f5e5ec4
Move communication into rmq module
unkcpz Dec 11, 2024
c2c9a65
Move TaskRejectError as the common exception for task launch
unkcpz Dec 14, 2024
4dae773
Remove useless communicator param passed to ProcessLaunch __call__
unkcpz Dec 14, 2024
56c18d4
Forming Communicator protocol
unkcpz Dec 14, 2024
2d9fdb9
Remove kiwipy/rmq dependencies of process module
unkcpz Dec 14, 2024
9d6655e
Interface change from communicator -> coordinator
unkcpz Dec 17, 2024
f8cc8ed
Remove unnecessary task_send ab from RemoteProcessControl interface
unkcpz Dec 17, 2024
34d3842
Interface for ProcessController
unkcpz Dec 17, 2024
bf99d23
RmqCoordinator example to show how using interface can avoid making c…
unkcpz Dec 17, 2024
15b267b
broadcast subscriber has versatile filters
unkcpz Dec 18, 2024
23d954c
Generic typing for Coordinator
unkcpz Dec 19, 2024
46bd2d3
Adopt new message protocol and changes required for aiida-core support
unkcpz Dec 19, 2024
8e23a88
Simpler create_task_threadsafe implementation
unkcpz Dec 30, 2024
f3f3095
Remove RmqCoordinator to tests/util only
unkcpz Jan 10, 2025
b577f5e
Export plumpy.futures.Future
unkcpz Jan 11, 2025
b0877db
Remove first unnecessary `_comm` argument to subscriber
unkcpz Feb 21, 2025
4b0267c
Protocol fulfill for RemoteProcessThreadController
unkcpz Feb 21, 2025
c4c50e0
Rename interfaces of Coordinator
unkcpz Feb 21, 2025
b1e4e1e
ralex - T -> CommT in generic type at communications.py
unkcpz Feb 21, 2025
14c6348
misc: pre-commit / uv.lock
unkcpz Feb 21, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/source/concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ WorkChains support the use of logical constructs such as `If_` and `While_` to c

A `Controller` can control processes throughout their lifetime, by sending and receiving messages. It can launch, pause, continue, kill and check status of the process.

The {py:class}`~plumpy.process_comms.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.
The {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController` can communicate with the process over the thread communicator provided by {{kiwipy}} which can subscribe and send messages over the {{rabbitmq}} message broker.

The thread communicator runs on a independent thread (event loop) and so will not be blocked by sometimes long waiting times in the process event loop.
Using RabbitMQ means that even if the computer is terminated unexpectedly, messages are persisted and can be run once the computer restarts.
2 changes: 1 addition & 1 deletion docs/source/nitpick-exceptions
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ py:class plumpy.base.state_machine.State
py:class State
py:class Process
py:class plumpy.futures.CancellableAction
py:class plumpy.communications.LoopCommunicator
py:class plumpy.rmq.communications.LoopCommunicator
py:class plumpy.persistence.PersistedPickle
py:class plumpy.utils.AttributesFrozendict
py:class plumpy.workchains._FunctionCall
Expand Down
2 changes: 1 addition & 1 deletion docs/source/tutorial.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@
"The {py:class}`~plumpy.workchains.WorkChain`\n",
": A subclass of `Process` that allows for running a process as a set of discrete steps (also known as instructions), with the ability to save the state of the process after each instruction has completed.\n",
"\n",
"The process `Controller` (principally the {py:class}`~plumpy.process_comms.RemoteProcessThreadController`)\n",
"The process `Controller` (principally the {py:class}`~plumpy.rmq.process_control.RemoteProcessThreadController`)\n",
": To control the process or workchain throughout its lifetime."
]
},
Expand Down
12 changes: 7 additions & 5 deletions src/plumpy/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,21 @@

import logging

from .communications import *
# interfaces
from .controller import ProcessController
from .coordinator import Coordinator
from .events import *
from .exceptions import *
from .futures import *
from .loaders import *
from .message import *
from .mixins import *
from .persistence import *
from .ports import *
from .process_comms import *
from .process_listener import *
from .process_states import *
from .processes import *
from .rmq import *
from .utils import *
from .workchains import *

Expand All @@ -27,14 +30,13 @@
+ futures.__all__
+ mixins.__all__
+ persistence.__all__
+ communications.__all__
+ process_comms.__all__
+ message.__all__
+ process_listener.__all__
+ workchains.__all__
+ loaders.__all__
+ ports.__all__
+ process_states.__all__
)
) + ['ProcessController', 'Coordinator']


# Do this se we don't get the "No handlers could be found..." warnings that will be produced
Expand Down
59 changes: 59 additions & 0 deletions src/plumpy/broadcast_filter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
# -*- coding: utf-8 -*-
# type: ignore
import re
import typing


class BroadcastFilter:
"""A filter that can be used to limit the subjects and/or senders that will be received"""

def __init__(self, subscriber, subject=None, sender=None):
self._subscriber = subscriber
self._subject_filters = []
self._sender_filters = []
if subject is not None:
self.add_subject_filter(subject)
if sender is not None:
self.add_sender_filter(sender)

@property
def __name__(self):
return 'BroadcastFilter'

def __call__(self, body, sender=None, subject=None, correlation_id=None):
if self.is_filtered(sender, subject):
return None
return self._subscriber(body, sender, subject, correlation_id)

def is_filtered(self, sender, subject) -> bool:
if subject is not None and self._subject_filters and not any(check(subject) for check in self._subject_filters):
return True

if sender is not None and self._sender_filters and not any(check(sender) for check in self._sender_filters):
return True

return False

def add_subject_filter(self, subject_filter):
self._subject_filters.append(self._ensure_filter(subject_filter))

def add_sender_filter(self, sender_filter):
self._sender_filters.append(self._ensure_filter(sender_filter))

@classmethod
def _ensure_filter(cls, filter_value):
if isinstance(filter_value, str):
return re.compile(filter_value.replace('.', '[.]').replace('*', '.*')).match
if isinstance(filter_value, typing.Pattern): # pylint: disable=isinstance-second-argument-not-valid-type
return filter_value.match

return lambda val: val == filter_value

@classmethod
def _make_regex(cls, filter_str):
"""
:param filter_str: The filter string
:type filter_str: str
:return: The regular expression object
"""
return re.compile(filter_str.replace('.', '[.]'))
136 changes: 136 additions & 0 deletions src/plumpy/controller.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

from collections.abc import Sequence
from typing import Any, Hashable, Optional, Protocol, Union, runtime_checkable

from plumpy import loaders
from plumpy.message import MessageType
from plumpy.utils import PID_TYPE

ProcessResult = Any
ProcessStatus = Any


@runtime_checkable
class ProcessController(Protocol):
"""
Control processes using coroutines that will send messages and wait
(in a non-blocking way) for their response
"""

def get_status(self, pid: 'PID_TYPE') -> ProcessStatus:
"""
Get the status of a process with the given PID
:param pid: the process id
:return: the status response from the process
"""
...

def pause_process(self, pid: 'PID_TYPE', msg_text: str | None = None) -> Any:
"""
Pause the process

:param pid: the pid of the process to pause
:param msg: optional pause message
:return: True if paused, False otherwise
"""
...

def pause_all(self, msg_text: str | None) -> None:
"""Pause all processes that are subscribed to the same coordinator

:param msg_text: an optional pause message text
"""
...

def play_process(self, pid: 'PID_TYPE') -> ProcessResult:
"""Play the process

:param pid: the pid of the process to play
:return: True if played, False otherwise
"""
...

def play_all(self) -> None:
"""Play all processes that are subscribed to the same coordinator"""

def kill_process(self, pid: 'PID_TYPE', msg_text: str | None = None) -> Any:
"""Kill the process

:param pid: the pid of the process to kill
:param msg: optional kill message
:return: True if killed, False otherwise
"""
...

def kill_all(self, msg_text: Optional[str]) -> None:
"""Kill all processes that are subscribed to the same coordinator

:param msg: an optional pause message
"""
...

def notify_msg(self, msg: MessageType, sender: Hashable | None = None, subject: str | None = None) -> None:
"""
Notify all processes by broadcasting of a msg

:param msg: an optional pause message
"""

def continue_process(
self, pid: 'PID_TYPE', tag: Optional[str] = None, nowait: bool = False, no_reply: bool = False
) -> Union[None, PID_TYPE, ProcessResult]:
"""Continue the process

:param _communicator: the communicator
:param pid: the pid of the process to continue
:param tag: the checkpoint tag to continue from
"""
...

def launch_process(
self,
process_class: str,
init_args: Optional[Sequence[Any]] = None,
init_kwargs: Optional[dict[str, Any]] = None,
persist: bool = False,
loader: Optional[loaders.ObjectLoader] = None,
nowait: bool = False,
no_reply: bool = False,
) -> Union[None, PID_TYPE, ProcessResult]:
"""Launch a process given the class and constructor arguments

:param process_class: the class of the process to launch
:param init_args: the constructor positional arguments
:param init_kwargs: the constructor keyword arguments
:param persist: should the process be persisted
:param loader: the classloader to use
:param nowait: if True, don't wait for the process to send a response, just return the pid
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
:return: the result of launching the process
"""
...

def execute_process(
self,
process_class: str,
init_args: Optional[Sequence[Any]] = None,
init_kwargs: Optional[dict[str, Any]] = None,
loader: Optional[loaders.ObjectLoader] = None,
nowait: bool = False,
no_reply: bool = False,
) -> Union[None, PID_TYPE, ProcessResult]:
"""Execute a process. This call will first send a create task and then a continue task over
the communicator. This means that if communicator messages are durable then the process
will run until the end even if this interpreter instance ceases to exist.

:param process_class: the process class to execute
:param init_args: the positional arguments to the class constructor
:param init_kwargs: the keyword arguments to the class constructor
:param loader: the class loader to use
:param nowait: if True, don't wait for the process to send a response
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
:return: the result of executing the process
"""
...
56 changes: 56 additions & 0 deletions src/plumpy/coordinator.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# -*- coding: utf-8 -*-
from __future__ import annotations

from re import Pattern
from typing import TYPE_CHECKING, Any, Callable, Hashable, Protocol, runtime_checkable

if TYPE_CHECKING:
ID_TYPE = Hashable
Receiver = Callable[..., Any]


@runtime_checkable
class Coordinator(Protocol):
def hook_rpc_receiver(
self,
receiver: 'Receiver',
identifier: 'ID_TYPE | None' = None,
) -> Any: ...

def hook_broadcast_receiver(
self,
receiver: 'Receiver',
subject_filters: list[Hashable | Pattern[str]] | None = None,
sender_filters: list[Hashable | Pattern[str]] | None = None,
identifier: 'ID_TYPE | None' = None,
) -> Any: ...

def hook_task_receiver(
self,
receiver: 'Receiver',
identifier: 'ID_TYPE | None' = None,
) -> 'ID_TYPE': ...

def unhook_rpc_receiver(self, identifier: 'ID_TYPE | None') -> None: ...

def unhook_broadcast_receiver(self, identifier: 'ID_TYPE | None') -> None: ...

def unhook_task_receiver(self, identifier: 'ID_TYPE') -> None: ...
Comment on lines +14 to +38
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I rename the interfaces to distinguish the coordinator from rmq communicator.


def rpc_send(
self,
recipient_id: Hashable,
msg: Any,
) -> Any: ...

def broadcast_send(
self,
body: Any | None,
sender: 'ID_TYPE | None' = None,
subject: str | None = None,
correlation_id: 'ID_TYPE | None' = None,
) -> Any: ...

def task_send(self, task: Any, no_reply: bool = False) -> Any: ...

def close(self) -> None: ...
29 changes: 26 additions & 3 deletions src/plumpy/exceptions.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
# -*- coding: utf-8 -*-
from typing import Optional

__all__ = ['ClosedError', 'InvalidStateError', 'KilledError', 'PersistenceError', 'UnsuccessfulResult']
__all__ = [
'ClosedError',
'CoordinatorConnectionError',
'CoordinatorTimeoutError',
'InvalidStateError',
'KilledError',
'PersistenceError',
'UnsuccessfulResult',
]


class KilledError(Exception):
"""The process was killed."""


class InvalidStateError(Exception):
"""
Raised when an operation is attempted that requires the process to be in a state
"""Raised when an operation is attempted that requires the process to be in a state
that is different from the current state
"""

Expand All @@ -33,3 +40,19 @@ class PersistenceError(Exception):

class ClosedError(Exception):
"""Raised when an mutable operation is attempted on a closed process"""


class TaskRejectedError(Exception):
"""A task was rejected by the coordinacor"""


class CoordinatorCommunicationError(Exception):
"""Generic coordinator communication error"""


class CoordinatorConnectionError(ConnectionError):
"""Raised when coordinator cannot be connected"""


class CoordinatorTimeoutError(TimeoutError):
"""Raised when communicate with coordinator timeout"""
Loading