Skip to content

Commit fc52fcd

Browse files
committed
Remove kiwipy/rmq dependencies of process module
1 parent a753f90 commit fc52fcd

File tree

9 files changed

+28
-28
lines changed

9 files changed

+28
-28
lines changed

src/plumpy/coordinator.py

Lines changed: 10 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -1,21 +1,19 @@
1-
from typing import Any, Callable, Protocol
1+
# -*- coding: utf-8 -*-
2+
from typing import Any, Callable, Pattern, Protocol
23

34
RpcSubscriber = Callable[['Communicator', Any], Any]
45
BroadcastSubscriber = Callable[['Communicator', Any, Any, Any, Any], Any]
56

6-
class Communicator(Protocol):
77

8-
def add_rpc_subscriber(self, subscriber: RpcSubscriber, identifier=None) -> Any:
9-
...
8+
class Communicator(Protocol):
9+
def add_rpc_subscriber(self, subscriber: RpcSubscriber, identifier=None) -> Any: ...
1010

11-
def add_broadcast_subscriber(self, subscriber: BroadcastSubscriber, identifier=None) -> Any:
12-
...
11+
def add_broadcast_subscriber(
12+
self, subscriber: BroadcastSubscriber, subject_filter: str | Pattern[str] | None = None, identifier=None
13+
) -> Any: ...
1314

14-
def remove_rpc_subscriber(self, identifier):
15-
...
15+
def remove_rpc_subscriber(self, identifier): ...
1616

17-
def remove_broadcast_subscriber(self, identifier):
18-
...
17+
def remove_broadcast_subscriber(self, identifier): ...
1918

20-
def broadcast_send(self, body, sender=None, subject=None, correlation_id=None) -> bool:
21-
...
19+
def broadcast_send(self, body, sender=None, subject=None, correlation_id=None) -> bool: ...

src/plumpy/exceptions.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,5 +39,6 @@ class PersistenceError(Exception):
3939
class ClosedError(Exception):
4040
"""Raised when an mutable operation is attempted on a closed process"""
4141

42+
4243
class TaskRejectedError(Exception):
43-
""" A task was rejected by the coordinacor"""
44+
"""A task was rejected by the coordinacor"""

src/plumpy/futures.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import contextlib
88
from typing import Any, Awaitable, Callable, Generator, Optional
99

10-
__all__ = ['CancellableAction', 'create_task', 'create_task', 'capture_exceptions']
10+
__all__ = ['CancellableAction', 'capture_exceptions', 'create_task', 'create_task']
1111

1212

1313
class InvalidFutureError(Exception):

src/plumpy/processes.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -38,15 +38,14 @@
3838
except ModuleNotFoundError:
3939
from contextvars import ContextVar
4040

41-
import kiwipy
4241
import yaml
4342

4443
from . import events, exceptions, message, persistence, ports, process_states, utils
45-
from .futures import capture_exceptions, CancellableAction
4644
from .base import state_machine
4745
from .base.state_machine import StateEntryFailed, StateMachine, TransitionFailed, event
4846
from .base.utils import call_with_super_check, super_check
4947
from .event_helper import EventHelper
48+
from .futures import CancellableAction, capture_exceptions
5049
from .process_listener import ProcessListener
5150
from .process_spec import ProcessSpec
5251
from .utils import PID_TYPE, SAVED_STATE_TYPE, protected
@@ -313,9 +312,9 @@ def init(self) -> None:
313312

314313
try:
315314
# filter out state change broadcasts
316-
# TODO: pattern filter should be moved to add_broadcast_subscriber.
317-
subscriber = kiwipy.BroadcastFilter(self.broadcast_receive, subject=re.compile(r'^(?!state_changed).*'))
318-
identifier = self._communicator.add_broadcast_subscriber(subscriber, identifier=str(self.pid))
315+
identifier = self._communicator.add_broadcast_subscriber(
316+
self.broadcast_receive, subject_filter=re.compile(r'^(?!state_changed).*'), identifier=str(self.pid)
317+
)
319318
self.add_cleanup(functools.partial(self._communicator.remove_broadcast_subscriber, identifier))
320319
except concurrent.futures.TimeoutError:
321320
self.logger.exception('Process<%s>: failed to register as a broadcast subscriber', self.pid)
@@ -715,6 +714,7 @@ def on_entered(self, from_state: Optional[process_states.State]) -> None:
715714
call_with_super_check(self.on_killed)
716715

717716
if self._communicator and isinstance(self.state, enum.Enum):
717+
# FIXME: move all to `coordinator.broadcast()` call and in rmq implement coordinator
718718
from plumpy.rmq.exceptions import CommunicatorChannelInvalidStateError, CommunicatorConnectionClosed
719719

720720
from_label = cast(enum.Enum, from_state.LABEL).value if from_state is not None else None

src/plumpy/rmq/communications.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -131,10 +131,10 @@ def remove_task_subscriber(self, identifier: 'ID_TYPE') -> None:
131131
return self._communicator.remove_task_subscriber(identifier)
132132

133133
def add_broadcast_subscriber(
134-
self, subscriber: 'BroadcastSubscriber', identifier: Optional['ID_TYPE'] = None
134+
self, subscriber: 'BroadcastSubscriber', subject_filter=None, identifier: Optional['ID_TYPE'] = None
135135
) -> 'ID_TYPE':
136136
converted = convert_to_comm(subscriber, self._loop)
137-
return self._communicator.add_broadcast_subscriber(converted, identifier)
137+
return self._communicator.add_broadcast_subscriber(converted, subject_filter, identifier)
138138

139139
def remove_broadcast_subscriber(self, identifier: 'ID_TYPE') -> None:
140140
return self._communicator.remove_broadcast_subscriber(identifier)

src/plumpy/rmq/process_comms.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,23 +3,22 @@
33

44
import asyncio
55
import copy
6-
from typing import TYPE_CHECKING, Any, Dict, Optional, Sequence, Union, cast
6+
from typing import Any, Dict, Optional, Sequence, Union
77

88
import kiwipy
99

10+
from plumpy import loaders
1011
from plumpy.message import (
12+
KILL_MSG,
1113
MESSAGE_KEY,
1214
PAUSE_MSG,
1315
PLAY_MSG,
1416
STATUS_MSG,
15-
KILL_MSG,
1617
Intent,
1718
create_continue_body,
1819
create_create_body,
1920
create_launch_body,
2021
)
21-
22-
from plumpy import loaders
2322
from plumpy.utils import PID_TYPE
2423

2524
__all__ = [

tests/rmq/test_communications.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ def test_add_broadcast_subscriber(loop_communicator, subscriber):
5656
assert loop_communicator.add_broadcast_subscriber(subscriber) is not None
5757

5858
identifier = 'identifier'
59-
assert loop_communicator.add_broadcast_subscriber(subscriber, identifier) == identifier
59+
assert loop_communicator.add_broadcast_subscriber(subscriber, identifier=identifier) == identifier
6060

6161

6262
def test_remove_broadcast_subscriber(loop_communicator, subscriber):

tests/rmq/test_communicator.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import tempfile
88
import uuid
99

10+
from kiwipy.rmq.communicator import kiwipy
1011
import pytest
1112
import shortuuid
1213
import yaml
@@ -81,7 +82,7 @@ def get_broadcast(_comm, body, sender, subject, correlation_id):
8182
assert result == BROADCAST
8283

8384
@pytest.mark.asyncio
84-
async def test_broadcast_filter(self, loop_communicator):
85+
async def test_broadcast_filter(self, loop_communicator: kiwipy.Communicator):
8586
broadcast_future = asyncio.Future()
8687

8788
def ignore_broadcast(_comm, body, sender, subject, correlation_id):
@@ -90,7 +91,7 @@ def ignore_broadcast(_comm, body, sender, subject, correlation_id):
9091
def get_broadcast(_comm, body, sender, subject, correlation_id):
9192
broadcast_future.set_result(True)
9293

93-
loop_communicator.add_broadcast_subscriber(BroadcastFilter(ignore_broadcast, subject='other'))
94+
loop_communicator.add_broadcast_subscriber(ignore_broadcast, subject_filter='other')
9495
loop_communicator.add_broadcast_subscriber(get_broadcast)
9596
loop_communicator.broadcast_send(
9697
**{'body': 'present', 'sender': 'Martin', 'subject': 'sup', 'correlation_id': 420}

tests/test_processes.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1068,6 +1068,7 @@ def test_paused(self):
10681068
self.assertSetEqual(events_tester.called, events_tester.expected_events)
10691069

10701070
def test_broadcast(self):
1071+
# FIXME: here I need a mock test
10711072
communicator = kiwipy.LocalCommunicator()
10721073

10731074
messages = []

0 commit comments

Comments
 (0)