Skip to content

Commit eb83a00

Browse files
committed
feat: add clickhouse telemetry backend and tooling
1 parent cc551ad commit eb83a00

File tree

7 files changed

+129
-3
lines changed

7 files changed

+129
-3
lines changed

Makefile

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
.PHONY: setup test compile run docs dashboard
1+
.PHONY: setup test compile run docs dashboard clickhouse-up
22

33
setup:
44
uv sync --all-extras
@@ -17,3 +17,8 @@ docs:
1717

1818
dashboard:
1919
uv run --group dashboard -- streamlit run dashboards/agent_dashboard.py
20+
21+
clickhouse-up:
22+
docker run --rm -p 8123:8123 -p 9000:9000 \
23+
-v $(PWD)/infrastructure/clickhouse/schema.sql:/docker-entrypoint-initdb.d/schema.sql \
24+
clickhouse/clickhouse-server:latest

README.md

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ Copy `.env.example` to `.env` and adjust values locally if you prefer dotenv-sty
5959
| `PROVENANCE_RISK_HIGH_SEVERITY_THRESHOLD` | Number of high-severity findings before issuing a warn | `1` |
6060
| `PROVENANCE_ANALYTICS_DEFAULT_WINDOW` | Default lookback window for analytics | `7d` |
6161
| `PROVENANCE_SEMGREP_CONFIG_PATH` | Override path/URL for Semgrep configuration | *(bundled rules)* |
62-
| `PROVENANCE_TIMESERIES_BACKEND` | Backend for analytics event export (`file`, `bigquery`, `snowflake`, `off`) | `file` |
62+
| `PROVENANCE_TIMESERIES_BACKEND` | Backend for analytics event export (`file`, `bigquery`, `snowflake`, `clickhouse`, `off`) | `file` |
6363
| `PROVENANCE_TIMESERIES_PATH` | File path for JSONL events when using `file` backend | `data/timeseries_events.jsonl` |
6464
| `PROVENANCE_TIMESERIES_PROJECT` | Cloud project/account for warehouse exports | *(unset)* |
6565
| `PROVENANCE_TIMESERIES_DATABASE` | Warehouse database name (Snowflake only) | *(unset)* |
@@ -86,6 +86,10 @@ Copy `.env.example` to `.env` and adjust values locally if you prefer dotenv-sty
8686
| `PROVENANCE_GITHUB_REVIEWER_TEAM_MAP` | JSON map of reviewer logins to team names for cohort reporting | `{}` |
8787
| `PROVENANCE_TEAM_REVIEW_BUDGETS` | JSON map of team names to max expected human review counts per window | `{}` |
8888
| `PROVENANCE_AGENT_PUBLIC_KEYS` | JSON map of agent IDs to base64 Ed25519 public keys for attestation verification | `{}` |
89+
| `PROVENANCE_CLICKHOUSE_URL` | ClickHouse HTTP endpoint for analytics export (when backend=`clickhouse`) | *(unset)* |
90+
| `PROVENANCE_CLICKHOUSE_DATABASE` | ClickHouse database name (optional if table already qualified) | `provenance` |
91+
| `PROVENANCE_CLICKHOUSE_USER` | ClickHouse user for authenticated writes | *(unset)* |
92+
| `PROVENANCE_CLICKHOUSE_PASSWORD` | ClickHouse password for authenticated writes | *(unset)* |
8993

9094
## Detection Pipeline
9195

@@ -183,6 +187,12 @@ The same process works against forks or sandboxes—helpful when validating new
183187
- Set `PROVENANCE_OTEL_ENABLED=true` to emit OpenTelemetry metrics (currently using the console exporter by default).
184188
- Event payloads include per-agent code volume, churn rates, complexity heuristics, and counts by finding category/severity.
185189

190+
### ClickHouse quickstart
191+
192+
- Run `make clickhouse-up` to launch a local ClickHouse instance with the starter schema from `infrastructure/clickhouse/schema.sql`.
193+
- Configure `PROVENANCE_TIMESERIES_BACKEND=clickhouse`, `PROVENANCE_CLICKHOUSE_URL=http://localhost:8123`, and `PROVENANCE_TIMESERIES_TABLE=analysis_events` (or point at your own table) to mirror analytics events into ClickHouse.
194+
- Downstream jobs can query the `provenance` database (tables: `analysis_events`, `findings`, `review_events`) for long-horizon reporting while Redis continues to serve hot state.
195+
186196
## Dashboard
187197

188198
- Install dashboard dependencies: `uv sync --group dashboard`

app/core/config.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,10 @@ class Settings(BaseSettings):
4444
github_reviewer_team_map: dict[str, str] = Field(default_factory=dict)
4545
github_team_review_budgets: dict[str, int] = Field(default_factory=dict)
4646
agent_public_keys: dict[str, str] = Field(default_factory=dict)
47+
clickhouse_url: str | None = None
48+
clickhouse_database: str | None = None
49+
clickhouse_user: str | None = None
50+
clickhouse_password: str | None = None
4751

4852
model_config = SettingsConfigDict(env_prefix="provenance_", env_file=".env", extra="ignore")
4953

app/telemetry/event_sink.py

Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@
88
from pathlib import Path
99
from typing import Protocol
1010

11+
import requests
12+
1113
from app.core.config import settings
1214

1315

@@ -180,6 +182,64 @@ def __del__(self): # pragma: no cover - best effort cleanup
180182
return
181183

182184

185+
class ClickHouseEventSink:
186+
"""Writes events into ClickHouse via the HTTP interface."""
187+
188+
def __init__(
189+
self,
190+
url: str,
191+
table: str,
192+
*,
193+
database: str | None = None,
194+
user: str | None = None,
195+
password: str | None = None,
196+
batch_size: int = 25,
197+
) -> None:
198+
self._url = url.rstrip("/")
199+
self._table = table
200+
self._database = database
201+
self._batch_size = max(batch_size, 1)
202+
self._auth = (user, password) if user and password else None
203+
self._buffer: list[str] = []
204+
self._lock = threading.Lock()
205+
self._session = requests.Session()
206+
207+
def _table_reference(self) -> str:
208+
if self._database and "." not in self._table:
209+
return f"{self._database}.{self._table}"
210+
return self._table
211+
212+
def publish(self, event: dict) -> None:
213+
payload = json.dumps(event, separators=(",", ":"), sort_keys=True)
214+
with self._lock:
215+
self._buffer.append(payload)
216+
if len(self._buffer) >= self._batch_size:
217+
self._flush_locked()
218+
219+
def close(self) -> None:
220+
with self._lock:
221+
self._flush_locked()
222+
self._session.close()
223+
224+
def _flush_locked(self) -> None:
225+
if not self._buffer:
226+
return
227+
data = "\n".join(self._buffer)
228+
query = f"INSERT INTO {self._table_reference()} FORMAT JSONEachRow\n{data}\n"
229+
params = {"database": self._database} if self._database and "." not in self._table else None
230+
response = self._session.post(
231+
self._url,
232+
params=params,
233+
data=query.encode("utf-8"),
234+
auth=self._auth,
235+
headers={"Content-Type": "application/json"},
236+
timeout=30,
237+
)
238+
if response.status_code >= 400:
239+
raise RuntimeError(f"ClickHouse insert failed ({response.status_code}): {response.text}")
240+
self._buffer.clear()
241+
242+
183243
def sink_from_settings() -> EventSink:
184244
"""Factory to construct an event sink based on app settings."""
185245

@@ -219,6 +279,17 @@ def sink_from_settings() -> EventSink:
219279
role=settings.timeseries_role,
220280
batch_size=settings.timeseries_batch_size,
221281
)
282+
if backend == "clickhouse":
283+
if not (settings.clickhouse_url and settings.timeseries_table):
284+
raise ValueError("ClickHouse backend requires PROVENANCE_CLICKHOUSE_URL and PROVENANCE_TIMESERIES_TABLE")
285+
return ClickHouseEventSink(
286+
url=settings.clickhouse_url,
287+
table=settings.timeseries_table,
288+
database=settings.clickhouse_database,
289+
user=settings.clickhouse_user,
290+
password=settings.clickhouse_password,
291+
batch_size=settings.timeseries_batch_size,
292+
)
222293
if backend in {"off", "none", "disabled"}:
223294
return NullEventSink()
224295
raise ValueError(f"Unsupported timeseries backend: {settings.timeseries_backend}")
Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
CREATE DATABASE IF NOT EXISTS provenance;
2+
3+
CREATE TABLE IF NOT EXISTS provenance.analysis_events
4+
(
5+
event_time DateTime DEFAULT now(),
6+
payload JSON
7+
)
8+
ENGINE = MergeTree
9+
ORDER BY (event_time);
10+
11+
CREATE TABLE IF NOT EXISTS provenance.findings
12+
(
13+
analysis_id String,
14+
repo_id String,
15+
pr_number String,
16+
rule_key String,
17+
severity String,
18+
detected_at DateTime,
19+
payload JSON
20+
)
21+
ENGINE = MergeTree
22+
ORDER BY (analysis_id, rule_key, detected_at);
23+
24+
CREATE TABLE IF NOT EXISTS provenance.review_events
25+
(
26+
repo_id String,
27+
pr_number String,
28+
agent_id String,
29+
recorded_at DateTime DEFAULT now(),
30+
payload JSON
31+
)
32+
ENGINE = MergeTree
33+
ORDER BY (repo_id, pr_number, recorded_at);

pyproject.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,8 @@ dependencies = [
1414
"semgrep>=1.64,<2.0",
1515
"opentelemetry-api>=1.25,<2.0",
1616
"opentelemetry-sdk>=1.25,<2.0",
17-
"PyGithub>=2.4,<3.0"
17+
"PyGithub>=2.4,<3.0",
18+
"requests>=2.31,<3.0"
1819
]
1920

2021
[project.optional-dependencies]

uv.lock

Lines changed: 2 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

0 commit comments

Comments
 (0)