Skip to content

Commit 9efb3f4

Browse files
author
Rossdan Craig rossdan@lastmileai.dev
committed
Get basic streaming to work for run
Building upon what Ryan investigated in #651, all the frontend changes are from him I just rebased onto his PR This is a bit tricky becasue we have to support: 1. streaming models --> use a queue iterator for passing the output text 2. non-streaming models --> still yield, just don't use a queue iterator and wait for run command to finish General flow: 1. Parse prompts from client 2. Define stream callback with queue iterator 3. Start thread to run aiconfig without blocking main thread from accessing queue iterator 4. Create a copy of the original AIConfig so we can write partially streamed outputs, yield and display it without risk of race conditions 5. Wait for queue iterator to start containing data, or wait until max timeout (becuase model may not support streaming) 5. Iterate through queue iterator, saving output to display config, yield display config 6. Once output is complete, wait for the original `config.run()` thread and display the output from that Open questions/TODOs 1. [solved - use `while output_text_queue.isEmpty() and t.is_alive()`] ~~How can we check whether model supports streaming or not? Right now we just default to having a max timeout of 5s, but long-term would be better for people to explicitly mark this as a boolean flag in their model parser class~~ 2. I need update the output format for streaming. I thought it was fine but guess not, will verify again. A bit annoying but also not a crazy blocker for now 3. Client needs to also support streaming, but that's fine Ryan can get unblocked with this diff now 4. Pretty complex, but streaming will break for `run_with_dependencies`. I've got a proposal to fix forward in https://github.com/lastmile-ai/gradio-workbook/pull/64 and really want people to take a look and give feedback ## Test plan ```bash alias aiconfig="python -m 'aiconfig.scripts.aiconfig_cli'" aiconfig edit --aiconfig-path="/Users/rossdancraig/Projects/aiconfig/cookbooks/Getting-Started/travel.aiconfig.json" --server-port=8080 --server-mode=debug_servers # Now run this from another terminal curl http://localhost:8080/api/run -d '{"prompt_name":"get_activities"}' -X POST -H 'Content-Type: application/json' ``` I also added this line to print output: ``` print(accumulated_output_text) ``` Streaming https://github.com/lastmile-ai/aiconfig/assets/151060367/d8930ea6-3143-49a3-89c6-4a2668c2e9e1 Non-streaming (same as before) https://github.com/lastmile-ai/aiconfig/assets/151060367/5aae7c7f-c273-4be7-bcb9-e96199a04076
1 parent a50fa41 commit 9efb3f4

File tree

5 files changed

+245
-20
lines changed

5 files changed

+245
-20
lines changed

python/src/aiconfig/editor/client/package.json

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
"aiconfig": "../../../../../typescript",
3939
"lodash": "^4.17.21",
4040
"node-fetch": "^3.3.2",
41+
"oboe": "^2.1.5",
4142
"react": "^18",
4243
"react-dom": "^18",
4344
"react-markdown": "^8.0.6",
@@ -49,6 +50,7 @@
4950
"devDependencies": {
5051
"@types/lodash": "^4.14.202",
5152
"@types/node": "^20",
53+
"@types/oboe": "^2.1.4",
5254
"@types/react": "^18",
5355
"@types/react-dom": "^18",
5456
"@typescript-eslint/eslint-plugin": "^6.16.0",
Lines changed: 37 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,37 @@
1+
import oboe, { Options } from "oboe";
2+
3+
// Promisify Oboe - similar to this: https://stackoverflow.com/questions/54855494/rewrite-fetch-call-to-oboe-for-json-streams-with-typescript
4+
// Except it allows to use .node('*', fn) & only resolves on done
5+
// See https://medium.com/@amberlamps84/oboe-js-mongodb-express-node-js-and-the-beauty-of-streams-4a90fad5414 on using oboe vs raw streams
6+
// (multiple chunks can be sent in single response & we only want valid json ones)
7+
export async function streamingApi<T>(
8+
headers: Options,
9+
on: string = "*",
10+
fn: (data: any) => void,
11+
on2?: string,
12+
fn2?: (data: any) => void,
13+
on3?: string,
14+
fn3?: (data: any) => void
15+
): Promise<T> {
16+
return new Promise((resolve, reject) => {
17+
if (fn2 && on2 && fn3 && on3) {
18+
oboe(headers)
19+
.node(on, fn)
20+
.node(on2, fn2)
21+
.node(on3, fn3)
22+
.done((data) => resolve(data))
23+
.fail((err) => reject(err.jsonBody));
24+
} else if (fn2 && on2) {
25+
oboe(headers)
26+
.node(on, fn)
27+
.node(on2, fn2)
28+
.done((data) => resolve(data))
29+
.fail((err) => reject(err.jsonBody));
30+
} else {
31+
oboe(headers)
32+
.node(on, fn)
33+
.done((data) => resolve(data))
34+
.fail((err) => reject(err.jsonBody));
35+
}
36+
});
37+
}

python/src/aiconfig/editor/client/yarn.lock

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2539,6 +2539,13 @@
25392539
dependencies:
25402540
undici-types "~5.26.4"
25412541

2542+
"@types/oboe@^2.1.4":
2543+
version "2.1.4"
2544+
resolved "https://registry.yarnpkg.com/@types/oboe/-/oboe-2.1.4.tgz#d92c4636d0b7737803e4361e10e8dad488f39634"
2545+
integrity sha512-bXt4BXSQy0N/buSIak1o0TjYAk2SAeK1aZV9xKcb+xVGWYP8NcMOFy2T7Um3kIvEcQJzrdgJ8R6fpbRcp/LEww==
2546+
dependencies:
2547+
"@types/node" "*"
2548+
25422549
"@types/parse-json@^4.0.0":
25432550
version "4.0.2"
25442551
resolved "https://registry.yarnpkg.com/@types/parse-json/-/parse-json-4.0.2.tgz#5950e50960793055845e956c427fc2b0d70c5239"
@@ -6083,6 +6090,11 @@ http-errors@~1.6.2:
60836090
setprototypeof "1.1.0"
60846091
statuses ">= 1.4.0 < 2"
60856092

6093+
http-https@^1.0.0:
6094+
version "1.0.0"
6095+
resolved "https://registry.yarnpkg.com/http-https/-/http-https-1.0.0.tgz#2f908dd5f1db4068c058cd6e6d4ce392c913389b"
6096+
integrity sha512-o0PWwVCSp3O0wS6FvNr6xfBCHgt0m1tvPLFOCc2iFDKTRAXhB7m8klDf7ErowFH8POa6dVdGatKU5I1YYwzUyg==
6097+
60866098
http-parser-js@>=0.5.1:
60876099
version "0.5.8"
60886100
resolved "https://registry.yarnpkg.com/http-parser-js/-/http-parser-js-0.5.8.tgz#af23090d9ac4e24573de6f6aecc9d84a48bf20e3"
@@ -8524,6 +8536,13 @@ object.values@^1.1.0, object.values@^1.1.6, object.values@^1.1.7:
85248536
define-properties "^1.2.0"
85258537
es-abstract "^1.22.1"
85268538

8539+
oboe@^2.1.5:
8540+
version "2.1.5"
8541+
resolved "https://registry.yarnpkg.com/oboe/-/oboe-2.1.5.tgz#5554284c543a2266d7a38f17e073821fbde393cd"
8542+
integrity sha512-zRFWiF+FoicxEs3jNI/WYUrVEgA7DeET/InK0XQuudGHRg8iIob3cNPrJTKaz4004uaA9Pbe+Dwa8iluhjLZWA==
8543+
dependencies:
8544+
http-https "^1.0.0"
8545+
85278546
obuf@^1.0.0, obuf@^1.1.2:
85288547
version "1.1.2"
85298548
resolved "https://registry.yarnpkg.com/obuf/-/obuf-1.1.2.tgz#09bea3343d41859ebd446292d11c9d4db619084e"
Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,44 @@
1+
"""Queue iterator for streaming. Can only process strings for now
2+
but in future will try to make it generic"""
3+
# import asyncio
4+
from queue import Queue
5+
6+
# from typing import Generic, TypeVar
7+
8+
# # TODO: Add generic typing for queue items
9+
# # (couldn't get sentinel value to work with generics)
10+
# T = TypeVar('T')
11+
STOP_STREAMING_SIGNAL = object() # sentinel value to indicate end of stream
12+
13+
14+
class QueueIterator:
15+
"""In order to support text streaming, we need to store
16+
the output in a queue and iterate over those values. A lot of this was
17+
inspired by HuggingFace's TextIteratorStreamer object:
18+
19+
I know I can just use a queue directly in the callsite with
20+
`iter(queue.get, None)`, but having a class makes it easier to manage
21+
and abstracts it a bit more.
22+
"""
23+
24+
def __init__(self):
25+
self.q = Queue()
26+
self.stop_signal = STOP_STREAMING_SIGNAL
27+
self.timeout = None
28+
29+
def __iter__(self):
30+
return self
31+
32+
def __next__(self):
33+
value = self.q.get(block=True, timeout=self.timeout)
34+
if value == self.stop_signal:
35+
raise StopIteration()
36+
return value
37+
38+
def put(self, text: str, stream_end: bool = False):
39+
self.q.put(text, timeout=self.timeout)
40+
if stream_end:
41+
self.q.put(self.stop_signal, timeout=self.timeout)
42+
43+
def isEmpty(self) -> bool:
44+
return self.q.empty()

python/src/aiconfig/editor/server/server.py

Lines changed: 143 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import asyncio
2+
import copy
3+
import json
14
import logging
5+
import threading
6+
import time
27
import webbrowser
38
from typing import Any, Dict, Type, Union
49

510
import lastmile_utils.lib.core.api as core_utils
611
import result
712
from aiconfig.Config import AIConfigRuntime
13+
from aiconfig.editor.server.queue_iterator import STOP_STREAMING_SIGNAL, QueueIterator
814
from aiconfig.editor.server.server_utils import (
915
EditServerConfig,
1016
FlaskResponse,
@@ -26,11 +32,11 @@
2632
)
2733
from aiconfig.model_parser import InferenceOptions
2834
from aiconfig.registry import ModelParserRegistry
29-
from flask import Flask, request
35+
from flask import Flask, Response, request, stream_with_context
3036
from flask_cors import CORS
3137
from result import Err, Ok, Result
3238

33-
from aiconfig.schema import Prompt
39+
from aiconfig.schema import ExecuteResult, Output, Prompt
3440

3541
logging.getLogger("werkzeug").disabled = True
3642

@@ -173,35 +179,152 @@ def create() -> FlaskResponse:
173179

174180

175181
@app.route("/api/run", methods=["POST"])
176-
async def run() -> FlaskResponse:
182+
def run() -> FlaskResponse:
183+
EXCLUDE_OPTIONS = {
184+
"prompt_index": True,
185+
"file_path": True,
186+
"callback_manager": True,
187+
}
177188
state = get_server_state(app)
178189
aiconfig = state.aiconfig
179190
request_json = request.get_json()
180191

192+
prompt_name: Union[str, None] = request_json.get("prompt_name")
193+
if prompt_name is None:
194+
return HttpResponseWithAIConfig(
195+
message="No prompt name provided, cannot execute `run` command",
196+
code=400,
197+
aiconfig=None,
198+
).to_flask_format()
199+
200+
# TODO (rossdanlm): Refactor aiconfig.run() to not take in `params`
201+
# as a function arg since we can now just call
202+
# aiconfig.get_parameters(prompt_name) directly inside of run. See:
203+
# https://github.com/lastmile-ai/aiconfig/issues/671
204+
params = request_json.get("params", aiconfig.get_parameters(prompt_name)) # type: ignore
205+
stream = request_json.get("stream", False) # TODO: set this automatically to True after client supports stream output
206+
207+
# Define stream callback and queue object for streaming results
208+
output_text_queue = QueueIterator()
209+
210+
def update_output_queue(data, _accumulated_data, _index) -> None:
211+
should_end_stream = data == STOP_STREAMING_SIGNAL
212+
output_text_queue.put(data, should_end_stream)
213+
214+
inference_options = InferenceOptions(
215+
stream=stream,
216+
stream_callback=update_output_queue,
217+
)
218+
219+
def generate():
220+
# Use multi-threading so that we don't block run command from
221+
# displaying the streamed output (if streaming is supported)
222+
def run_async_config_in_thread():
223+
asyncio.run(
224+
aiconfig.run(
225+
prompt_name=prompt_name,
226+
params=params,
227+
run_with_dependencies=False,
228+
options=inference_options,
229+
)
230+
)
231+
output_text_queue.put(STOP_STREAMING_SIGNAL)
232+
233+
t = threading.Thread(target=run_async_config_in_thread)
234+
t.start()
235+
236+
# Create a deep copy of the state aiconfig so we can yield an AIConfig
237+
# with streaming partial outputs in the meantime. This probably isn't
238+
# necessary, but just getting unblocked for now
239+
displaying_config = copy.deepcopy(aiconfig)
240+
241+
# If model supports streaming, need to wait until streamer has at
242+
# least 1 item to display. If model does not support streaming,
243+
# need to wait until the aiconfig.run() thread is complete
244+
SLEEP_DELAY_SECONDS = 0.1
245+
wait_time_in_seconds = 0.0
246+
while output_text_queue.isEmpty() and t.is_alive():
247+
# Yea I know time.sleep() isn't super accurate, but it's fine,
248+
# we can fix later
249+
time.sleep(0.1)
250+
wait_time_in_seconds += SLEEP_DELAY_SECONDS
251+
print(f"Output queue is currently empty. Waiting for {wait_time_in_seconds:.1f}s...")
252+
253+
# Yield in flask is weird and you either need to send responses as a
254+
# string, or artificially wrap them around "[" and "]"
255+
# yield "["
256+
if not output_text_queue.isEmpty():
257+
accumulated_output_text = ""
258+
for text in output_text_queue:
259+
if isinstance(text, str):
260+
accumulated_output_text += text
261+
elif isinstance(text, dict) and "content" in text:
262+
# TODO: Fix streaming output format so that it returns text
263+
accumulated_output_text += text["content"]
264+
elif isinstance(text, dict) and "generated_text" in text:
265+
# TODO: Fix streaming output format so that it returns text
266+
accumulated_output_text += text["generated_text"]
267+
268+
accumulated_output: Output = ExecuteResult(
269+
**{
270+
"output_type": "execute_result",
271+
"data": accumulated_output_text,
272+
# Assume streaming only supports single output
273+
# I think this actually may be wrong for PaLM or OpenAI
274+
# TODO: Need to sync with Ankush but can fix forward
275+
"execution_count": 0,
276+
"metadata": {},
277+
}
278+
)
279+
280+
displaying_config.add_output(prompt_name, accumulated_output, overwrite=True)
281+
aiconfig_json = displaying_config.model_dump(exclude=EXCLUDE_OPTIONS)
282+
yield "["
283+
yield json.dumps({"aiconfig": aiconfig_json})
284+
yield "]"
285+
286+
# Ensure that the run process is complete to yield final output
287+
t.join()
288+
aiconfig_json = aiconfig.model_dump(exclude=EXCLUDE_OPTIONS)
289+
yield "["
290+
yield json.dumps({"aiconfig": aiconfig_json})
291+
yield "]"
292+
181293
try:
182-
prompt_name: Union[str, None] = request_json.get("prompt_name")
183-
if prompt_name is None:
184-
return HttpResponseWithAIConfig(
185-
message="No prompt name provided, cannot execute `run` command",
186-
code=400,
187-
aiconfig=None,
188-
).to_flask_format()
189-
190-
# TODO (rossdanlm): Refactor aiconfig.run() to not take in `params`
191-
# as a function arg since we can now just call
192-
# aiconfig.get_parameters(prompt_name) directly inside of run. See:
193-
# https://github.com/lastmile-ai/aiconfig/issues/671
194-
params = request_json.get("params", aiconfig.get_parameters(prompt_name)) # type: ignore
195-
stream = request_json.get("stream", False)
196-
options = InferenceOptions(stream=stream)
197-
run_output = await aiconfig.run(prompt_name, params, options) # type: ignore
198-
LOGGER.debug(f"run_output: {run_output}")
294+
if stream:
295+
LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}")
296+
# Streaming based on
297+
# https://stackoverflow.com/questions/73275517/flask-not-streaming-json-response
298+
return Response(
299+
stream_with_context(generate()),
300+
status=200,
301+
content_type="application/json",
302+
)
303+
304+
# Run without streaming
305+
inference_options = InferenceOptions(stream=stream)
306+
def run_async_config_in_thread():
307+
asyncio.run(
308+
aiconfig.run(
309+
prompt_name=prompt_name,
310+
params=params,
311+
run_with_dependencies=False,
312+
options=inference_options,
313+
)
314+
)
315+
output_text_queue.put(STOP_STREAMING_SIGNAL)
316+
317+
t = threading.Thread(target=run_async_config_in_thread)
318+
t.start()
319+
LOGGER.info(f"Running `aiconfig.run()` command with request: {request_json}")
320+
t.join()
199321
return HttpResponseWithAIConfig(
200322
#
201323
message="Ran prompt",
202324
code=200,
203325
aiconfig=aiconfig,
204326
).to_flask_format()
327+
205328
except Exception as e:
206329
return HttpResponseWithAIConfig(
207330
#

0 commit comments

Comments
 (0)