33This example implements a simple ML backend that trains a
44recurrent neural network on labeled time series CSV files
55and predicts segments for new tasks.
6+ """
7+
8+ import os
9+ import io
10+ import logging
611import pickle
712from typing import Dict , List , Optional , Tuple
13+
14+ import numpy as np
815import pandas as pd
16+ import label_studio_sdk
917import tensorflow as tf
1018from tensorflow .keras import layers , models
19+
20+ from label_studio_ml .model import LabelStudioMLBase
21+ from label_studio_ml .response import ModelResponse
22+
23+ logger = logging .getLogger (__name__ )
24+
1125_model : Optional [models .Model ] = None
26+
27+
28+ class TimeSeriesSegmenter (LabelStudioMLBase ):
1229 """Minimal LSTM-based segmenter for time series."""
1330
1431 LABEL_STUDIO_HOST = os .getenv ("LABEL_STUDIO_HOST" , "http://localhost:8080" )
1532 LABEL_STUDIO_API_KEY = os .getenv ("LABEL_STUDIO_API_KEY" )
1633 START_TRAINING_EACH_N_UPDATES = int (os .getenv ("START_TRAINING_EACH_N_UPDATES" , 10 ))
1734 MODEL_DIR = os .getenv ("MODEL_DIR" , "." )
35+
36+ def setup (self ):
37+ self .set ("model_version" , f"{ self .__class__ .__name__ } -v0.0.1" )
38+
39+ # ------------------------------------------------------------------
40+ # Utility helpers
41+
1842 def _build_model (self , n_channels : int , n_labels : int ) -> models .Model :
1943 tf .keras .utils .set_random_seed (42 )
2044 model = models .Sequential (
@@ -34,24 +58,62 @@ def _build_model(self, n_channels: int, n_labels: int) -> models.Model:
3458 def _get_model (
3559 self , n_channels : int , n_labels : int , blank : bool = False
3660 ) -> models .Model :
61+ global _model
62+ if _model is not None and not blank :
63+ return _model
3764 model_path = os .path .join (self .MODEL_DIR , "model.keras" )
65+ if not blank and os .path .exists (model_path ):
3866 _model = models .load_model (model_path )
67+ else :
3968 _model = self ._build_model (n_channels , n_labels )
69+ return _model
70+
71+ def _get_labeling_params (self ) -> Dict :
72+ from_name , to_name , value = self .label_interface .get_first_tag_occurence (
73+ "TimeSeriesLabels" , "TimeSeries"
74+ )
75+ tag = self .label_interface .get_tag (from_name )
76+ labels = list (tag .labels )
77+ ts_tag = self .label_interface .get_tag (to_name )
78+ time_col = ts_tag .attr .get ("timeColumn" )
4079
80+ import xml .etree .ElementTree as ET
81+
82+ root = ET .fromstring (self .label_config )
83+ ts_elem = root .find (f".//TimeSeries[@name='{ to_name } ']" )
84+ channels = [ch .attrib ["column" ] for ch in ts_elem .findall ("Channel" )]
85+
86+ return {
4187 "from_name" : from_name ,
4288 "to_name" : to_name ,
4389 "value" : value ,
4490 "labels" : labels ,
4591 "time_col" : time_col ,
4692 "channels" : channels ,
93+ }
94+
95+ def _read_csv (self , task : Dict , path : str ) -> pd .DataFrame :
96+ csv_str = self .preload_task_data (task , path )
97+ return pd .read_csv (io .StringIO (csv_str ))
98+
4799 def _predict_task (self , task : Dict , model : models .Model , params : Dict ) -> Dict :
100+ df = self ._read_csv (task , task ["data" ][params ["value" ]])
48101 X = df [params ["channels" ]].values .reshape (- 1 , 1 , len (params ["channels" ]))
102+ if len (X ) == 0 :
103+ return {}
104+
49105 probs = model .predict (X , verbose = 0 )
106+ labels_idx = np .argmax (probs , axis = 1 )
50107 df ["pred_label" ] = [params ["labels" ][i ] for i in labels_idx ]
51108 df ["score" ] = probs [np .arange (len (probs )), labels_idx ]
52109
53110 segments = self ._group_rows (df , params ["time_col" ])
111+
112+ results = []
113+ avg_score = 0
114+ for seg in segments :
54115 score = float (np .mean (seg ["scores" ]))
116+ avg_score += score
55117 results .append (
56118 {
57119 "from_name" : params ["from_name" ],
@@ -66,104 +128,32 @@ def _predict_task(self, task: Dict, model: models.Model, params: Dict) -> Dict:
66128 "score" : score ,
67129 }
68130 )
69- "result": results,
70- "score": avg_score / len(results),
71- "model_version": self.get("model_version"),
72- label = row["pred_label"]
73- if current and current["label"] == label:
74- current["end"] = row[time_col]
75- current["scores"].append(row["score"])
76- "label": label,
77- "start": row[time_col],
78- "end": row[time_col],
79- "scores": [row["score"]],
80- df = self._read_csv(task, task["data"][params["value"]])
81- annotations = [a for a in task["annotations"] if a.get("result")]
82- for r in ann["result"]:
83- if r["from_name"] != params["from_name"]:
84- start = r["value"]["start"]
85- end = r["value"]["end"]
86- label = r["value"]["timeserieslabels"][0]
87- mask = (df[params["time_col"]] >= start) & (
88- df[params["time_col"]] <= end
89- seg = df.loc[mask, params["channels"]].values
90- def _save_model(self, model: models.Model) -> None:
91- model_path = os.path.join(self.MODEL_DIR, "model.keras")
92- model.save(model_path)
93-
94- def predict(self, tasks: List[Dict], context: Optional[Dict] = None, **kwargs) -> ModelResponse:
95- model = self._get_model(len(params["channels"]), len(params["labels"]))
96- return ModelResponse(predictions=predictions, model_version=self.get("model_version"))
97- ls = label_studio_sdk.Client(self.LABEL_STUDIO_HOST, self.LABEL_STUDIO_API_KEY)
98- if event not in ("ANNOTATION_CREATED", "ANNOTATION_UPDATED", "START_TRAINING"):
99- project_id = data["annotation"]["project"]
100- if len(tasks) % self.START_TRAINING_EACH_N_UPDATES != 0 and event != "START_TRAINING":
101- "Skip training: %s tasks are not multiple of %s",
102- len(tasks),
103- self.START_TRAINING_EACH_N_UPDATES,
104- )
105- label2idx = {l: i for i, l in enumerate(params["labels"])}
106-
107- logger.warning("No data collected for training")
108-
109- model = self._get_model(len(params["channels"]), len(params["labels"]), blank=True)
110- X_arr = np.array(X).reshape(-1, 1, len(params["channels"]))
111- y_arr = np.array(y)
112- model.fit(X_arr, y_arr, epochs=10, verbose=0)
113- _model = None
114- self._get_model(len(params["channels"]), len(params["labels"]))
115- results.append(
116- {
117- 'from_name': params['from_name'],
118- 'to_name': params['to_name'],
119- 'type': 'timeserieslabels',
120- 'value': {
121- 'start': seg['start'],
122- 'end': seg['end'],
123- 'instant': False,
124- 'timeserieslabels': [seg['label']],
125- },
126- 'score': score,
127- }
128- )
129131
130132 if not results :
131133 return {}
132134
133135 return {
134- ' result' : results,
135- ' score' : avg_score / len(results),
136- ' model_version' : self.get(' model_version' ),
136+ " result" : results ,
137+ " score" : avg_score / len (results ),
138+ " model_version" : self .get (" model_version" ),
137139 }
138140
139141 def _group_rows (self , df : pd .DataFrame , time_col : str ) -> List [Dict ]:
140- def _collect_samples(
141- self, tasks: List[Dict], params: Dict, label2idx: Dict[str, int]
142- ) -> Tuple[List, List]:
143- def predict(
144- self, tasks: List[Dict], context: Optional[Dict] = None, **kwargs
145- ) -> ModelResponse:
146- return ModelResponse(
147- predictions=predictions, model_version=self.get("model_version")
148- )
149- if (
150- len(tasks) % self.START_TRAINING_EACH_N_UPDATES != 0
151- and event != "START_TRAINING"
152- ):
153- model = self._get_model(
154- len(params["channels"]), len(params["labels"]), blank=True
155- )
156- if current and current['label'] == label:
157- current['end'] = row[time_col]
158- current['scores'].append(row['score'])
142+ segments = []
143+ current = None
144+ for _ , row in df .iterrows ():
145+ label = row ["pred_label" ]
146+ if current and current ["label" ] == label :
147+ current ["end" ] = row [time_col ]
148+ current ["scores" ].append (row ["score" ])
159149 else :
160- def _save_model(self, model: RandomForestClassifier) -> None :
150+ if current :
161151 segments .append (current )
162152 current = {
163- ' label' : label,
164- ' start' : row[time_col],
165- ' end' : row[time_col],
166- ' scores' : [row[' score' ]],
153+ " label" : label ,
154+ " start" : row [time_col ],
155+ " end" : row [time_col ],
156+ " scores" : [row [" score" ]],
167157 }
168158 if current :
169159 segments .append (current )
@@ -172,91 +162,78 @@ def _save_model(self, model: RandomForestClassifier) -> None:
172162 def _collect_samples (
173163 self , tasks : List [Dict ], params : Dict , label2idx : Dict [str , int ]
174164 ) -> Tuple [List , List ]:
175- """ Return feature matrix and label vector built from all labeled tasks ."""
176165 X , y = [], []
177166 for task in tasks :
178- df = self._read_csv(task, task[' data' ][params[' value' ]])
167+ df = self ._read_csv (task , task [" data" ][params [" value" ]])
179168 if df .empty :
180169 continue
181-
182- annotations = [a for a in task['annotations'] if a.get('result')]
183-
170+ annotations = [a for a in task ["annotations" ] if a .get ("result" )]
184171 for ann in annotations :
185- for r in ann[' result' ]:
186- if r[' from_name' ] != params[' from_name' ]:
172+ for r in ann [" result" ]:
173+ if r [" from_name" ] != params [" from_name" ]:
187174 continue
188- start = r[' value'][' start' ]
189- end = r[' value'][' end' ]
190- label = r[' value'][' timeserieslabels' ][0]
191- mask = (df[params[' time_col' ]] >= start) & (
192- df[params[' time_col' ]] <= end
175+ start = r [" value" ][ " start" ]
176+ end = r [" value" ][ " end" ]
177+ label = r [" value" ][ " timeserieslabels" ][0 ]
178+ mask = (df [params [" time_col" ]] >= start ) & (
179+ df [params [" time_col" ]] <= end
193180 )
194- seg = df.loc[mask, params[' channels' ]].values
181+ seg = df .loc [mask , params [" channels" ]].values
195182 X .extend (seg )
196183 y .extend ([label2idx [label ]] * len (seg ))
197184 return X , y
198185
199- def _save_model(self, model: LogisticRegression) -> None:
200- """ Persist trained model to disk ."""
186+ def _save_model (self , model : models .Model ) -> None :
201187 os .makedirs (self .MODEL_DIR , exist_ok = True )
202- model_path = os.path.join(self.MODEL_DIR, 'model.pkl')
203- with open(model_path, 'wb') as f:
204- pickle.dump(model, f)
188+ model_path = os .path .join (self .MODEL_DIR , "model.keras" )
189+ model .save (model_path )
205190
206191 def predict (
207192 self , tasks : List [Dict ], context : Optional [Dict ] = None , ** kwargs
208193 ) -> ModelResponse :
209- """ Return time series segments predicted for the given tasks ."""
210194 params = self ._get_labeling_params ()
211- model = self._get_model()
212- predictions = [
213- self._predict_task(task, model, params) for task in tasks
214- ]
215-
195+ model = self ._get_model (len (params ["channels" ]), len (params ["labels" ]))
196+ predictions = [self ._predict_task (task , model , params ) for task in tasks ]
216197 return ModelResponse (
217- predictions=predictions, model_version=self.get(' model_version' )
198+ predictions = predictions , model_version = self .get (" model_version" )
218199 )
219200
220201 def _get_tasks (self , project_id : int ) -> List [Dict ]:
221- """ Fetch labeled tasks from Label Studio ."""
222- ls = label_studio_sdk.Client(
223- self.LABEL_STUDIO_HOST, self.LABEL_STUDIO_API_KEY
224- )
202+ ls = label_studio_sdk .Client (self .LABEL_STUDIO_HOST , self .LABEL_STUDIO_API_KEY )
225203 project = ls .get_project (id = project_id )
226204 return project .get_labeled_tasks ()
227205
228206 def fit (self , event , data , ** kwargs ):
229- """ Train the model on all labeled segments ."""
230- if event not in (
231- 'ANNOTATION_CREATED' ,
232- 'ANNOTATION_UPDATED' ,
233- 'START_TRAINING' ,
234- ):
235- logger .info ('Skip training : event % s is not supported ', event )
207+ if event not in ("ANNOTATION_CREATED" , "ANNOTATION_UPDATED" , "START_TRAINING" ):
208+ logger .info ("Skip training: event %s is not supported" , event )
236209 return
237-
238- project_id = data ['annotation' ]['project' ]
210+ project_id = data ["annotation" ]["project" ]
239211 tasks = self ._get_tasks (project_id )
240212 if (
241213 len (tasks ) % self .START_TRAINING_EACH_N_UPDATES != 0
242- and event != ' START_TRAINING'
214+ and event != " START_TRAINING"
243215 ):
244216 logger .info (
245- f'Skip training: { len (tasks )} tasks are not multiple of { self .START_TRAINING_EACH_N_UPDATES } '
217+ "Skip training: %s tasks are not multiple of %s" ,
218+ len (tasks ),
219+ self .START_TRAINING_EACH_N_UPDATES ,
246220 )
247221 return
248-
249222 params = self ._get_labeling_params ()
250- label2idx = {l : i for i , l in enumerate (params [' labels' ])}
223+ label2idx = {l : i for i , l in enumerate (params [" labels" ])}
251224
252225 X , y = self ._collect_samples (tasks , params , label2idx )
253226 if not X :
254- logger .warning (' No data collected for training' )
227+ logger .warning (" No data collected for training" )
255228 return
256229
257- model = self ._get_model (blank = True )
258- model .fit (np .array (X ), np .array (y ))
230+ model = self ._get_model (
231+ len (params ["channels" ]), len (params ["labels" ]), blank = True
232+ )
233+ X_arr = np .array (X ).reshape (- 1 , 1 , len (params ["channels" ]))
234+ y_arr = np .array (y )
235+ model .fit (X_arr , y_arr , epochs = 10 , verbose = 0 )
259236 self ._save_model (model )
260237 global _model
261- _model = None # reload on next predict
262- self ._get_model ()
238+ _model = None
239+ self ._get_model (len ( params [ "channels" ]), len ( params [ "labels" ]) )
0 commit comments