kiwipy/rmq related modules into independent module#297
kiwipy/rmq related modules into independent module#297unkcpz merged 22 commits intoaiidateam:devfrom
Conversation
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## dev #297 +/- ##
======================================
Coverage ? 89.46%
======================================
Files ? 28
Lines ? 3196
Branches ? 0
======================================
Hits ? 2859
Misses ? 337
Partials ? 0 ☔ View full report in Codecov by Sentry. |
fc52fcd to
c0a8bbd
Compare
4960836 to
47ac8e4
Compare
85fc72a to
da644ac
Compare
49991fe to
f698369
Compare
|
Rebase with uv.lock, the lines changed are much less. It is ready for another review @agoscinski |
| ) | ||
| communicator._loop.set_debug(True) | ||
| comm._loop.set_debug(True) | ||
| coordinator = RmqCoordinator(comm) |
There was a problem hiding this comment.
This is a typical example how I wrap the original rmq communicator into the Coordinator interface.
| def hook_rpc_receiver( | ||
| self, | ||
| receiver: 'Receiver', | ||
| identifier: 'ID_TYPE | None' = None, | ||
| ) -> Any: ... | ||
|
|
||
| def hook_broadcast_receiver( | ||
| self, | ||
| receiver: 'Receiver', | ||
| subject_filters: list[Hashable | Pattern[str]] | None = None, | ||
| sender_filters: list[Hashable | Pattern[str]] | None = None, | ||
| identifier: 'ID_TYPE | None' = None, | ||
| ) -> Any: ... | ||
|
|
||
| def hook_task_receiver( | ||
| self, | ||
| receiver: 'Receiver', | ||
| identifier: 'ID_TYPE | None' = None, | ||
| ) -> 'ID_TYPE': ... | ||
|
|
||
| def unhook_rpc_receiver(self, identifier: 'ID_TYPE | None') -> None: ... | ||
|
|
||
| def unhook_broadcast_receiver(self, identifier: 'ID_TYPE | None') -> None: ... | ||
|
|
||
| def unhook_task_receiver(self, identifier: 'ID_TYPE') -> None: ... |
There was a problem hiding this comment.
I rename the interfaces to distinguish the coordinator from rmq communicator.
agoscinski
left a comment
There was a problem hiding this comment.
Need to pause review. Posting what I have so far now.
src/plumpy/rmq/process_control.py
Outdated
|
|
||
| # This class not conform with typing of ProcessController protocol. | ||
| # Does't matter too much, since this controller is not directly used as the controller by downstream. | ||
| class RemoteProcessController: |
There was a problem hiding this comment.
I would inherit from the protocol to make it explicit that we implement it
class RemoteProcessController(ProcessController):It defeats part of the idea Protocol that you do not need inheritance, but from this file it is not clear that this is implemented. Still you get the advantage (or disadvantage, depending how one sees it) of moving the check that the class is correctly implemented to static type checking phase and not runtime is for abstract classes. (EDIT: The same is True for abstract classes I really don't get the benifit)
Further reading: https://peps.python.org/pep-0544/#explicitly-declaring-implementation
There was a problem hiding this comment.
Agree, I inherit for both, these two controllers do not need to be generic. Make a lot sense to just explicitly declaring the implementation.
tests/utils.py
Outdated
| Snapshot = collections.namedtuple('Snapshot', ['state', 'bundle', 'outputs']) | ||
|
|
||
|
|
||
| class MockCoordinator: |
There was a problem hiding this comment.
I think we should have a test that this actually implements the protocol. By inheriting from the Protocol the type checker would pick it up but I am not sure if the type checker is running on the tests.
There was a problem hiding this comment.
Just give it a try. For MockCoordinator, sure, didn't see the power of protocol. When it comes to RmqCoordinator where I want to have a generic typing support for the inner communicator (in aiida, it is LoopCommunicator, in plumpy, there are RmqCommunicator and RmqThreadCommunicator).
I am sure there should be ways to workaround this:
tests/rmq/__init__.py:19: in <module>
class RmqCoordinator(Generic[U], Coordinator):
../../.local/share/uv/python/cpython-3.9.21-linux-x86_64-gnu/lib/python3.9/abc.py:106: in __new__
cls = super().__new__(mcls, name, bases, namespace, **kwargs)
E TypeError: Cannot create a consistent method resolution
E order (MRO) for bases Generic, Coordinator
I believe this can make our life a bit uneasy when it comes to define more generic types that need to work with static type checking.
It is a valid point that the test is better to add, so I add runtime_checkable to Coordinator and have two tests for MockCoordinator and RmqCoordinator respectively. (4242057)
There was a problem hiding this comment.
Okay, that was my skill issue. What I should do is:
class RmqCoordinator(Coordinator, Generic[U]):
...There was a problem hiding this comment.
Was the inheritance order important or what was the issue something else?
There was a problem hiding this comment.
The problem was Coordinator as a protocol is already inherited from Generic. Looks it again, I still don't think it is a good way to put two parent class here.
There was a problem hiding this comment.
A note for my self.
In the current situation, there is no actually benefit to use protocol over abc class. I can just argue it is a bit more "flexible", with the cost of losing the explicity. The advantage only shows up when there is another implementation that has its own way to use its own "Coordinator" then making RmqCoordinator to work with that case without inherit can be only possible with protocol. But we never need to get there.
Therefore, the style will be using protocol but explicitly inherit from it for subclass.
| return new_future | ||
|
|
||
|
|
||
| # XXX: this required in aiida-core, see if really need this unwrap. |
There was a problem hiding this comment.
Did you change the logic somewhere that it does not need to be unwrapped? I am actually not sure anymore why we have these nested Futures.
There was a problem hiding this comment.
The only place that uses this unwrap is in aiida_core/src/aiida/engine/processes/control.py. This piece of shitty workaround was actually add by me :( aiidateam/aiida-core@cd0d15c what a shame.
I said the workaround "should and can be removed". I'll give it a look on the aiida-core side after this PR.
There was a problem hiding this comment.
Added comments for future changes in the dev branch to PR #319
subscriber -> receiver to distinguish interfaces from RMQ communicator.
Explicitly declare protocol implementations
|
It is rebased and merged to |
The refactoring is targeting to decouple the dependencies of using kiwipy+rmq as the communicator for the process control.
By forming a
Coordinatorprotocol contract, the different type of rmq/kiwipy related codes are removed out from plumpy logic. The new contract also pave the way to make it clearly show how a new type coordinator can be implemented (future examples will be thetatzelwurma task broker that has scheduler support and file based task broker require no background service).For the prototype of how a coordinator should look like, the
MockCoordinatorintests/utilsis the coordinator that store things in memory, and can serve as the lightweight ephemeral daemon without persistent functionality.Another major change here is hand write the resolver of future by mimic how tho asyncio does for wrapping
concurrent.futures.Futureintoasyncio.Future. I use the same way to convertasyncio.Futureintoconcurent.futures.Future(which is thekiwipy.Futureas alias).aio_pikaimport lazily by moving the rmq exceptions tormqmodule, this can increase the performance ofimport aiida; aiida.orm.CancellableActionusing composite to behave as a Future like object.asyncio.Futurein favor of aliasplumpy.Futureandconcurrent.futures.Futureinstead of aliaskiwipy.Future._chainand_copy_futuresince we can not just rely on the API of asyncio that is not exposed.coordinator/Communicatorprotocol.coordinator/Coordinatorprotocol and wrap rmq/communicator as a coordinator that not require changs in kiwipy.The API for plumpy process can be more compact instead of using kiwipy/rmq "subscriber" concept. (how to replace rpc pattern??)out of the scope of this PR.message_receiverandProcessLauncher.Rmq move out as an extra module unkcpz/plumpy#6