Skip to content

Commit 4b7b4ee

Browse files
authored
feat(sqlmesh): implement builder classifiers (#5449)
* feat(sqlmesh): rank projects within a collection using metrics * feat(sqlmesh): test a clustering algo in production * chore: add scikit-learn to uv
1 parent ce6ecb7 commit 4b7b4ee

File tree

6 files changed

+2148
-1614
lines changed

6 files changed

+2148
-1614
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ dependencies = [
7474
"cryo>=0.3.2",
7575
"web3>=7.13.0",
7676
"pynessie>=0.67.0",
77+
"scikit-learn>=1.7.2",
7778
]
7879
name = "oso"
7980
version = "1.0.0"

uv.lock

Lines changed: 1608 additions & 1614 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.
Lines changed: 250 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,250 @@
1+
"""
2+
K-means clustering model for collection performance segmentation.
3+
4+
This Python model performs k-means clustering on collection performance features
5+
to segment collections into performance tiers (e.g., Top, High, Mid, Low, Very Low).
6+
"""
7+
8+
from __future__ import annotations
9+
10+
import typing as t
11+
12+
import pandas as pd
13+
from sklearn.cluster import KMeans
14+
from sklearn.metrics import silhouette_samples, silhouette_score
15+
from sklearn.preprocessing import StandardScaler
16+
from sqlmesh import ExecutionContext, model
17+
18+
19+
@model(
20+
"oso.int_collection_performance_clusters",
21+
kind="full",
22+
columns={
23+
"collection_id": "TEXT",
24+
"collection_source": "TEXT",
25+
"collection_namespace": "TEXT",
26+
"collection_name": "TEXT",
27+
"collection_display_name": "TEXT",
28+
"sample_date": "DATE",
29+
"total_active_developers": "DOUBLE",
30+
"avg_active_developers_per_project": "DOUBLE",
31+
"active_dev_concentration": "DOUBLE",
32+
"total_stars": "DOUBLE",
33+
"avg_stars_per_project": "DOUBLE",
34+
"total_commits": "DOUBLE",
35+
"avg_commits_per_project": "DOUBLE",
36+
"total_forks": "DOUBLE",
37+
"total_contributors": "DOUBLE",
38+
"avg_contributors_per_project": "DOUBLE",
39+
"total_projects": "DOUBLE",
40+
"developer_density": "DOUBLE",
41+
"commits_per_developer": "DOUBLE",
42+
"star_appeal": "DOUBLE",
43+
"cluster": "INTEGER",
44+
"segment": "TEXT",
45+
"silhouette": "DOUBLE",
46+
},
47+
grain="collection_id",
48+
audits=[
49+
("not_null", {"columns": ["collection_id"]}),
50+
],
51+
)
52+
def execute(
53+
context: ExecutionContext,
54+
start: t.Optional[str] = None,
55+
end: t.Optional[str] = None,
56+
execution_time: t.Optional[str] = None,
57+
**kwargs: t.Any,
58+
) -> t.Iterator[pd.DataFrame]:
59+
"""
60+
Execute k-means clustering on collection performance features.
61+
62+
Args:
63+
context: SQLMesh execution context
64+
start: Start date (not used for FULL model)
65+
end: End date (not used for FULL model)
66+
execution_time: Execution time (not used for FULL model)
67+
**kwargs: Additional arguments
68+
69+
Yields:
70+
DataFrame with clustering results
71+
"""
72+
# Resolve the upstream table name (environment-aware)
73+
features_table = context.resolve_table("oso.int_collection_performance_features")
74+
75+
# Read the features
76+
query = f"SELECT * FROM {features_table}"
77+
df = context.fetchdf(query)
78+
79+
# If no data, yield nothing (use empty generator pattern)
80+
if df.empty:
81+
yield from ()
82+
return
83+
84+
# Type enforcement
85+
df["collection_id"] = df["collection_id"].astype(str)
86+
df["collection_source"] = df["collection_source"].astype(str)
87+
df["collection_namespace"] = df["collection_namespace"].astype(str)
88+
df["collection_name"] = df["collection_name"].astype(str)
89+
df["collection_display_name"] = df["collection_display_name"].astype(str)
90+
df["sample_date"] = pd.to_datetime(df["sample_date"])
91+
92+
# Fill nulls for numeric columns
93+
numeric_cols = [
94+
"total_active_developers",
95+
"avg_active_developers_per_project",
96+
"active_dev_concentration",
97+
"total_stars",
98+
"avg_stars_per_project",
99+
"total_commits",
100+
"avg_commits_per_project",
101+
"total_forks",
102+
"total_contributors",
103+
"avg_contributors_per_project",
104+
"total_projects",
105+
"developer_density",
106+
"commits_per_developer",
107+
"star_appeal",
108+
]
109+
110+
for col in numeric_cols:
111+
df[col] = df[col].fillna(0).astype(float)
112+
113+
# Select features for clustering (higher weights = more important)
114+
feature_cols = [
115+
"total_active_developers", # Weight: 2.5
116+
"total_stars", # Weight: 1.5
117+
"total_commits", # Weight: 2.0
118+
"total_contributors", # Weight: 2.0
119+
"avg_active_developers_per_project", # Weight: 1.5
120+
"developer_density", # Weight: 1.8
121+
"commits_per_developer", # Weight: 1.2
122+
"star_appeal", # Weight: 1.0
123+
]
124+
125+
# Feature weights (emphasize developer activity and engagement)
126+
feature_weights = [2.5, 1.5, 2.0, 2.0, 1.5, 1.8, 1.2, 1.0]
127+
128+
# Build feature matrix
129+
X = df[feature_cols].values
130+
131+
# Standardize features (zero mean, unit variance)
132+
scaler = StandardScaler()
133+
X_scaled = scaler.fit_transform(X)
134+
135+
# Apply feature weights
136+
X_weighted = X_scaled * feature_weights
137+
138+
# Determine optimal k using silhouette score
139+
n_samples = len(df)
140+
max_k = min(6, n_samples) # Try up to 6 clusters, but not more than samples
141+
142+
if n_samples < 3:
143+
# Too few samples for meaningful clustering
144+
df["cluster"] = 0
145+
df["segment"] = "Single Cluster"
146+
df["silhouette"] = 0.0
147+
else:
148+
best_score = -1
149+
best_labels = None
150+
151+
# Try different values of k
152+
for k in range(3, max_k + 1):
153+
if k >= n_samples:
154+
break
155+
156+
kmeans = KMeans(
157+
n_clusters=k,
158+
init="k-means++",
159+
n_init="auto",
160+
max_iter=300,
161+
random_state=42,
162+
)
163+
labels = kmeans.fit_predict(X_weighted)
164+
165+
# Calculate silhouette score
166+
score = silhouette_score(X_weighted, labels)
167+
168+
if score > best_score:
169+
best_score = score
170+
best_labels = labels
171+
172+
# Assign cluster labels
173+
if best_labels is not None:
174+
df["cluster"] = best_labels
175+
176+
# Calculate per-sample silhouette scores
177+
silhouettes = silhouette_samples(X_weighted, best_labels)
178+
df["silhouette"] = silhouettes
179+
else:
180+
# Fallback if clustering failed
181+
df["cluster"] = 0
182+
df["silhouette"] = 0.0
183+
184+
# Assign meaningful segment names based on performance
185+
# Sort clusters by mean total_active_developers (proxy for overall performance)
186+
cluster_performance = (
187+
df.groupby("cluster")["total_active_developers"]
188+
.mean()
189+
.sort_values(ascending=True) # type: ignore[call-overload]
190+
)
191+
192+
# Map clusters to segment names
193+
segment_names = ["Very Low", "Low", "Mid", "High", "Top"]
194+
cluster_to_segment: dict[int, str] = {}
195+
196+
for idx, cluster_id in enumerate(cluster_performance.index):
197+
# Assign names based on sorted order
198+
if idx < len(segment_names):
199+
cluster_to_segment[int(cluster_id)] = segment_names[idx]
200+
else:
201+
cluster_to_segment[int(cluster_id)] = f"Tier {idx + 1}"
202+
203+
df["segment"] = df["cluster"].map(cluster_to_segment) # type: ignore[arg-type]
204+
205+
# Ensure output columns match schema and order
206+
output_df = df[
207+
[
208+
"collection_id",
209+
"collection_source",
210+
"collection_namespace",
211+
"collection_name",
212+
"collection_display_name",
213+
"sample_date",
214+
"total_active_developers",
215+
"avg_active_developers_per_project",
216+
"active_dev_concentration",
217+
"total_stars",
218+
"avg_stars_per_project",
219+
"total_commits",
220+
"avg_commits_per_project",
221+
"total_forks",
222+
"total_contributors",
223+
"avg_contributors_per_project",
224+
"total_projects",
225+
"developer_density",
226+
"commits_per_developer",
227+
"star_appeal",
228+
"cluster",
229+
"segment",
230+
"silhouette",
231+
]
232+
].copy()
233+
234+
# Final type casting for safety
235+
output_df["collection_id"] = output_df["collection_id"].astype(str)
236+
output_df["collection_source"] = output_df["collection_source"].astype(str)
237+
output_df["collection_namespace"] = output_df["collection_namespace"].astype(str)
238+
output_df["collection_name"] = output_df["collection_name"].astype(str)
239+
output_df["collection_display_name"] = output_df["collection_display_name"].astype(
240+
str
241+
)
242+
output_df["cluster"] = output_df["cluster"].astype(int)
243+
output_df["segment"] = output_df["segment"].astype(str)
244+
output_df["silhouette"] = output_df["silhouette"].astype(float)
245+
246+
for col in numeric_cols:
247+
output_df[col] = output_df[col].astype(float)
248+
249+
# Yield the final DataFrame
250+
yield output_df # type: ignore[misc]
Lines changed: 118 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,118 @@
1+
MODEL (
2+
name oso.int_collection_performance_features,
3+
description "Performance features for each collection to be used in clustering",
4+
kind FULL,
5+
dialect trino,
6+
grain (collection_id, sample_date),
7+
tags (
8+
'entity_category=collection',
9+
'model_category=clustering'
10+
),
11+
audits (
12+
has_at_least_n_rows(threshold := 0),
13+
unique_values(columns := (collection_id, sample_date)),
14+
not_null(columns := (collection_id, sample_date))
15+
)
16+
);
17+
18+
-- Aggregate performance metrics at the collection level for the most recent date
19+
WITH latest_dates AS (
20+
SELECT
21+
collection_id,
22+
MAX(sample_date) AS max_date
23+
FROM oso.int_ranked_projects_by_collection
24+
GROUP BY collection_id
25+
),
26+
ranked_projects AS (
27+
SELECT
28+
rpc.collection_id,
29+
rpc.collection_source,
30+
rpc.collection_namespace,
31+
rpc.collection_name,
32+
rpc.collection_display_name,
33+
rpc.sample_date,
34+
rpc.metric_id,
35+
rpc.metric_model,
36+
COUNT(DISTINCT rpc.project_id) AS project_count,
37+
SUM(rpc.amount) AS total_amount,
38+
AVG(rpc.amount) AS avg_amount,
39+
MAX(rpc.amount) AS max_amount,
40+
MIN(rpc.amount) AS min_amount,
41+
STDDEV(rpc.amount) AS stddev_amount,
42+
-- Top project concentration: what % of total is from top 20% of projects
43+
SUM(CASE WHEN rpc.percentile_rank >= 0.8 THEN rpc.amount ELSE 0 END) / NULLIF(SUM(rpc.amount), 0) AS top_20_concentration
44+
FROM oso.int_ranked_projects_by_collection AS rpc
45+
INNER JOIN latest_dates AS ld
46+
ON rpc.collection_id = ld.collection_id
47+
AND rpc.sample_date = ld.max_date
48+
GROUP BY
49+
rpc.collection_id,
50+
rpc.collection_source,
51+
rpc.collection_namespace,
52+
rpc.collection_name,
53+
rpc.collection_display_name,
54+
rpc.sample_date,
55+
rpc.metric_id,
56+
rpc.metric_model
57+
),
58+
-- Pivot key metrics into columns for easier feature engineering
59+
pivoted_metrics AS (
60+
SELECT
61+
collection_id,
62+
collection_source,
63+
collection_namespace,
64+
collection_name,
65+
collection_display_name,
66+
sample_date,
67+
-- Active developers metrics
68+
MAX(CASE WHEN metric_model = 'active_developers' THEN total_amount END) AS total_active_developers,
69+
MAX(CASE WHEN metric_model = 'active_developers' THEN avg_amount END) AS avg_active_developers_per_project,
70+
MAX(CASE WHEN metric_model = 'active_developers' THEN top_20_concentration END) AS active_dev_concentration,
71+
-- Stars metrics
72+
MAX(CASE WHEN metric_model = 'stars' THEN total_amount END) AS total_stars,
73+
MAX(CASE WHEN metric_model = 'stars' THEN avg_amount END) AS avg_stars_per_project,
74+
-- Commits metrics
75+
MAX(CASE WHEN metric_model = 'commits' THEN total_amount END) AS total_commits,
76+
MAX(CASE WHEN metric_model = 'commits' THEN avg_amount END) AS avg_commits_per_project,
77+
-- Forks metrics
78+
MAX(CASE WHEN metric_model = 'forks' THEN total_amount END) AS total_forks,
79+
-- Contributors metrics
80+
MAX(CASE WHEN metric_model = 'contributors' THEN total_amount END) AS total_contributors,
81+
MAX(CASE WHEN metric_model = 'contributors' THEN avg_amount END) AS avg_contributors_per_project,
82+
-- Project diversity
83+
MAX(project_count) AS total_projects
84+
FROM ranked_projects
85+
GROUP BY
86+
collection_id,
87+
collection_source,
88+
collection_namespace,
89+
collection_name,
90+
collection_display_name,
91+
sample_date
92+
)
93+
94+
SELECT
95+
collection_id,
96+
collection_source,
97+
collection_namespace,
98+
collection_name,
99+
collection_display_name,
100+
sample_date,
101+
-- Core metrics (coalesce to 0 if null)
102+
COALESCE(total_active_developers, 0) AS total_active_developers,
103+
COALESCE(avg_active_developers_per_project, 0) AS avg_active_developers_per_project,
104+
COALESCE(active_dev_concentration, 0) AS active_dev_concentration,
105+
COALESCE(total_stars, 0) AS total_stars,
106+
COALESCE(avg_stars_per_project, 0) AS avg_stars_per_project,
107+
COALESCE(total_commits, 0) AS total_commits,
108+
COALESCE(avg_commits_per_project, 0) AS avg_commits_per_project,
109+
COALESCE(total_forks, 0) AS total_forks,
110+
COALESCE(total_contributors, 0) AS total_contributors,
111+
COALESCE(avg_contributors_per_project, 0) AS avg_contributors_per_project,
112+
COALESCE(total_projects, 0) AS total_projects,
113+
-- Derived features
114+
COALESCE(total_active_developers, 0) / NULLIF(total_projects, 0) AS developer_density,
115+
COALESCE(total_commits, 0) / NULLIF(total_active_developers, 0) AS commits_per_developer,
116+
COALESCE(total_stars, 0) / NULLIF(total_projects, 0) AS star_appeal
117+
FROM pivoted_metrics
118+

0 commit comments

Comments
 (0)