Skip to content

Commit 568417a

Browse files
Fede Kamelharfede-kamel
authored andcommitted
fix(streaming-performance) - Optimize streaming event processing and resource cleanup
fix(streaming-performance) - Optimize streaming event processing and resource cleanup fix(streaming-performance) - Optimize streaming event processing and resource cleanup
1 parent 109b771 commit 568417a

File tree

2 files changed

+782
-28
lines changed

2 files changed

+782
-28
lines changed

src/anthropic/_streaming.py

Lines changed: 14 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -81,20 +81,6 @@ def __stream__(self) -> Iterator[_T]:
8181
if sse.event == "completion":
8282
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
8383

84-
if (
85-
sse.event == "message_start"
86-
or sse.event == "message_delta"
87-
or sse.event == "message_stop"
88-
or sse.event == "content_block_start"
89-
or sse.event == "content_block_delta"
90-
or sse.event == "content_block_stop"
91-
):
92-
data = sse.json()
93-
if is_dict(data) and "type" not in data:
94-
data["type"] = sse.event
95-
96-
yield process_data(data=data, cast_to=cast_to, response=response)
97-
9884
if sse.event == "ping":
9985
continue
10086

@@ -113,6 +99,13 @@ def __stream__(self) -> Iterator[_T]:
11399
response=self.response,
114100
)
115101

102+
# Process any other event for forward compatibility
103+
data = sse.json()
104+
if is_dict(data) and "type" not in data:
105+
data["type"] = sse.event
106+
107+
yield process_data(data=data, cast_to=cast_to, response=response)
108+
116109
# As we might not fully consume the response stream, we need to close it explicitly
117110
response.close()
118111

@@ -198,20 +191,6 @@ async def __stream__(self) -> AsyncIterator[_T]:
198191
if sse.event == "completion":
199192
yield process_data(data=sse.json(), cast_to=cast_to, response=response)
200193

201-
if (
202-
sse.event == "message_start"
203-
or sse.event == "message_delta"
204-
or sse.event == "message_stop"
205-
or sse.event == "content_block_start"
206-
or sse.event == "content_block_delta"
207-
or sse.event == "content_block_stop"
208-
):
209-
data = sse.json()
210-
if is_dict(data) and "type" not in data:
211-
data["type"] = sse.event
212-
213-
yield process_data(data=data, cast_to=cast_to, response=response)
214-
215194
if sse.event == "ping":
216195
continue
217196

@@ -230,6 +209,13 @@ async def __stream__(self) -> AsyncIterator[_T]:
230209
response=self.response,
231210
)
232211

212+
# Process any other event for forward compatibility
213+
data = sse.json()
214+
if is_dict(data) and "type" not in data:
215+
data["type"] = sse.event
216+
217+
yield process_data(data=data, cast_to=cast_to, response=response)
218+
233219
# As we might not fully consume the response stream, we need to close it explicitly
234220
await response.aclose()
235221

0 commit comments

Comments
 (0)