Skip to content

Commit 2100eac

Browse files
committed
replace with threading
1 parent e414984 commit 2100eac

File tree

2 files changed

+80
-16
lines changed

2 files changed

+80
-16
lines changed

nbs/llm/llm.ipynb

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -27,13 +27,14 @@
2727
"import typing as t\n",
2828
"import asyncio\n",
2929
"import inspect\n",
30+
"import threading\n",
3031
"from pydantic import BaseModel\n",
3132
"import instructor\n",
3233
"\n",
3334
"T = t.TypeVar('T', bound=BaseModel)\n",
3435
"\n",
3536
"class RagasLLM:\n",
36-
" def __init__(self, provider: str, model:str, client: t.Any, **model_args):\n",
37+
" def __init__(self, provider: str, model: str, client: t.Any, **model_args):\n",
3738
" self.provider = provider.lower()\n",
3839
" self.model = model\n",
3940
" self.model_args = model_args or {}\n",
@@ -70,23 +71,54 @@
7071
" def _run_async_in_current_loop(self, coro):\n",
7172
" \"\"\"Run an async coroutine in the current event loop if possible.\n",
7273
" \n",
73-
" This handles Jupyter environments correctly by using the existing loop.\n",
74+
" This handles Jupyter environments correctly by using a separate thread\n",
75+
" when a running event loop is detected.\n",
7476
" \"\"\"\n",
7577
" try:\n",
76-
" # Check if we're in an environment with an existing event loop (like Jupyter)\n",
78+
" # Try to get the current event loop\n",
7779
" loop = asyncio.get_event_loop()\n",
80+
" \n",
7881
" if loop.is_running():\n",
79-
" # We're likely in a Jupyter environment\n",
80-
" import nest_asyncio\n",
81-
" nest_asyncio.apply()\n",
82-
" return loop.run_until_complete(coro)\n",
82+
" # If the loop is already running (like in Jupyter notebooks),\n",
83+
" # we run the coroutine in a separate thread with its own event loop\n",
84+
" result_container = {'result': None, 'exception': None}\n",
85+
" \n",
86+
" def run_in_thread():\n",
87+
" # Create a new event loop for this thread\n",
88+
" new_loop = asyncio.new_event_loop()\n",
89+
" asyncio.set_event_loop(new_loop)\n",
90+
" try:\n",
91+
" # Run the coroutine in this thread's event loop\n",
92+
" result_container['result'] = new_loop.run_until_complete(coro)\n",
93+
" except Exception as e:\n",
94+
" # Capture any exceptions to re-raise in the main thread\n",
95+
" result_container['exception'] = e\n",
96+
" finally:\n",
97+
" # Clean up the event loop\n",
98+
" new_loop.close()\n",
99+
" \n",
100+
" # Start the thread and wait for it to complete\n",
101+
" thread = threading.Thread(target=run_in_thread)\n",
102+
" thread.start()\n",
103+
" thread.join()\n",
104+
" \n",
105+
" # Re-raise any exceptions that occurred in the thread\n",
106+
" if result_container['exception']:\n",
107+
" raise result_container['exception']\n",
108+
" \n",
109+
" return result_container['result']\n",
110+
" else:\n",
111+
" # Standard case - event loop exists but isn't running\n",
112+
" return loop.run_until_complete(coro)\n",
113+
" \n",
83114
" except RuntimeError:\n",
84115
" # If we get a runtime error about no event loop, create a new one\n",
85116
" loop = asyncio.new_event_loop()\n",
86117
" asyncio.set_event_loop(loop)\n",
87118
" try:\n",
88119
" return loop.run_until_complete(coro)\n",
89120
" finally:\n",
121+
" # Clean up\n",
90122
" loop.close()\n",
91123
" asyncio.set_event_loop(None)\n",
92124
" \n",
@@ -129,7 +161,7 @@
129161
" **self.model_args,\n",
130162
" )\n",
131163
"\n",
132-
"def ragas_llm(provider: str,model:str, client: t.Any, **model_args) -> RagasLLM:\n",
164+
"def ragas_llm(provider: str, model: str, client: t.Any, **model_args) -> RagasLLM:\n",
133165
" return RagasLLM(provider=provider, client=client, model=model, **model_args)"
134166
]
135167
},

ragas_annotator/llm/llm.py

Lines changed: 40 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -7,13 +7,14 @@
77
import typing as t
88
import asyncio
99
import inspect
10+
import threading
1011
from pydantic import BaseModel
1112
import instructor
1213

1314
T = t.TypeVar('T', bound=BaseModel)
1415

1516
class RagasLLM:
16-
def __init__(self, provider: str, model:str, client: t.Any, **model_args):
17+
def __init__(self, provider: str, model: str, client: t.Any, **model_args):
1718
self.provider = provider.lower()
1819
self.model = model
1920
self.model_args = model_args or {}
@@ -50,23 +51,54 @@ def _initialize_client(self, provider: str, client: t.Any) -> t.Any:
5051
def _run_async_in_current_loop(self, coro):
5152
"""Run an async coroutine in the current event loop if possible.
5253
53-
This handles Jupyter environments correctly by using the existing loop.
54+
This handles Jupyter environments correctly by using a separate thread
55+
when a running event loop is detected.
5456
"""
5557
try:
56-
# Check if we're in an environment with an existing event loop (like Jupyter)
58+
# Try to get the current event loop
5759
loop = asyncio.get_event_loop()
60+
5861
if loop.is_running():
59-
# We're likely in a Jupyter environment
60-
import nest_asyncio
61-
nest_asyncio.apply()
62-
return loop.run_until_complete(coro)
62+
# If the loop is already running (like in Jupyter notebooks),
63+
# we run the coroutine in a separate thread with its own event loop
64+
result_container = {'result': None, 'exception': None}
65+
66+
def run_in_thread():
67+
# Create a new event loop for this thread
68+
new_loop = asyncio.new_event_loop()
69+
asyncio.set_event_loop(new_loop)
70+
try:
71+
# Run the coroutine in this thread's event loop
72+
result_container['result'] = new_loop.run_until_complete(coro)
73+
except Exception as e:
74+
# Capture any exceptions to re-raise in the main thread
75+
result_container['exception'] = e
76+
finally:
77+
# Clean up the event loop
78+
new_loop.close()
79+
80+
# Start the thread and wait for it to complete
81+
thread = threading.Thread(target=run_in_thread)
82+
thread.start()
83+
thread.join()
84+
85+
# Re-raise any exceptions that occurred in the thread
86+
if result_container['exception']:
87+
raise result_container['exception']
88+
89+
return result_container['result']
90+
else:
91+
# Standard case - event loop exists but isn't running
92+
return loop.run_until_complete(coro)
93+
6394
except RuntimeError:
6495
# If we get a runtime error about no event loop, create a new one
6596
loop = asyncio.new_event_loop()
6697
asyncio.set_event_loop(loop)
6798
try:
6899
return loop.run_until_complete(coro)
69100
finally:
101+
# Clean up
70102
loop.close()
71103
asyncio.set_event_loop(None)
72104

@@ -109,5 +141,5 @@ async def agenerate(self, prompt: str, response_model: t.Type[T]) -> T:
109141
**self.model_args,
110142
)
111143

112-
def ragas_llm(provider: str,model:str, client: t.Any, **model_args) -> RagasLLM:
144+
def ragas_llm(provider: str, model: str, client: t.Any, **model_args) -> RagasLLM:
113145
return RagasLLM(provider=provider, client=client, model=model, **model_args)

0 commit comments

Comments
 (0)