Skip to content

Commit 4d19bd1

Browse files
ZanSaramasci
andauthored
refactor: consolidate telemetry events (#4275)
* add specific Ray event * group evaluation and training events * consolidate pipeline run events * fix send_event import * review feedback * typo * send uptime * track embeddingRetriever openai encoder * track embeddingRetriever openai encoder * pylitn --------- Co-authored-by: Massimiliano Pippi <[email protected]>
1 parent 4624844 commit 4d19bd1

File tree

10 files changed

+79
-40
lines changed

10 files changed

+79
-40
lines changed

haystack/modeling/evaluation/eval.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,9 @@ def eval(
5858
:return: all_results: A list of dictionaries, one for each prediction head. Each dictionary contains the metrics
5959
and reports generated during evaluation.
6060
"""
61-
send_event("Evaluator.eval()")
61+
send_event(
62+
event_name="Evaluation", event_properties={"class": self.__class__.__name__, "function_name": "eval"}
63+
)
6264
model.prediction_heads[0].use_confidence_scores_for_ranking = use_confidence_scores_for_ranking
6365
model.prediction_heads[0].use_no_answer_legacy_confidence = use_no_answer_legacy_confidence
6466
model.eval()

haystack/modeling/training/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
from haystack.modeling.utils import GracefulKiller
2323
from haystack.utils.experiment_tracking import Tracker as tracker
2424
from haystack.utils.early_stopping import EarlyStopping
25-
from haystack.telemetry import send_event
25+
from haystack.telemetry_2 import send_event
2626

2727

2828
logger = logging.getLogger(__name__)
@@ -164,7 +164,7 @@ def train(self):
164164
:return: Returns the model after training. When you do ``early_stopping``
165165
with a ``save_dir`` the best model is loaded and returned.
166166
"""
167-
send_event("Trainer.train()")
167+
send_event(event_name="Training", event_properties={"class": self.__class__.__name__, "function_name": "train"})
168168
# connect the prediction heads with the right output from processor
169169
self.model.connect_heads_with_processor(self.data_silo.processor.tasks, require_labels=True)
170170
# Check that the tokenizer(s) fits the language model(s)

haystack/nodes/reader/farm.py

Lines changed: 16 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -435,7 +435,7 @@ def train(
435435
:param max_query_length: Maximum length of the question in number of tokens.
436436
:return: None
437437
"""
438-
send_event("FARMReader.train()")
438+
send_event(event_name="Training", event_properties={"class": self.__class__.__name__, "function_name": "train"})
439439
return self._training_procedure(
440440
data_dir=data_dir,
441441
train_filename=train_filename,
@@ -557,7 +557,10 @@ def distil_prediction_layer_from(
557557
:param early_stopping: An initialized EarlyStopping object to control early stopping and saving of the best models.
558558
:return: None
559559
"""
560-
send_event("FARMReader.distil_prediction_layer_from()")
560+
send_event(
561+
event_name="Training",
562+
event_properties={"class": self.__class__.__name__, "function_name": "distil_prediction_layer_from"},
563+
)
561564
return self._training_procedure(
562565
data_dir=data_dir,
563566
train_filename=train_filename,
@@ -680,7 +683,10 @@ def distil_intermediate_layers_from(
680683
:param early_stopping: An initialized EarlyStopping object to control early stopping and saving of the best models.
681684
:return: None
682685
"""
683-
send_event("FARMReader.distil_intermediate_layers_from()")
686+
send_event(
687+
event_name="Training",
688+
event_properties={"class": self.__class__.__name__, "function_name": "distil_intermediate_layers_from"},
689+
)
684690
return self._training_procedure(
685691
data_dir=data_dir,
686692
train_filename=train_filename,
@@ -942,7 +948,10 @@ def eval_on_file(
942948
"Hence, results might slightly differ from those of `Pipeline.eval()`\n."
943949
"If you are just about starting to evaluate your model consider using `Pipeline.eval()` instead."
944950
)
945-
send_event("FARMReader.eval_on_file()")
951+
send_event(
952+
event_name="Evaluation",
953+
event_properties={"class": self.__class__.__name__, "function_name": "eval_on_file"},
954+
)
946955
if device is None:
947956
device = self.devices[0]
948957
else:
@@ -1020,7 +1029,9 @@ def eval(
10201029
"Hence, results might slightly differ from those of `Pipeline.eval()`\n."
10211030
"If you are just about starting to evaluate your model consider using `Pipeline.eval()` instead."
10221031
)
1023-
send_event("FARMReader.eval()")
1032+
send_event(
1033+
event_name="Evaluation", event_properties={"class": self.__class__.__name__, "function_name": "eval"}
1034+
)
10241035
if device is None:
10251036
device = self.devices[0]
10261037
else:

haystack/nodes/retriever/_embedding_encoder.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ def train(
200200
reference the Sentence-Transformers [documentation](https://www.sbert.net/docs/training/overview.html#sentence_transformers.SentenceTransformer.fit)
201201
for a full list of keyword arguments.
202202
"""
203-
send_event("SentenceTransformersEmbeddingEncoder.train()")
203+
send_event(event_name="Training", event_properties={"class": self.__class__.__name__, "function_name": "train"})
204204

205205
if train_loss not in _TRAINING_LOSSES:
206206
raise ValueError(f"Unrecognized train_loss {train_loss}. Should be one of: {_TRAINING_LOSSES.keys()}")

haystack/nodes/retriever/_openai_encoder.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212
from haystack.nodes.retriever._base_embedding_encoder import _BaseEmbeddingEncoder
1313
from haystack.schema import Document
1414
from haystack.utils.openai_utils import USE_TIKTOKEN, count_openai_tokens, load_openai_tokenizer, openai_request
15+
from haystack.telemetry_2 import send_event
1516

1617
if TYPE_CHECKING:
1718
from haystack.nodes.retriever import EmbeddingRetriever
@@ -23,6 +24,7 @@
2324

2425
class _OpenAIEmbeddingEncoder(_BaseEmbeddingEncoder):
2526
def __init__(self, retriever: "EmbeddingRetriever"):
27+
send_event("OpenAIEmbeddingEncoder initialized", event_properties={"model": retriever.embedding_model})
2628
# See https://platform.openai.com/docs/guides/embeddings and
2729
# https://learn.microsoft.com/en-us/azure/cognitive-services/openai/how-to/embeddings?tabs=console for more details
2830
self.using_azure = (

haystack/nodes/retriever/base.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,9 @@ def eval(
154154
contains the keys "predictions" and "metrics".
155155
:param headers: Custom HTTP headers to pass to document store client if supported (e.g. {'Authorization': 'Basic YWRtaW46cm9vdA=='} for basic authentication)
156156
"""
157-
send_event("BaseRetriever.eval()")
157+
send_event(
158+
event_name="Evaluation", event_properties={"class": self.__class__.__name__, "function_name": "eval"}
159+
)
158160
# Extract all questions for evaluation
159161
filters: Dict = {"origin": [label_origin]}
160162

haystack/nodes/retriever/dense.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -655,7 +655,7 @@ def train(
655655
Checkpoints can be stored via setting `checkpoint_every` to a custom number of steps.
656656
If any checkpoints are stored, a subsequent run of train() will resume training from the latest available checkpoint.
657657
"""
658-
send_event("DensePassageRetriever.train()")
658+
send_event(event_name="Training", event_properties={"class": self.__class__.__name__, "function_name": "train"})
659659
self.processor.embed_title = embed_title
660660
self.processor.data_dir = Path(data_dir)
661661
self.processor.train_filename = train_filename
@@ -1307,7 +1307,7 @@ def train(
13071307
:param checkpoints_to_keep: The maximum number of train checkpoints to save.
13081308
:param early_stopping: An initialized EarlyStopping object to control early stopping and saving of the best models.
13091309
"""
1310-
send_event("TableTextRetriever.train()")
1310+
send_event(event_name="Training", event_properties={"class": self.__class__.__name__, "function_name": "train"})
13111311
if embed_meta_fields is None:
13121312
embed_meta_fields = ["page_title", "section_title", "caption"]
13131313

@@ -1923,7 +1923,7 @@ def train(
19231923
reference the Sentence-Transformers [documentation](https://www.sbert.net/docs/training/overview.html#sentence_transformers.SentenceTransformer.fit)
19241924
for a full list of keyword arguments.
19251925
"""
1926-
send_event("EmbeddingRetriever.train()")
1926+
send_event(event_name="Training", event_properties={"class": self.__class__.__name__, "function_name": "train"})
19271927
self.embedding_encoder.train(
19281928
training_data,
19291929
learning_rate=learning_rate,

haystack/pipelines/base.py

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ def __init__(self):
8383
self.run_total = 0
8484
self.sent_event_in_window = False
8585
self.yaml_hash = False
86+
self.last_run = None
8687

8788
@property
8889
def root_node(self) -> Optional[str]:
@@ -496,7 +497,8 @@ def run( # type: ignore
496497
"""
497498
send_pipeline_run_event(
498499
pipeline=self,
499-
event_name="Pipeline.run()",
500+
classname=self.__class__.__name__,
501+
function_name="run",
500502
query=query,
501503
file_paths=file_paths,
502504
labels=labels,
@@ -647,7 +649,8 @@ def run_batch( # type: ignore
647649
"""
648650
send_pipeline_run_event(
649651
pipeline=self,
650-
event_name="Pipeline.run_batch()",
652+
classname=self.__class__.__name__,
653+
function_name="run_batch",
651654
queries=queries,
652655
file_paths=file_paths,
653656
labels=labels,
@@ -813,7 +816,7 @@ def eval_beir(
813816
Each metric is represented by a dictionary containing the scores for each top_k value.
814817
"""
815818
send_event_2(
816-
event_name="Pipeline.eval_beir()",
819+
event_name=f"{cls.__name__}.eval_beir()",
817820
event_properties={
818821
"dataset": dataset,
819822
"index_pipeline": index_pipeline.yaml_hash,
@@ -1261,7 +1264,7 @@ def eval(
12611264
Additional information can be found here
12621265
https://huggingface.co/transformers/main_classes/model.html#transformers.PreTrainedModel.from_pretrained
12631266
"""
1264-
send_pipeline_event(pipeline=self, event_name="Pipeline.eval()")
1267+
send_pipeline_event(pipeline=self, event_name="Evaluation", event_properties={"function_name": "eval"})
12651268

12661269
eval_result = EvaluationResult()
12671270
if add_isolated_node_eval:
@@ -1380,7 +1383,7 @@ def eval_batch(
13801383
Additional information can be found here
13811384
https://huggingface.co/transformers/main_classes/model.html#transformers.PreTrainedModel.from_pretrained
13821385
"""
1383-
send_pipeline_event(pipeline=self, event_name="Pipeline.eval_batch()")
1386+
send_pipeline_event(pipeline=self, event_name=f"{self.__class__.__name__}.eval_batch()")
13841387

13851388
eval_result = EvaluationResult()
13861389
if add_isolated_node_eval:

haystack/pipelines/ray.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
from haystack.nodes.base import BaseComponent, RootNode
2424
from haystack.pipelines.base import Pipeline
2525
from haystack.schema import Document, MultiLabel
26+
from haystack.telemetry_2 import send_pipeline_run_event
2627

2728

2829
logger = logging.getLogger(__name__)
@@ -311,6 +312,19 @@ async def run_async( # type: ignore
311312
about their execution. By default, this information includes the input parameters
312313
the Nodes received and the output they generated. You can then find all debug information in the dictionary returned by this method under the key `_debug`.
313314
"""
315+
send_pipeline_run_event(
316+
pipeline=self,
317+
classname=self.__class__.__name__,
318+
function_name="run_async",
319+
query=query,
320+
file_paths=file_paths,
321+
labels=labels,
322+
documents=documents,
323+
meta=meta,
324+
params=params,
325+
debug=debug,
326+
)
327+
314328
# validate the node names
315329
self._validate_node_names_in_params(params=params)
316330

haystack/telemetry_2.py

Lines changed: 27 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44
import logging
55
from pathlib import Path
66
import json
7-
import yaml
7+
import datetime
88

9+
import yaml
910
import posthog
1011

1112
from haystack.environment import collect_static_system_specs, collect_dynamic_system_specs
@@ -103,7 +104,8 @@ def send_event(self, event_name: str, event_properties: Optional[Dict[str, Any]]
103104

104105

105106
def send_pipeline_run_event( # type: ignore
106-
event_name: str,
107+
classname: str,
108+
function_name: str,
107109
pipeline: "Pipeline", # type: ignore
108110
query: Optional[str] = None,
109111
queries: Optional[List[str]] = None,
@@ -117,7 +119,8 @@ def send_pipeline_run_event( # type: ignore
117119
"""
118120
Sends a telemetry event about the execution of a pipeline, if telemetry is enabled.
119121
120-
:param event_name: The name of the event to show in PostHog.
122+
:param classname: The name of the Pipeline class (Pipeline, RayPipeline, ...)
123+
:param function_name: The name of the function that was invoked (run, run_batch, async_run, ...).
121124
:param pipeline: the pipeline that is running
122125
:param query: the value of the `query` input of the pipeline, if any
123126
:param queries: the value of the `queries` input of the pipeline, if any
@@ -130,15 +133,18 @@ def send_pipeline_run_event( # type: ignore
130133
"""
131134
try:
132135
if telemetry:
133-
event_properties: Dict[str, Optional[Union[str, bool, int, Dict[str, Any]]]] = {}
136+
event_properties: Dict[str, Optional[Union[str, bool, int, Dict[str, Any]]]] = {
137+
"class": classname,
138+
"function_name": function_name,
139+
}
134140

135141
# Check if it's the public demo
136142
exec_context = os.environ.get(HAYSTACK_EXECUTION_CONTEXT, "")
137143
if exec_context == "public_demo":
138144
event_properties["pipeline.is_public_demo"] = True
139145
event_properties["pipeline.run_parameters.query"] = query
140146
event_properties["pipeline.run_parameters.params"] = params
141-
telemetry.send_event(event_name=event_name, event_properties=event_properties)
147+
telemetry.send_event(event_name=function_name, event_properties=event_properties)
142148
return
143149

144150
# Collect pipeline profile
@@ -185,26 +191,35 @@ def send_pipeline_run_event( # type: ignore
185191
event_properties["pipeline.run_parameters.params"] = bool(params)
186192
event_properties["pipeline.run_parameters.debug"] = bool(debug)
187193

188-
telemetry.send_event(event_name=event_name, event_properties=event_properties)
194+
telemetry.send_event(event_name="Pipeline run", event_properties=event_properties)
189195
except Exception as e:
190196
# Never let telemetry break things
191-
logger.debug("There was an issue sending a %s telemetry event", event_name, exc_info=e)
197+
logger.debug("There was an issue sending a '%s' telemetry event", function_name, exc_info=e)
192198

193199

194-
def send_pipeline_event(pipeline: "Pipeline", event_name: str): # type: ignore
200+
def send_pipeline_event(pipeline: "Pipeline", event_name: str, event_properties: Optional[Dict[str, Any]] = None): # type: ignore
195201
"""
196202
Send a telemetry event related to a pipeline which is not a call to run(), if telemetry is enabled.
197203
"""
198204
try:
199205
if telemetry:
200-
telemetry.send_event(
201-
event_name=event_name,
202-
event_properties={
206+
if not event_properties:
207+
event_properties = {}
208+
event_properties.update(
209+
{
203210
"pipeline.classname": pipeline.__class__.__name__,
204211
"pipeline.fingerprint": pipeline.fingerprint,
205212
"pipeline.yaml_hash": pipeline.yaml_hash,
206-
},
213+
}
207214
)
215+
now = datetime.datetime.now()
216+
if pipeline.last_run:
217+
event_properties["pipeline.since_last_run"] = (now - pipeline.last_run).total_seconds()
218+
else:
219+
event_properties["pipeline.since_last_run"] = 0
220+
pipeline.last_run = now
221+
222+
telemetry.send_event(event_name=event_name, event_properties=event_properties)
208223
except Exception as e:
209224
# Never let telemetry break things
210225
logger.debug("There was an issue sending a '%s' telemetry event", event_name, exc_info=e)
@@ -222,16 +237,6 @@ def send_event(event_name: str, event_properties: Optional[Dict[str, Any]] = Non
222237
logger.debug("There was an issue sending a '%s' telemetry event", event_name, exc_info=e)
223238

224239

225-
def _serializer(obj):
226-
"""
227-
Small function used to build pipeline fingerprints and safely serialize any object.
228-
"""
229-
try:
230-
return str(obj)
231-
except:
232-
return "~ non serializable object ~"
233-
234-
235240
if os.environ.get("HAYSTACK_TELEMETRY_VERSION", "2") == "2":
236241
telemetry = Telemetry()
237242
else:

0 commit comments

Comments
 (0)