5151
5252__all__ = ['Process' , 'ProcessSpec' , 'BundleKeys' , 'TransitionFailed' ]
5353
54-
55- #file_handler = logging.FileHandler(filename='tmp.log')
56- #stdout_handler = logging.StreamHandler(stream=sys.stdout)
57- #handlers = [file_handler, stdout_handler]
58- #
59- #logging.basicConfig(
60- # level=logging.DEBUG,
61- # format='[%(asctime)s] {%(filename)s:%(lineno)d} %(levelname)s - %(message)s',
62- # handlers=handlers
63- #)
64-
65- #file_handler = logging.FileHandler(filename="/Users/alexgo/code/aiida-core/plumpy2.log")
66- #stdout_handler = logging.StreamHandler(stream=sys.stdout)
67- #handlers = [file_handler, stdout_handler]
68- #
69- #logging.basicConfig(
70- # level=logging.DEBUG,
71- # format='[%(asctime)s] {%(pathname)s:%(lineno)d} %(levelname)s - %(message)s',
72- # handlers=handlers
73- #)
7454_LOGGER = logging .getLogger (__name__ )
75-
76-
7755PROCESS_STACK = ContextVar ('process stack' , default = [])
7856
7957
@@ -411,8 +389,8 @@ def logger(self) -> logging.Logger:
411389 :return: The logger.
412390
413391 """
414- # if self._logger is not None:
415- # return self._logger
392+ if self ._logger is not None :
393+ return self ._logger
416394
417395 return _LOGGER
418396
@@ -930,7 +908,6 @@ def message_receive(self, _comm: kiwipy.Communicator, msg: Dict[str, Any]) -> An
930908 :param msg: the message
931909 :return: the outcome of processing the message, the return value will be sent back as a response to the sender
932910 """
933- breakpoint ()
934911 self .logger .debug ("Process<%s>: received RPC message with communicator '%s': %r" , self .pid , _comm , msg )
935912
936913 intent = msg [process_comms .INTENT_KEY ]
@@ -940,11 +917,7 @@ def message_receive(self, _comm: kiwipy.Communicator, msg: Dict[str, Any]) -> An
940917 if intent == process_comms .Intent .PAUSE :
941918 return self ._schedule_rpc (self .pause , msg = msg .get (process_comms .MESSAGE_KEY , None ))
942919 if intent == process_comms .Intent .KILL :
943- breakpoint ()
944- # have problems to pass new argument get
945- # Error: failed to kill Process<699>: Process.kill() got an unexpected keyword argument 'force_kill'
946- #return self._schedule_rpc(self.kill, msg=msg.get(process_comms.MESSAGE_KEY, None), force_kill=msg.get(process_comms.FORCE_KILL_KEY, False))
947- return self ._schedule_rpc (self .kill , msg = msg )
920+ return self ._schedule_rpc (self .kill , msg = msg .get (process_comms .MESSAGE_KEY , None ))
948921 if intent == process_comms .Intent .STATUS :
949922 status_info : Dict [str , Any ] = {}
950923 self .get_status_info (status_info )
@@ -961,7 +934,6 @@ def broadcast_receive(self, _comm: kiwipy.Communicator, body: Any, sender: Any,
961934 :param _comm: the communicator that sent the message
962935 :param msg: the message
963936 """
964- breakpoint ()
965937 # pylint: disable=unused-argument
966938 self .logger .debug (
967939 "Process<%s>: received broadcast message '%s' with communicator '%s': %r" , self .pid , subject , _comm , body
@@ -973,7 +945,6 @@ def broadcast_receive(self, _comm: kiwipy.Communicator, body: Any, sender: Any,
973945 if subject == process_comms .Intent .PAUSE :
974946 return self ._schedule_rpc (self .pause , msg = body )
975947 if subject == process_comms .Intent .KILL :
976- # TODO deal with this
977948 return self ._schedule_rpc (self .kill , msg = body )
978949 return None
979950
@@ -1096,7 +1067,7 @@ def _create_interrupt_action(self, exception: process_states.Interruption) -> fu
10961067 do_pause = functools .partial (self ._do_pause , str (exception ))
10971068 return futures .CancellableAction (do_pause , cookie = exception )
10981069
1099- if isinstance (exception , process_states .KillInterruption ):
1070+ if isinstance (exception , ( process_states .KillInterruption , process_states . ForceKillInterruption ) ):
11001071
11011072 def do_kill (_next_state : process_states .State ) -> Any :
11021073 try :
@@ -1155,20 +1126,12 @@ def fail(self, exception: Optional[BaseException], trace_back: Optional[Tracebac
11551126 """
11561127 self .transition_to (process_states .ProcessState .EXCEPTED , exception , trace_back )
11571128
1158- def kill (self , msg : Union [dict , None ] = None , force_kill : bool = False ) -> Union [bool , asyncio .Future ]:
1129+ def kill (self , msg : Union [str , None ] = None ) -> Union [bool , asyncio .Future ]:
11591130 """
11601131 Kill the process
1161-
1162- # PR_COMMENT have not figured out how to integrate force_kill as argument
1163- # so I just pass the dict
1164-
11651132 :param msg: An optional kill message
11661133 """
1167- breakpoint ()
1168- if msg is None :
1169- force_kill = False
1170- else :
1171- force_kill = msg .get (process_comms .FORCE_KILL_KEY , False )
1134+ force_kill = isinstance (msg , str ) and '-F' in msg
11721135
11731136 if self .state == process_states .ProcessState .KILLED :
11741137 # Already killed
@@ -1178,20 +1141,25 @@ def kill(self, msg: Union[dict, None] = None, force_kill: bool = False) -> Union
11781141 # Can't kill
11791142 return False
11801143
1181- if self ._killing :
1144+ if self ._killing and not force_kill :
11821145 # Already killing
11831146 return self ._killing
11841147
1185- if self ._stepping and not force_kill :
1148+ if force_kill :
1149+ # Skip interrupting the state and go straight to killed
1150+ interrupt_exception = process_states .ForceKillInterruption (msg )
1151+ self ._killing = self ._interrupt_action
1152+ self ._state .interrupt (interrupt_exception )
1153+
1154+ elif self ._stepping :
11861155 # Ask the step function to pause by setting this flag and giving the
11871156 # caller back a future
1188- interrupt_exception = process_states .KillInterruption (msg . get ( process_comms . MESSAGE_KEY , None ))
1157+ interrupt_exception = process_states .KillInterruption (msg ) # type: ignore
11891158 self ._set_interrupt_action_from_exception (interrupt_exception )
11901159 self ._killing = self ._interrupt_action
11911160 self ._state .interrupt (interrupt_exception )
11921161 return cast (futures .CancellableAction , self ._interrupt_action )
11931162
1194- breakpoint ()
11951163 self .transition_to (process_states .ProcessState .KILLED , msg )
11961164 return True
11971165
0 commit comments