Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
# 6.7.3 - 2025-09-04

- fix: missing usage tokens in Gemini

# 6.7.2 - 2025-09-03

- fix: tool call results in streaming providers
Expand Down
9 changes: 4 additions & 5 deletions posthog/ai/anthropic/anthropic.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
import uuid
from typing import Any, Dict, List, Optional

from posthog.ai.types import StreamingContentBlock, ToolInProgress
from posthog.ai.types import StreamingContentBlock, TokenUsage, ToolInProgress
from posthog.ai.utils import (
call_llm_and_track_usage,
merge_usage_stats,
Expand Down Expand Up @@ -126,7 +126,7 @@ def _create_streaming(
**kwargs: Any,
):
start_time = time.time()
usage_stats: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0}
usage_stats: TokenUsage = TokenUsage(input_tokens=0, output_tokens=0)
accumulated_content = ""
content_blocks: List[StreamingContentBlock] = []
tools_in_progress: Dict[str, ToolInProgress] = {}
Expand Down Expand Up @@ -210,14 +210,13 @@ def _capture_streaming_event(
posthog_privacy_mode: bool,
posthog_groups: Optional[Dict[str, Any]],
kwargs: Dict[str, Any],
usage_stats: Dict[str, int],
usage_stats: TokenUsage,
latency: float,
content_blocks: List[StreamingContentBlock],
accumulated_content: str,
):
from posthog.ai.types import StreamingEventData
from posthog.ai.anthropic.anthropic_converter import (
standardize_anthropic_usage,
format_anthropic_streaming_input,
format_anthropic_streaming_output_complete,
)
Expand All @@ -236,7 +235,7 @@ def _capture_streaming_event(
formatted_output=format_anthropic_streaming_output_complete(
content_blocks, accumulated_content
),
usage_stats=standardize_anthropic_usage(usage_stats),
usage_stats=usage_stats,
latency=latency,
distinct_id=posthog_distinct_id,
trace_id=posthog_trace_id,
Expand Down
6 changes: 3 additions & 3 deletions posthog/ai/anthropic/anthropic_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
from typing import Any, Dict, List, Optional

from posthog import setup
from posthog.ai.types import StreamingContentBlock, ToolInProgress
from posthog.ai.types import StreamingContentBlock, TokenUsage, ToolInProgress
from posthog.ai.utils import (
call_llm_and_track_usage_async,
extract_available_tool_calls,
Expand Down Expand Up @@ -131,7 +131,7 @@ async def _create_streaming(
**kwargs: Any,
):
start_time = time.time()
usage_stats: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0}
usage_stats: TokenUsage = TokenUsage(input_tokens=0, output_tokens=0)
accumulated_content = ""
content_blocks: List[StreamingContentBlock] = []
tools_in_progress: Dict[str, ToolInProgress] = {}
Expand Down Expand Up @@ -215,7 +215,7 @@ async def _capture_streaming_event(
posthog_privacy_mode: bool,
posthog_groups: Optional[Dict[str, Any]],
kwargs: Dict[str, Any],
usage_stats: Dict[str, int],
usage_stats: TokenUsage,
latency: float,
content_blocks: List[StreamingContentBlock],
accumulated_content: str,
Expand Down
56 changes: 33 additions & 23 deletions posthog/ai/anthropic/anthropic_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
FormattedMessage,
FormattedTextContent,
StreamingContentBlock,
StreamingUsageStats,
TokenUsage,
ToolInProgress,
)
Expand Down Expand Up @@ -164,7 +163,38 @@ def format_anthropic_streaming_content(
return formatted


def extract_anthropic_usage_from_event(event: Any) -> StreamingUsageStats:
def extract_anthropic_usage_from_response(response: Any) -> TokenUsage:
"""
Extract usage from a full Anthropic response (non-streaming).

Args:
response: The complete response from Anthropic API

Returns:
TokenUsage with standardized usage
"""
if not hasattr(response, "usage"):
return TokenUsage(input_tokens=0, output_tokens=0)

result = TokenUsage(
input_tokens=getattr(response.usage, "input_tokens", 0),
output_tokens=getattr(response.usage, "output_tokens", 0),
)

if hasattr(response.usage, "cache_read_input_tokens"):
cache_read = response.usage.cache_read_input_tokens
if cache_read and cache_read > 0:
result["cache_read_input_tokens"] = cache_read

if hasattr(response.usage, "cache_creation_input_tokens"):
cache_creation = response.usage.cache_creation_input_tokens
if cache_creation and cache_creation > 0:
result["cache_creation_input_tokens"] = cache_creation

return result


def extract_anthropic_usage_from_event(event: Any) -> TokenUsage:
"""
Extract usage statistics from an Anthropic streaming event.

Expand All @@ -175,7 +205,7 @@ def extract_anthropic_usage_from_event(event: Any) -> StreamingUsageStats:
Dictionary of usage statistics
"""

usage: StreamingUsageStats = {}
usage: TokenUsage = TokenUsage()

# Handle usage stats from message_start event
if hasattr(event, "type") and event.type == "message_start":
Expand Down Expand Up @@ -329,26 +359,6 @@ def finalize_anthropic_tool_input(
del tools_in_progress[block["id"]]


def standardize_anthropic_usage(usage: Dict[str, Any]) -> TokenUsage:
"""
Standardize Anthropic usage statistics to common TokenUsage format.

Anthropic already uses standard field names, so this mainly structures the data.

Args:
usage: Raw usage statistics from Anthropic

Returns:
Standardized TokenUsage dict
"""
return TokenUsage(
input_tokens=usage.get("input_tokens", 0),
output_tokens=usage.get("output_tokens", 0),
cache_read_input_tokens=usage.get("cache_read_input_tokens"),
cache_creation_input_tokens=usage.get("cache_creation_input_tokens"),
)


def format_anthropic_streaming_input(kwargs: Dict[str, Any]) -> Any:
"""
Format Anthropic streaming input using system prompt merging.
Expand Down
9 changes: 5 additions & 4 deletions posthog/ai/gemini/gemini.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import uuid
from typing import Any, Dict, Optional

from posthog.ai.types import TokenUsage

try:
from google import genai
except ImportError:
Expand Down Expand Up @@ -294,7 +296,7 @@ def _generate_content_streaming(
**kwargs: Any,
):
start_time = time.time()
usage_stats: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0}
usage_stats: TokenUsage = TokenUsage(input_tokens=0, output_tokens=0)
accumulated_content = []

kwargs_without_stream = {"model": model, "contents": contents, **kwargs}
Expand Down Expand Up @@ -350,12 +352,11 @@ def _capture_streaming_event(
privacy_mode: bool,
groups: Optional[Dict[str, Any]],
kwargs: Dict[str, Any],
usage_stats: Dict[str, int],
usage_stats: TokenUsage,
latency: float,
output: Any,
):
from posthog.ai.types import StreamingEventData
from posthog.ai.gemini.gemini_converter import standardize_gemini_usage

# Prepare standardized event data
formatted_input = self._format_input(contents)
Expand All @@ -368,7 +369,7 @@ def _capture_streaming_event(
kwargs=kwargs,
formatted_input=sanitized_input,
formatted_output=format_gemini_streaming_output(output),
usage_stats=standardize_gemini_usage(usage_stats),
usage_stats=usage_stats,
latency=latency,
distinct_id=distinct_id,
trace_id=trace_id,
Expand Down
82 changes: 52 additions & 30 deletions posthog/ai/gemini/gemini_converter.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
from posthog.ai.types import (
FormattedContentItem,
FormattedMessage,
StreamingUsageStats,
TokenUsage,
)

Expand Down Expand Up @@ -283,29 +282,71 @@ def format_gemini_input(contents: Any) -> List[FormattedMessage]:
return [_format_object_message(contents)]


def extract_gemini_usage_from_chunk(chunk: Any) -> StreamingUsageStats:
def _extract_usage_from_metadata(metadata: Any) -> TokenUsage:
"""
Common logic to extract usage from Gemini metadata.
Used by both streaming and non-streaming paths.

Args:
metadata: usage_metadata from Gemini response or chunk

Returns:
TokenUsage with standardized usage
"""
usage = TokenUsage(
input_tokens=getattr(metadata, "prompt_token_count", 0),
output_tokens=getattr(metadata, "candidates_token_count", 0),
)

# Add cache tokens if present (don't add if 0)
if hasattr(metadata, "cached_content_token_count"):
cache_tokens = metadata.cached_content_token_count
if cache_tokens and cache_tokens > 0:
usage["cache_read_input_tokens"] = cache_tokens

# Add reasoning tokens if present (don't add if 0)
if hasattr(metadata, "thoughts_token_count"):
reasoning_tokens = metadata.thoughts_token_count
if reasoning_tokens and reasoning_tokens > 0:
usage["reasoning_tokens"] = reasoning_tokens

return usage


def extract_gemini_usage_from_response(response: Any) -> TokenUsage:
"""
Extract usage statistics from a full Gemini response (non-streaming).

Args:
response: The complete response from Gemini API

Returns:
TokenUsage with standardized usage statistics
"""
if not hasattr(response, "usage_metadata") or not response.usage_metadata:
return TokenUsage(input_tokens=0, output_tokens=0)

return _extract_usage_from_metadata(response.usage_metadata)


def extract_gemini_usage_from_chunk(chunk: Any) -> TokenUsage:
"""
Extract usage statistics from a Gemini streaming chunk.

Args:
chunk: Streaming chunk from Gemini API

Returns:
Dictionary of usage statistics
TokenUsage with standardized usage statistics
"""

usage: StreamingUsageStats = {}
usage: TokenUsage = TokenUsage()

if not hasattr(chunk, "usage_metadata") or not chunk.usage_metadata:
return usage

# Gemini uses prompt_token_count and candidates_token_count
usage["input_tokens"] = getattr(chunk.usage_metadata, "prompt_token_count", 0)
usage["output_tokens"] = getattr(chunk.usage_metadata, "candidates_token_count", 0)

# Calculate total if both values are defined (including 0)
if "input_tokens" in usage and "output_tokens" in usage:
usage["total_tokens"] = usage["input_tokens"] + usage["output_tokens"]
# Use the shared helper to extract usage
usage = _extract_usage_from_metadata(chunk.usage_metadata)

return usage

Expand Down Expand Up @@ -417,22 +458,3 @@ def format_gemini_streaming_output(

# Fallback for empty or unexpected input
return [{"role": "assistant", "content": [{"type": "text", "text": ""}]}]


def standardize_gemini_usage(usage: Dict[str, Any]) -> TokenUsage:
"""
Standardize Gemini usage statistics to common TokenUsage format.

Gemini already uses standard field names (input_tokens/output_tokens).

Args:
usage: Raw usage statistics from Gemini

Returns:
Standardized TokenUsage dict
"""
return TokenUsage(
input_tokens=usage.get("input_tokens", 0),
output_tokens=usage.get("output_tokens", 0),
# Gemini doesn't currently support cache or reasoning tokens
)
16 changes: 8 additions & 8 deletions posthog/ai/openai/openai.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import uuid
from typing import Any, Dict, List, Optional

from posthog.ai.types import TokenUsage

try:
import openai
except ImportError:
Expand Down Expand Up @@ -120,7 +122,7 @@ def _create_streaming(
**kwargs: Any,
):
start_time = time.time()
usage_stats: Dict[str, int] = {}
usage_stats: TokenUsage = TokenUsage()
final_content = []
response = self._original.create(**kwargs)

Expand Down Expand Up @@ -171,14 +173,13 @@ def _capture_streaming_event(
posthog_privacy_mode: bool,
posthog_groups: Optional[Dict[str, Any]],
kwargs: Dict[str, Any],
usage_stats: Dict[str, int],
usage_stats: TokenUsage,
latency: float,
output: Any,
available_tool_calls: Optional[List[Dict[str, Any]]] = None,
):
from posthog.ai.types import StreamingEventData
from posthog.ai.openai.openai_converter import (
standardize_openai_usage,
format_openai_streaming_input,
format_openai_streaming_output,
)
Expand All @@ -195,7 +196,7 @@ def _capture_streaming_event(
kwargs=kwargs,
formatted_input=sanitized_input,
formatted_output=format_openai_streaming_output(output, "responses"),
usage_stats=standardize_openai_usage(usage_stats, "responses"),
usage_stats=usage_stats,
latency=latency,
distinct_id=posthog_distinct_id,
trace_id=posthog_trace_id,
Expand Down Expand Up @@ -316,7 +317,7 @@ def _create_streaming(
**kwargs: Any,
):
start_time = time.time()
usage_stats: Dict[str, int] = {}
usage_stats: TokenUsage = TokenUsage()
accumulated_content = []
accumulated_tool_calls: Dict[int, Dict[str, Any]] = {}
if "stream_options" not in kwargs:
Expand Down Expand Up @@ -387,15 +388,14 @@ def _capture_streaming_event(
posthog_privacy_mode: bool,
posthog_groups: Optional[Dict[str, Any]],
kwargs: Dict[str, Any],
usage_stats: Dict[str, int],
usage_stats: TokenUsage,
latency: float,
output: Any,
tool_calls: Optional[List[Dict[str, Any]]] = None,
available_tool_calls: Optional[List[Dict[str, Any]]] = None,
):
from posthog.ai.types import StreamingEventData
from posthog.ai.openai.openai_converter import (
standardize_openai_usage,
format_openai_streaming_input,
format_openai_streaming_output,
)
Expand All @@ -412,7 +412,7 @@ def _capture_streaming_event(
kwargs=kwargs,
formatted_input=sanitized_input,
formatted_output=format_openai_streaming_output(output, "chat", tool_calls),
usage_stats=standardize_openai_usage(usage_stats, "chat"),
usage_stats=usage_stats,
latency=latency,
distinct_id=posthog_distinct_id,
trace_id=posthog_trace_id,
Expand Down
Loading
Loading