Skip to content

Commit 9b83471

Browse files
telemetry with posthog
1 parent 7626ac9 commit 9b83471

File tree

8 files changed

+186
-1
lines changed

8 files changed

+186
-1
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ dependencies = [
3838
"fastmcp>=2.10.5",
3939
"mcp-use[search]>=1.3.7",
4040
"onnxruntime==1.19.2 ; sys_platform == 'darwin' and platform_machine == 'x86_64'",
41+
"posthog>=6.5.0",
4142
]
4243

4344
[project.urls]

src/datu/app_config.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
from datu.integrations.config import IntegrationConfigs
1818
from datu.mcp.config import MCPConfig
1919
from datu.services.config import SchemaRAGConfig
20+
from datu.telemetry.config import TelemetryConfig
2021

2122

2223
class Environment(Enum):
@@ -110,7 +111,8 @@ class DatuConfig(BaseSettings):
110111
description="Configuration settings for schema RAG (Retrieval-Augmented Generation).",
111112
)
112113
enable_anonymization: bool = False
113-
114+
enable_anonymized_telemetry: bool = True
115+
telemetry: TelemetryConfig | None = Field(default_factory=TelemetryConfig)
114116
model_config = SettingsConfigDict(
115117
env_prefix="datu_",
116118
env_nested_delimiter="__",

src/datu/main.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313
from datu.app_config import get_logger, settings
1414
from datu.routers import chat, metadata, transformations
1515
from datu.schema_extractor.schema_cache import load_schema_cache
16+
from datu.telemetry.product.events import OpenAIEvents
17+
from datu.telemetry.product.posthog import get_posthog_client
1618

1719
logger = get_logger(__name__)
1820

@@ -56,6 +58,8 @@ def start_app() -> None:
5658
It also sets the logging level based on the configuration settings.
5759
"""
5860
logger.info("Starting the FastAPI application...")
61+
posthog_client = get_posthog_client()
62+
posthog_client.capture(OpenAIEvents())
5963
uvicorn.run(app, host=settings.host, port=settings.port)
6064

6165

src/datu/telemetry/__init__.py

Whitespace-only changes.

src/datu/telemetry/config.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
"""Telemetry configuration settings."""
2+
3+
from pydantic_settings import BaseSettings, SettingsConfigDict
4+
5+
6+
class TelemetryConfig(BaseSettings):
7+
"""Telemetry configuration settings."""
8+
9+
api_key: str = "phc_m74dfR9nLpm2nipvkL2swyFDtNuQNC9o2FL2CSbh6Je"
10+
package_name: str = "datu-core"
11+
12+
model_config = SettingsConfigDict(
13+
env_prefix="telemetry_",
14+
env_nested_delimiter="__",
15+
)
16+
17+
18+
def get_telemetry_settings() -> TelemetryConfig:
19+
return TelemetryConfig()

src/datu/telemetry/product/__init__.py

Whitespace-only changes.
Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,50 @@
1+
from typing import Any, ClassVar, Dict
2+
3+
from datu.app_config import get_app_settings, get_logger
4+
5+
app_settings = get_app_settings()
6+
7+
8+
class ProductTelemetryEvent:
9+
"""Base class for all telemetry events."""
10+
11+
max_batch_size: ClassVar[int] = 1
12+
13+
def __init__(self, **kwargs):
14+
self._props = kwargs
15+
self.batch_size = 1
16+
17+
@property
18+
def name(self) -> str:
19+
return self.__class__.__name__
20+
21+
@property
22+
def properties(self) -> Dict[str, Any]:
23+
return self._props
24+
25+
@property
26+
def batch_key(self) -> str:
27+
return self.name
28+
29+
def batch(self, other: "ProductTelemetryEvent") -> "ProductTelemetryEvent":
30+
"""Simple batch: append counts together."""
31+
if self.name != other.name:
32+
raise ValueError("Cannot batch different event types")
33+
self.batch_size += other.batch_size
34+
return self
35+
36+
37+
class MCPClientStartEvent(ProductTelemetryEvent):
38+
"""Event for when the MCP client starts."""
39+
40+
def __init__(self, **kwargs):
41+
super().__init__(**kwargs)
42+
self._props["mcp_client_enabled"] = app_settings.mcp_client_enabled
43+
44+
45+
class OpenAIEvents(ProductTelemetryEvent):
46+
"""Event for OpenAI-related telemetry."""
47+
48+
def __init__(self, **kwargs):
49+
super().__init__(**kwargs)
50+
self._props["openai_model"] = app_settings.openai_model
Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,109 @@
1+
import importlib
2+
import logging
3+
import platform
4+
import sys
5+
import uuid
6+
from functools import lru_cache
7+
from pathlib import Path
8+
from typing import Any, Dict, Optional
9+
10+
import posthog
11+
12+
from datu.app_config import get_app_settings, get_logger
13+
from datu.telemetry.config import TelemetryConfig as TelemetrySettings
14+
from datu.telemetry.product.events import ProductTelemetryEvent
15+
16+
app_settings = get_app_settings()
17+
logger = get_logger(__name__)
18+
19+
POSTHOG_EVENT_SETTINGS = {"$process_person_profile": False}
20+
21+
22+
class PostHogClient:
23+
"""Telemetry client with basic batching + config via Pydantic."""
24+
25+
UNKNOWN_USER_ID = "UNKNOWN"
26+
USER_ID_PATH = Path.home() / ".cache" / "datu-core" / "telemetry_user_id"
27+
28+
def __init__(self, settings: Optional[TelemetrySettings]) -> None:
29+
self.settings = settings or TelemetrySettings()
30+
self._batched_events: Dict[str, ProductTelemetryEvent] = {}
31+
self._user_id: str = ""
32+
self._user_id_path: Path = self.USER_ID_PATH
33+
34+
if not app_settings.enable_anonymized_telemetry or "pytest" in sys.modules:
35+
posthog.disabled = True
36+
else:
37+
logger.info("Enabled anonymized telemetry. See https://docs.datu.fi for more information.")
38+
posthog.api_key = self.settings.api_key
39+
posthog_logger = logging.getLogger("posthog")
40+
posthog_logger.disabled = True
41+
42+
@property
43+
def user_id(self) -> str:
44+
if self._user_id:
45+
return self._user_id
46+
47+
try:
48+
if not self._user_id_path.exists():
49+
self._user_id_path.parent.mkdir(parents=True, exist_ok=True)
50+
new_id = str(uuid.uuid4())
51+
self._user_id_path.write_text(new_id)
52+
self._user_id = new_id
53+
else:
54+
self._user_id = self._user_id_path.read_text().strip()
55+
except Exception:
56+
self._user_id = self.UNKNOWN_USER_ID
57+
58+
return self._user_id
59+
60+
def _base_context(self) -> Dict[str, Any]:
61+
try:
62+
pkg_version = importlib.metadata.version(self.settings.package_name)
63+
except importlib.metadata.PackageNotFoundError:
64+
pkg_version = "unknown"
65+
66+
return {
67+
"python_version": sys.version.split()[0],
68+
"os": platform.system(),
69+
"os_version": platform.release(),
70+
"package_version": pkg_version,
71+
}
72+
73+
def capture(self, event: ProductTelemetryEvent) -> None:
74+
"""Capture an event (with simple batching)."""
75+
if not app_settings.enable_anonymized_telemetry or not self.settings.api_key:
76+
return
77+
78+
if event.max_batch_size == 1:
79+
self._send(event)
80+
return
81+
82+
batch_key = event.batch_key
83+
if batch_key not in self._batched_events:
84+
self._batched_events[batch_key] = event
85+
return
86+
87+
batched = self._batched_events[batch_key].batch(event)
88+
self._batched_events[batch_key] = batched
89+
90+
if batched.batch_size >= batched.max_batch_size:
91+
self._send(batched)
92+
del self._batched_events[batch_key]
93+
94+
def _send(self, event: ProductTelemetryEvent) -> None:
95+
try:
96+
posthog.capture(
97+
distinct_id=self.user_id,
98+
event=event.name,
99+
properties={**self._base_context(), **POSTHOG_EVENT_SETTINGS, **event.properties},
100+
)
101+
except Exception:
102+
# silently fail
103+
pass
104+
105+
106+
@lru_cache(maxsize=1)
107+
def get_posthog_client() -> PostHogClient:
108+
"""Get the PostHog telemetry client."""
109+
return PostHogClient(app_settings.telemetry)

0 commit comments

Comments
 (0)