Skip to content

Commit 7df0da2

Browse files
committed
Async Interpreter
1 parent 9ad4513 commit 7df0da2

File tree

11 files changed

+2014
-1748
lines changed

11 files changed

+2014
-1748
lines changed

interpreter/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
from .core.async_core import AsyncInterpreter
12
from .core.computer.terminal.base_language import BaseLanguage
23
from .core.core import OpenInterpreter
34

File renamed without changes.

interpreter/core/async_core.py

Lines changed: 176 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,35 +1,190 @@
1+
import asyncio
12
import json
23
import threading
3-
import time
4+
import traceback
5+
from typing import Any, Dict
46

5-
from core import OpenInterpreter
7+
from .core import OpenInterpreter
68

9+
try:
10+
import janus
11+
import uvicorn
12+
from fastapi import APIRouter, FastAPI, WebSocket
13+
except:
14+
# Server dependencies are not required by the main package.
15+
pass
716

8-
class AsyncOpenInterpreter(OpenInterpreter):
17+
18+
class AsyncInterpreter(OpenInterpreter):
919
def __init__(self, *args, **kwargs):
1020
super().__init__(*args, **kwargs)
11-
self.async_thread = None
12-
self.input_queue
13-
self.output_queue
21+
22+
self.respond_thread = None
23+
self.stop_event = threading.Event()
24+
self.output_queue = None
25+
26+
self.server = Server(self)
1427

1528
async def input(self, chunk):
1629
"""
17-
Expects a chunk in streaming LMC format.
30+
Accumulates LMC chunks onto interpreter.messages.
31+
When it hits an "end" flag, calls interpreter.respond().
1832
"""
19-
try:
20-
chunk = json.loads(chunk)
21-
except:
22-
pass
2333

2434
if "start" in chunk:
25-
self.async_thread.join()
35+
# If the user is starting something, the interpreter should stop.
36+
if self.respond_thread is not None and self.respond_thread.is_alive():
37+
self.stop_event.set()
38+
self.respond_thread.join()
39+
self.accumulate(chunk)
40+
elif "content" in chunk:
41+
self.accumulate(chunk)
2642
elif "end" in chunk:
27-
if self.async_thread is None or not self.async_thread.is_alive():
28-
self.async_thread = threading.Thread(target=self.complete)
29-
self.async_thread.start()
30-
else:
31-
await self._add_to_queue(self._input_queue, chunk)
32-
33-
async def output(self, *args, **kwargs):
34-
# Your async output code here
35-
pass
43+
# If the user is done talking, the interpreter should respond.
44+
self.stop_event.clear()
45+
print("Responding.")
46+
self.respond_thread = threading.Thread(target=self.respond)
47+
self.respond_thread.start()
48+
49+
async def output(self):
50+
if self.output_queue == None:
51+
self.output_queue = janus.Queue()
52+
return await self.output_queue.async_q.get()
53+
54+
def respond(self):
55+
for chunk in self._respond_and_store():
56+
print(chunk.get("content", ""), end="")
57+
if self.stop_event.is_set():
58+
return
59+
self.output_queue.sync_q.put(chunk)
60+
61+
self.output_queue.sync_q.put(
62+
{"role": "server", "type": "status", "content": "complete"}
63+
)
64+
65+
def accumulate(self, chunk):
66+
"""
67+
Accumulates LMC chunks onto interpreter.messages.
68+
"""
69+
if type(chunk) == dict:
70+
if chunk.get("format") == "active_line":
71+
# We don't do anything with these.
72+
pass
73+
74+
elif "start" in chunk:
75+
chunk_copy = (
76+
chunk.copy()
77+
) # So we don't modify the original chunk, which feels wrong.
78+
chunk_copy.pop("start")
79+
chunk_copy["content"] = ""
80+
self.messages.append(chunk_copy)
81+
82+
elif "content" in chunk:
83+
self.messages[-1]["content"] += chunk["content"]
84+
85+
elif type(chunk) == bytes:
86+
if self.messages[-1]["content"] == "": # We initialize as an empty string ^
87+
self.messages[-1]["content"] = b"" # But it actually should be bytes
88+
self.messages[-1]["content"] += chunk
89+
90+
91+
def create_router(async_interpreter):
92+
router = APIRouter()
93+
94+
@router.get("/heartbeat")
95+
async def heartbeat():
96+
return {"status": "alive"}
97+
98+
@router.websocket("/")
99+
async def websocket_endpoint(websocket: WebSocket):
100+
await websocket.accept()
101+
try:
102+
103+
async def receive_input():
104+
while True:
105+
try:
106+
data = await websocket.receive()
107+
108+
if data.get("type") == "websocket.receive" and "text" in data:
109+
data = json.loads(data["text"])
110+
await async_interpreter.input(data)
111+
elif (
112+
data.get("type") == "websocket.disconnect"
113+
and data.get("code") == 1000
114+
):
115+
print("Disconnecting.")
116+
return
117+
else:
118+
print("Invalid data:", data)
119+
continue
120+
121+
except Exception as e:
122+
error_message = {
123+
"role": "server",
124+
"type": "error",
125+
"content": traceback.format_exc() + "\n" + str(e),
126+
}
127+
await websocket.send_text(json.dumps(error_message))
128+
129+
async def send_output():
130+
while True:
131+
try:
132+
output = await async_interpreter.output()
133+
134+
if isinstance(output, bytes):
135+
await websocket.send_bytes(output)
136+
else:
137+
await websocket.send_text(json.dumps(output))
138+
except Exception as e:
139+
traceback.print_exc()
140+
error_message = {
141+
"role": "server",
142+
"type": "error",
143+
"content": traceback.format_exc() + "\n" + str(e),
144+
}
145+
await websocket.send_text(json.dumps(error_message))
146+
147+
await asyncio.gather(receive_input(), send_output())
148+
except Exception as e:
149+
traceback.print_exc()
150+
try:
151+
error_message = {
152+
"role": "server",
153+
"type": "error",
154+
"content": traceback.format_exc() + "\n" + str(e),
155+
}
156+
await websocket.send_text(json.dumps(error_message))
157+
except:
158+
# If we can't send it, that's fine.
159+
pass
160+
finally:
161+
await websocket.close()
162+
163+
@router.post("/settings")
164+
async def settings(payload: Dict[str, Any]):
165+
for key, value in payload.items():
166+
print(f"Updating settings: {key} = {value}")
167+
if key in ["llm", "computer"] and isinstance(value, dict):
168+
for sub_key, sub_value in value.items():
169+
setattr(getattr(async_interpreter, key), sub_key, sub_value)
170+
else:
171+
setattr(async_interpreter, key, value)
172+
173+
return {"status": "success"}
174+
175+
return router
176+
177+
178+
class Server:
179+
def __init__(self, async_interpreter, host="0.0.0.0", port=8000):
180+
self.app = FastAPI()
181+
router = create_router(async_interpreter)
182+
self.app.include_router(router)
183+
self.host = host
184+
self.port = port
185+
self.uvicorn_server = uvicorn.Server(
186+
config=uvicorn.Config(app=self.app, host=self.host, port=self.port)
187+
)
188+
189+
def run(self):
190+
uvicorn.run(self.app, host=self.host, port=self.port)

interpreter/core/computer/terminal/languages/jupyter_language.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -236,6 +236,13 @@ def detect_active_line(self, line):
236236

237237
def _capture_output(self, message_queue):
238238
while True:
239+
# For async usage
240+
if (
241+
hasattr(self.computer.interpreter, "stop_event")
242+
and self.computer.interpreter.stop_event.is_set()
243+
):
244+
break
245+
239246
if self.listener_thread:
240247
try:
241248
output = message_queue.get(timeout=0.1)

interpreter/core/core.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,6 @@
2020
from .utils.telemetry import send_telemetry
2121
from .utils.truncate_output import truncate_output
2222

23-
try:
24-
from .server import server
25-
except:
26-
# Dependencies for server are not generally required
27-
pass
28-
2923

3024
class OpenInterpreter:
3125
"""
@@ -141,14 +135,6 @@ def __init__(
141135
self.empty_code_output_template = empty_code_output_template
142136
self.code_output_sender = code_output_sender
143137

144-
def server(self, *args, **kwargs):
145-
try:
146-
server(self)
147-
except ImportError:
148-
display_markdown_message(
149-
"Missing dependencies for the server, please run `pip install open-interpreter[server]` and try again."
150-
)
151-
152138
def local_setup(self):
153139
"""
154140
Opens a wizard that lets terminal users pick a local model.
@@ -313,6 +299,7 @@ def _respond_and_store(self):
313299
Pulls from the respond stream, adding delimiters. Some things, like active_line, console, confirmation... these act specially.
314300
Also assembles new messages and adds them to `self.messages`.
315301
"""
302+
self.verbose = False
316303

317304
# Utility function
318305
def is_active_line_chunk(chunk):
@@ -321,6 +308,10 @@ def is_active_line_chunk(chunk):
321308
last_flag_base = None
322309

323310
for chunk in respond(self):
311+
# For async usage
312+
if hasattr(self, "stop_event") and self.stop_event.is_set():
313+
break
314+
324315
if chunk["content"] == "":
325316
continue
326317

@@ -330,7 +321,10 @@ def is_active_line_chunk(chunk):
330321
if last_flag_base:
331322
yield {**last_flag_base, "end": True}
332323
last_flag_base = None
333-
yield chunk
324+
325+
if self.auto_run == False:
326+
yield chunk
327+
334328
# We want to append this now, so even if content is never filled, we know that the execution didn't produce output.
335329
# ... rethink this though.
336330
self.messages.append(

interpreter/core/llm/llm.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,10 +4,12 @@
44
import litellm
55

66
litellm.suppress_debug_info = True
7+
import json
78
import subprocess
89
import time
910
import uuid
1011

12+
import requests
1113
import tokentrim as tt
1214

1315
from ...terminal_interface.utils.display_markdown_message import (
@@ -289,8 +291,6 @@ def load(self):
289291
return
290292

291293
if self.model.startswith("ollama/"):
292-
# WOAH we should also hit up ollama and set max_tokens and context_window based on the LLM. I think they let u do that
293-
294294
model_name = self.model.replace("ollama/", "")
295295
try:
296296
# List out all downloaded ollama models. Will fail if ollama isn't installed
@@ -315,20 +315,36 @@ def load(self):
315315
self.interpreter.display_message(f"\nDownloading {model_name}...\n")
316316
subprocess.run(["ollama", "pull", model_name], check=True)
317317

318+
# Get context window if not set
319+
if self.context_window == None:
320+
response = requests.post(
321+
"http://localhost:11434/api/show", json={"name": model_name}
322+
)
323+
model_info = response.json().get("model_info", {})
324+
context_length = None
325+
for key in model_info:
326+
if "context_length" in key:
327+
context_length = model_info[key]
328+
break
329+
if context_length is not None:
330+
self.context_window = context_length
331+
if self.max_tokens == None:
332+
if self.context_window != None:
333+
self.max_tokens = int(self.context_window * 0.8)
334+
318335
# Send a ping, which will actually load the model
319-
# print(f"\nLoading {model_name}...\n")
336+
print(f"Loading {model_name}...\n")
320337

321338
old_max_tokens = self.max_tokens
322339
self.max_tokens = 1
323340
self.interpreter.computer.ai.chat("ping")
324341
self.max_tokens = old_max_tokens
325342

326-
# self.interpreter.display_message("\n*Model loaded.*\n")
343+
self.interpreter.display_message("*Model loaded.*\n")
327344

328345
# Validate LLM should be moved here!!
329346

330347
self._is_loaded = True
331-
return
332348

333349

334350
def fixed_litellm_completions(**params):

interpreter/terminal_interface/profiles/defaults/local.py

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,3 @@
2727
# Misc settings
2828
interpreter.auto_run = False
2929
interpreter.offline = True
30-
31-
# Final message
32-
interpreter.display_message(
33-
f"> Model set to `{interpreter.llm.model}`\n\n**Open Interpreter** will require approval before running code.\n\nUse `interpreter -y` to bypass this.\n\nPress `CTRL-C` to exit.\n"
34-
)

0 commit comments

Comments
 (0)