Skip to content

Commit cf51769

Browse files
Fede Kamelharfede-kamel
authored andcommitted
fix(streaming-performance) - Optimize streaming event processing and resource cleanup
fix(streaming-performance) - Optimize streaming event processing and resource cleanup fix(streaming-performance) - Optimize streaming event processing and resource cleanup
1 parent 4022488 commit cf51769

File tree

2 files changed

+793
-24
lines changed

2 files changed

+793
-24
lines changed

src/anthropic/_streaming.py

Lines changed: 25 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,19 @@
1919

2020
_T = TypeVar("_T")
2121

22+
# Centralized event type definitions for O(1) lookup performance
23+
# frozenset prevents accidental modification and is slightly faster than set
24+
MESSAGE_EVENTS = frozenset(
25+
{
26+
"message_start",
27+
"message_delta",
28+
"message_stop",
29+
"content_block_start",
30+
"content_block_delta",
31+
"content_block_stop",
32+
}
33+
)
34+
2235

2336
class _SyncStreamMeta(abc.ABCMeta):
2437
@override
@@ -80,15 +93,8 @@ def __stream__(self) -> Iterator[_T]:
8093
for sse in iterator:
8194
if sse.event == "completion":
8295
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
83-
84-
if (
85-
sse.event == "message_start"
86-
or sse.event == "message_delta"
87-
or sse.event == "message_stop"
88-
or sse.event == "content_block_start"
89-
or sse.event == "content_block_delta"
90-
or sse.event == "content_block_stop"
91-
):
96+
# Single, fast membership test instead of multiple string comparisons
97+
if sse.event in MESSAGE_EVENTS:
9298
data = sse.json()
9399
if is_dict(data) and "type" not in data:
94100
data["type"] = sse.event
@@ -101,6 +107,7 @@ def __stream__(self) -> Iterator[_T]:
101107
if sse.event == "error":
102108
body = sse.data
103109

110+
# Extract meaningful error messages and use specific exception handling
104111
try:
105112
body = sse.json()
106113
err_msg = f"{body}"
@@ -113,9 +120,10 @@ def __stream__(self) -> Iterator[_T]:
113120
response=self.response,
114121
)
115122

116-
# Ensure the entire stream is consumed
117-
for _sse in iterator:
118-
...
123+
# Explicitly closes decoder resources if available
124+
# Immediately releases connection instead of consuming remaining stream
125+
if hasattr(self._decoder, "close"):
126+
self._decoder.close() # Properly closes decoder resources without unnecessary iteration
119127

120128
def __enter__(self) -> Self:
121129
return self
@@ -198,15 +206,8 @@ async def __stream__(self) -> AsyncIterator[_T]:
198206
async for sse in iterator:
199207
if sse.event == "completion":
200208
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
201-
202-
if (
203-
sse.event == "message_start"
204-
or sse.event == "message_delta"
205-
or sse.event == "message_stop"
206-
or sse.event == "content_block_start"
207-
or sse.event == "content_block_delta"
208-
or sse.event == "content_block_stop"
209-
):
209+
# same O(1) lookup for consistency between sync/async versions
210+
if sse.event in MESSAGE_EVENTS:
210211
data = sse.json()
211212
if is_dict(data) and "type" not in data:
212213
data["type"] = sse.event
@@ -231,9 +232,9 @@ async def __stream__(self) -> AsyncIterator[_T]:
231232
response=self.response,
232233
)
233234

234-
# Ensure the entire stream is consumed
235-
async for _sse in iterator:
236-
...
235+
# Immediately releases connection instead of consuming remaining stream
236+
if hasattr(self._decoder, "close"):
237+
self._decoder.close() # Properly closes decoder resources without unnecessary iteration
237238

238239
async def __aenter__(self) -> Self:
239240
return self

0 commit comments

Comments
 (0)