Skip to content

Commit d8e4557

Browse files
committed
fix: streaming output event
1 parent 3a36f6a commit d8e4557

File tree

4 files changed

+202
-114
lines changed

4 files changed

+202
-114
lines changed

llm_observability_examples.py

Lines changed: 27 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -25,23 +25,30 @@
2525
def main_sync():
2626
trace_id = str(uuid.uuid4())
2727
print("Trace ID:", trace_id)
28+
distinct_id = "test_distinct_id"
29+
properties = {"test_property": "test_value"}
2830

2931
try:
30-
basic_openai_call()
31-
streaming_openai_call()
32+
basic_openai_call(distinct_id, trace_id, properties)
33+
# streaming_openai_call(distinct_id, trace_id, properties)
3234
except Exception as e:
3335
print("Error during OpenAI call:", str(e))
3436

3537

3638
async def main_async():
39+
trace_id = str(uuid.uuid4())
40+
print("Trace ID:", trace_id)
41+
distinct_id = "test_distinct_id"
42+
properties = {"test_property": "test_value"}
43+
3744
try:
38-
await basic_async_openai_call()
39-
await streaming_async_openai_call()
45+
await basic_async_openai_call(distinct_id, trace_id, properties)
46+
await streaming_async_openai_call(distinct_id, trace_id, properties)
4047
except Exception as e:
4148
print("Error during OpenAI call:", str(e))
4249

4350

44-
def basic_openai_call():
51+
def basic_openai_call(distinct_id, trace_id, properties):
4552
response = openai_client.chat.completions.create(
4653
model="gpt-4o-mini",
4754
messages=[
@@ -50,6 +57,9 @@ def basic_openai_call():
5057
],
5158
max_tokens=100,
5259
temperature=0.7,
60+
posthog_distinct_id=distinct_id,
61+
posthog_trace_id=trace_id,
62+
posthog_properties=properties,
5363
)
5464
if response and response.choices:
5565
print("OpenAI response:", response.choices[0].message.content)
@@ -58,7 +68,7 @@ def basic_openai_call():
5868
return response
5969

6070

61-
async def basic_async_openai_call():
71+
async def basic_async_openai_call(distinct_id, trace_id, properties):
6272
response = await async_openai_client.chat.completions.create(
6373
model="gpt-4o-mini",
6474
messages=[
@@ -67,6 +77,9 @@ async def basic_async_openai_call():
6777
],
6878
max_tokens=100,
6979
temperature=0.7,
80+
posthog_distinct_id=distinct_id,
81+
posthog_trace_id=trace_id,
82+
posthog_properties=properties,
7083
)
7184
if response and hasattr(response, "choices"):
7285
print("OpenAI response:", response.choices[0].message.content)
@@ -75,7 +88,7 @@ async def basic_async_openai_call():
7588
return response
7689

7790

78-
def streaming_openai_call():
91+
def streaming_openai_call(distinct_id, trace_id, properties):
7992
response = openai_client.chat.completions.create(
8093
model="gpt-4o-mini",
8194
messages=[
@@ -85,6 +98,9 @@ def streaming_openai_call():
8598
max_tokens=100,
8699
temperature=0.7,
87100
stream=True,
101+
posthog_distinct_id=distinct_id,
102+
posthog_trace_id=trace_id,
103+
posthog_properties=properties,
88104
)
89105

90106
for chunk in response:
@@ -93,7 +109,7 @@ def streaming_openai_call():
93109
return response
94110

95111

96-
async def streaming_async_openai_call():
112+
async def streaming_async_openai_call(distinct_id, trace_id, properties):
97113
response = await async_openai_client.chat.completions.create(
98114
model="gpt-4o-mini",
99115
messages=[
@@ -103,6 +119,9 @@ async def streaming_async_openai_call():
103119
max_tokens=100,
104120
temperature=0.7,
105121
stream=True,
122+
posthog_distinct_id=distinct_id,
123+
posthog_trace_id=trace_id,
124+
posthog_properties=properties,
106125
)
107126

108127
async for chunk in response:

posthog/ai/providers/openai/openai.py

Lines changed: 92 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from typing import Any, Dict, Optional, Union
23

34
try:
@@ -6,7 +7,10 @@
67
raise ModuleNotFoundError("Please install OpenAI to use this feature: 'pip install openai'")
78

89
from posthog.client import Client as PostHogClient
9-
from posthog.ai.utils import process_sync_streaming_response, track_usage
10+
from posthog.ai.utils import (
11+
track_usage,
12+
get_model_params,
13+
)
1014

1115

1216
class OpenAI:
@@ -56,31 +60,98 @@ def create(
5660
posthog_properties: Optional[Dict[str, Any]] = None,
5761
**kwargs: Any,
5862
):
59-
"""
60-
Wraps openai chat completions and captures a $ai_generation event in PostHog.
61-
62-
PostHog-specific parameters:
63-
- posthog_distinct_id: Ties the resulting event to a user in PostHog.
64-
- posthog_trace_id: For grouping multiple calls into a single trace.
65-
- posthog_properties: Additional custom properties for PostHog analytics.
66-
"""
6763
distinct_id = posthog_distinct_id or "anonymous_ai_user"
68-
69-
# If streaming, handle it separately
64+
7065
if kwargs.get("stream", False):
71-
response = self._openai_client.chat.completions.create(**kwargs)
72-
return process_sync_streaming_response(
73-
response=response,
74-
ph_client=self._ph_client,
75-
event_properties={},
76-
distinct_id=distinct_id,
66+
return self._create_streaming(
67+
distinct_id,
68+
posthog_trace_id,
69+
posthog_properties,
70+
**kwargs,
7771
)
7872

79-
# Non-streaming: let track_usage handle the request and analytics
73+
8074
def call_method(**call_kwargs):
8175
return self._openai_client.chat.completions.create(**call_kwargs)
8276

83-
response = track_usage(
84-
distinct_id, self._ph_client, posthog_trace_id, posthog_properties, call_method, **kwargs
77+
return track_usage(
78+
distinct_id,
79+
self._ph_client,
80+
posthog_trace_id,
81+
posthog_properties,
82+
call_method,
83+
**kwargs,
8584
)
86-
return response
85+
86+
def _create_streaming(
87+
self,
88+
distinct_id: str,
89+
posthog_trace_id: Optional[str],
90+
posthog_properties: Optional[Dict[str, Any]],
91+
**kwargs: Any,
92+
):
93+
start_time = time.time()
94+
usage_stats: Dict[str, int] = {}
95+
accumulated_content = []
96+
stream_options = {"include_usage": True}
97+
response = self._openai_client.chat.completions.create(**kwargs, stream_options=stream_options)
98+
99+
def generator():
100+
nonlocal usage_stats
101+
nonlocal accumulated_content
102+
try:
103+
for chunk in response:
104+
if hasattr(chunk, "usage") and chunk.usage:
105+
usage_stats = {
106+
k: getattr(chunk.usage, k, 0)
107+
for k in ["prompt_tokens", "completion_tokens", "total_tokens"]
108+
}
109+
if chunk.choices[0].delta.content:
110+
accumulated_content.append(chunk.choices[0].delta.content)
111+
yield chunk
112+
finally:
113+
end_time = time.time()
114+
latency = end_time - start_time
115+
output = "".join(accumulated_content)
116+
self._capture_streaming_event(distinct_id, posthog_trace_id, posthog_properties, kwargs, usage_stats, latency, output)
117+
118+
return generator()
119+
120+
def _capture_streaming_event(
121+
self,
122+
distinct_id: str,
123+
posthog_trace_id: Optional[str],
124+
posthog_properties: Optional[Dict[str, Any]],
125+
kwargs: Dict[str, Any],
126+
usage_stats: Dict[str, int],
127+
latency: float,
128+
output: str,
129+
):
130+
131+
event_properties = {
132+
"$ai_provider": "openai",
133+
"$ai_model": kwargs.get("model"),
134+
"$ai_model_parameters": get_model_params(kwargs),
135+
"$ai_input": kwargs.get("messages"),
136+
"$ai_output": {
137+
"choices": [
138+
{
139+
"content": output,
140+
"role": "assistant",
141+
}
142+
]
143+
},
144+
"$ai_http_status": 200,
145+
"$ai_input_tokens": usage_stats.get("prompt_tokens", 0),
146+
"$ai_output_tokens": usage_stats.get("completion_tokens", 0),
147+
"$ai_latency": latency,
148+
"$ai_trace_id": posthog_trace_id,
149+
"$ai_posthog_properties": posthog_properties,
150+
}
151+
152+
if hasattr(self._ph_client, "capture"):
153+
self._ph_client.capture(
154+
distinct_id=distinct_id,
155+
event="$ai_generation",
156+
properties=event_properties,
157+
)

posthog/ai/providers/openai/openai_async.py

Lines changed: 81 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,4 @@
1+
import time
12
from typing import Any, Dict, Optional, Union
23

34
try:
@@ -6,7 +7,7 @@
67
raise ModuleNotFoundError("Please install OpenAI to use this feature: 'pip install openai'")
78

89
from posthog.client import Client as PostHogClient
9-
from posthog.ai.utils import process_async_streaming_response, track_usage_async
10+
from posthog.ai.utils import track_usage_async, get_model_params
1011

1112

1213
class AsyncOpenAI:
@@ -55,23 +56,15 @@ async def create(
5556
posthog_properties: Optional[Dict[str, Any]] = None,
5657
**kwargs: Any,
5758
):
58-
"""
59-
Wraps openai chat completions (async) and captures a $ai_generation event in PostHog.
60-
61-
To use streaming in async mode:
62-
async for chunk in async_openai.chat.completions.create(stream=True, ...):
63-
...
64-
"""
6559
distinct_id = posthog_distinct_id or "anonymous_ai_user"
6660

6761
# If streaming, handle streaming specifically
6862
if kwargs.get("stream", False):
69-
response = await self._openai_client.chat.completions.create(**kwargs)
70-
return process_async_streaming_response(
71-
response=response,
72-
ph_client=self._ph_client,
73-
event_properties={},
74-
distinct_id=distinct_id,
63+
return await self._create_streaming(
64+
distinct_id,
65+
posthog_trace_id,
66+
posthog_properties,
67+
**kwargs,
7568
)
7669

7770
# Non-streaming: let track_usage_async handle request and analytics
@@ -82,3 +75,77 @@ async def call_async_method(**call_kwargs):
8275
distinct_id, self._ph_client, posthog_trace_id, posthog_properties, call_async_method, **kwargs
8376
)
8477
return response
78+
79+
80+
async def _create_streaming(
81+
self,
82+
distinct_id: str,
83+
posthog_trace_id: Optional[str],
84+
posthog_properties: Optional[Dict[str, Any]],
85+
**kwargs: Any,
86+
):
87+
start_time = time.time()
88+
usage_stats: Dict[str, int] = {}
89+
accumulated_content = []
90+
stream_options = {"include_usage": True}
91+
response = await self._openai_client.chat.completions.create(**kwargs, stream_options=stream_options)
92+
93+
async def async_generator():
94+
nonlocal usage_stats, accumulated_content
95+
try:
96+
async for chunk in response:
97+
if hasattr(chunk, "usage") and chunk.usage:
98+
usage_stats = {
99+
k: getattr(chunk.usage, k, 0)
100+
for k in ["prompt_tokens", "completion_tokens", "total_tokens"]
101+
}
102+
if chunk.choices[0].delta.content:
103+
accumulated_content.append(chunk.choices[0].delta.content)
104+
yield chunk
105+
finally:
106+
end_time = time.time()
107+
latency = end_time - start_time
108+
output = "".join(accumulated_content)
109+
self._capture_streaming_event(distinct_id, posthog_trace_id, posthog_properties, kwargs, usage_stats, latency, output)
110+
111+
return async_generator()
112+
113+
def _capture_streaming_event(
114+
self,
115+
distinct_id: str,
116+
posthog_trace_id: Optional[str],
117+
posthog_properties: Optional[Dict[str, Any]],
118+
kwargs: Dict[str, Any],
119+
usage_stats: Dict[str, int],
120+
latency: float,
121+
output: str,
122+
):
123+
124+
event_properties = {
125+
"$ai_provider": "openai",
126+
"$ai_model": kwargs.get("model"),
127+
"$ai_model_parameters": get_model_params(kwargs),
128+
"$ai_input": kwargs.get("messages"),
129+
"$ai_output": {
130+
"choices": [
131+
{
132+
"content": output,
133+
"role": "assistant",
134+
}
135+
]
136+
},
137+
"$ai_http_status": 200,
138+
"$ai_input_tokens": usage_stats.get("prompt_tokens", 0),
139+
"$ai_output_tokens": usage_stats.get("completion_tokens", 0),
140+
"$ai_latency": latency,
141+
"$ai_trace_id": posthog_trace_id,
142+
"$ai_posthog_properties": posthog_properties,
143+
}
144+
145+
if hasattr(self._ph_client, "capture"):
146+
self._ph_client.capture(
147+
distinct_id=distinct_id,
148+
event="$ai_generation",
149+
properties=event_properties,
150+
)
151+

0 commit comments

Comments
 (0)