Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/IntegrationTest.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ jobs:
python -m pip install --upgrade pip
pip install pytest
if [ -f requirements/runtime.txt ]; then pip install -r requirements/runtime.txt; fi
pip install pyspark
pip install -e .

- name: Integration Test(local plaintext)
Expand Down
18 changes: 12 additions & 6 deletions dingo/exec/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,15 +113,18 @@ def execute(self) -> SummaryModel:
for field_key, eval_detail_list in result_info.eval_details.items():
if field_key not in self.summary.type_ratio:
self.summary.type_ratio[field_key] = {}
# 遍历 List[EvalDetail]

# 遍历 List[EvalDetail],同时收集指标分数和标签
for eval_detail in eval_detail_list:
# 获取label列表
# 收集指标分数
if eval_detail.score is not None and eval_detail.metric:
self.summary.add_metric_score(eval_detail.metric, eval_detail.score)

# 收集标签统计
label_list = eval_detail.label if eval_detail.label else []
for label in label_list:
if label not in self.summary.type_ratio[field_key]:
self.summary.type_ratio[field_key][label] = 1
else:
self.summary.type_ratio[field_key][label] += 1
self.summary.type_ratio[field_key].setdefault(label, 0)
self.summary.type_ratio[field_key][label] += 1

if result_info.eval_status:
self.summary.num_bad += 1
Expand Down Expand Up @@ -238,6 +241,9 @@ def summarize(self, summary: SummaryModel) -> SummaryModel:
new_summary.type_ratio[field_name][eval_details] / new_summary.total, 6
)

# 计算指标分数的平均值、最小值、最大值、标准差等
new_summary.calculate_metrics_score_averages()

new_summary.finish_time = time.strftime("%Y%m%d_%H%M%S", time.localtime())
return new_summary

Expand Down
117 changes: 74 additions & 43 deletions dingo/exec/spark.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,63 @@ def load_data(self) -> RDD:
"""Load and return the RDD data."""
return self.spark_rdd

@staticmethod
def _aggregate_eval_details(acc, item):
"""聚合单个 item 的 eval_details 到累加器中,同时收集 scores"""
eval_details_dict = item.get('eval_details', {})

# 遍历第一层:字段名,第二层是 List[EvalDetail] (序列化为 list of dicts)
for field_key, eval_detail_list in eval_details_dict.items():
# 初始化字段的统计数据
if field_key not in acc['label_counts']:
acc['label_counts'][field_key] = {}

# 遍历 List[EvalDetail]
for eval_detail in eval_detail_list:
# 收集指标分数(用于RAG等评估场景)
score = eval_detail.get('score') if isinstance(eval_detail, dict) else getattr(eval_detail, 'score', None)
metric = eval_detail.get('metric') if isinstance(eval_detail, dict) else getattr(eval_detail, 'metric', None)

if score is not None and metric:
if metric not in acc['metric_scores']:
acc['metric_scores'][metric] = []
acc['metric_scores'][metric].append(score)

# 收集标签统计
label_list = eval_detail.get('label', []) if isinstance(eval_detail, dict) else getattr(eval_detail, 'label', [])
if label_list:
# 统计每个 label 的出现次数
for label in label_list:
if label not in acc['label_counts'][field_key]:
acc['label_counts'][field_key][label] = 1
else:
acc['label_counts'][field_key][label] += 1

return acc

@staticmethod
def _merge_eval_details(acc1, acc2):
"""合并两个累加器"""
# 合并 label 统计
for field_key, label_dict in acc2['label_counts'].items():
if field_key not in acc1['label_counts']:
acc1['label_counts'][field_key] = label_dict.copy()
else:
for label, count in label_dict.items():
if label not in acc1['label_counts'][field_key]:
acc1['label_counts'][field_key][label] = count
else:
acc1['label_counts'][field_key][label] += count

# 合并 metric scores
for metric, scores in acc2['metric_scores'].items():
if metric not in acc1['metric_scores']:
acc1['metric_scores'][metric] = scores.copy()
else:
acc1['metric_scores'][metric].extend(scores)

return acc1

def execute(self) -> SummaryModel:
"""Main execution method for Spark evaluation."""
create_time = time.strftime("%Y%m%d_%H%M%S", time.localtime())
Expand Down Expand Up @@ -211,59 +268,25 @@ def summarize(self, summary: SummaryModel) -> SummaryModel:

统计所有评估结果中每个字段下每个 label 的出现次数,
然后除以总数得到比例,填充到 summary.type_ratio 中。
同时收集指标分数用于统计。
"""
new_summary = copy.deepcopy(summary)
if new_summary.total == 0:
return new_summary

# 使用 Spark 聚合操作统计 eval_details
# 使用 Spark 聚合操作统计 eval_details 和收集 scores
# data_info_list 的每个元素是 Dict,包含 eval_details 字段
def aggregate_eval_detailss(acc, item):
"""聚合单个 item 的 eval_details 到累加器中"""
eval_details_dict = item.get('eval_details', {})

# 遍历第一层:字段名,第二层是 List[EvalDetail] (序列化为 list of dicts)
for field_key, eval_detail_list in eval_details_dict.items():
if field_key not in acc:
acc[field_key] = {}

# 遍历 List[EvalDetail]
for eval_detail in eval_detail_list:
# 从 EvalDetail 的 label 列表中获取错误类型
label_list = eval_detail.get('label', []) if isinstance(eval_detail, dict) else eval_detail.label
if label_list:
# 统计每个 label 的出现次数
for label in label_list:
if label not in acc[field_key]:
acc[field_key][label] = 1
else:
acc[field_key][label] += 1

return acc

def merge_eval_detailss(acc1, acc2):
"""合并两个累加器"""
for field_key, label_dict in acc2.items():
if field_key not in acc1:
acc1[field_key] = label_dict.copy()
else:
for label, count in label_dict.items():
if label not in acc1[field_key]:
acc1[field_key][label] = count
else:
acc1[field_key][label] += count
return acc1

# 使用 aggregate 聚合所有 eval_details
# data_info_list 在 execute 中已经被 persist() 并保存为实例变量
if hasattr(self, 'data_info_list') and self.data_info_list:
type_ratio_counts = self.data_info_list.aggregate(
{}, # 初始累加器
aggregate_eval_detailss, # 聚合单个元素
merge_eval_detailss # 合并累加器
aggregated_results = self.data_info_list.aggregate(
{'label_counts': {}, 'metric_scores': {}}, # 初始累加器
SparkExecutor._aggregate_eval_details, # 聚合单个元素
SparkExecutor._merge_eval_details # 合并累加器
)
type_ratio_counts = aggregated_results['label_counts']
metric_scores = aggregated_results['metric_scores']
else:
type_ratio_counts = {}
metric_scores = {}

# 将计数转换为比例
new_summary.type_ratio = {}
Expand All @@ -274,6 +297,14 @@ def merge_eval_detailss(acc1, acc2):
type_ratio_counts[field_name][eval_details] / new_summary.total, 6
)

# 添加收集到的 metric scores 到 summary
for metric_name, scores in metric_scores.items():
for score in scores:
new_summary.add_metric_score(metric_name, score)

# 计算 metrics 的平均分等统计信息
new_summary.calculate_metrics_score_averages()

new_summary.finish_time = time.strftime("%Y%m%d_%H%M%S", time.localtime())
return new_summary

Expand Down
86 changes: 84 additions & 2 deletions dingo/io/output/summary_model.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Dict
from typing import Any, Dict, List

from pydantic import BaseModel, Field

Expand All @@ -17,8 +17,80 @@ class SummaryModel(BaseModel):
total: int = 0
type_ratio: Dict[str, Dict[str, int]] = {}

def to_dict(self):
# 新增:指标分数统计(用于RAG等评估场景)
metrics_score_stats: Dict[str, Dict[str, Any]] = Field(default_factory=dict)

def add_metric_score(self, metric_name: str, score: float):
"""
添加指标分数到统计中

Args:
metric_name: 指标名称(如 LLMRAGFaithfulness)
score: 分数值
"""
if metric_name not in self.metrics_score_stats:
self.metrics_score_stats[metric_name] = {
'scores': [],
'score_average': 0.0,
'score_count': 0,
'score_min': None,
'score_max': None
}

self.metrics_score_stats[metric_name]['scores'].append(score)
self.metrics_score_stats[metric_name]['score_count'] += 1

def calculate_metrics_score_averages(self):
"""
计算所有指标分数的平均值、最小值、最大值、标准差

注意:为保证精度,先计算未四舍五入的平均值用于方差计算,
最后再对平均值和标准差进行四舍五入
"""
for metric_name, stats in self.metrics_score_stats.items():
scores = stats['scores']
if scores:
# 先计算未四舍五入的平均值(用于方差计算)
mean = sum(scores) / len(scores)
stats['score_average'] = round(mean, 2)
stats['score_min'] = round(min(scores), 2)
stats['score_max'] = round(max(scores), 2)
# 计算标准差(使用未四舍五入的 mean)
if len(scores) > 1:
variance = sum((x - mean) ** 2 for x in scores) / len(scores)
stats['score_std_dev'] = round(variance ** 0.5, 2)
# 清理scores列表以减少存储空间(保留统计信息即可)
del stats['scores']

def get_metrics_score_summary(self) -> Dict[str, float]:
"""
获取指标分数汇总(只包含平均值)

Returns:
指标名称到平均分数的映射
"""
return {
metric_name: stats.get('score_average', 0.0)
for metric_name, stats in self.metrics_score_stats.items()
}

def get_metrics_score_overall_average(self) -> float:
"""
计算所有指标分数的总平均分

注意:包含所有指标(即使平均分为 0),因为 0 分也是一个重要的评估信号

Returns:
总平均分
"""
averages = [
stats.get('score_average', 0.0)
for stats in self.metrics_score_stats.values()
]
return round(sum(averages) / len(averages), 2) if averages else 0.0

def to_dict(self):
result = {
'task_id': self.task_id,
'task_name': self.task_name,
# 'eval_group': self.eval_group,
Expand All @@ -32,3 +104,13 @@ def to_dict(self):
'total': self.total,
'type_ratio': self.type_ratio,
}

# 如果有指标分数统计,以层级结构添加到输出中
if self.metrics_score_stats:
result['metrics_score'] = {
'stats': self.metrics_score_stats,
'summary': self.get_metrics_score_summary(),
'overall_average': self.get_metrics_score_overall_average()
}

return result
Loading