Skip to content

Commit ae08623

Browse files
authored
Merge pull request #244 from Portkey-AI/feat/stream-runs
feat: stream for threads and runs
2 parents fdf0885 + 36ea6d9 commit ae08623

File tree

1 file changed

+151
-8
lines changed

1 file changed

+151
-8
lines changed

portkey_ai/api_resources/apis/threads.py

Lines changed: 151 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
import json
2-
from typing import Any, Iterable, Literal, Optional, Union
2+
from typing import Any, AsyncIterator, Iterable, Iterator, Literal, Optional, Union
33
import typing
44

5+
from portkey_ai._vendor.openai.types.beta.assistant_stream_event import (
6+
AssistantStreamEvent,
7+
)
58
from portkey_ai.api_resources.apis.api_resource import APIResource, AsyncAPIResource
69
from portkey_ai.api_resources.client import AsyncPortkey, Portkey
710
from portkey_ai.api_resources.types.thread_message_type import (
@@ -105,14 +108,40 @@ def delete(
105108

106109
return data
107110

108-
def create_and_run(self, assistant_id, **kwargs) -> Run:
111+
def stream_create_and_run( # type: ignore[return]
112+
self, assistant_id, **kwargs
113+
) -> Union[Run, Iterator[AssistantStreamEvent]]:
114+
with self.openai_client.with_streaming_response.beta.threads.create_and_run(
115+
assistant_id=assistant_id, stream=True, extra_body=kwargs
116+
) as streaming:
117+
for line in streaming.iter_lines():
118+
json_string = line.replace("data: ", "")
119+
json_string = json_string.strip().rstrip("\n")
120+
if json_string == "[DONE]":
121+
break
122+
elif json_string == "":
123+
continue
124+
elif json_string != "":
125+
yield json_string
126+
else:
127+
return ""
128+
129+
def normal_create_and_run(self, assistant_id, **kwargs) -> Run:
109130
response = self.openai_client.with_raw_response.beta.threads.create_and_run(
110131
assistant_id=assistant_id, extra_body=kwargs
111132
)
112133
data = Run(**json.loads(response.text))
113134
data._headers = response.headers
114135
return data
115136

137+
def create_and_run(
138+
self, assistant_id, stream: Union[bool, NotGiven] = NOT_GIVEN, **kwargs
139+
) -> Union[Run, Iterator[AssistantStreamEvent]]:
140+
if stream is True:
141+
return self.stream_create_and_run(assistant_id, **kwargs)
142+
else:
143+
return self.normal_create_and_run(assistant_id, **kwargs)
144+
116145
def create_and_run_poll(
117146
self,
118147
*,
@@ -307,15 +336,60 @@ def __init__(self, client: Portkey) -> None:
307336
self.openai_client = client.openai_client
308337
self.steps = Steps(client)
309338

310-
def create(self, thread_id: str, *, assistant_id: str, **kwargs) -> Run:
339+
def stream_create( # type: ignore[return]
340+
self,
341+
thread_id,
342+
assistant_id,
343+
**kwargs,
344+
) -> Union[Run, Iterator[AssistantStreamEvent]]:
345+
with self.openai_client.with_streaming_response.beta.threads.runs.create(
346+
thread_id=thread_id,
347+
assistant_id=assistant_id,
348+
stream=True,
349+
extra_body=kwargs,
350+
) as streaming:
351+
for line in streaming.iter_lines():
352+
json_string = line.replace("data: ", "")
353+
json_string = json_string.strip().rstrip("\n")
354+
if json_string == "[DONE]":
355+
break
356+
elif json_string == "":
357+
continue
358+
elif json_string != "":
359+
yield json_string
360+
else:
361+
return ""
362+
363+
def normal_create(
364+
self,
365+
thread_id,
366+
assistant_id,
367+
**kwargs,
368+
) -> Run:
311369
response = self.openai_client.with_raw_response.beta.threads.runs.create(
312370
thread_id=thread_id, assistant_id=assistant_id, extra_body=kwargs
313371
)
314372
data = Run(**json.loads(response.text))
315373
data._headers = response.headers
316-
317374
return data
318375

376+
def create(
377+
self,
378+
thread_id: str,
379+
*,
380+
assistant_id: str,
381+
stream: Union[bool, NotGiven] = NOT_GIVEN,
382+
**kwargs,
383+
) -> Union[Run, Iterator[AssistantStreamEvent]]:
384+
if stream is True:
385+
return self.stream_create(
386+
thread_id=thread_id, assistant_id=assistant_id, **kwargs
387+
)
388+
else:
389+
return self.normal_create(
390+
thread_id=thread_id, assistant_id=assistant_id, **kwargs
391+
)
392+
319393
def retrieve(self, thread_id, run_id, **kwargs) -> Run:
320394
response = self.openai_client.with_raw_response.beta.threads.runs.retrieve(
321395
thread_id=thread_id, run_id=run_id, extra_body=kwargs
@@ -681,7 +755,25 @@ async def delete(
681755

682756
return data
683757

684-
async def create_and_run(self, assistant_id, **kwargs) -> Run:
758+
async def stream_create_and_run(
759+
self, assistant_id, **kwargs
760+
) -> Union[Run, AsyncIterator[AssistantStreamEvent]]:
761+
async with self.openai_client.with_streaming_response.beta.threads.create_and_run( # noqa: E501
762+
assistant_id=assistant_id, stream=True, extra_body=kwargs
763+
) as streaming:
764+
async for line in streaming.iter_lines():
765+
json_string = line.replace("data: ", "")
766+
json_string = json_string.strip().rstrip("\n")
767+
if json_string == "[DONE]":
768+
break
769+
elif json_string == "":
770+
continue
771+
elif json_string != "":
772+
yield json_string
773+
else:
774+
pass
775+
776+
async def normal_create_and_run(self, assistant_id, **kwargs) -> Run:
685777
response = (
686778
await self.openai_client.with_raw_response.beta.threads.create_and_run(
687779
assistant_id=assistant_id, extra_body=kwargs
@@ -691,6 +783,14 @@ async def create_and_run(self, assistant_id, **kwargs) -> Run:
691783
data._headers = response.headers
692784
return data
693785

786+
async def create_and_run(
787+
self, assistant_id, stream: Union[bool, NotGiven] = NOT_GIVEN, **kwargs
788+
) -> Union[Run, AsyncIterator[AssistantStreamEvent]]:
789+
if stream is True:
790+
return self.stream_create_and_run(assistant_id=assistant_id, **kwargs)
791+
else:
792+
return await self.normal_create_and_run(assistant_id=assistant_id, **kwargs)
793+
694794
async def create_and_run_poll(
695795
self,
696796
*,
@@ -897,15 +997,58 @@ def __init__(self, client: AsyncPortkey) -> None:
897997
self.openai_client = client.openai_client
898998
self.steps = AsyncSteps(client)
899999

900-
async def create(self, thread_id: str, *, assistant_id: str, **kwargs) -> Run:
1000+
async def stream_create(
1001+
self,
1002+
thread_id,
1003+
assistant_id,
1004+
**kwargs,
1005+
) -> Union[Run, AsyncIterator[AssistantStreamEvent]]:
1006+
async with self.openai_client.with_streaming_response.beta.threads.runs.create( # noqa: E501
1007+
thread_id=thread_id,
1008+
assistant_id=assistant_id,
1009+
stream=True,
1010+
extra_body=kwargs,
1011+
) as response:
1012+
async for line in response.iter_lines():
1013+
json_string = line.replace("data: ", "")
1014+
json_string = json_string.strip().rstrip("\n")
1015+
if json_string == "[DONE]":
1016+
break
1017+
elif json_string == "":
1018+
continue
1019+
elif json_string != "":
1020+
yield json_string
1021+
else:
1022+
pass
1023+
1024+
async def normal_create(
1025+
self,
1026+
thread_id,
1027+
assistant_id,
1028+
**kwargs,
1029+
) -> Run:
9011030
response = await self.openai_client.with_raw_response.beta.threads.runs.create(
902-
thread_id=thread_id, assistant_id=assistant_id, extra_body=kwargs
1031+
thread_id=thread_id,
1032+
assistant_id=assistant_id,
1033+
extra_body=kwargs,
9031034
)
9041035
data = Run(**json.loads(response.text))
9051036
data._headers = response.headers
906-
9071037
return data
9081038

1039+
async def create(
1040+
self,
1041+
thread_id: str,
1042+
*,
1043+
assistant_id: str,
1044+
stream: Union[bool, NotGiven] = NOT_GIVEN,
1045+
**kwargs,
1046+
) -> Union[Run, AsyncIterator[AssistantStreamEvent]]:
1047+
if stream is True:
1048+
return self.stream_create(thread_id, assistant_id, **kwargs)
1049+
else:
1050+
return await self.normal_create(thread_id, assistant_id, **kwargs)
1051+
9091052
async def retrieve(self, thread_id, run_id, **kwargs) -> Run:
9101053
response = (
9111054
await self.openai_client.with_raw_response.beta.threads.runs.retrieve(

0 commit comments

Comments
 (0)