Skip to content

Executing AsyncCommandHandle.kill() will cause blocking and cannot kill the process. #1034

@fsqinghuayu

Description

@fsqinghuayu

Describe the bug
When I also use asyncio.create_task(run_command()) to start a task and get output in real-time, upon catching an exception (such as request disconnection), I want to abort the task, but kill() does not take effect. After pause sandbox and get_info() shows the sandbox status as paused, but the process is still running(It's also possible that I misjudged the situation. The Claude model token costs were still being incurred after the sandbox was paused — this might be because the request had already been sent out when I paused the sandbox.).

Expected behavior

  1. Be able to successfully kill the process
  2. When the sandbox is paused, the process should stop executing
  3. I need an effective method to kill processes. Now when I reconnect to the sandbox, the previous task starts running again.This is causing me to incur a lot of unnecessary token expenses.

Main code

 async def run_command():
                nonlocal command_handle
                try:
                    logger.info(f"[COMMAND] Starting command with background=True...")
                    command_handle = await sdx.commands.run(
                        command,
                        timeout=600,
                        on_stdout=on_stdout,
                        on_stderr=on_stderr,
                        cwd=work_path,
                        envs=envs,
                        user=user,
                        background=True,
                    )
                    logger.info(
                        f"[COMMAND] Got command handle: {command_handle}, pid: {getattr(command_handle, 'pid', 'N/A')}")
                    logger.info(f"[COMMAND] Waiting for command to complete...")
                    result = await command_handle.wait()
                    logger.info(f"[COMMAND] Command completed with exit_code: {getattr(result, 'exit_code', 'N/A')}")
                except asyncio.CancelledError:
                    logger.warning(f"[COMMAND] run_command task was cancelled")
                    raise
                except Exception as e:
                    logger.error(f"[COMMAND] Command error: {e}")
                finally:
                    logger.info(f"[COMMAND] run_command finally block, setting done_event")
                    done_event.set()

        try:

            run_command_task = asyncio.create_task(run_command())
            logger.info(f"[STREAM] Started run_command_task: {run_command_task}")

            while not done_event.is_set() or not output_queue.empty():
                if await request.is_disconnected():
                    logger.warning("[STREAM] Client disconnected detected!")
                    logger.info(f"[STREAM] command_handle exists: {command_handle is not None}")
                    logger.info(
                        f"[STREAM] run_command_task done: {run_command_task.done() if run_command_task else 'N/A'}")

                    if command_handle:
                        try:
                            logger.info(
                                f"[CLEANUP] Attempting to kill command, pid: {getattr(command_handle, 'pid', 'N/A')}")
                            kill_result = await command_handle.kill()
                            logger.info(f"[CLEANUP] Kill result: {kill_result}")
                        except Exception as e:
                            logger.error(f"[CLEANUP] Error killing command: {type(e).__name__}: {e}")

                        try:
                            logger.info(f"[CLEANUP] Attempting to disconnect command handle...")
                            await command_handle.disconnect()
                            logger.info(f"[CLEANUP] Disconnect completed successfully")
                        except Exception as e:
                            logger.error(f"[CLEANUP] Error disconnecting command: {type(e).__name__}: {e}")
                    else:
                        logger.warning(f"[CLEANUP] No command_handle available to kill/disconnect")

                    if run_command_task and not run_command_task.done():
                        logger.info(f"[CLEANUP] Cancelling run_command_task...")
                        run_command_task.cancel()
                        try:
                            await run_command_task
                            logger.info(f"[CLEANUP] run_command_task awaited after cancel")
                        except asyncio.CancelledError:
                            logger.info(f"[CLEANUP] run_command_task CancelledError caught (expected)")
                    else:
                        logger.info(f"[CLEANUP] run_command_task already done or None")

                    logger.info(f"[STREAM] Breaking out of main loop due to client disconnect")
                    break  

                try:
                    data = await asyncio.wait_for(output_queue.get(), timeout=0.01)
                    # Some data processing operations ... 
                     yield processed_data

                except asyncio.TimeoutError:
                    continue

            logger.info(
                f"[STREAM] Exited main while loop. done_event.is_set(): {done_event.is_set()}, queue empty: {output_queue.empty()}")

        except Exception as e:
            logger.info(f"[EXCEPTION] command_handle exists: {command_handle is not None}")
            if command_handle:

                try:
                    await command_handle.disconnect()
                    logger.info(f"[EXCEPTION-CLEANUP] Disconnect completed")
                except Exception as e:
                    logger.error(f"[EXCEPTION-CLEANUP] Error disconnecting command: {type(e).__name__}: {e}")

                try:
                    await command_handle.kill()
                    logger.info(f"[EXCEPTION-CLEANUP] Kill completed")
                except Exception as e:
                    logger.error(f"[EXCEPTION-CLEANUP] Error killing command: {type(e).__name__}: {e}")
             # Some data processing operations ... 
              yield processed_data

        finally:
            logger.info(f"[FINALLY] Entering finally block. cleanup_done: {cleanup_done}")
            logger.info(
                f"[FINALLY] command_handle: {command_handle is not None}, run_command_task: {run_command_task is not None}")

            if not cleanup_done:
                try:
                    logger.info("[FINALLY] Executing cleanup with shield protection...")
                    await asyncio.shield(
                        cleanup_sandbox(sdx, sandbox_info, body, time_start) # pause sandbox and update db data
                    )
                    cleanup_done = True
                    logger.info("[FINALLY] Cleanup completed successfully")
                except Exception as e:
                    logger.error(f"[FINALLY] Critical error during cleanup: {type(e).__name__}: {e}")
            else:
                logger.info("[FINALLY] Cleanup already done, skipping")

Metadata

Metadata

Assignees

No one assigned

    Labels

    bugSomething isn't working

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions