|
| 1 | +try: |
| 2 | + import anthropic |
| 3 | + from anthropic.resources import AsyncMessages |
| 4 | +except ImportError: |
| 5 | + raise ModuleNotFoundError("Please install the Anthropic SDK to use this feature: 'pip install anthropic'") |
| 6 | + |
| 7 | +from posthog.ai.utils import call_llm_and_track_usage_async, get_model_params, with_privacy_mode |
| 8 | +from posthog.client import Client as PostHogClient |
| 9 | +from typing import Any, Dict, Optional |
| 10 | +import uuid |
| 11 | +import time |
| 12 | + |
| 13 | + |
| 14 | +class AsyncAnthropic(anthropic.AsyncAnthropic): |
| 15 | + """ |
| 16 | + An async wrapper around the Anthropic SDK that automatically sends LLM usage events to PostHog. |
| 17 | + """ |
| 18 | + |
| 19 | + _ph_client: PostHogClient |
| 20 | + |
| 21 | + def __init__(self, posthog_client: PostHogClient, **kwargs): |
| 22 | + """ |
| 23 | + Args: |
| 24 | + posthog_client: PostHog client for tracking usage |
| 25 | + **kwargs: Additional arguments passed to the Anthropic client |
| 26 | + """ |
| 27 | + super().__init__(**kwargs) |
| 28 | + self._ph_client = posthog_client |
| 29 | + self.messages = AsyncWrappedMessages(self) |
| 30 | + |
| 31 | + |
| 32 | +class AsyncWrappedMessages(AsyncMessages): |
| 33 | + _client: AsyncAnthropic |
| 34 | + |
| 35 | + async def create( |
| 36 | + self, |
| 37 | + posthog_distinct_id: Optional[str] = None, |
| 38 | + posthog_trace_id: Optional[str] = None, |
| 39 | + posthog_properties: Optional[Dict[str, Any]] = None, |
| 40 | + posthog_privacy_mode: bool = False, |
| 41 | + posthog_groups: Optional[Dict[str, Any]] = None, |
| 42 | + **kwargs: Any, |
| 43 | + ): |
| 44 | + """ |
| 45 | + Create a message using Anthropic's API while tracking usage in PostHog. |
| 46 | + |
| 47 | + Args: |
| 48 | + posthog_distinct_id: Optional ID to associate with the usage event |
| 49 | + posthog_trace_id: Optional trace UUID for linking events |
| 50 | + posthog_properties: Optional dictionary of extra properties to include in the event |
| 51 | + posthog_privacy_mode: Whether to redact sensitive information in tracking |
| 52 | + posthog_groups: Optional group analytics properties |
| 53 | + **kwargs: Arguments passed to Anthropic's messages.create |
| 54 | + """ |
| 55 | + if posthog_trace_id is None: |
| 56 | + posthog_trace_id = uuid.uuid4() |
| 57 | + |
| 58 | + if kwargs.get("stream", False): |
| 59 | + return await self._create_streaming( |
| 60 | + posthog_distinct_id, |
| 61 | + posthog_trace_id, |
| 62 | + posthog_properties, |
| 63 | + posthog_privacy_mode, |
| 64 | + posthog_groups, |
| 65 | + **kwargs, |
| 66 | + ) |
| 67 | + |
| 68 | + return await call_llm_and_track_usage_async( |
| 69 | + posthog_distinct_id, |
| 70 | + self._client._ph_client, |
| 71 | + "anthropic", |
| 72 | + posthog_trace_id, |
| 73 | + posthog_properties, |
| 74 | + posthog_privacy_mode, |
| 75 | + posthog_groups, |
| 76 | + self._client.base_url, |
| 77 | + super().create, |
| 78 | + **kwargs, |
| 79 | + ) |
| 80 | + |
| 81 | + async def stream( |
| 82 | + self, |
| 83 | + posthog_distinct_id: Optional[str] = None, |
| 84 | + posthog_trace_id: Optional[str] = None, |
| 85 | + posthog_properties: Optional[Dict[str, Any]] = None, |
| 86 | + posthog_privacy_mode: bool = False, |
| 87 | + posthog_groups: Optional[Dict[str, Any]] = None, |
| 88 | + **kwargs: Any, |
| 89 | + ): |
| 90 | + if posthog_trace_id is None: |
| 91 | + posthog_trace_id = uuid.uuid4() |
| 92 | + |
| 93 | + return await self._create_streaming( |
| 94 | + posthog_distinct_id, |
| 95 | + posthog_trace_id, |
| 96 | + posthog_properties, |
| 97 | + posthog_privacy_mode, |
| 98 | + posthog_groups, |
| 99 | + **kwargs, |
| 100 | + ) |
| 101 | + |
| 102 | + async def _create_streaming( |
| 103 | + self, |
| 104 | + posthog_distinct_id: Optional[str], |
| 105 | + posthog_trace_id: Optional[str], |
| 106 | + posthog_properties: Optional[Dict[str, Any]], |
| 107 | + posthog_privacy_mode: bool, |
| 108 | + posthog_groups: Optional[Dict[str, Any]], |
| 109 | + **kwargs: Any, |
| 110 | + ): |
| 111 | + start_time = time.time() |
| 112 | + usage_stats: Dict[str, int] = {"input_tokens": 0, "output_tokens": 0} |
| 113 | + accumulated_content = [] |
| 114 | + response = await super().create(**kwargs) |
| 115 | + |
| 116 | + async def generator(): |
| 117 | + nonlocal usage_stats |
| 118 | + nonlocal accumulated_content |
| 119 | + try: |
| 120 | + async for event in response: |
| 121 | + if hasattr(event, "usage") and event.usage: |
| 122 | + usage_stats = { |
| 123 | + k: getattr(event.usage, k, 0) |
| 124 | + for k in [ |
| 125 | + "input_tokens", |
| 126 | + "output_tokens", |
| 127 | + ] |
| 128 | + } |
| 129 | + |
| 130 | + if hasattr(event, "content") and event.content: |
| 131 | + accumulated_content.append(event.content) |
| 132 | + |
| 133 | + yield event |
| 134 | + |
| 135 | + finally: |
| 136 | + end_time = time.time() |
| 137 | + latency = end_time - start_time |
| 138 | + output = "".join(accumulated_content) |
| 139 | + |
| 140 | + await self._capture_streaming_event( |
| 141 | + posthog_distinct_id, |
| 142 | + posthog_trace_id, |
| 143 | + posthog_properties, |
| 144 | + posthog_privacy_mode, |
| 145 | + posthog_groups, |
| 146 | + kwargs, |
| 147 | + usage_stats, |
| 148 | + latency, |
| 149 | + output, |
| 150 | + ) |
| 151 | + |
| 152 | + return generator() |
| 153 | + |
| 154 | + async def _capture_streaming_event( |
| 155 | + self, |
| 156 | + posthog_distinct_id: Optional[str], |
| 157 | + posthog_trace_id: Optional[str], |
| 158 | + posthog_properties: Optional[Dict[str, Any]], |
| 159 | + posthog_privacy_mode: bool, |
| 160 | + posthog_groups: Optional[Dict[str, Any]], |
| 161 | + kwargs: Dict[str, Any], |
| 162 | + usage_stats: Dict[str, int], |
| 163 | + latency: float, |
| 164 | + output: str, |
| 165 | + ): |
| 166 | + if posthog_trace_id is None: |
| 167 | + posthog_trace_id = uuid.uuid4() |
| 168 | + |
| 169 | + event_properties = { |
| 170 | + "$ai_provider": "anthropic", |
| 171 | + "$ai_model": kwargs.get("model"), |
| 172 | + "$ai_model_parameters": get_model_params(kwargs), |
| 173 | + "$ai_input": with_privacy_mode(self._client._ph_client, posthog_privacy_mode, kwargs.get("messages")), |
| 174 | + "$ai_output_choices": with_privacy_mode( |
| 175 | + self._client._ph_client, |
| 176 | + posthog_privacy_mode, |
| 177 | + [{"content": output, "role": "assistant"}], |
| 178 | + ), |
| 179 | + "$ai_http_status": 200, |
| 180 | + "$ai_input_tokens": usage_stats.get("input_tokens", 0), |
| 181 | + "$ai_output_tokens": usage_stats.get("output_tokens", 0), |
| 182 | + "$ai_latency": latency, |
| 183 | + "$ai_trace_id": posthog_trace_id, |
| 184 | + "$ai_base_url": str(self._client.base_url), |
| 185 | + **(posthog_properties or {}), |
| 186 | + } |
| 187 | + |
| 188 | + if posthog_distinct_id is None: |
| 189 | + event_properties["$process_person_profile"] = False |
| 190 | + |
| 191 | + if hasattr(self._client._ph_client, "capture"): |
| 192 | + self._client._ph_client.capture( |
| 193 | + distinct_id=posthog_distinct_id or posthog_trace_id, |
| 194 | + event="$ai_generation", |
| 195 | + properties=event_properties, |
| 196 | + groups=posthog_groups, |
| 197 | + ) |
| 198 | + |
0 commit comments