77import logging
88import os
99import re
10+ import tempfile
11+ import json
1012from typing import Any , Callable , Dict , List , Optional , Set , Tuple , TypedDict , Union , cast
1113
1214from openai import OpenAI , AzureOpenAI
@@ -611,13 +613,32 @@ def _apply_target_to_data(
611613 category = ErrorCategory .FAILED_EXECUTION ,
612614 blame = ErrorBlame .USER_ERROR ,
613615 )
616+
617+ # Log a warning if some rows failed
618+ failed_lines = run_summary .get ("failed_lines" , 0 )
619+ completed_lines = run_summary ["completed_lines" ]
620+ total_lines = failed_lines + completed_lines
621+
622+ if failed_lines > 0 :
623+ LOGGER .warning (
624+ f"Target function completed { completed_lines } out of { total_lines } rows. "
625+ f"{ failed_lines } rows failed and will be filled with NaN values."
626+ )
627+
614628 # Remove input and output prefix
615629 generated_columns = {
616630 col [len (Prefixes .OUTPUTS ) :] for col in target_output .columns if col .startswith (Prefixes .OUTPUTS )
617631 }
618632 # Sort output by line numbers
619633 target_output .set_index (f"inputs.{ LINE_NUMBER } " , inplace = True )
620634 target_output .sort_index (inplace = True )
635+
636+ initial_data_with_line_numbers = initial_data .copy ()
637+ initial_data_with_line_numbers [LINE_NUMBER ] = range (len (initial_data ))
638+
639+ complete_index = initial_data_with_line_numbers [LINE_NUMBER ]
640+ target_output = target_output .reindex (complete_index )
641+
621642 target_output .reset_index (inplace = True , drop = False )
622643 # target_output contains only input columns, taken by function,
623644 # so we need to concatenate it to the input data frame.
@@ -626,8 +647,8 @@ def _apply_target_to_data(
626647 # Rename outputs columns to __outputs
627648 rename_dict = {col : col .replace (Prefixes .OUTPUTS , Prefixes .TSG_OUTPUTS ) for col in target_output .columns }
628649 target_output .rename (columns = rename_dict , inplace = True )
629- # Concatenate output to input
630- target_output = pd .concat ([target_output , initial_data ], axis = 1 )
650+ # Concatenate output to input - now both dataframes have the same number of rows
651+ target_output = pd .concat ([initial_data , target_output ], axis = 1 )
631652
632653 return target_output , generated_columns , run
633654
@@ -645,7 +666,7 @@ def _process_column_mappings(
645666
646667 processed_config : Dict [str , Dict [str , str ]] = {}
647668
648- expected_references = re .compile (r"^\$\{(target|data)\.[a-zA-Z0-9_]+\}$" )
669+ expected_references = re .compile (r"^\$\{(target|data)\.( [a-zA-Z0-9_]+(?:\.[a-zA-Z0-9_]+)*) \}$" )
649670
650671 if column_mapping :
651672 for evaluator , mapping_config in column_mapping .items ():
@@ -1013,17 +1034,50 @@ def _preprocess_data(
10131034 target , batch_run_data , batch_run_client , input_data_df , evaluation_name , ** kwargs
10141035 )
10151036
1016- for evaluator_name , mapping in column_mapping .items ():
1017- mapped_to_values = set (mapping .values ())
1018- for col in target_generated_columns :
1019- # If user defined mapping differently, do not change it.
1020- # If it was mapped to target, we have already changed it
1021- # in _process_column_mappings
1022- run_output = f"${{run.outputs.{ col } }}"
1023- # We will add our mapping only if
1024- # customer did not mapped target output.
1025- if col not in mapping and run_output not in mapped_to_values :
1026- column_mapping [evaluator_name ][col ] = run_output # pylint: disable=unnecessary-dict-index-lookup
1037+ # IMPORTANT FIX: For ProxyClient, create a temporary file with the complete dataframe
1038+ # This ensures that evaluators get all rows (including failed ones with NaN values)
1039+ if isinstance (batch_run_client , ProxyClient ):
1040+ # Create a temporary JSONL file with the complete dataframe
1041+ temp_file = tempfile .NamedTemporaryFile (mode = "w" , suffix = ".jsonl" , delete = False )
1042+ try :
1043+ for _ , row in input_data_df .iterrows ():
1044+ row_dict = row .to_dict ()
1045+ temp_file .write (json .dumps (row_dict ) + "\n " )
1046+ temp_file .close ()
1047+ batch_run_data = temp_file .name
1048+
1049+ # Update column mappings to use data references instead of run outputs
1050+ for evaluator_name , mapping in column_mapping .items ():
1051+ mapped_to_values = set (mapping .values ())
1052+ for col in target_generated_columns :
1053+ # Use data reference instead of run output to ensure we get all rows
1054+ target_reference = f"${{data.{ Prefixes .TSG_OUTPUTS } { col } }}"
1055+
1056+ # We will add our mapping only if customer did not map target output.
1057+ if col not in mapping and target_reference not in mapped_to_values :
1058+ column_mapping [evaluator_name ][col ] = target_reference
1059+
1060+ # Don't pass the target_run since we're now using the complete dataframe
1061+ target_run = None
1062+
1063+ except Exception as e :
1064+ # Clean up the temp file if something goes wrong
1065+ if os .path .exists (temp_file .name ):
1066+ os .unlink (temp_file .name )
1067+ raise e
1068+ else :
1069+ # For DataFrame-based clients, update batch_run_data to use the updated input_data_df
1070+ batch_run_data = input_data_df
1071+
1072+ # Update column mappings for DataFrame clients
1073+ for evaluator_name , mapping in column_mapping .items ():
1074+ mapped_to_values = set (mapping .values ())
1075+ for col in target_generated_columns :
1076+ target_reference = f"${{data.{ Prefixes .TSG_OUTPUTS } { col } }}"
1077+
1078+ # We will add our mapping only if customer did not map target output.
1079+ if col not in mapping and target_reference not in mapped_to_values :
1080+ column_mapping [evaluator_name ][col ] = target_reference
10271081
10281082 # After we have generated all columns, we can check if we have everything we need for evaluators.
10291083 _validate_columns_for_evaluators (input_data_df , evaluators , target , target_generated_columns , column_mapping )
@@ -1062,30 +1116,50 @@ def _run_callable_evaluators(
10621116 batch_run_data = validated_data ["batch_run_data" ]
10631117 column_mapping = validated_data ["column_mapping" ]
10641118 evaluators = validated_data ["evaluators" ]
1065- with EvalRunContext (batch_run_client ):
1066- runs = {
1067- evaluator_name : batch_run_client .run (
1068- flow = evaluator ,
1069- data = batch_run_data ,
1070- run = target_run ,
1071- evaluator_name = evaluator_name ,
1072- column_mapping = column_mapping .get (evaluator_name , column_mapping .get ("default" , None )),
1073- stream = True ,
1074- name = kwargs .get ("_run_name" ),
1075- )
1076- for evaluator_name , evaluator in evaluators .items ()
1077- }
10781119
1079- # get_details needs to be called within EvalRunContext scope in order to have user agent populated
1080- per_evaluator_results : Dict [str , __EvaluatorInfo ] = {
1081- evaluator_name : {
1082- "result" : batch_run_client .get_details (run , all_results = True ),
1083- "metrics" : batch_run_client .get_metrics (run ),
1084- "run_summary" : batch_run_client .get_run_summary (run ),
1120+ # Clean up temporary file after evaluation if it was created
1121+ temp_file_to_cleanup = None
1122+ if (
1123+ isinstance (batch_run_client , ProxyClient )
1124+ and isinstance (batch_run_data , str )
1125+ and batch_run_data .endswith (".jsonl" )
1126+ ):
1127+ # Check if it's a temporary file (contains temp directory path)
1128+ if tempfile .gettempdir () in batch_run_data :
1129+ temp_file_to_cleanup = batch_run_data
1130+
1131+ try :
1132+ with EvalRunContext (batch_run_client ):
1133+ runs = {
1134+ evaluator_name : batch_run_client .run (
1135+ flow = evaluator ,
1136+ data = batch_run_data ,
1137+ # Don't pass target_run when using complete dataframe
1138+ run = target_run ,
1139+ evaluator_name = evaluator_name ,
1140+ column_mapping = column_mapping .get (evaluator_name , column_mapping .get ("default" , None )),
1141+ stream = True ,
1142+ name = kwargs .get ("_run_name" ),
1143+ )
1144+ for evaluator_name , evaluator in evaluators .items ()
10851145 }
1086- for evaluator_name , run in runs .items ()
1087- }
10881146
1147+ # get_details needs to be called within EvalRunContext scope in order to have user agent populated
1148+ per_evaluator_results : Dict [str , __EvaluatorInfo ] = {
1149+ evaluator_name : {
1150+ "result" : batch_run_client .get_details (run , all_results = True ),
1151+ "metrics" : batch_run_client .get_metrics (run ),
1152+ "run_summary" : batch_run_client .get_run_summary (run ),
1153+ }
1154+ for evaluator_name , run in runs .items ()
1155+ }
1156+ finally :
1157+ # Clean up temporary file if it was created
1158+ if temp_file_to_cleanup and os .path .exists (temp_file_to_cleanup ):
1159+ try :
1160+ os .unlink (temp_file_to_cleanup )
1161+ except Exception as e :
1162+ LOGGER .warning (f"Failed to clean up temporary file { temp_file_to_cleanup } : { e } " )
10891163 # Concatenate all results
10901164 evaluators_result_df = pd .DataFrame ()
10911165 evaluators_metric = {}
0 commit comments