@@ -89,32 +89,33 @@ def _make_session() -> requests.Session:
89
89
return s
90
90
91
91
92
- def parse_stream_helper (line : bytes ):
92
+ def parse_stream_helper (line : bytes ) -> Optional [ str ] :
93
93
if line :
94
94
if line .strip () == b"data: [DONE]" :
95
95
# return here will cause GeneratorExit exception in urllib3
96
96
# and it will close http connection with TCP Reset
97
97
return None
98
- if hasattr (line , "decode" ):
99
- line = line .decode ("utf-8" )
100
- if line .startswith ("data: " ):
101
- line = line [len ("data: " ) :]
102
- return line
98
+ if line .startswith (b"data: " ):
99
+ line = line [len (b"data: " ) :]
100
+ return line .decode ("utf-8" )
103
101
return None
104
102
105
103
106
- def parse_stream (rbody ) :
104
+ def parse_stream (rbody : Iterator [ bytes ]) -> Iterator [ str ] :
107
105
for line in rbody :
108
106
_line = parse_stream_helper (line )
109
107
if _line is not None :
110
108
yield _line
111
109
112
110
113
111
async def parse_stream_async (rbody : aiohttp .StreamReader ):
114
- async for line , _ in rbody .iter_chunks ():
115
- _line = parse_stream_helper (line )
116
- if _line is not None :
117
- yield _line
112
+ async for chunk , _ in rbody .iter_chunks ():
113
+ # While the `ChunkTupleAsyncStreamIterator` iterator is meant to iterate over chunks (and thus lines) it seems
114
+ # to still sometimes return multiple lines at a time, so let's split the chunk by lines again.
115
+ for line in chunk .splitlines ():
116
+ _line = parse_stream_helper (line )
117
+ if _line is not None :
118
+ yield _line
118
119
119
120
120
121
class APIRequestor :
@@ -296,20 +297,25 @@ async def arequest(
296
297
) -> Tuple [Union [OpenAIResponse , AsyncGenerator [OpenAIResponse , None ]], bool , str ]:
297
298
ctx = aiohttp_session ()
298
299
session = await ctx .__aenter__ ()
299
- result = await self .arequest_raw (
300
- method .lower (),
301
- url ,
302
- session ,
303
- params = params ,
304
- supplied_headers = headers ,
305
- files = files ,
306
- request_id = request_id ,
307
- request_timeout = request_timeout ,
308
- )
309
- resp , got_stream = await self ._interpret_async_response (result , stream )
300
+ try :
301
+ result = await self .arequest_raw (
302
+ method .lower (),
303
+ url ,
304
+ session ,
305
+ params = params ,
306
+ supplied_headers = headers ,
307
+ files = files ,
308
+ request_id = request_id ,
309
+ request_timeout = request_timeout ,
310
+ )
311
+ resp , got_stream = await self ._interpret_async_response (result , stream )
312
+ except Exception :
313
+ await ctx .__aexit__ (None , None , None )
314
+ raise
310
315
if got_stream :
311
316
312
317
async def wrap_resp ():
318
+ assert isinstance (resp , AsyncGenerator )
313
319
try :
314
320
async for r in resp :
315
321
yield r
@@ -612,7 +618,10 @@ def _interpret_response(
612
618
else :
613
619
return (
614
620
self ._interpret_response_line (
615
- result .content , result .status_code , result .headers , stream = False
621
+ result .content .decode ("utf-8" ),
622
+ result .status_code ,
623
+ result .headers ,
624
+ stream = False ,
616
625
),
617
626
False ,
618
627
)
@@ -635,13 +644,16 @@ async def _interpret_async_response(
635
644
util .log_warn (e , body = result .content )
636
645
return (
637
646
self ._interpret_response_line (
638
- await result .read (), result .status , result .headers , stream = False
647
+ (await result .read ()).decode ("utf-8" ),
648
+ result .status ,
649
+ result .headers ,
650
+ stream = False ,
639
651
),
640
652
False ,
641
653
)
642
654
643
655
def _interpret_response_line (
644
- self , rbody , rcode , rheaders , stream : bool
656
+ self , rbody : str , rcode : int , rheaders , stream : bool
645
657
) -> OpenAIResponse :
646
658
# HTTP 204 response code does not have any content in the body.
647
659
if rcode == 204 :
@@ -655,13 +667,11 @@ def _interpret_response_line(
655
667
headers = rheaders ,
656
668
)
657
669
try :
658
- if hasattr (rbody , "decode" ):
659
- rbody = rbody .decode ("utf-8" )
660
670
data = json .loads (rbody )
661
- except (JSONDecodeError , UnicodeDecodeError ):
671
+ except (JSONDecodeError , UnicodeDecodeError ) as e :
662
672
raise error .APIError (
663
673
f"HTTP code { rcode } from API ({ rbody } )" , rbody , rcode , headers = rheaders
664
- )
674
+ ) from e
665
675
resp = OpenAIResponse (data , rheaders )
666
676
# In the future, we might add a "status" parameter to errors
667
677
# to better handle the "error while streaming" case.
0 commit comments