diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py index 0714bc6bde7f..ae94fcc53eb8 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py @@ -88,6 +88,7 @@ class __ValidatedData(TypedDict): target_run: Optional[BatchClientRun] batch_run_client: BatchClient batch_run_data: Union[str, os.PathLike, pd.DataFrame] + temp_files_to_cleanup: List[str] def _aggregate_other_metrics(df: pd.DataFrame) -> Tuple[List[str], Dict[str, float]]: @@ -888,100 +889,113 @@ def _evaluate( # pylint: disable=too-many-locals,too-many-statements **kwargs, ) - # extract relevant info from validated data - column_mapping = validated_data["column_mapping"] - evaluators = validated_data["evaluators"] - graders = validated_data["graders"] - input_data_df = validated_data["input_data_df"] - results_df = pd.DataFrame() - metrics: Dict[str, float] = {} - eval_run_info_list: List[OAIEvalRunCreationInfo] = [] - - # Start OAI eval runs if any graders are present. - need_oai_run = len(graders) > 0 - need_local_run = len(evaluators) > 0 - need_get_oai_results = False - got_local_results = False - if need_oai_run: - try: - aoi_name = evaluation_name if evaluation_name else DEFAULT_OAI_EVAL_RUN_NAME - eval_run_info_list = _begin_aoai_evaluation(graders, column_mapping, input_data_df, aoi_name) - need_get_oai_results = len(eval_run_info_list) > 0 - except EvaluationException as e: - if need_local_run: - # If there are normal evaluators, don't stop execution and try to run - # those. - LOGGER.warning( - "Remote Azure Open AI grader evaluations failed during run creation." - + " Continuing with local evaluators." - ) - LOGGER.warning(e) - else: - raise e + # Extract temporary files to ensure cleanup + temp_files_to_cleanup = validated_data["temp_files_to_cleanup"] - # Evaluate 'normal' evaluators. This includes built-in evaluators and any user-supplied callables. - if need_local_run: - try: - eval_result_df, eval_metrics, per_evaluator_results = _run_callable_evaluators( - validated_data=validated_data, fail_on_evaluator_errors=fail_on_evaluator_errors - ) - results_df = eval_result_df - metrics = eval_metrics - got_local_results = True - # TODO figure out how to update this printing to include OAI results? - _print_summary(per_evaluator_results) - except EvaluationException as e: - if need_get_oai_results: - # If there are OAI graders, we only print a warning on local failures. - LOGGER.warning("Local evaluations failed. Will still attempt to retrieve online grader results.") - LOGGER.warning(e) - else: - raise e - - # Retrieve OAI eval run results if needed. - if need_get_oai_results: - try: - aoai_results, aoai_metrics = _get_evaluation_run_results(eval_run_info_list) # type: ignore - # Post build TODO: add equivalent of _print_summary(per_evaluator_results) here - - # Combine results if both evaluators and graders are present - if len(evaluators) > 0: - results_df = pd.concat([results_df, aoai_results], axis=1) - metrics.update(aoai_metrics) - else: - # Otherwise combine aoai results with input data df to include input columns in outputs. - results_df = pd.concat([input_data_df, aoai_results], axis=1) - metrics = aoai_metrics - except EvaluationException as e: - if got_local_results: - # If there are local eval results, we only print a warning on OAI failure. - LOGGER.warning("Remote Azure Open AI grader evaluations failed. Still returning local results.") - LOGGER.warning(e) - else: - raise e - - # Done with all evaluations, message outputs into final forms, and log results if needed. - name_map = _map_names_to_builtins(evaluators, graders) - if is_onedp_project(azure_ai_project): - studio_url = _log_metrics_and_instance_results_onedp( - metrics, results_df, azure_ai_project, evaluation_name, name_map, tags=tags, **kwargs - ) - else: - # Since tracing is disabled, pass None for target_run so a dummy evaluation run will be created each time. - trace_destination = _trace_destination_from_project_scope(azure_ai_project) if azure_ai_project else None - studio_url = None - if trace_destination: - studio_url = _log_metrics_and_instance_results( - metrics, results_df, trace_destination, None, evaluation_name, name_map, tags=tags, **kwargs + try: + # extract relevant info from validated data + column_mapping = validated_data["column_mapping"] + evaluators = validated_data["evaluators"] + graders = validated_data["graders"] + input_data_df = validated_data["input_data_df"] + results_df = pd.DataFrame() + metrics: Dict[str, float] = {} + eval_run_info_list: List[OAIEvalRunCreationInfo] = [] + + # Start OAI eval runs if any graders are present. + need_oai_run = len(graders) > 0 + need_local_run = len(evaluators) > 0 + need_get_oai_results = False + got_local_results = False + if need_oai_run: + try: + aoi_name = evaluation_name if evaluation_name else DEFAULT_OAI_EVAL_RUN_NAME + eval_run_info_list = _begin_aoai_evaluation(graders, column_mapping, input_data_df, aoi_name) + need_get_oai_results = len(eval_run_info_list) > 0 + except EvaluationException as e: + if need_local_run: + # If there are normal evaluators, don't stop execution and try to run + # those. + LOGGER.warning( + "Remote Azure Open AI grader evaluations failed during run creation." + + " Continuing with local evaluators." + ) + LOGGER.warning(e) + else: + raise e + + # Evaluate 'normal' evaluators. This includes built-in evaluators and any user-supplied callables. + if need_local_run: + try: + eval_result_df, eval_metrics, per_evaluator_results = _run_callable_evaluators( + validated_data=validated_data, fail_on_evaluator_errors=fail_on_evaluator_errors + ) + results_df = eval_result_df + metrics = eval_metrics + got_local_results = True + # TODO figure out how to update this printing to include OAI results? + _print_summary(per_evaluator_results) + except EvaluationException as e: + if need_get_oai_results: + # If there are OAI graders, we only print a warning on local failures. + LOGGER.warning("Local evaluations failed. Will still attempt to retrieve online grader results.") + LOGGER.warning(e) + else: + raise e + + # Retrieve OAI eval run results if needed. + if need_get_oai_results: + try: + aoai_results, aoai_metrics = _get_evaluation_run_results(eval_run_info_list) # type: ignore + # Post build TODO: add equivalent of _print_summary(per_evaluator_results) here + + # Combine results if both evaluators and graders are present + if len(evaluators) > 0: + results_df = pd.concat([results_df, aoai_results], axis=1) + metrics.update(aoai_metrics) + else: + # Otherwise combine aoai results with input data df to include input columns in outputs. + results_df = pd.concat([input_data_df, aoai_results], axis=1) + metrics = aoai_metrics + except EvaluationException as e: + if got_local_results: + # If there are local eval results, we only print a warning on OAI failure. + LOGGER.warning("Remote Azure Open AI grader evaluations failed. Still returning local results.") + LOGGER.warning(e) + else: + raise e + + # Done with all evaluations, message outputs into final forms, and log results if needed. + name_map = _map_names_to_builtins(evaluators, graders) + if is_onedp_project(azure_ai_project): + studio_url = _log_metrics_and_instance_results_onedp( + metrics, results_df, azure_ai_project, evaluation_name, name_map, tags=tags, **kwargs ) + else: + # Since tracing is disabled, pass None for target_run so a dummy evaluation run will be created each time. + trace_destination = _trace_destination_from_project_scope(azure_ai_project) if azure_ai_project else None + studio_url = None + if trace_destination: + studio_url = _log_metrics_and_instance_results( + metrics, results_df, trace_destination, None, evaluation_name, name_map, tags=tags, **kwargs + ) - result_df_dict = results_df.to_dict("records") - result: EvaluationResult = {"rows": result_df_dict, "metrics": metrics, "studio_url": studio_url} # type: ignore + result_df_dict = results_df.to_dict("records") + result: EvaluationResult = {"rows": result_df_dict, "metrics": metrics, "studio_url": studio_url} # type: ignore - if output_path: - _write_output(output_path, result) + if output_path: + _write_output(output_path, result) - return result + return result + + finally: + # Clean up any temporary files created during evaluation + for temp_file_path in temp_files_to_cleanup: + if os.path.exists(temp_file_path): + try: + os.unlink(temp_file_path) + except Exception as e: + LOGGER.warning(f"Failed to clean up temporary file {temp_file_path}: {e}") def _preprocess_data( @@ -1026,6 +1040,7 @@ def _preprocess_data( target_generated_columns: Set[str] = set() batch_run_client: BatchClient batch_run_data: Union[str, os.PathLike, pd.DataFrame] = data + temp_files_to_cleanup: List[str] = [] def get_client_type(evaluate_kwargs: Dict[str, Any]) -> Literal["run_submitter", "pf_client", "code_client"]: """Determines the BatchClient to use from provided kwargs (_use_run_submitter_client and _use_pf_client)""" @@ -1090,6 +1105,8 @@ def get_client_type(evaluate_kwargs: Dict[str, Any]) -> Literal["run_submitter", temp_file.write(json.dumps(row_dict) + "\n") temp_file.close() batch_run_data = temp_file.name + # Track the temporary file for cleanup + temp_files_to_cleanup.append(temp_file.name) # Update column mappings to use data references instead of run outputs for evaluator_name, mapping in column_mapping.items(): @@ -1146,6 +1163,7 @@ def get_client_type(evaluate_kwargs: Dict[str, Any]) -> Literal["run_submitter", target_run=target_run, batch_run_client=batch_run_client, batch_run_data=batch_run_data, + temp_files_to_cleanup=temp_files_to_cleanup, ) @@ -1162,49 +1180,30 @@ def _run_callable_evaluators( column_mapping = validated_data["column_mapping"] evaluators = validated_data["evaluators"] - # Clean up temporary file after evaluation if it was created - temp_file_to_cleanup = None - if ( - isinstance(batch_run_client, ProxyClient) - and isinstance(batch_run_data, str) - and batch_run_data.endswith(".jsonl") - ): - # Check if it's a temporary file (contains temp directory path) - if tempfile.gettempdir() in batch_run_data: - temp_file_to_cleanup = batch_run_data - - try: - with EvalRunContext(batch_run_client): - runs = { - evaluator_name: batch_run_client.run( - flow=evaluator, - data=batch_run_data, - # Don't pass target_run when using complete dataframe - run=target_run, - evaluator_name=evaluator_name, - column_mapping=column_mapping.get(evaluator_name, column_mapping.get("default", None)), - stream=True, - name=kwargs.get("_run_name"), - ) - for evaluator_name, evaluator in evaluators.items() - } + with EvalRunContext(batch_run_client): + runs = { + evaluator_name: batch_run_client.run( + flow=evaluator, + data=batch_run_data, + # Don't pass target_run when using complete dataframe + run=target_run, + evaluator_name=evaluator_name, + column_mapping=column_mapping.get(evaluator_name, column_mapping.get("default", None)), + stream=True, + name=kwargs.get("_run_name"), + ) + for evaluator_name, evaluator in evaluators.items() + } - # get_details needs to be called within EvalRunContext scope in order to have user agent populated - per_evaluator_results: Dict[str, __EvaluatorInfo] = { - evaluator_name: { - "result": batch_run_client.get_details(run, all_results=True), - "metrics": batch_run_client.get_metrics(run), - "run_summary": batch_run_client.get_run_summary(run), - } - for evaluator_name, run in runs.items() + # get_details needs to be called within EvalRunContext scope in order to have user agent populated + per_evaluator_results: Dict[str, __EvaluatorInfo] = { + evaluator_name: { + "result": batch_run_client.get_details(run, all_results=True), + "metrics": batch_run_client.get_metrics(run), + "run_summary": batch_run_client.get_run_summary(run), } - finally: - # Clean up temporary file if it was created - if temp_file_to_cleanup and os.path.exists(temp_file_to_cleanup): - try: - os.unlink(temp_file_to_cleanup) - except Exception as e: - LOGGER.warning(f"Failed to clean up temporary file {temp_file_to_cleanup}: {e}") + for evaluator_name, run in runs.items() + } # Concatenate all results evaluators_result_df = pd.DataFrame() evaluators_metric = {}