Skip to content

Commit 429c76d

Browse files
committed
Add run_async and run_sync methods
1 parent d90049d commit 429c76d

File tree

1 file changed

+33
-6
lines changed

1 file changed

+33
-6
lines changed

ipykernel/thread.py

Lines changed: 33 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -4,9 +4,10 @@
44
from collections.abc import Awaitable
55
from queue import Queue
66
from threading import Event, Thread
7-
from typing import Callable
7+
from typing import Any, Callable
88

99
from anyio import create_task_group, run, to_thread
10+
from anyio.abc import TaskGroup
1011

1112
CONTROL_THREAD_NAME = "Control"
1213
SHELL_CHANNEL_THREAD_NAME = "Shell channel"
@@ -22,23 +23,49 @@ def __init__(self, **kwargs):
2223
self.stopped = Event()
2324
self.pydev_do_not_trace = True
2425
self.is_pydev_daemon_thread = True
25-
self._tasks: Queue[Callable[[], Awaitable[None]] | None] = Queue()
26+
self._tasks: Queue[tuple[str, Callable[[], Awaitable[Any]]] | None] = Queue()
27+
self._result: Queue[Any] = Queue()
2628

27-
def start_soon(self, task: Callable[[], Awaitable[None]] | None) -> None:
28-
self._tasks.put(task)
29+
@property
30+
def task_group(self) -> TaskGroup:
31+
return self._task_group
32+
33+
def start_soon(self, coro: Callable[[], Awaitable[Any]]) -> None:
34+
self._tasks.put(("start_soon", coro))
35+
36+
def run_async(self, coro: Callable[[], Awaitable[Any]]) -> Any:
37+
self._tasks.put(("run_async", coro))
38+
return self._result.get()
39+
40+
def run_sync(self, func: Callable[..., Any]) -> Any:
41+
self._tasks.put(("run_sync", func))
42+
return self._result.get()
2943

3044
def run(self) -> None:
3145
"""Run the thread."""
32-
run(self._main)
46+
try:
47+
run(self._main)
48+
except Exception:
49+
pass
3350

3451
async def _main(self) -> None:
3552
async with create_task_group() as tg:
53+
self._task_group = tg
3654
self.started.set()
3755
while True:
3856
task = await to_thread.run_sync(self._tasks.get)
3957
if task is None:
4058
break
41-
tg.start_soon(task)
59+
func, arg = task
60+
if func == "start_soon":
61+
tg.start_soon(arg)
62+
elif func == "run_async":
63+
res = await arg
64+
self._result.put(res)
65+
else: # func == "run_sync"
66+
res = arg()
67+
self._result.put(res)
68+
4269
tg.cancel_scope.cancel()
4370

4471
def stop(self) -> None:

0 commit comments

Comments
 (0)