Skip to content

Commit 6f626bf

Browse files
committed
Implementation with minimal changes
1 parent 07c30db commit 6f626bf

File tree

3 files changed

+20
-56
lines changed

3 files changed

+20
-56
lines changed

src/plumpy/process_comms.py

Lines changed: 2 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@
3030

3131
INTENT_KEY = 'intent'
3232
MESSAGE_KEY = 'message'
33-
FORCE_KILL_KEY = 'force_kill'
3433

3534

3635
class Intent:
@@ -197,19 +196,17 @@ async def play_process(self, pid: 'PID_TYPE') -> 'ProcessResult':
197196
result = await asyncio.wrap_future(future)
198197
return result
199198

200-
async def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None, force_kill: bool = False) -> 'ProcessResult':
199+
async def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> 'ProcessResult':
201200
"""
202201
Kill the process
203202
204203
:param pid: the pid of the process to kill
205204
:param msg: optional kill message
206205
:return: True if killed, False otherwise
207206
"""
208-
breakpoint()
209207
message = copy.copy(KILL_MSG)
210208
if msg is not None:
211209
message[MESSAGE_KEY] = msg
212-
message[FORCE_KILL_KEY] = force_kill
213210

214211
# Wait for the communication to go through
215212
kill_future = self._communicator.rpc_send(pid, message)
@@ -378,7 +375,7 @@ def play_all(self) -> None:
378375
"""
379376
self._communicator.broadcast_send(None, subject=Intent.PLAY)
380377

381-
def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None, force_kill: bool = False) -> kiwipy.Future:
378+
def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None) -> kiwipy.Future:
382379
"""
383380
Kill the process
384381
@@ -387,11 +384,9 @@ def kill_process(self, pid: 'PID_TYPE', msg: Optional[Any] = None, force_kill: b
387384
:return: a response future from the process to be killed
388385
389386
"""
390-
breakpoint()
391387
message = copy.copy(KILL_MSG)
392388
if msg is not None:
393389
message[MESSAGE_KEY] = msg
394-
message[FORCE_KILL_KEY] = force_kill
395390

396391
return self._communicator.rpc_send(pid, message)
397392

@@ -410,7 +405,6 @@ def continue_process(
410405
nowait: bool = False,
411406
no_reply: bool = False
412407
) -> Union[None, PID_TYPE, ProcessResult]:
413-
breakpoint()
414408
message = create_continue_body(pid=pid, tag=tag, nowait=nowait)
415409
return self.task_send(message, no_reply=no_reply)
416410

@@ -485,7 +479,6 @@ def task_send(self, message: Any, no_reply: bool = False) -> Optional[Any]:
485479
:param no_reply: if True, this call will be fire-and-forget, i.e. no return value
486480
:return: the response from the remote side (if no_reply=False)
487481
"""
488-
breakpoint()
489482
return self._communicator.task_send(message, no_reply=no_reply)
490483

491484

src/plumpy/process_states.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
'Continue',
3737
'Interruption',
3838
'KillInterruption',
39+
'ForceKillInterruption',
3940
'PauseInterruption',
4041
]
4142

@@ -50,6 +51,8 @@ class Interruption(Exception):
5051
class KillInterruption(Interruption):
5152
pass
5253

54+
class ForceKillInterruption(Interruption):
55+
pass
5356

5457
class PauseInterruption(Interruption):
5558
pass

src/plumpy/processes.py

Lines changed: 15 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -51,29 +51,7 @@
5151

5252
__all__ = ['Process', 'ProcessSpec', 'BundleKeys', 'TransitionFailed']
5353

54-
55-
#file_handler = logging.FileHandler(filename='tmp.log')
56-
#stdout_handler = logging.StreamHandler(stream=sys.stdout)
57-
#handlers = [file_handler, stdout_handler]
58-
#
59-
#logging.basicConfig(
60-
# level=logging.DEBUG,
61-
# format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
62-
# handlers=handlers
63-
#)
64-
65-
#file_handler = logging.FileHandler(filename="/Users/alexgo/code/aiida-core/plumpy2.log")
66-
#stdout_handler = logging.StreamHandler(stream=sys.stdout)
67-
#handlers = [file_handler, stdout_handler]
68-
#
69-
#logging.basicConfig(
70-
# level=logging.DEBUG,
71-
# format='[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s',
72-
# handlers=handlers
73-
#)
7454
_LOGGER = logging.getLogger(__name__)
75-
76-
7755
PROCESS_STACK = ContextVar('process stack', default=[])
7856

7957

@@ -411,8 +389,8 @@ def logger(self) -> logging.Logger:
411389
:return: The logger.
412390
413391
"""
414-
#if self._logger is not None:
415-
# return self._logger
392+
if self._logger is not None:
393+
return self._logger
416394

417395
return _LOGGER
418396

@@ -930,7 +908,6 @@ def message_receive(self, _comm: kiwipy.Communicator, msg: Dict[str, Any]) -> An
930908
:param msg: the message
931909
:return: the outcome of processing the message, the return value will be sent back as a response to the sender
932910
"""
933-
breakpoint()
934911
self.logger.debug("Process<%s>: received RPC message with communicator '%s': %r", self.pid, _comm, msg)
935912

936913
intent = msg[process_comms.INTENT_KEY]
@@ -940,11 +917,7 @@ def message_receive(self, _comm: kiwipy.Communicator, msg: Dict[str, Any]) -> An
940917
if intent == process_comms.Intent.PAUSE:
941918
return self._schedule_rpc(self.pause, msg=msg.get(process_comms.MESSAGE_KEY, None))
942919
if intent == process_comms.Intent.KILL:
943-
breakpoint()
944-
# have problems to pass new argument get
945-
# Error: failed to kill Process<699>: Process.kill() got an unexpected keyword argument 'force_kill'
946-
#return self._schedule_rpc(self.kill, msg=msg.get(process_comms.MESSAGE_KEY, None), force_kill=msg.get(process_comms.FORCE_KILL_KEY, False))
947-
return self._schedule_rpc(self.kill, msg=msg)
920+
return self._schedule_rpc(self.kill, msg=msg.get(process_comms.MESSAGE_KEY, None))
948921
if intent == process_comms.Intent.STATUS:
949922
status_info: Dict[str, Any] = {}
950923
self.get_status_info(status_info)
@@ -961,7 +934,6 @@ def broadcast_receive(self, _comm: kiwipy.Communicator, body: Any, sender: Any,
961934
:param _comm: the communicator that sent the message
962935
:param msg: the message
963936
"""
964-
breakpoint()
965937
# pylint: disable=unused-argument
966938
self.logger.debug(
967939
"Process<%s>: received broadcast message '%s' with communicator '%s': %r", self.pid, subject, _comm, body
@@ -973,7 +945,6 @@ def broadcast_receive(self, _comm: kiwipy.Communicator, body: Any, sender: Any,
973945
if subject == process_comms.Intent.PAUSE:
974946
return self._schedule_rpc(self.pause, msg=body)
975947
if subject == process_comms.Intent.KILL:
976-
# TODO deal with this
977948
return self._schedule_rpc(self.kill, msg=body)
978949
return None
979950

@@ -1096,7 +1067,7 @@ def _create_interrupt_action(self, exception: process_states.Interruption) -> fu
10961067
do_pause = functools.partial(self._do_pause, str(exception))
10971068
return futures.CancellableAction(do_pause, cookie=exception)
10981069

1099-
if isinstance(exception, process_states.KillInterruption):
1070+
if isinstance(exception, (process_states.KillInterruption, process_states.ForceKillInterruption)):
11001071

11011072
def do_kill(_next_state: process_states.State) -> Any:
11021073
try:
@@ -1155,20 +1126,12 @@ def fail(self, exception: Optional[BaseException], trace_back: Optional[Tracebac
11551126
"""
11561127
self.transition_to(process_states.ProcessState.EXCEPTED, exception, trace_back)
11571128

1158-
def kill(self, msg: Union[dict, None] = None, force_kill: bool = False) -> Union[bool, asyncio.Future]:
1129+
def kill(self, msg: Union[str, None] = None) -> Union[bool, asyncio.Future]:
11591130
"""
11601131
Kill the process
1161-
1162-
# PR_COMMENT have not figured out how to integrate force_kill as argument
1163-
# so I just pass the dict
1164-
11651132
:param msg: An optional kill message
11661133
"""
1167-
breakpoint()
1168-
if msg is None:
1169-
force_kill = False
1170-
else:
1171-
force_kill = msg.get(process_comms.FORCE_KILL_KEY, False)
1134+
force_kill = isinstance(msg, str) and '-F' in msg
11721135

11731136
if self.state == process_states.ProcessState.KILLED:
11741137
# Already killed
@@ -1178,20 +1141,25 @@ def kill(self, msg: Union[dict, None] = None, force_kill: bool = False) -> Union
11781141
# Can't kill
11791142
return False
11801143

1181-
if self._killing:
1144+
if self._killing and not force_kill:
11821145
# Already killing
11831146
return self._killing
11841147

1185-
if self._stepping and not force_kill:
1148+
if force_kill:
1149+
# Skip interrupting the state and go straight to killed
1150+
interrupt_exception = process_states.ForceKillInterruption(msg)
1151+
self._killing = self._interrupt_action
1152+
self._state.interrupt(interrupt_exception)
1153+
1154+
elif self._stepping:
11861155
# Ask the step function to pause by setting this flag and giving the
11871156
# caller back a future
1188-
interrupt_exception = process_states.KillInterruption(msg.get(process_comms.MESSAGE_KEY, None))
1157+
interrupt_exception = process_states.KillInterruption(msg) # type: ignore
11891158
self._set_interrupt_action_from_exception(interrupt_exception)
11901159
self._killing = self._interrupt_action
11911160
self._state.interrupt(interrupt_exception)
11921161
return cast(futures.CancellableAction, self._interrupt_action)
11931162

1194-
breakpoint()
11951163
self.transition_to(process_states.ProcessState.KILLED, msg)
11961164
return True
11971165

0 commit comments

Comments
 (0)