Skip to content

Commit f1cd1b2

Browse files
authored
[evaluation] refactor: Make RunSubmitterClient default batch client (remove promptflow) (Azure#42243)
* chore: Remove promptflow dependencies * fix: Filter inputs before calling function in Batch Engine Added to match behavior in promptflow see also: https://github.com/microsoft/promptflow/blob/5e6c183474c0a2575bb416d18201e4f9fd562b2e/src/promptflow-core/promptflow/executor/_script_executor.py#L162 * refactor: Use enumerate instead of manually keeping track of line number * fix,refactor: Unconditionally inject default column mapping from data -> params In promptflow's logic for applying a column mapping to data, it will unconditionally inject a mapping from function parameter do data of the same name: https://github.com/microsoft/promptflow/blob/3e297112a2c142caf7c185bcba644d0f66422539/src/promptflow-devkit/promptflow/batch/_batch_inputs_processor.py#L110-L141 This behavior deviated from the existing logic in this SDK, where generating that mapping was conditional on the user not providing a column mapping: https://github.com/Azure/azure-sdk-for-python/blob/f3740540eb5b3d22dc1bccba0eb00b652b124d5f/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_run_submitter.py#L51-L52 This deviation caused one of the parameterized cases of `test_evaluate_another_questions` to fail, because there was a user provided column mapping that mapped to a parameter not present in the evaluator, so the lack of a default mapping caused the evaluation to fail because the required parameter was missing https://github.com/Azure/azure-sdk-for-python/blob/f3740540eb5b3d22dc1bccba0eb00b652b124d5f/sdk/evaluation/azure-ai-evaluation/tests/e2etests/test_evaluate.py#L216 This commit aligns the application of the column mapping in the SDK more closely to the promptflow implementation * refactor: Don't shadow `value` variable in apply_column_mapping_to_lines * feat: Add support for running aggregations in RunSubmitterClient * tests,fix: Don't log duration as a metric Breaks a tests that checks for strict equality of metrics * refactor: Rewrite RunSubmitterClient.get_details * fix: Correct the typing of is_onedp_project * fix,tests: Don't log tokens as metrics Removing to match the behavior of the other clients * fix: Set error message without depending on run storage Promptflow surfaces exceptions by reading them from their "Storage" abstraction. That has not been ported to this SDK. * tests,fix: Fix test_evaluate_invalid_column_mapping PR 40556 accidentally indented the assertion in test_evaluate_invalid_column_mapping into the `pytest.raises` block. This inadvertently made the test useless, since the `evaluate` call would always raise an exception which skips over the assertion as the exception unwinds the stack. This commit unindents the assertion so that it runs. Additionally, PR 41919 updated our validation logic to allow column mapping reference of arbitrary length e.g. `${target.foo.bar.baz`}`. So this commit also removes the test case that was explicitly guarding against this `${target.response.one}` * fix,tests: Force PFClient specific tests to use PFClient * fix,tests: Force CodeClient specific tests to use CodeClient * fix: Improve the ergonomics for picking which client is used Except for the CodeClient, you only need to use at most 1 of `_use_pf_client` and `_use_run_submitter_client` * feat: Show exception message in run logs * fix: For safety evaluation to use codeclient as originally intended * fix: Don't wrap EvaluationExcpeiton in BatchEngineError * refactor: Refactor BatchConfig * fix: Make raising on error configurable for runsubmitterclient * chore: Update changelog * fix: Uncomment log_path * chore: Add promptflow to dev-requirements.txt Some tests have explicit dependencies on the promptflow implementation. Since we aren't removing the code path yet, allow them to run by installing promptflow for those tests. * fix: Initialize error_message * fix: Get exception instead of batchrunresult * chore,docs: Clarify changelog
1 parent 1f0488c commit f1cd1b2

File tree

14 files changed

+299
-117
lines changed

14 files changed

+299
-117
lines changed

sdk/evaluation/azure-ai-evaluation/CHANGELOG.md

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,13 @@ tolerance for harmful responses).
2121
- Significant improvements to Relevance evaluator. New version has more concrete rubrics and has less variance, is much faster and consumes fewer tokens.
2222

2323

24+
### Other Changes
25+
26+
- The default engine for evaluation was changed from `promptflow` (PFClient) to an in-SDK batch client (RunSubmitterClient)
27+
- Note: We've temporarily kept an escape hatch to fall back to the legacy `promptflow` implementation by setting `_use_pf_client=True` when invoking `evaluate()`.
28+
This is due to be removed in a future release.
29+
30+
2431
## 1.9.0 (2025-07-02)
2532

2633
### Features Added

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_common/utils.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,11 +6,11 @@
66
import re
77
import math
88
import threading
9-
from typing import Any, List, Literal, Mapping, Type, TypeVar, Tuple, Union, cast, get_args, get_origin
9+
from typing import Any, List, Literal, Mapping, Optional, Type, TypeVar, Tuple, Union, cast, get_args, get_origin
1010

1111
import nltk
1212
from azure.storage.blob import ContainerClient
13-
from typing_extensions import NotRequired, Required, TypeGuard
13+
from typing_extensions import NotRequired, Required, TypeGuard, TypeIs
1414
from azure.ai.evaluation._legacy._adapters._errors import MissingRequiredPackage
1515
from azure.ai.evaluation._constants import AZURE_OPENAI_TYPE, OPENAI_TYPE
1616
from azure.ai.evaluation._exceptions import ErrorMessage, ErrorBlame, ErrorCategory, ErrorTarget, EvaluationException
@@ -127,17 +127,15 @@ def construct_prompty_model_config(
127127
return prompty_model_config
128128

129129

130-
def is_onedp_project(azure_ai_project: AzureAIProject) -> bool:
130+
def is_onedp_project(azure_ai_project: Optional[Union[str, AzureAIProject]]) -> TypeIs[str]:
131131
"""Check if the Azure AI project is an OneDP project.
132132
133133
:param azure_ai_project: The scope of the Azure AI project.
134-
:type azure_ai_project: ~azure.ai.evaluation.AzureAIProject
134+
:type azure_ai_project: Optional[Union[str,~azure.ai.evaluation.AzureAIProject]]
135135
:return: True if the Azure AI project is an OneDP project, False otherwise.
136136
:rtype: bool
137137
"""
138-
if isinstance(azure_ai_project, str):
139-
return True
140-
return False
138+
return isinstance(azure_ai_project, str)
141139

142140

143141
def validate_azure_ai_project(o: object) -> AzureAIProject:

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/_run_submitter_client.py

Lines changed: 70 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@
66
import logging
77
import pandas as pd
88
import sys
9+
import itertools
910
from collections import defaultdict
1011
from concurrent.futures import Future
1112
from os import PathLike
@@ -16,15 +17,34 @@
1617
from ..._legacy._batch_engine._config import BatchEngineConfig
1718
from ..._legacy._batch_engine._run import Run
1819
from ..._legacy._adapters._constants import LINE_NUMBER
20+
from ..._legacy._adapters.types import AttrDict
1921
from ..._legacy._common._thread_pool_executor_with_context import ThreadPoolExecutorWithContext
22+
from ..._evaluate._utils import _has_aggregator
23+
from ..._constants import Prefixes, PF_BATCH_TIMEOUT_SEC
2024

25+
from .._utils import get_int_env_var as get_int
2126

22-
LOGGER = logging.getLogger(__name__)
27+
28+
LOGGER = logging.getLogger("run")
29+
MISSING_VALUE: Final[int] = sys.maxsize
2330

2431

2532
class RunSubmitterClient:
26-
def __init__(self, config: Optional[BatchEngineConfig] = None) -> None:
27-
self._config = config or BatchEngineConfig(LOGGER, use_async=True)
33+
def __init__(self, *, raise_on_errors: bool = False, config: Optional[BatchEngineConfig] = None) -> None:
34+
if config:
35+
self._config = config
36+
else:
37+
# Generate default config and apply any overrides to the configuration from environment variables
38+
self._config = BatchEngineConfig(LOGGER, use_async=True)
39+
if (val := get_int(PF_BATCH_TIMEOUT_SEC, MISSING_VALUE)) != MISSING_VALUE:
40+
self._config.batch_timeout_seconds = val
41+
if (val := get_int("PF_LINE_TIMEOUT_SEC", MISSING_VALUE)) != MISSING_VALUE:
42+
self._config.line_timeout_seconds = val
43+
if (val := get_int("PF_WORKER_COUNT", MISSING_VALUE)) != MISSING_VALUE:
44+
self._config.max_concurrency = val
45+
46+
self._config.raise_on_error = raise_on_errors
47+
2848
self._thread_pool = ThreadPoolExecutorWithContext(
2949
thread_name_prefix="evaluators_thread", max_workers=self._config.max_concurrency
3050
)
@@ -72,32 +92,60 @@ def run(
7292
return run_future
7393

7494
def get_details(self, client_run: BatchClientRun, all_results: bool = False) -> pd.DataFrame:
75-
7695
run = self._get_run(client_run)
7796

78-
data: Dict[str, List[Any]] = defaultdict(list)
79-
stop_at: Final[int] = self._config.default_num_results if not all_results else sys.maxsize
97+
def concat(*dataframes: pd.DataFrame) -> pd.DataFrame:
98+
return pd.concat(dataframes, axis=1, verify_integrity=True)
8099

81-
def _update(prefix: str, items: Sequence[Mapping[str, Any]]) -> None:
82-
for i, line in enumerate(items):
83-
if i >= stop_at:
84-
break
85-
for k, value in line.items():
86-
key = f"{prefix}.{k}"
87-
data[key].append(value)
100+
def to_dataframe(items: Sequence[Mapping[str, Any]], *, max_length: Optional[int] = None) -> pd.DataFrame:
101+
"""Convert a sequence of dictionaries to a DataFrame.
88102
89-
# Go from a list of dictionaries (i.e. a row view of the data) to a dictionary of lists
90-
# (i.e. a column view of the data)
91-
_update("inputs", run.inputs)
92-
_update("inputs", [{LINE_NUMBER: i} for i in range(len(run.inputs))])
93-
_update("outputs", run.outputs)
103+
:param items: Sequence of dictionaries to convert.
104+
:type items: Sequence[Mapping[str, Any]]
105+
:param max_length: Maximum number of items to include in the DataFrame. If None, include all items.
106+
:type max_length: Optional[int]
107+
:return: DataFrame containing the items.
108+
:rtype: pd.DataFrame
109+
"""
110+
max_length = None if all_results else self._config.default_num_results
111+
return pd.DataFrame(data=items if all_results else itertools.islice(items, max_length))
94112

95-
df = pd.DataFrame(data).reindex(columns=[k for k in data.keys()])
96-
return df
113+
inputs = concat(
114+
to_dataframe(run.inputs), to_dataframe([{LINE_NUMBER: i} for i in range(len(run.inputs))])
115+
).add_prefix(Prefixes.INPUTS)
116+
117+
outputs = to_dataframe(run.outputs).add_prefix(Prefixes.OUTPUTS)
118+
119+
return concat(inputs, outputs)
97120

98121
def get_metrics(self, client_run: BatchClientRun) -> Dict[str, Any]:
99122
run = self._get_run(client_run)
100-
return dict(run.metrics)
123+
return {**run.metrics, **self._get_aggregated_metrics(client_run)}
124+
125+
def _get_aggregated_metrics(self, client_run: BatchClientRun) -> Dict[str, Any]:
126+
aggregated_metrics = None
127+
run = self._get_run(client_run)
128+
try:
129+
if _has_aggregator(run.dynamic_callable):
130+
result_df = pd.DataFrame(run.outputs)
131+
if len(result_df.columns) == 1 and result_df.columns[0] == "output":
132+
aggregate_input = result_df["output"].tolist()
133+
else:
134+
aggregate_input = [AttrDict(item) for item in result_df.to_dict("records")]
135+
136+
aggr_func = getattr(run.dynamic_callable, "__aggregate__")
137+
aggregated_metrics = aggr_func(aggregate_input)
138+
139+
except Exception as ex: # pylint: disable=broad-exception-caught
140+
LOGGER.warning("Error calculating aggregations for evaluator, failed with error %s", ex)
141+
142+
if not isinstance(aggregated_metrics, dict):
143+
LOGGER.warning(
144+
"Aggregated metrics for evaluator is not a dictionary will not be logged as metrics",
145+
)
146+
return {}
147+
148+
return aggregated_metrics
101149

102150
def get_run_summary(self, client_run: BatchClientRun) -> Dict[str, Any]:
103151
run = self._get_run(client_run)
@@ -110,7 +158,7 @@ def get_run_summary(self, client_run: BatchClientRun) -> Dict[str, Any]:
110158
"duration": str(run.duration),
111159
"completed_lines": total_lines - failed_lines,
112160
"failed_lines": failed_lines,
113-
# "log_path": "",
161+
"log_path": None,
114162
}
115163

116164
@staticmethod

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
import re
1010
import tempfile
1111
import json
12-
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypedDict, Union, cast
12+
from typing import Any, Callable, Dict, List, Literal, Optional, Set, Tuple, TypedDict, Union, cast
1313

1414
from openai import OpenAI, AzureOpenAI
1515
from azure.ai.evaluation._legacy._adapters._constants import LINE_NUMBER
@@ -876,6 +876,7 @@ def _evaluate( # pylint: disable=too-many-locals,too-many-statements
876876
output_path=output_path,
877877
azure_ai_project=azure_ai_project,
878878
evaluation_name=evaluation_name,
879+
fail_on_evaluator_errors=fail_on_evaluator_errors,
879880
**kwargs,
880881
)
881882

@@ -983,6 +984,7 @@ def _preprocess_data(
983984
output_path: Optional[Union[str, os.PathLike]] = None,
984985
azure_ai_project: Optional[Union[str, AzureAIProject]] = None,
985986
evaluation_name: Optional[str] = None,
987+
fail_on_evaluator_errors: bool = False,
986988
**kwargs,
987989
) -> __ValidatedData:
988990
# Process evaluator config to replace ${target.} with ${data.}
@@ -1016,15 +1018,49 @@ def _preprocess_data(
10161018
batch_run_client: BatchClient
10171019
batch_run_data: Union[str, os.PathLike, pd.DataFrame] = data
10181020

1019-
if kwargs.pop("_use_run_submitter_client", False):
1020-
batch_run_client = RunSubmitterClient()
1021+
def get_client_type(evaluate_kwargs: Dict[str, Any]) -> Literal["run_submitter", "pf_client", "code_client"]:
1022+
"""Determines the BatchClient to use from provided kwargs (_use_run_submitter_client and _use_pf_client)"""
1023+
_use_run_submitter_client = cast(Optional[bool], kwargs.pop("_use_run_submitter_client", None))
1024+
_use_pf_client = cast(Optional[bool], kwargs.pop("_use_pf_client", None))
1025+
1026+
if _use_run_submitter_client is None and _use_pf_client is None:
1027+
# If both are unset, return default
1028+
return "run_submitter"
1029+
1030+
if _use_run_submitter_client and _use_pf_client:
1031+
raise EvaluationException(
1032+
message="Only one of _use_pf_client and _use_run_submitter_client should be set to True.",
1033+
target=ErrorTarget.EVALUATE,
1034+
category=ErrorCategory.INVALID_VALUE,
1035+
blame=ErrorBlame.USER_ERROR,
1036+
)
1037+
1038+
if _use_run_submitter_client == False and _use_pf_client == False:
1039+
return "code_client"
1040+
1041+
if _use_run_submitter_client:
1042+
return "run_submitter"
1043+
if _use_pf_client:
1044+
return "pf_client"
1045+
1046+
if _use_run_submitter_client is None and _use_pf_client == False:
1047+
return "run_submitter"
1048+
if _use_run_submitter_client == False and _use_pf_client is None:
1049+
return "pf_client"
1050+
1051+
assert False, "This should be impossible"
1052+
1053+
client_type: Literal["run_submitter", "pf_client", "code_client"] = get_client_type(kwargs)
1054+
1055+
if client_type == "run_submitter":
1056+
batch_run_client = RunSubmitterClient(raise_on_errors=fail_on_evaluator_errors)
10211057
batch_run_data = input_data_df
1022-
elif kwargs.pop("_use_pf_client", True):
1058+
elif client_type == "pf_client":
10231059
batch_run_client = ProxyClient(user_agent=UserAgentSingleton().value)
10241060
# Ensure the absolute path is passed to pf.run, as relative path doesn't work with
10251061
# multiple evaluators. If the path is already absolute, abspath will return the original path.
10261062
batch_run_data = os.path.abspath(data)
1027-
else:
1063+
elif client_type == "code_client":
10281064
batch_run_client = CodeClient()
10291065
batch_run_data = input_data_df
10301066

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_batch_engine/_config.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ class BatchEngineConfig:
1919
batch_timeout_seconds: int = PF_BATCH_TIMEOUT_SEC_DEFAULT
2020
"""The maximum amount of time to wait for all evaluations in the batch to complete."""
2121

22-
run_timeout_seconds: int = 600
22+
line_timeout_seconds: int = 600
2323
"""The maximum amount of time to wait for an evaluation to run against a single entry
2424
in the data input to complete."""
2525

@@ -32,13 +32,16 @@ class BatchEngineConfig:
3232
default_num_results: int = 100
3333
"""The default number of results to return if you don't ask for all results."""
3434

35+
raise_on_error: bool = True
36+
"""Whether to raise an error if an evaluation fails."""
37+
3538
def __post_init__(self):
3639
if self.logger is None:
3740
raise ValueError("logger cannot be None")
3841
if self.batch_timeout_seconds <= 0:
3942
raise ValueError("batch_timeout_seconds must be greater than 0")
40-
if self.run_timeout_seconds <= 0:
41-
raise ValueError("run_timeout_seconds must be greater than 0")
43+
if self.line_timeout_seconds <= 0:
44+
raise ValueError("line_timeout_seconds must be greater than 0")
4245
if self.max_concurrency <= 0:
4346
raise ValueError("max_concurrency must be greater than 0")
4447
if self.default_num_results <= 0:

0 commit comments

Comments
 (0)