Skip to content

Commit 8ec73fd

Browse files
feat: product metrics build (#751)
* feat: product metrics build * feat: add initial posthog product metrics (#744) * feat: add posthog product metrics * add rest of metrics * add tests * feat: add staging table for metrics (#796) --------- Co-authored-by: Mohammad Alisafaee <[email protected]> * feat: get product metrics through k8s watcher (#802) * feat: product metrics using k8s cache * feat: only track member adding metrics and add user requesting session state change * feat: add metrics for seach (#824) * address comments --------- Co-authored-by: Mohammad Alisafaee <[email protected]>
1 parent 68df15c commit 8ec73fd

File tree

49 files changed

+1032
-72
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

49 files changed

+1032
-72
lines changed

DEVELOPING.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ Within components, there are the following modules:
3434
* *k8s*: Kubernetes client code
3535
* *message_queue*: Redis streams messaging code
3636
* *migrations*: Database migrations
37+
* *metrics*: Store metrics data in a staging table
3738
* *namespace*: Code for handling namespaces (user/groups)
3839
* *platform*: Renku platform configuration code
3940
* *project*: Code for Project entities

bases/renku_data_services/data_api/app.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
134134
session_repo=config.session_repo,
135135
data_connector_repo=config.data_connector_repo,
136136
project_migration_repo=config.project_migration_repo,
137+
metrics=config.metrics,
137138
)
138139
project_session_secrets = ProjectSessionSecretBP(
139140
name="project_session_secrets",
@@ -146,6 +147,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
146147
url_prefix=url_prefix,
147148
authenticator=config.authenticator,
148149
group_repo=config.group_repo,
150+
metrics=config.metrics,
149151
)
150152
session_environments = EnvironmentsBP(
151153
name="session_environments",
@@ -158,6 +160,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
158160
url_prefix=url_prefix,
159161
session_repo=config.session_repo,
160162
authenticator=config.authenticator,
163+
metrics=config.metrics,
161164
)
162165
builds = (
163166
BuildsBP(
@@ -214,6 +217,7 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
214217
data_connector_repo=config.data_connector_repo,
215218
data_connector_secret_repo=config.data_connector_secret_repo,
216219
internal_gitlab_authenticator=config.gitlab_authenticator,
220+
metrics=config.metrics,
217221
)
218222
platform_config = PlatformConfigBP(
219223
name="platform_config",
@@ -247,13 +251,15 @@ def register_all_handlers(app: Sanic, config: Config) -> Sanic:
247251
),
248252
solr_config=config.solr_config,
249253
authz=config.authz,
254+
metrics=config.metrics,
250255
)
251256
data_connectors = DataConnectorsBP(
252257
name="data_connectors",
253258
url_prefix=url_prefix,
254259
data_connector_repo=config.data_connector_repo,
255260
data_connector_secret_repo=config.data_connector_secret_repo,
256261
authenticator=config.authenticator,
262+
metrics=config.metrics,
257263
)
258264
app.blueprint(
259265
[

bases/renku_data_services/data_tasks/config.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,33 @@
1010
from renku_data_services.solr.solr_client import SolrClientConfig
1111

1212

13+
@dataclass
14+
class PosthogConfig:
15+
"""Configuration for posthog."""
16+
17+
enabled: bool
18+
api_key: str
19+
host: str
20+
environment: str
21+
22+
@classmethod
23+
def from_env(cls, prefix: str = "") -> PosthogConfig:
24+
"""Create posthog config from environment variables."""
25+
enabled = os.environ.get(f"{prefix}POSTHOG_ENABLED", "false").lower() == "true"
26+
api_key = os.environ.get(f"{prefix}POSTHOG_API_KEY", "")
27+
host = os.environ.get(f"{prefix}POSTHOG_HOST", "")
28+
environment = os.environ.get(f"{prefix}POSTHOG_ENVIRONMENT", "development")
29+
30+
return cls(enabled, api_key, host, environment)
31+
32+
1333
@dataclass
1434
class Config:
1535
"""Configuration for data tasks."""
1636

1737
db_config: DBConfig
1838
solr_config: SolrClientConfig
39+
posthog_config: PosthogConfig
1940
redis_config: RedisConfig
2041
max_retry_wait_seconds: int
2142
main_log_interval_seconds: int
@@ -34,6 +55,7 @@ def env(key: str, default: str) -> str:
3455
max_retry = int(env("MAX_RETRY_WAIT_SECONDS", "120"))
3556
main_tick = int(env("MAIN_LOG_INTERVAL_SECONDS", "300"))
3657
solr_config = SolrClientConfig.from_env(prefix)
58+
posthog_config = PosthogConfig.from_env(prefix)
3759
tcp_host = env("TCP_HOST", "127.0.0.1")
3860
tcp_port = int(env("TCP_PORT", "8001"))
3961

@@ -43,6 +65,7 @@ def env(key: str, default: str) -> str:
4365
max_retry_wait_seconds=max_retry,
4466
main_log_interval_seconds=main_tick,
4567
solr_config=solr_config,
68+
posthog_config=posthog_config,
4669
redis_config=redis,
4770
tcp_host=tcp_host,
4871
tcp_port=tcp_port,

bases/renku_data_services/data_tasks/task_defs.py

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,19 @@
11
"""The task definitions in form of coroutines."""
22

33
import asyncio
4+
import logging
45

56
import renku_data_services.search.core as search_core
67
from renku_data_services.data_tasks.config import Config
78
from renku_data_services.data_tasks.taskman import TaskDefininions
89
from renku_data_services.message_queue.db import EventRepository
910
from renku_data_services.message_queue.redis_queue import RedisQueue
11+
from renku_data_services.metrics.db import MetricsRepository
1012
from renku_data_services.search.db import SearchUpdatesRepo
1113
from renku_data_services.solr.solr_client import DefaultSolrClient
1214

15+
logger = logging.getLogger(__name__)
16+
1317

1418
async def update_search(cfg: Config) -> None:
1519
"""Update the SOLR with data from the search staging table."""
@@ -28,6 +32,49 @@ async def send_pending_redis_events(cfg: Config) -> None:
2832
await asyncio.sleep(1)
2933

3034

35+
async def send_metrics_to_posthog(cfg: Config) -> None:
36+
"""Send pending product metrics to posthog."""
37+
from posthog import Posthog
38+
39+
posthog = Posthog(
40+
api_key=cfg.posthog_config.api_key,
41+
host=cfg.posthog_config.host,
42+
sync_mode=True,
43+
super_properties={"environment": cfg.posthog_config.environment},
44+
)
45+
repo = MetricsRepository(cfg.db_config.async_session_maker)
46+
47+
while True:
48+
try:
49+
metrics = repo.get_unprocessed_metrics()
50+
51+
processed_ids = []
52+
async for metric in metrics:
53+
try:
54+
posthog.capture(
55+
distinct_id=metric.anonymous_user_id,
56+
timestamp=metric.timestamp,
57+
event=metric.event,
58+
properties=metric.metadata_ or {},
59+
# This is sent to avoid duplicate events if multiple instances of data service are running.
60+
# Posthog deduplicates events with the same timestamp, distinct_id, event, and uuid fields:
61+
# https://github.com/PostHog/posthog/issues/17211#issuecomment-1723136534
62+
uuid=metric.id.to_uuid4(),
63+
)
64+
except Exception as e:
65+
logger.error(f"Failed to process metrics event {metric.id}: {e}")
66+
else:
67+
processed_ids.append(metric.id)
68+
69+
await repo.delete_processed_metrics(processed_ids)
70+
except (asyncio.CancelledError, KeyboardInterrupt) as e:
71+
logger.warning(f"Exiting: {e}")
72+
return
73+
else:
74+
# NOTE: Sleep 10 seconds between processing cycles
75+
await asyncio.sleep(10)
76+
77+
3178
def all_tasks(cfg: Config) -> TaskDefininions:
3279
"""A dict of task factories to be managed in main."""
3380
# Impl. note: We pass the entire config to the coroutines, because
@@ -41,5 +88,6 @@ def all_tasks(cfg: Config) -> TaskDefininions:
4188
{
4289
"update_search": lambda: update_search(cfg),
4390
"send_pending_events": lambda: send_pending_redis_events(cfg),
91+
"send_product_metrics": lambda: send_metrics_to_posthog(cfg),
4492
}
4593
)

bases/renku_data_services/k8s_cache/config.py

Lines changed: 56 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,17 @@
11
"""K8s cache config."""
22

3-
from dataclasses import dataclass
3+
from dataclasses import dataclass, field
44
from typing import Self
55

66
from kubernetes.client.api_client import os
77

8+
from renku_data_services.crc.db import ResourcePoolRepository
89
from renku_data_services.db_config.config import DBConfig
10+
from renku_data_services.k8s.clients import DummyCoreClient, DummySchedulingClient
11+
from renku_data_services.k8s.quota import QuotaRepository
912
from renku_data_services.k8s_watcher.db import K8sDbCache
13+
from renku_data_services.metrics.core import StagingMetricsService
14+
from renku_data_services.metrics.db import MetricsRepository
1015

1116

1217
@dataclass
@@ -20,13 +25,54 @@ def from_env(cls) -> Self:
2025
return cls(renku_namespace=os.environ.get("KUBERNETES_NAMESPACE", "default"))
2126

2227

28+
@dataclass
29+
class _MetricsConfig:
30+
"""Configuration for metrics."""
31+
32+
enabled: bool
33+
34+
@classmethod
35+
def from_env(cls, prefix: str = "") -> "_MetricsConfig":
36+
"""Create metrics config from environment variables."""
37+
enabled = os.environ.get(f"{prefix}POSTHOG_ENABLED", "false").lower() == "true"
38+
return cls(enabled)
39+
40+
2341
@dataclass
2442
class Config:
2543
"""K8s cache config."""
2644

2745
db: DBConfig
2846
k8s: _K8sConfig
47+
metrics_config: _MetricsConfig
48+
quota_repo: QuotaRepository
2949
_k8s_cache: K8sDbCache | None = None
50+
_metrics_repo: MetricsRepository | None = field(default=None, repr=False, init=False)
51+
_metrics: StagingMetricsService | None = field(default=None, repr=False, init=False)
52+
_rp_repo: ResourcePoolRepository | None = field(default=None, repr=False, init=False)
53+
54+
@property
55+
def metrics_repo(self) -> MetricsRepository:
56+
"""The DB adapter for metrics."""
57+
if not self._metrics_repo:
58+
self._metrics_repo = MetricsRepository(session_maker=self.db.async_session_maker)
59+
return self._metrics_repo
60+
61+
@property
62+
def metrics(self) -> StagingMetricsService:
63+
"""The metrics service interface."""
64+
if not self._metrics:
65+
self._metrics = StagingMetricsService(enabled=self.metrics_config.enabled, metrics_repo=self.metrics_repo)
66+
return self._metrics
67+
68+
@property
69+
def rp_repo(self) -> ResourcePoolRepository:
70+
"""The resource pool repository."""
71+
if not self._rp_repo:
72+
self._rp_repo = ResourcePoolRepository(
73+
session_maker=self.db.async_session_maker, quotas_repo=self.quota_repo
74+
)
75+
return self._rp_repo
3076

3177
@property
3278
def k8s_cache(self) -> K8sDbCache:
@@ -42,5 +88,13 @@ def from_env(cls, prefix: str = "") -> "Config":
4288
"""Create a config from environment variables."""
4389
db = DBConfig.from_env(prefix)
4490
k8s_config = _K8sConfig.from_env()
91+
metrics_config = _MetricsConfig.from_env(prefix)
92+
93+
# NOTE: We only need the QuotaRepository to instantiate the ResourcePoolRepository which is used to get
94+
# the resource class and pool information for metrics. We don't need quota information for metrics at all
95+
# so we use the dummy client for quotas here as we don't actually access k8s, just the db.
96+
quota_repo = QuotaRepository(
97+
DummyCoreClient({}, {}), DummySchedulingClient({}), namespace=k8s_config.renku_namespace
98+
)
4599

46-
return cls(db=db, k8s=k8s_config)
100+
return cls(db=db, k8s=k8s_config, metrics_config=metrics_config, quota_repo=quota_repo)

bases/renku_data_services/k8s_cache/main.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,7 @@ async def main() -> None:
2121
clusters = [Cluster(id=ClusterId("renkulab"), namespace=config.k8s.renku_namespace, api=kr8s_api)]
2222

2323
watcher = K8sWatcher(
24-
handler=k8s_object_handler(config.k8s_cache),
24+
handler=k8s_object_handler(config.k8s_cache, config.metrics, rp_repo=config.rp_repo),
2525
clusters={c.id: c for c in clusters},
2626
kinds=[AMALTHEA_SESSION_KIND, JUPYTER_SESSION_KIND],
2727
)

components/renku_data_services/app_config/config.py

Lines changed: 37 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,6 +61,8 @@
6161
from renku_data_services.message_queue.db import EventRepository, ReprovisioningRepository
6262
from renku_data_services.message_queue.interface import IMessageQueue
6363
from renku_data_services.message_queue.redis_queue import RedisQueue
64+
from renku_data_services.metrics.core import StagingMetricsService
65+
from renku_data_services.metrics.db import MetricsRepository
6466
from renku_data_services.namespace.db import GroupRepository
6567
from renku_data_services.notebooks.config import NotebooksConfig
6668
from renku_data_services.platform.db import PlatformRepository
@@ -133,6 +135,20 @@ def from_env(cls, prefix: str = "") -> "SentryConfig":
133135
return cls(enabled, dsn=dsn, environment=environment, sample_rate=sample_rate)
134136

135137

138+
@dataclass
139+
class PosthogConfig:
140+
"""Configuration for posthog."""
141+
142+
enabled: bool
143+
144+
@classmethod
145+
def from_env(cls, prefix: str = "") -> "PosthogConfig":
146+
"""Create posthog config from environment variables."""
147+
enabled = os.environ.get(f"{prefix}POSTHOG_ENABLED", "false").lower() == "true"
148+
149+
return cls(enabled)
150+
151+
136152
@dataclass
137153
class TrustedProxiesConfig:
138154
"""Configuration for trusted reverse proxies."""
@@ -261,6 +277,7 @@ class Config:
261277
gitlab_url: str | None
262278
nb_config: NotebooksConfig
263279
builds_config: BuildsConfig
280+
posthog: PosthogConfig
264281

265282
secrets_service_public_key: rsa.RSAPublicKey
266283
"""The public key of the secrets service, used to encrypt user secrets that only it can decrypt."""
@@ -300,6 +317,8 @@ class Config:
300317
_data_connector_repo: DataConnectorRepository | None = field(default=None, repr=False, init=False)
301318
_data_connector_secret_repo: DataConnectorSecretRepository | None = field(default=None, repr=False, init=False)
302319
_cluster_repo: ClusterRepository | None = field(default=None, repr=False, init=False)
320+
_metrics_repo: MetricsRepository | None = field(default=None, repr=False, init=False)
321+
_metrics: StagingMetricsService | None = field(default=None, repr=False, init=False)
303322

304323
@staticmethod
305324
@functools.cache
@@ -352,7 +371,7 @@ def __post_init__(self) -> None:
352371

353372
@property
354373
def user_repo(self) -> UserRepository:
355-
"""The DB adapter for users of resoure pools and classes."""
374+
"""The DB adapter for users of resource pools and classes."""
356375
if not self._user_repo:
357376
self._user_repo = UserRepository(
358377
session_maker=self.db.async_session_maker, quotas_repo=self.quota_repo, user_repo=self.kc_user_repo
@@ -602,6 +621,20 @@ def cluster_repo(self) -> ClusterRepository:
602621

603622
return self._cluster_repo
604623

624+
@property
625+
def metrics_repo(self) -> MetricsRepository:
626+
"""The DB adapter for metrics."""
627+
if not self._metrics_repo:
628+
self._metrics_repo = MetricsRepository(session_maker=self.db.async_session_maker)
629+
return self._metrics_repo
630+
631+
@property
632+
def metrics(self) -> StagingMetricsService:
633+
"""The metrics service interface."""
634+
if not self._metrics:
635+
self._metrics = StagingMetricsService(enabled=self.posthog.enabled, metrics_repo=self.metrics_repo)
636+
return self._metrics
637+
605638
@classmethod
606639
def from_env(cls, prefix: str = "") -> "Config":
607640
"""Create a config from environment variables."""
@@ -644,7 +677,7 @@ def from_env(cls, prefix: str = "") -> "Config":
644677
UnsavedUserInfo(id="user1", first_name="user1", last_name="doe", email="[email protected]"),
645678
UnsavedUserInfo(id="user2", first_name="user2", last_name="doe", email="[email protected]"),
646679
]
647-
kc_api = DummyKeycloakAPI(users=[i._to_keycloak_dict() for i in dummy_users])
680+
kc_api = DummyKeycloakAPI(users=[i.to_keycloak_dict() for i in dummy_users])
648681
redis = RedisConfig.fake()
649682
gitlab_url = None
650683
else:
@@ -698,6 +731,7 @@ def from_env(cls, prefix: str = "") -> "Config":
698731
message_queue = RedisQueue(redis)
699732
nb_config = NotebooksConfig.from_env(db)
700733
builds_config = BuildsConfig.from_env(prefix, k8s_namespace)
734+
posthog = PosthogConfig.from_env(prefix)
701735

702736
return cls(
703737
version=version,
@@ -721,4 +755,5 @@ def from_env(cls, prefix: str = "") -> "Config":
721755
gitlab_url=gitlab_url,
722756
nb_config=nb_config,
723757
builds_config=builds_config,
758+
posthog=posthog,
724759
)

components/renku_data_services/base_models/core.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -77,6 +77,7 @@ class ServiceAdminId(StrEnum):
7777

7878
migrations = "migrations"
7979
secrets_rotation = "secrets_rotation"
80+
k8s_watcher = "k8s_watcher"
8081

8182

8283
@dataclass(kw_only=True, frozen=True)

0 commit comments

Comments
 (0)