Skip to content

Commit 07c30db

Browse files
committed
WIP: Implement option to allow force kill
Do not peform stepping when force_kill command is send. This avoids getting stuck in the kill callback.
1 parent b3837fc commit 07c30db

File tree

2 files changed

+55
-8
lines changed

2 files changed

+55
-8
lines changed

src/plumpy/process_comms.py

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030

3131
INTENT_KEY = 'intent'
3232
MESSAGE_KEY = 'message'
33+
FORCE_KILL_KEY = 'force_kill'
3334

3435

3536
class Intent:
@@ -196,17 +197,19 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult':
196197
result = await asyncio.wrap_future(future)
197198
return result
198199

199-
async def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'ProcessResult':
200+
async def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None, force_kill: bool = False) -> 'ProcessResult':
200201
"""
201202
Kill the process
202203
203204
:param pid: the pid of the process to kill
204205
:param msg: optional kill message
205206
:return: True if killed, False otherwise
206207
"""
208+
breakpoint()
207209
message = copy.copy(KILL_MSG)
208210
if msg is not None:
209211
message[MESSAGE_KEY] = msg
212+
message[FORCE_KILL_KEY] = force_kill
210213

211214
# Wait for the communication to go through
212215
kill_future = self._communicator.rpc_send(pid, message)
@@ -375,7 +378,7 @@ def play_all(self) -> None:
375378
"""
376379
self._communicator.broadcast_send(None, subject=Intent.PLAY)
377380

378-
def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future:
381+
def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None, force_kill: bool = False) -> kiwipy.Future:
379382
"""
380383
Kill the process
381384
@@ -384,9 +387,11 @@ def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Fut
384387
:return: a response future from the process to be killed
385388
386389
"""
390+
breakpoint()
387391
message = copy.copy(KILL_MSG)
388392
if msg is not None:
389393
message[MESSAGE_KEY] = msg
394+
message[FORCE_KILL_KEY] = force_kill
390395

391396
return self._communicator.rpc_send(pid, message)
392397

@@ -405,6 +410,7 @@ def continue_process(
405410
nowait: bool = False,
406411
no_reply: bool = False
407412
) -> Union[None, PID_TYPE, ProcessResult]:
413+
breakpoint()
408414
message = create_continue_body(pid=pid, tag=tag, nowait=nowait)
409415
return self.task_send(message, no_reply=no_reply)
410416

@@ -479,6 +485,7 @@ def task_send(self, message: Any, no_reply: bool = False) -> Optional[Any]:
479485
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
480486
:return: the response from the remote side (if no_reply=False)
481487
"""
488+
breakpoint()
482489
return self._communicator.task_send(message, no_reply=no_reply)
483490

484491

src/plumpy/processes.py

Lines changed: 46 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -51,7 +51,29 @@
5151

5252
__all__ = ['Process', 'ProcessSpec', 'BundleKeys', 'TransitionFailed']
5353

54+
55+
#file_handler = logging.FileHandler(filename='tmp.log')
56+
#stdout_handler = logging.StreamHandler(stream=sys.stdout)
57+
#handlers = [file_handler, stdout_handler]
58+
#
59+
#logging.basicConfig(
60+
# level=logging.DEBUG,
61+
# format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
62+
# handlers=handlers
63+
#)
64+
65+
#file_handler = logging.FileHandler(filename="/Users/alexgo/code/aiida-core/plumpy2.log")
66+
#stdout_handler = logging.StreamHandler(stream=sys.stdout)
67+
#handlers = [file_handler, stdout_handler]
68+
#
69+
#logging.basicConfig(
70+
# level=logging.DEBUG,
71+
# format='[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s',
72+
# handlers=handlers
73+
#)
5474
_LOGGER = logging.getLogger(__name__)
75+
76+
5577
PROCESS_STACK = ContextVar('process stack', default=[])
5678

5779

@@ -389,8 +411,8 @@ def logger(self) -> logging.Logger:
389411
:return: The logger.
390412
391413
"""
392-
if self._logger is not None:
393-
return self._logger
414+
#if self._logger is not None:
415+
# return self._logger
394416

395417
return _LOGGER
396418

@@ -908,6 +930,7 @@ def message_receive(self, _comm: kiwipy.Communicator, msg: Dict[str, Any]) -> An
908930
:param msg: the message
909931
:return: the outcome of processing the message, the return value will be sent back as a response to the sender
910932
"""
933+
breakpoint()
911934
self.logger.debug("Process<%s>: received RPC message with communicator '%s': %r", self.pid, _comm, msg)
912935

913936
intent = msg[process_comms.INTENT_KEY]
@@ -917,7 +940,11 @@ def message_receive(self, _comm: kiwipy.Communicator, msg: Dict[str, Any]) -> An
917940
if intent == process_comms.Intent.PAUSE:
918941
return self._schedule_rpc(self.pause, msg=msg.get(process_comms.MESSAGE_KEY, None))
919942
if intent == process_comms.Intent.KILL:
920-
return self._schedule_rpc(self.kill, msg=msg.get(process_comms.MESSAGE_KEY, None))
943+
breakpoint()
944+
# have problems to pass new argument get
945+
# Error: failed to kill Process<699>: Process.kill() got an unexpected keyword argument 'force_kill'
946+
#return self._schedule_rpc(self.kill, msg=msg.get(process_comms.MESSAGE_KEY, None), force_kill=msg.get(process_comms.FORCE_KILL_KEY, False))
947+
return self._schedule_rpc(self.kill, msg=msg)
921948
if intent == process_comms.Intent.STATUS:
922949
status_info: Dict[str, Any] = {}
923950
self.get_status_info(status_info)
@@ -934,6 +961,7 @@ def broadcast_receive(self, _comm: kiwipy.Communicator, body: Any, sender: Any,
934961
:param _comm: the communicator that sent the message
935962
:param msg: the message
936963
"""
964+
breakpoint()
937965
# pylint: disable=unused-argument
938966
self.logger.debug(
939967
"Process<%s>: received broadcast message '%s' with communicator '%s': %r", self.pid, subject, _comm, body
@@ -945,6 +973,7 @@ def broadcast_receive(self, _comm: kiwipy.Communicator, body: Any, sender: Any,
945973
if subject == process_comms.Intent.PAUSE:
946974
return self._schedule_rpc(self.pause, msg=body)
947975
if subject == process_comms.Intent.KILL:
976+
# TODO deal with this
948977
return self._schedule_rpc(self.kill, msg=body)
949978
return None
950979

@@ -1126,11 +1155,21 @@ def fail(self, exception: Optional[BaseException], trace_back: Optional[Tracebac
11261155
"""
11271156
self.transition_to(process_states.ProcessState.EXCEPTED, exception, trace_back)
11281157

1129-
def kill(self, msg: Union[str, None] = None) -> Union[bool, asyncio.Future]:
1158+
def kill(self, msg: Union[dict, None] = None, force_kill: bool = False) -> Union[bool, asyncio.Future]:
11301159
"""
11311160
Kill the process
1161+
1162+
# PR_COMMENT have not figured out how to integrate force_kill as argument
1163+
# so I just pass the dict
1164+
11321165
:param msg: An optional kill message
11331166
"""
1167+
breakpoint()
1168+
if msg is None:
1169+
force_kill = False
1170+
else:
1171+
force_kill = msg.get(process_comms.FORCE_KILL_KEY, False)
1172+
11341173
if self.state == process_states.ProcessState.KILLED:
11351174
# Already killed
11361175
return True
@@ -1143,15 +1182,16 @@ def kill(self, msg: Union[str, None] = None) -> Union[bool, asyncio.Future]:
11431182
# Already killing
11441183
return self._killing
11451184

1146-
if self._stepping:
1185+
if self._stepping and not force_kill:
11471186
# Ask the step function to pause by setting this flag and giving the
11481187
# caller back a future
1149-
interrupt_exception = process_states.KillInterruption(msg)
1188+
interrupt_exception = process_states.KillInterruption(msg.get(process_comms.MESSAGE_KEY, None))
11501189
self._set_interrupt_action_from_exception(interrupt_exception)
11511190
self._killing = self._interrupt_action
11521191
self._state.interrupt(interrupt_exception)
11531192
return cast(futures.CancellableAction, self._interrupt_action)
11541193

1194+
breakpoint()
11551195
self.transition_to(process_states.ProcessState.KILLED, msg)
11561196
return True
11571197

0 commit comments

Comments
 (0)