| 
52 | 52 | from aiida.common.links import LinkType  | 
53 | 53 | from aiida.common.log import LOG_LEVEL_REPORT  | 
54 | 54 | from aiida.orm.implementation.utils import clean_value  | 
 | 55 | +from aiida.orm.nodes.process.calculation.calcjob import CalcJobNode  | 
55 | 56 | from aiida.orm.utils import serialize  | 
56 | 57 | 
 
  | 
57 | 58 | from .builder import ProcessBuilder  | 
@@ -329,50 +330,162 @@ def load_instance_state(  | 
329 | 330 | 
 
  | 
330 | 331 |         self.node.logger.info(f'Loaded process<{self.node.pk}> from saved state')  | 
331 | 332 | 
 
  | 
 | 333 | +    async def _launch_task(self, coro, *args, **kwargs):  | 
 | 334 | +        """Launch a coroutine as a task, making sure to make it interruptable."""  | 
 | 335 | +        import functools  | 
 | 336 | + | 
 | 337 | +        from aiida.engine.utils import interruptable_task  | 
 | 338 | + | 
 | 339 | +        task_fn = functools.partial(coro, *args, **kwargs)  | 
 | 340 | +        try:  | 
 | 341 | +            self._task = interruptable_task(task_fn)  | 
 | 342 | +            result = await self._task  | 
 | 343 | +            return result  | 
 | 344 | +        finally:  | 
 | 345 | +            self._task = None  | 
 | 346 | + | 
332 | 347 |     def kill(self, msg_text: str | None = None, force_kill: bool = False) -> Union[bool, plumpy.futures.Future]:  | 
333 | 348 |         """Kill the process and all the children calculations it called  | 
334 | 349 | 
  | 
335 | 350 |         :param msg: message  | 
336 | 351 |         """  | 
337 |  | -        self.node.logger.info(f'Request to kill Process<{self.node.pk}>')  | 
338 |  | - | 
339 |  | -        had_been_terminated = self.has_terminated()  | 
340 |  | - | 
341 |  | -        result = super().kill(msg_text, force_kill)  | 
 | 352 | +        # breakpoint()  | 
 | 353 | +        if self.killed():  | 
 | 354 | +            self.node.logger.info(f'Request to kill Process<{self.node.pk}> but process has already been killed.')  | 
 | 355 | +            return True  | 
 | 356 | +        elif self.has_terminated():  | 
 | 357 | +            self.node.logger.info(f'Request to kill Process<{self.node.pk}> but process has already terminated.')  | 
 | 358 | +            return False  | 
 | 359 | +        self.node.logger.info(f'Request to kill Process<{self.node.pk}>.')  | 
 | 360 | + | 
 | 361 | +        # PR_COMMENT We need to kill the children now before because we transition to kill after the first kill  | 
 | 362 | +        #            This became buggy in the last PR by allowing the user to reusing killing commands (if _killing do  | 
 | 363 | +        #            nothing). Since we want to now allow the user to resend killing commands with different options we  | 
 | 364 | +        #            have to kill first the children, or we still kill the children even when this process has been  | 
 | 365 | +        #            killed. Otherwise you have the problematic scenario: Process is killed but did not kill the  | 
 | 366 | +        #            children yet, kill timeouts, we kill again, but the parent process is already killed so it will  | 
 | 367 | +        #            never enter this code  | 
 | 368 | +        #  | 
 | 369 | +        # TODO if tests just pass it could mean that this is not well tested, need to check if there is a test  | 
 | 370 | + | 
 | 371 | +        # TODO  | 
 | 372 | +        # this blocks worker and it cannot be unblocked  | 
 | 373 | +        # need async await maybe  | 
 | 374 | + | 
 | 375 | +        killing = []  | 
 | 376 | +        # breakpoint()  | 
 | 377 | +        for child in self.node.called:  | 
 | 378 | +            if self.runner.controller is None:  | 
 | 379 | +                self.logger.info('no controller available to kill child<%s>', child.pk)  | 
 | 380 | +                continue  | 
 | 381 | +            try:  | 
 | 382 | +                # we block for sending message  | 
342 | 383 | 
 
  | 
343 |  | -        # Only kill children if we could be killed ourselves  | 
344 |  | -        if result is not False and not had_been_terminated:  | 
345 |  | -            killing = []  | 
346 |  | -            for child in self.node.called:  | 
347 |  | -                if self.runner.controller is None:  | 
348 |  | -                    self.logger.info('no controller available to kill child<%s>', child.pk)  | 
349 |  | -                    continue  | 
350 |  | -                try:  | 
351 |  | -                    result = self.runner.controller.kill_process(child.pk, msg_text=f'Killed by parent<{self.node.pk}>')  | 
352 |  | -                    result = asyncio.wrap_future(result)  # type: ignore[arg-type]  | 
353 |  | -                    if asyncio.isfuture(result):  | 
354 |  | -                        killing.append(result)  | 
355 |  | -                except ConnectionClosed:  | 
356 |  | -                    self.logger.info('no connection available to kill child<%s>', child.pk)  | 
357 |  | -                except UnroutableError:  | 
358 |  | -                    self.logger.info('kill signal was unable to reach child<%s>', child.pk)  | 
359 |  | - | 
360 |  | -            if asyncio.isfuture(result):  | 
361 |  | -                # We ourselves are waiting to be killed so add it to the list  | 
362 |  | -                killing.append(result)  | 
363 |  | - | 
364 |  | -            if killing:  | 
 | 384 | +                # result = self.loop.run_until_complete(coro)  | 
 | 385 | +                # breakpoint()  | 
 | 386 | +                result = self.runner.controller.kill_process(  | 
 | 387 | +                    child.pk, msg_text=f'Killed by parent<{self.node.pk}>', force_kill=force_kill  | 
 | 388 | +                )  | 
 | 389 | +                from plumpy.futures import unwrap_kiwi_future  | 
 | 390 | + | 
 | 391 | +                killing.append(unwrap_kiwi_future(result))  | 
 | 392 | +                breakpoint()  | 
 | 393 | +                # result = unwrapped_future.result(timeout=5)  | 
 | 394 | +                # result = asyncio.wrap_future(result)  # type: ignore[arg-type]  | 
 | 395 | +                # PR_COMMENT I commented out, we wrap it before to an asyncio future why the if check?  | 
 | 396 | +                # if asyncio.isfuture(result):  | 
 | 397 | +                #    killing.append(result)  | 
 | 398 | +            except ConnectionClosed:  | 
 | 399 | +                self.logger.info('no connection available to kill child<%s>', child.pk)  | 
 | 400 | +            except UnroutableError:  | 
 | 401 | +                self.logger.info('kill signal was unable to reach child<%s>', child.pk)  | 
 | 402 | + | 
 | 403 | +        # TODO need to check this part, might be overengineered  | 
 | 404 | +        # if asyncio.isfuture(result):  | 
 | 405 | +        #    # We ourselves are waiting to be killed so add it to the list  | 
 | 406 | +        #    killing.append(result)  | 
 | 407 | + | 
 | 408 | +        ####### KILL TWO  | 
 | 409 | +        if not force_kill:  | 
 | 410 | +            # asyncio.send(continue_kill)  | 
 | 411 | +            # return  | 
 | 412 | +            for pending_future in killing:  | 
 | 413 | +                # breakpoint()  | 
 | 414 | +                result = pending_future.result()  | 
365 | 415 |                 # We are waiting for things to be killed, so return the 'gathered' future  | 
366 |  | -                kill_future = plumpy.futures.gather(*killing)  | 
367 |  | -                result = self.loop.create_future()  | 
368 | 416 | 
 
  | 
369 |  | -                def done(done_future: plumpy.futures.Future):  | 
370 |  | -                    is_all_killed = all(done_future.result())  | 
371 |  | -                    result.set_result(is_all_killed)  | 
372 |  | - | 
373 |  | -                kill_future.add_done_callback(done)  | 
374 |  | - | 
375 |  | -        return result  | 
 | 417 | +                # kill_future = plumpy.futures.gather(*killing)  | 
 | 418 | +                # result = self.loop.create_future()  | 
 | 419 | +                # breakpoint()  | 
 | 420 | + | 
 | 421 | +                # def done(done_future: plumpy.futures.Future):  | 
 | 422 | +                #    is_all_killed = all(done_future.result())  | 
 | 423 | +                #    result.set_result(is_all_killed)  | 
 | 424 | + | 
 | 425 | +                # kill_future.add_done_callback(done)  | 
 | 426 | + | 
 | 427 | +        # PR_COMMENT We do not do this anymore. The original idea was to resend the killing interruption so the state  | 
 | 428 | +        #            can continue freeing its resources using an EBM with new parameters as the user can change these  | 
 | 429 | +        #            between kills by changing the config parameters. However this was not working properly because the  | 
 | 430 | +        #            process state goes only the first time it receives a KillInterruption into the EBM. This is because  | 
 | 431 | +        #            the EBM is activated within try-catch block.  | 
 | 432 | +        #            try:  | 
 | 433 | +        #              do_work() # <-- now we send the interrupt exception  | 
 | 434 | +        #            except KillInterruption:  | 
 | 435 | +        #              cancel_scheduler_job_in_ebm # <-- if we cancel it will just stop this  | 
 | 436 | +        #  | 
 | 437 | +        #            Not sure why I did not detect this during my tries. We could also do a while loop of interrupts  | 
 | 438 | +        #            but I think it is generally not good design that the process state cancels the scheduler job while  | 
 | 439 | +        #            here we kill the process. It adds another actor responsible for killing the process correctly  | 
 | 440 | +        #            making it more complex than necessary.  | 
 | 441 | +        #  | 
 | 442 | +        # Cancel any old killing command to send a new one  | 
 | 443 | +        # if self._killing:  | 
 | 444 | +        #    self._killing.cancel()  | 
 | 445 | + | 
 | 446 | +        # Send kill interruption to the tasks in the event loop so they stop  | 
 | 447 | +        # This is not blocking, so the interruption is happening concurrently  | 
 | 448 | +        if self._stepping:  | 
 | 449 | +            # Ask the step function to pause by setting this flag and giving the  | 
 | 450 | +            # caller back a future  | 
 | 451 | +            interrupt_exception = plumpy.process_states.KillInterruption(msg_text, force_kill)  | 
 | 452 | +            # PR COMMENT we do not set interrupt action because plumpy is very smart it uses the interrupt action to set  | 
 | 453 | +            #            next state in the stepping, but we do not want to step to the next state through the plumpy  | 
 | 454 | +            #            state machine, we want to control this here and only here  | 
 | 455 | +            # self._set_interrupt_action_from_exception(interrupt_exception)  | 
 | 456 | +            # self._killing = self._interrupt_action  | 
 | 457 | +            self._state.interrupt(interrupt_exception)  | 
 | 458 | +            # return cast(plumpy.futures.CancellableAction, self._interrupt_action)  | 
 | 459 | + | 
 | 460 | +        # Kill jobs from scheduler associated with this process.  | 
 | 461 | +        # This is blocking so we only continue when the scheduler job has been killed.  | 
 | 462 | +        if not force_kill and isinstance(self.node, CalcJobNode):  | 
 | 463 | +            # TODO put this function into more common place  | 
 | 464 | +            from .calcjobs.tasks import task_kill_job  | 
 | 465 | + | 
 | 466 | +            # if already killing we have triggered the Interruption  | 
 | 467 | +            coro = self._launch_task(task_kill_job, self.node, self.runner.transport)  | 
 | 468 | +            task = asyncio.create_task(coro)  | 
 | 469 | +            # task_kill_job is raising an error if not successful, e.g. EBM fails.  | 
 | 470 | +            # PR COMMENT we just return False and write why the kill fails, it does not make sense to me to put the  | 
 | 471 | +            #            process to excepted. Maybe you fix your internet connection and want to try it again.  | 
 | 472 | +            #            We have force-kill now if the user wants to enforce a killing  | 
 | 473 | +            try:  | 
 | 474 | +                # breakpoint()  | 
 | 475 | +                self.loop.run_until_complete(task)  | 
 | 476 | +                # breakpoint()  | 
 | 477 | +            except Exception as exc:  | 
 | 478 | +                self.node.logger.error(f'While cancelling job error was raised: {exc!s}')  | 
 | 479 | +                # breakpoint()  | 
 | 480 | +                return False  | 
 | 481 | + | 
 | 482 | +        # Transition to killed process state  | 
 | 483 | +        # This is blocking so we only continue when we are in killed state  | 
 | 484 | +        msg = plumpy.process_comms.MessageBuilder.kill(text=msg_text, force_kill=force_kill)  | 
 | 485 | +        new_state = self._create_state_instance(plumpy.process_states.ProcessState.KILLED, msg=msg)  | 
 | 486 | +        self.transition_to(new_state)  | 
 | 487 | + | 
 | 488 | +        return True  | 
376 | 489 | 
 
  | 
377 | 490 |     @override  | 
378 | 491 |     def out(self, output_port: str, value: Any = None) -> None:  | 
 | 
0 commit comments