Skip to content

Fix file handle leaks in azure-ai-evaluation when exceptions occur during evaluation #42480

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 2 commits into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ class __ValidatedData(TypedDict):
target_run: Optional[BatchClientRun]
batch_run_client: BatchClient
batch_run_data: Union[str, os.PathLike, pd.DataFrame]
temp_files_to_cleanup: List[str]


def _aggregate_other_metrics(df: pd.DataFrame) -> Tuple[List[str], Dict[str, float]]:
Expand Down Expand Up @@ -888,100 +889,113 @@ def _evaluate( # pylint: disable=too-many-locals,too-many-statements
**kwargs,
)

# extract relevant info from validated data
column_mapping = validated_data["column_mapping"]
evaluators = validated_data["evaluators"]
graders = validated_data["graders"]
input_data_df = validated_data["input_data_df"]
results_df = pd.DataFrame()
metrics: Dict[str, float] = {}
eval_run_info_list: List[OAIEvalRunCreationInfo] = []

# Start OAI eval runs if any graders are present.
need_oai_run = len(graders) > 0
need_local_run = len(evaluators) > 0
need_get_oai_results = False
got_local_results = False
if need_oai_run:
try:
aoi_name = evaluation_name if evaluation_name else DEFAULT_OAI_EVAL_RUN_NAME
eval_run_info_list = _begin_aoai_evaluation(graders, column_mapping, input_data_df, aoi_name)
need_get_oai_results = len(eval_run_info_list) > 0
except EvaluationException as e:
if need_local_run:
# If there are normal evaluators, don't stop execution and try to run
# those.
LOGGER.warning(
"Remote Azure Open AI grader evaluations failed during run creation."
+ " Continuing with local evaluators."
)
LOGGER.warning(e)
else:
raise e
# Extract temporary files to ensure cleanup
temp_files_to_cleanup = validated_data["temp_files_to_cleanup"]

# Evaluate 'normal' evaluators. This includes built-in evaluators and any user-supplied callables.
if need_local_run:
try:
eval_result_df, eval_metrics, per_evaluator_results = _run_callable_evaluators(
validated_data=validated_data, fail_on_evaluator_errors=fail_on_evaluator_errors
)
results_df = eval_result_df
metrics = eval_metrics
got_local_results = True
# TODO figure out how to update this printing to include OAI results?
_print_summary(per_evaluator_results)
except EvaluationException as e:
if need_get_oai_results:
# If there are OAI graders, we only print a warning on local failures.
LOGGER.warning("Local evaluations failed. Will still attempt to retrieve online grader results.")
LOGGER.warning(e)
else:
raise e

# Retrieve OAI eval run results if needed.
if need_get_oai_results:
try:
aoai_results, aoai_metrics = _get_evaluation_run_results(eval_run_info_list) # type: ignore
# Post build TODO: add equivalent of _print_summary(per_evaluator_results) here

# Combine results if both evaluators and graders are present
if len(evaluators) > 0:
results_df = pd.concat([results_df, aoai_results], axis=1)
metrics.update(aoai_metrics)
else:
# Otherwise combine aoai results with input data df to include input columns in outputs.
results_df = pd.concat([input_data_df, aoai_results], axis=1)
metrics = aoai_metrics
except EvaluationException as e:
if got_local_results:
# If there are local eval results, we only print a warning on OAI failure.
LOGGER.warning("Remote Azure Open AI grader evaluations failed. Still returning local results.")
LOGGER.warning(e)
else:
raise e

# Done with all evaluations, message outputs into final forms, and log results if needed.
name_map = _map_names_to_builtins(evaluators, graders)
if is_onedp_project(azure_ai_project):
studio_url = _log_metrics_and_instance_results_onedp(
metrics, results_df, azure_ai_project, evaluation_name, name_map, tags=tags, **kwargs
)
else:
# Since tracing is disabled, pass None for target_run so a dummy evaluation run will be created each time.
trace_destination = _trace_destination_from_project_scope(azure_ai_project) if azure_ai_project else None
studio_url = None
if trace_destination:
studio_url = _log_metrics_and_instance_results(
metrics, results_df, trace_destination, None, evaluation_name, name_map, tags=tags, **kwargs
try:
# extract relevant info from validated data
column_mapping = validated_data["column_mapping"]
evaluators = validated_data["evaluators"]
graders = validated_data["graders"]
input_data_df = validated_data["input_data_df"]
results_df = pd.DataFrame()
metrics: Dict[str, float] = {}
eval_run_info_list: List[OAIEvalRunCreationInfo] = []

# Start OAI eval runs if any graders are present.
need_oai_run = len(graders) > 0
need_local_run = len(evaluators) > 0
need_get_oai_results = False
got_local_results = False
if need_oai_run:
try:
aoi_name = evaluation_name if evaluation_name else DEFAULT_OAI_EVAL_RUN_NAME
eval_run_info_list = _begin_aoai_evaluation(graders, column_mapping, input_data_df, aoi_name)
need_get_oai_results = len(eval_run_info_list) > 0
except EvaluationException as e:
if need_local_run:
# If there are normal evaluators, don't stop execution and try to run
# those.
LOGGER.warning(
"Remote Azure Open AI grader evaluations failed during run creation."
+ " Continuing with local evaluators."
)
LOGGER.warning(e)
else:
raise e

# Evaluate 'normal' evaluators. This includes built-in evaluators and any user-supplied callables.
if need_local_run:
try:
eval_result_df, eval_metrics, per_evaluator_results = _run_callable_evaluators(
validated_data=validated_data, fail_on_evaluator_errors=fail_on_evaluator_errors
)
results_df = eval_result_df
metrics = eval_metrics
got_local_results = True
# TODO figure out how to update this printing to include OAI results?
_print_summary(per_evaluator_results)
except EvaluationException as e:
if need_get_oai_results:
# If there are OAI graders, we only print a warning on local failures.
LOGGER.warning("Local evaluations failed. Will still attempt to retrieve online grader results.")
LOGGER.warning(e)
else:
raise e

# Retrieve OAI eval run results if needed.
if need_get_oai_results:
try:
aoai_results, aoai_metrics = _get_evaluation_run_results(eval_run_info_list) # type: ignore
# Post build TODO: add equivalent of _print_summary(per_evaluator_results) here

# Combine results if both evaluators and graders are present
if len(evaluators) > 0:
results_df = pd.concat([results_df, aoai_results], axis=1)
metrics.update(aoai_metrics)
else:
# Otherwise combine aoai results with input data df to include input columns in outputs.
results_df = pd.concat([input_data_df, aoai_results], axis=1)
metrics = aoai_metrics
except EvaluationException as e:
if got_local_results:
# If there are local eval results, we only print a warning on OAI failure.
LOGGER.warning("Remote Azure Open AI grader evaluations failed. Still returning local results.")
LOGGER.warning(e)
else:
raise e

# Done with all evaluations, message outputs into final forms, and log results if needed.
name_map = _map_names_to_builtins(evaluators, graders)
if is_onedp_project(azure_ai_project):
studio_url = _log_metrics_and_instance_results_onedp(
metrics, results_df, azure_ai_project, evaluation_name, name_map, tags=tags, **kwargs
)
else:
# Since tracing is disabled, pass None for target_run so a dummy evaluation run will be created each time.
trace_destination = _trace_destination_from_project_scope(azure_ai_project) if azure_ai_project else None
studio_url = None
if trace_destination:
studio_url = _log_metrics_and_instance_results(
metrics, results_df, trace_destination, None, evaluation_name, name_map, tags=tags, **kwargs
)

result_df_dict = results_df.to_dict("records")
result: EvaluationResult = {"rows": result_df_dict, "metrics": metrics, "studio_url": studio_url} # type: ignore
result_df_dict = results_df.to_dict("records")
result: EvaluationResult = {"rows": result_df_dict, "metrics": metrics, "studio_url": studio_url} # type: ignore

if output_path:
_write_output(output_path, result)
if output_path:
_write_output(output_path, result)

return result
return result

finally:
# Clean up any temporary files created during evaluation
for temp_file_path in temp_files_to_cleanup:
if os.path.exists(temp_file_path):
try:
os.unlink(temp_file_path)
except Exception as e:
LOGGER.warning(f"Failed to clean up temporary file {temp_file_path}: {e}")


def _preprocess_data(
Expand Down Expand Up @@ -1026,6 +1040,7 @@ def _preprocess_data(
target_generated_columns: Set[str] = set()
batch_run_client: BatchClient
batch_run_data: Union[str, os.PathLike, pd.DataFrame] = data
temp_files_to_cleanup: List[str] = []

def get_client_type(evaluate_kwargs: Dict[str, Any]) -> Literal["run_submitter", "pf_client", "code_client"]:
"""Determines the BatchClient to use from provided kwargs (_use_run_submitter_client and _use_pf_client)"""
Expand Down Expand Up @@ -1090,6 +1105,8 @@ def get_client_type(evaluate_kwargs: Dict[str, Any]) -> Literal["run_submitter",
temp_file.write(json.dumps(row_dict) + "\n")
temp_file.close()
batch_run_data = temp_file.name
# Track the temporary file for cleanup
temp_files_to_cleanup.append(temp_file.name)

# Update column mappings to use data references instead of run outputs
for evaluator_name, mapping in column_mapping.items():
Expand Down Expand Up @@ -1146,6 +1163,7 @@ def get_client_type(evaluate_kwargs: Dict[str, Any]) -> Literal["run_submitter",
target_run=target_run,
batch_run_client=batch_run_client,
batch_run_data=batch_run_data,
temp_files_to_cleanup=temp_files_to_cleanup,
)


Expand All @@ -1162,49 +1180,30 @@ def _run_callable_evaluators(
column_mapping = validated_data["column_mapping"]
evaluators = validated_data["evaluators"]

# Clean up temporary file after evaluation if it was created
temp_file_to_cleanup = None
if (
isinstance(batch_run_client, ProxyClient)
and isinstance(batch_run_data, str)
and batch_run_data.endswith(".jsonl")
):
# Check if it's a temporary file (contains temp directory path)
if tempfile.gettempdir() in batch_run_data:
temp_file_to_cleanup = batch_run_data

try:
with EvalRunContext(batch_run_client):
runs = {
evaluator_name: batch_run_client.run(
flow=evaluator,
data=batch_run_data,
# Don't pass target_run when using complete dataframe
run=target_run,
evaluator_name=evaluator_name,
column_mapping=column_mapping.get(evaluator_name, column_mapping.get("default", None)),
stream=True,
name=kwargs.get("_run_name"),
)
for evaluator_name, evaluator in evaluators.items()
}
with EvalRunContext(batch_run_client):
runs = {
evaluator_name: batch_run_client.run(
flow=evaluator,
data=batch_run_data,
# Don't pass target_run when using complete dataframe
run=target_run,
evaluator_name=evaluator_name,
column_mapping=column_mapping.get(evaluator_name, column_mapping.get("default", None)),
stream=True,
name=kwargs.get("_run_name"),
)
for evaluator_name, evaluator in evaluators.items()
}

# get_details needs to be called within EvalRunContext scope in order to have user agent populated
per_evaluator_results: Dict[str, __EvaluatorInfo] = {
evaluator_name: {
"result": batch_run_client.get_details(run, all_results=True),
"metrics": batch_run_client.get_metrics(run),
"run_summary": batch_run_client.get_run_summary(run),
}
for evaluator_name, run in runs.items()
# get_details needs to be called within EvalRunContext scope in order to have user agent populated
per_evaluator_results: Dict[str, __EvaluatorInfo] = {
evaluator_name: {
"result": batch_run_client.get_details(run, all_results=True),
"metrics": batch_run_client.get_metrics(run),
"run_summary": batch_run_client.get_run_summary(run),
}
finally:
# Clean up temporary file if it was created
if temp_file_to_cleanup and os.path.exists(temp_file_to_cleanup):
try:
os.unlink(temp_file_to_cleanup)
except Exception as e:
LOGGER.warning(f"Failed to clean up temporary file {temp_file_to_cleanup}: {e}")
for evaluator_name, run in runs.items()
}
# Concatenate all results
evaluators_result_df = pd.DataFrame()
evaluators_metric = {}
Expand Down
Loading