1919
2020_T = TypeVar ("_T" )
2121
22+ # Centralized event type definitions for O(1) lookup performance
23+ # frozenset prevents accidental modification and is slightly faster than set
24+ MESSAGE_EVENTS = frozenset (
25+ {
26+ "message_start" ,
27+ "message_delta" ,
28+ "message_stop" ,
29+ "content_block_start" ,
30+ "content_block_delta" ,
31+ "content_block_stop" ,
32+ }
33+ )
34+
2235
2336class _SyncStreamMeta (abc .ABCMeta ):
2437 @override
@@ -80,15 +93,8 @@ def __stream__(self) -> Iterator[_T]:
8093 for sse in iterator :
8194 if sse .event == "completion" :
8295 yield process_data (data = sse .json (), cast_to = cast_to , response = response )
83-
84- if (
85- sse .event == "message_start"
86- or sse .event == "message_delta"
87- or sse .event == "message_stop"
88- or sse .event == "content_block_start"
89- or sse .event == "content_block_delta"
90- or sse .event == "content_block_stop"
91- ):
96+ # Single, fast membership test instead of multiple string comparisons
97+ if sse .event in MESSAGE_EVENTS :
9298 data = sse .json ()
9399 if is_dict (data ) and "type" not in data :
94100 data ["type" ] = sse .event
@@ -101,6 +107,7 @@ def __stream__(self) -> Iterator[_T]:
101107 if sse .event == "error" :
102108 body = sse .data
103109
110+ # Extract meaningful error messages and use specific exception handling
104111 try :
105112 body = sse .json ()
106113 err_msg = f"{ body } "
@@ -113,9 +120,10 @@ def __stream__(self) -> Iterator[_T]:
113120 response = self .response ,
114121 )
115122
116- # Ensure the entire stream is consumed
117- for _sse in iterator :
118- ...
123+ # Explicitly closes decoder resources if available
124+ # Immediately releases connection instead of consuming remaining stream
125+ if hasattr (self ._decoder , "close" ):
126+ self ._decoder .close () # Properly closes decoder resources without unnecessary iteration
119127
120128 def __enter__ (self ) -> Self :
121129 return self
@@ -198,15 +206,8 @@ async def __stream__(self) -> AsyncIterator[_T]:
198206 async for sse in iterator :
199207 if sse .event == "completion" :
200208 yield process_data (data = sse .json (), cast_to = cast_to , response = response )
201-
202- if (
203- sse .event == "message_start"
204- or sse .event == "message_delta"
205- or sse .event == "message_stop"
206- or sse .event == "content_block_start"
207- or sse .event == "content_block_delta"
208- or sse .event == "content_block_stop"
209- ):
209+ # same O(1) lookup for consistency between sync/async versions
210+ if sse .event in MESSAGE_EVENTS :
210211 data = sse .json ()
211212 if is_dict (data ) and "type" not in data :
212213 data ["type" ] = sse .event
@@ -231,9 +232,9 @@ async def __stream__(self) -> AsyncIterator[_T]:
231232 response = self .response ,
232233 )
233234
234- # Ensure the entire stream is consumed
235- async for _sse in iterator :
236- ...
235+ # Immediately releases connection instead of consuming remaining stream
236+ if hasattr ( self . _decoder , "close" ) :
237+ self . _decoder . close () # Properly closes decoder resources without unnecessary iteration
237238
238239 async def __aenter__ (self ) -> Self :
239240 return self
0 commit comments