Skip to content

Commit 040616a

Browse files
authored
chore: updated tracking for single scores (#1716)
1 parent cc31f65 commit 040616a

File tree

5 files changed

+244
-28
lines changed

5 files changed

+244
-28
lines changed

src/ragas/_analytics.py

Lines changed: 81 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,18 @@
33
import json
44
import logging
55
import os
6+
import time
67
import typing as t
78
import uuid
89
from functools import lru_cache, wraps
10+
from threading import Lock
11+
from typing import List
912

1013
import requests
1114
from appdirs import user_data_dir
1215
from pydantic import BaseModel, Field
1316

17+
from ragas._version import __version__
1418
from ragas.utils import get_debug_mode
1519

1620
if t.TYPE_CHECKING:
@@ -81,14 +85,15 @@ def get_userid() -> str:
8185
class BaseEvent(BaseModel):
8286
event_type: str
8387
user_id: str = Field(default_factory=get_userid)
88+
ragas_version: str = Field(default=__version__)
8489

8590

8691
class EvaluationEvent(BaseEvent):
8792
metrics: t.List[str]
88-
evaluation_mode: str
8993
num_rows: int
94+
evaluation_type: t.Literal["SINGLE_TURN", "MULTI_TURN"]
9095
language: str
91-
in_ci: bool
96+
event_type: str = "evaluation"
9297

9398

9499
class TestsetGenerationEvent(BaseEvent):
@@ -100,6 +105,76 @@ class TestsetGenerationEvent(BaseEvent):
100105
version: str = "3" # the version of testset generation pipeline
101106

102107

108+
class AnalyticsBatcher:
109+
def __init__(self, batch_size: int = 50, flush_interval: float = 120):
110+
"""
111+
Initialize an AnalyticsBatcher instance.
112+
113+
Args:
114+
batch_size (int, optional): Maximum number of events to batch before flushing. Defaults to 50.
115+
flush_interval (float, optional): Maximum time in seconds between flushes. Defaults to 5.
116+
"""
117+
self.buffer: List[EvaluationEvent] = []
118+
self.lock = Lock()
119+
self.last_flush_time = time.time()
120+
self.BATCH_SIZE = batch_size
121+
self.FLUSH_INTERVAL = flush_interval # seconds
122+
123+
def add_evaluation(self, evaluation_event: EvaluationEvent) -> None:
124+
with self.lock:
125+
self.buffer.append(evaluation_event)
126+
127+
if (
128+
len(self.buffer) >= self.BATCH_SIZE
129+
or (time.time() - self.last_flush_time) > self.FLUSH_INTERVAL
130+
):
131+
self.flush()
132+
133+
def _join_evaluation_events(
134+
self, events: List[EvaluationEvent]
135+
) -> List[EvaluationEvent]:
136+
"""
137+
Join multiple evaluation events into a single event and increase the num_rows.
138+
Group properties except for num_rows.
139+
"""
140+
if not events:
141+
return []
142+
143+
# Group events by their properties (except num_rows)
144+
grouped_events = {}
145+
for event in events:
146+
key = (
147+
event.event_type,
148+
tuple(event.metrics),
149+
event.evaluation_type,
150+
)
151+
if key not in grouped_events:
152+
grouped_events[key] = event
153+
else:
154+
grouped_events[key].num_rows += event.num_rows
155+
156+
# Convert grouped events back to a list
157+
return list(grouped_events.values())
158+
159+
def flush(self) -> None:
160+
# if no events to send, do nothing
161+
if not self.buffer:
162+
return
163+
164+
try:
165+
# join all the EvaluationEvents into a single event and send it
166+
events_to_send = self._join_evaluation_events(self.buffer)
167+
for event in events_to_send:
168+
track(event)
169+
except Exception as err:
170+
if _usage_event_debugging():
171+
logger.error("Tracking Error: %s", err, stack_info=True, stacklevel=3)
172+
finally:
173+
with self.lock:
174+
self.buffer = []
175+
self.last_flush_time = time.time()
176+
177+
103178
@silent
104179
def track(event_properties: BaseEvent):
105180
if do_not_track():
@@ -133,3 +208,7 @@ def wrapper(*args: P.args, **kwargs: P.kwargs) -> t.Any:
133208
return result
134209

135210
return wrapper
211+
212+
213+
# Create a global batcher instance
214+
_analytics_batcher = AnalyticsBatcher(batch_size=10, flush_interval=10)

src/ragas/evaluation.py

Lines changed: 2 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -2,13 +2,12 @@
22

33
import typing as t
44

5-
import numpy as np
65
from datasets import Dataset
76
from langchain_core.callbacks import BaseCallbackHandler, BaseCallbackManager
87
from langchain_core.embeddings import Embeddings as LangchainEmbeddings
98
from langchain_core.language_models import BaseLanguageModel as LangchainLLM
109

11-
from ragas._analytics import EvaluationEvent, track, track_was_completed
10+
from ragas._analytics import track_was_completed
1211
from ragas.callbacks import ChainType, RagasTracer, new_group
1312
from ragas.dataset_schema import (
1413
EvaluationDataset,
@@ -37,7 +36,7 @@
3736
is_reproducable,
3837
)
3938
from ragas.run_config import RunConfig
40-
from ragas.utils import convert_v1_to_v2_dataset, get_feature_language
39+
from ragas.utils import convert_v1_to_v2_dataset
4140
from ragas.validation import (
4241
remap_column_names,
4342
validate_required_columns,
@@ -351,18 +350,4 @@ def evaluate(
351350
for i in reproducable_metrics:
352351
metrics[i].reproducibility = 1 # type: ignore
353352

354-
# log the evaluation event
355-
metrics_names = [m.name for m in metrics]
356-
metric_lang = [get_feature_language(m) for m in metrics]
357-
metric_lang = np.unique([m for m in metric_lang if m is not None])
358-
track(
359-
EvaluationEvent(
360-
event_type="evaluation",
361-
metrics=metrics_names,
362-
evaluation_mode="",
363-
num_rows=len(dataset),
364-
language=metric_lang[0] if len(metric_lang) > 0 else "",
365-
in_ci=in_ci,
366-
)
367-
)
368353
return result

src/ragas/metrics/base.py

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,12 +10,18 @@
1010

1111
from pysbd import Segmenter
1212

13+
from ragas._analytics import EvaluationEvent, _analytics_batcher
1314
from ragas.callbacks import ChainType, new_group
1415
from ragas.dataset_schema import MultiTurnSample, SingleTurnSample
1516
from ragas.executor import is_event_loop_running
1617
from ragas.prompt import PromptMixin
1718
from ragas.run_config import RunConfig
18-
from ragas.utils import RAGAS_SUPPORTED_LANGUAGE_CODES, camel_to_snake, deprecated
19+
from ragas.utils import (
20+
RAGAS_SUPPORTED_LANGUAGE_CODES,
21+
camel_to_snake,
22+
deprecated,
23+
get_metric_language,
24+
)
1925

2026
if t.TYPE_CHECKING:
2127
from langchain_core.callbacks import Callbacks
@@ -286,6 +292,16 @@ def single_turn_score(
286292
else:
287293
if not group_cm.ended:
288294
rm.on_chain_end({"output": score})
295+
296+
# track the evaluation event
297+
_analytics_batcher.add_evaluation(
298+
EvaluationEvent(
299+
metrics=[self.name],
300+
num_rows=1,
301+
evaluation_type=MetricType.SINGLE_TURN.name,
302+
language=get_metric_language(self),
303+
)
304+
)
289305
return score
290306

291307
async def single_turn_ascore(
@@ -320,6 +336,16 @@ async def single_turn_ascore(
320336
else:
321337
if not group_cm.ended:
322338
rm.on_chain_end({"output": score})
339+
340+
# track the evaluation event
341+
_analytics_batcher.add_evaluation(
342+
EvaluationEvent(
343+
metrics=[self.name],
344+
num_rows=1,
345+
evaluation_type=MetricType.SINGLE_TURN.name,
346+
language=get_metric_language(self),
347+
)
348+
)
323349
return score
324350

325351
@abstractmethod
@@ -394,6 +420,16 @@ def multi_turn_score(
394420
else:
395421
if not group_cm.ended:
396422
rm.on_chain_end({"output": score})
423+
424+
# track the evaluation event
425+
_analytics_batcher.add_evaluation(
426+
EvaluationEvent(
427+
metrics=[self.name],
428+
num_rows=1,
429+
evaluation_type=MetricType.SINGLE_TURN.name,
430+
language=get_metric_language(self),
431+
)
432+
)
397433
return score
398434

399435
async def multi_turn_ascore(
@@ -428,6 +464,17 @@ async def multi_turn_ascore(
428464
else:
429465
if not group_cm.ended:
430466
rm.on_chain_end({"output": score})
467+
468+
# track the evaluation event
469+
_analytics_batcher.add_evaluation(
470+
EvaluationEvent(
471+
metrics=[self.name],
472+
num_rows=1,
473+
evaluation_type=MetricType.SINGLE_TURN.name,
474+
language=get_metric_language(self),
475+
)
476+
)
477+
431478
return score
432479

433480
@abstractmethod

src/ragas/utils.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -85,15 +85,15 @@ def is_nan(x):
8585
return False
8686

8787

88-
def get_feature_language(feature: Metric) -> t.Optional[str]:
88+
def get_metric_language(metric: Metric) -> str:
8989
from ragas.prompt import BasePrompt
9090

9191
languags = [
9292
value.language
93-
for _, value in vars(feature).items()
93+
for _, value in vars(metric).items()
9494
if isinstance(value, BasePrompt)
9595
]
96-
return languags[0] if len(languags) > 0 else None
96+
return languags[0] if len(languags) > 0 else ""
9797

9898

9999
def deprecated(

0 commit comments

Comments
 (0)