Skip to content

Commit 8d5eea1

Browse files
committed
Refactor timeseries segmenter
1 parent 0567de6 commit 8d5eea1

File tree

2 files changed

+161
-91
lines changed

2 files changed

+161
-91
lines changed

label_studio_ml/examples/timeseries_segmenter/README.md

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,12 +51,52 @@ collects all labeled segments, extracts sensor values inside each segment and
5151
fits a logistic regression classifier. Model artifacts are stored in the
5252
`MODEL_DIR` (defaults to the current directory).
5353

54+
Steps performed by `fit()`:
55+
56+
1. Fetch all labeled tasks from Label Studio.
57+
2. Convert labeled ranges to per-row training samples.
58+
3. Fit a logistic regression model.
59+
4. Save the trained model to disk.
60+
5461
## Prediction
5562

5663
For each task, the backend loads the CSV, applies the trained classifier to each
5764
row and groups consecutive predictions into labeled segments. Prediction scores
5865
are averaged per segment and returned to Label Studio.
5966

67+
The `predict()` method:
68+
69+
1. Loads the stored model.
70+
2. Reads the task CSV and builds a feature matrix.
71+
3. Predicts a label for each row.
72+
4. Merges consecutive rows with the same label into a segment.
73+
5. Returns the segments in Label Studio JSON format.
74+
75+
## How it works
76+
77+
### Training pipeline
78+
79+
```mermaid
80+
flowchart TD
81+
A[Webhook event] --> B{Enough tasks?}
82+
B -- no --> C[Skip]
83+
B -- yes --> D[Load labeled tasks]
84+
D --> E[Collect per-row samples]
85+
E --> F[Fit logistic regression]
86+
F --> G[Save model]
87+
```
88+
89+
### Prediction pipeline
90+
91+
```mermaid
92+
flowchart TD
93+
T[Predict request] --> U[Load model]
94+
U --> V[Read task CSV]
95+
V --> W[Predict label per row]
96+
W --> X[Group consecutive labels]
97+
X --> Y[Return segments]
98+
```
99+
60100
## Customize
61101

62102
Edit `docker-compose.yml` to set environment variables such as `LABEL_STUDIO_HOST`
Lines changed: 121 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1-
"""Logistic regression based time series segmenter."""
1+
"""Logistic regression based time series segmenter.
2+
3+
This example shows a very small yet functional ML backend that trains a
4+
classifier on labeled time series CSV files and predicts segments for new
5+
tasks. The logic is intentionally simple so that it can serve as a starting
6+
point for your own experiments.
7+
"""
28

39
import os
410
import io
511
import pickle
612
import logging
7-
from typing import List, Dict, Optional
13+
from typing import List, Dict, Optional, Tuple
814

915
import pandas as pd
1016
import numpy as np
@@ -32,9 +38,11 @@ def setup(self):
3238
"""Initialize model metadata."""
3339
self.set("model_version", f"{self.__class__.__name__}-v0.0.1")
3440

35-
# util functions
41+
# ------------------------------------------------------------------
42+
# Utility helpers
43+
3644
def _get_model(self, blank: bool = False) -> LogisticRegression:
37-
"""Return a trained model or create a new one."""
45+
"""Return a trained model or create a fresh one if needed."""
3846
global _model
3947
if _model is not None and not blank:
4048
return _model
@@ -47,20 +55,22 @@ def _get_model(self, blank: bool = False) -> LogisticRegression:
4755
return _model
4856

4957
def _get_labeling_params(self) -> Dict:
50-
"""Extract tag names and channel info from the labeling config."""
58+
"""Return tag names and channel information from the labeling config."""
5159
from_name, to_name, value = self.label_interface.get_first_tag_occurence(
5260
"TimeSeriesLabels", "TimeSeries"
5361
)
5462
tag = self.label_interface.get_tag(from_name)
5563
labels = list(tag.labels)
5664
ts_tag = self.label_interface.get_tag(to_name)
5765
time_col = ts_tag.attr.get("timeColumn")
58-
# Parse channels from the original XML because the tag does not expose its children
66+
# Parse channel names from the original XML because TimeSeries tag
67+
# does not expose its children via label-studio's interface
5968
import xml.etree.ElementTree as ET
6069

6170
root = ET.fromstring(self.label_config)
6271
ts_elem = root.find(f".//TimeSeries[@name='{to_name}']")
6372
channels = [ch.attrib["column"] for ch in ts_elem.findall("Channel")]
73+
6474
return {
6575
'from_name': from_name,
6676
'to_name': to_name,
@@ -71,71 +81,115 @@ def _get_labeling_params(self) -> Dict:
7181
}
7282

7383
def _read_csv(self, task: Dict, path: str) -> pd.DataFrame:
74-
"""Load CSV associated with the task from Label Studio."""
84+
"""Load a CSV referenced by the task using Label Studio utilities."""
7585
csv_str = self.preload_task_data(task, path)
7686
return pd.read_csv(io.StringIO(csv_str))
7787

88+
def _predict_task(self, task: Dict, model: LogisticRegression, params: Dict) -> Dict:
89+
"""Return Label Studio-style prediction for a single task."""
90+
df = self._read_csv(task, task["data"][params["value"]])
91+
92+
# Vector of sensor values per row
93+
X = df[params['channels']].values
94+
if len(X) == 0:
95+
return {}
96+
97+
# Predict label probabilities for each row
98+
probs = model.predict_proba(X)
99+
labels_idx = np.argmax(probs, axis=1)
100+
df['pred_label'] = [params['labels'][i] for i in labels_idx]
101+
df['score'] = probs[np.arange(len(probs)), labels_idx]
102+
103+
segments = self._group_rows(df, params['time_col'])
104+
105+
results = []
106+
avg_score = 0
107+
for seg in segments:
108+
score = float(np.mean(seg['scores']))
109+
avg_score += score
110+
results.append({
111+
'from_name': params['from_name'],
112+
'to_name': params['to_name'],
113+
'type': 'timeserieslabels',
114+
'value': {
115+
'start': seg['start'],
116+
'end': seg['end'],
117+
'instant': False,
118+
'timeserieslabels': [seg['label']]
119+
},
120+
'score': score
121+
})
122+
123+
if not results:
124+
return {}
125+
126+
return {
127+
'result': results,
128+
'score': avg_score / len(results),
129+
'model_version': self.get('model_version')
130+
}
131+
132+
def _group_rows(self, df: pd.DataFrame, time_col: str) -> List[Dict]:
133+
"""Group consecutive rows with the same predicted label."""
134+
segments = []
135+
current = None
136+
for _, row in df.iterrows():
137+
label = row['pred_label']
138+
if current and current['label'] == label:
139+
current['end'] = row[time_col]
140+
current['scores'].append(row['score'])
141+
else:
142+
if current:
143+
segments.append(current)
144+
current = {
145+
'label': label,
146+
'start': row[time_col],
147+
'end': row[time_col],
148+
'scores': [row['score']]
149+
}
150+
if current:
151+
segments.append(current)
152+
return segments
153+
154+
def _collect_samples(self, tasks: List[Dict], params: Dict, label2idx: Dict[str, int]) -> Tuple[List, List]:
155+
"""Return feature matrix and label vector built from all labeled tasks."""
156+
X, y = [], []
157+
for task in tasks:
158+
df = self._read_csv(task, task['data'][params['value']])
159+
if df.empty:
160+
continue
161+
annotations = [a for a in task['annotations'] if a.get('result')]
162+
for ann in annotations:
163+
for r in ann['result']:
164+
if r['from_name'] != params['from_name']:
165+
continue
166+
start = r['value']['start']
167+
end = r['value']['end']
168+
label = r['value']['timeserieslabels'][0]
169+
mask = (
170+
(df[params['time_col']] >= start)
171+
& (df[params['time_col']] <= end)
172+
)
173+
seg = df.loc[mask, params['channels']].values
174+
X.extend(seg)
175+
y.extend([label2idx[label]] * len(seg))
176+
return X, y
177+
178+
def _save_model(self, model: LogisticRegression) -> None:
179+
"""Persist trained model to disk."""
180+
os.makedirs(self.MODEL_DIR, exist_ok=True)
181+
model_path = os.path.join(self.MODEL_DIR, 'model.pkl')
182+
with open(model_path, 'wb') as f:
183+
pickle.dump(model, f)
184+
78185
def predict(
79186
self, tasks: List[Dict], context: Optional[Dict] = None, **kwargs
80187
) -> ModelResponse:
81188
"""Return time series segments predicted for the given tasks."""
82189
params = self._get_labeling_params()
83190
model = self._get_model()
84-
predictions = []
85-
for task in tasks:
86-
df = self._read_csv(task, task["data"][params["value"]])
87-
# each row is described by the selected channels
88-
X = df[params['channels']].values
89-
if len(X) == 0:
90-
predictions.append({})
91-
continue
92-
# predict probabilities for each label
93-
probs = model.predict_proba(X)
94-
labels_idx = np.argmax(probs, axis=1)
95-
df['pred_label'] = [params['labels'][i] for i in labels_idx]
96-
df['score'] = probs[np.arange(len(probs)), labels_idx]
97-
# group consecutive rows with the same predicted label
98-
segments = []
99-
current = None
100-
for _, row in df.iterrows():
101-
label = row['pred_label']
102-
if current and current['label'] == label:
103-
current['end'] = row[params['time_col']]
104-
current['scores'].append(row['score'])
105-
else:
106-
if current:
107-
segments.append(current)
108-
current = {
109-
'label': label,
110-
'start': row[params['time_col']],
111-
'end': row[params['time_col']],
112-
'scores': [row['score']]
113-
}
114-
if current:
115-
segments.append(current)
116-
results = []
117-
avg_score = 0
118-
for seg in segments:
119-
score = float(np.mean(seg['scores']))
120-
avg_score += score
121-
results.append({
122-
'from_name': params['from_name'],
123-
'to_name': params['to_name'],
124-
'type': 'timeserieslabels',
125-
'value': {
126-
'start': seg['start'],
127-
'end': seg['end'],
128-
'instant': False,
129-
'timeserieslabels': [seg['label']]
130-
},
131-
'score': score
132-
})
133-
if results:
134-
predictions.append({
135-
'result': results,
136-
'score': avg_score / len(results),
137-
'model_version': self.get('model_version')
138-
})
191+
predictions = [self._predict_task(task, model, params) for task in tasks]
192+
139193
return ModelResponse(predictions=predictions, model_version=self.get('model_version'))
140194

141195
def _get_tasks(self, project_id: int) -> List[Dict]:
@@ -163,40 +217,16 @@ def fit(self, event, data, **kwargs):
163217
return
164218
params = self._get_labeling_params()
165219
label2idx = {l: i for i, l in enumerate(params['labels'])}
166-
X, y = [], [] # features and labels for classifier
167-
for task in tasks:
168-
df = self._read_csv(task, task['data'][params['value']])
169-
if df.empty:
170-
continue
171-
# convert labeled segments into per-row samples
172-
annotations = [a for a in task['annotations'] if a.get('result')]
173-
for ann in annotations:
174-
for r in ann['result']:
175-
if r['from_name'] != params['from_name']:
176-
continue
177-
start = r['value']['start']
178-
end = r['value']['end']
179-
label = r['value']['timeserieslabels'][0]
180-
mask = (
181-
(df[params['time_col']] >= start)
182-
& (df[params['time_col']] <= end)
183-
)
184-
seg = df.loc[mask, params['channels']].values
185-
# each row inside the labeled range belongs to the segment
186-
X.extend(seg)
187-
y.extend([label2idx[label]] * len(seg))
220+
221+
X, y = self._collect_samples(tasks, params, label2idx)
188222
if not X:
189223
logger.warning('No data collected for training')
190224
return
225+
191226
model = self._get_model(blank=True)
192227
model.fit(np.array(X), np.array(y))
193-
os.makedirs(self.MODEL_DIR, exist_ok=True)
194-
model_path = os.path.join(self.MODEL_DIR, 'model.pkl')
195-
# save trained model to disk
196-
with open(model_path, 'wb') as f:
197-
pickle.dump(model, f)
228+
self._save_model(model)
198229
global _model
199-
# reload the cached model on next prediction
200-
_model = None
230+
_model = None # reload on next predict
201231
self._get_model()
202232

0 commit comments

Comments
 (0)