diff --git a/jupyter_server_nbmodel/actions.py b/jupyter_server_nbmodel/actions.py index a59eb9a..73b6bbc 100644 --- a/jupyter_server_nbmodel/actions.py +++ b/jupyter_server_nbmodel/actions.py @@ -250,6 +250,50 @@ async def _execute_snippet( } +async def dedup_task_queue(q: asyncio.Queue): + """ + Deduplicate tasks in an asyncio.Queue by keeping only the last + submitted task per cell_id. + + Problem: + After "Restart Kernel and Run All Cells", tasks from the previous + run may still remain in the queue. This causes duplicate tasks + for the same cell_id to exist. When consumed, both the old and + new tasks run, leading to duplicated execution. + + Solution: + This function drains the queue, keeps only the last occurrence + of each cell_id, and puts those tasks back into the queue in the + correct order. That way, each cell_id has only one pending task. + """ + if q.empty(): + return + + # Drain the queue into a list + items = [] + while not q.empty(): + items.append(await q.get()) + + # Track last occurrence index of each cell_id + last_idx = {} + for i, item in enumerate(items): + cell_id = item[2]["cell_id"] + last_idx[cell_id] = i + + # Collect items in the order of last appearances + keep = [] + seen = set() + for i, item in enumerate(items): + cell_id = item[2]["cell_id"] + if i == last_idx[cell_id] and cell_id not in seen: + keep.append(item) + seen.add(cell_id) + + # Put back only the deduplicated items + for item in keep: + await q.put(item) + + async def kernel_worker( kernel_id: str, client: jupyter_client.asynchronous.client.AsyncKernelClient, @@ -264,6 +308,7 @@ async def kernel_worker( while True: try: uid, snippet, metadata = await queue.get() + await dedup_task_queue(queue) get_logger().debug(f"Processing execution request {uid} for kernel {kernel_id}…") get_logger().debug("%s %s %s", uid, snippet, metadata) client.session.session = uid @@ -278,6 +323,12 @@ async def kernel_worker( ) queue.task_done() get_logger().debug(f"Execution request {uid} processed for kernel {kernel_id}.") + + # stop other tasks if one hits error + if results[uid]['status'] == 'error': + while not queue.empty(): + queue.get_nowait() + queue.task_done() except (asyncio.CancelledError, KeyboardInterrupt, RuntimeError) as e: results[uid] = {"error": str(e)} get_logger().debug( @@ -285,6 +336,7 @@ async def kernel_worker( ) # Empty the queue while not queue.empty(): + queue.get_nowait() queue.task_done() to_raise = e break @@ -293,6 +345,7 @@ async def kernel_worker( f"Failed to process execution request {uid} for kernel {kernel_id}.", exc_info=e ) if not queue.empty(): + queue.get_nowait() queue.task_done() if to_raise is not None: raise to_raise