Skip to content

Commit 4d8426b

Browse files
authored
Port of Batch Engine code (Part II) (Azure#40411)
- Adds support for target function calls - Adds error handling and retries for the OpenAI prompty requests - Moves to async first methods in the implementation to simplify implementation and move which thread the async event loop runs on to the caller of the code (`RunSubmitterClient` in this case)
1 parent 8125ca3 commit 4d8426b

23 files changed

+685
-210
lines changed

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/_run_submitter_client.py

Lines changed: 30 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,21 @@
22
# Copyright (c) Microsoft Corporation. All rights reserved.
33
# ---------------------------------------------------------
44

5+
import asyncio
56
import logging
67
import pandas as pd
78
import sys
89
from collections import defaultdict
9-
from concurrent.futures import Future, ThreadPoolExecutor
10+
from concurrent.futures import Future
1011
from os import PathLike
1112
from typing import Any, Callable, Dict, Final, List, Mapping, Optional, Sequence, Union, cast
1213

1314
from .batch_clients import BatchClientRun, HasAsyncCallable
1415
from ..._legacy._batch_engine._run_submitter import RunSubmitter
1516
from ..._legacy._batch_engine._config import BatchEngineConfig
1617
from ..._legacy._batch_engine._run import Run
18+
from ..._legacy._adapters._constants import LINE_NUMBER
19+
from ..._legacy._common._thread_pool_executor_with_context import ThreadPoolExecutorWithContext
1720

1821

1922
LOGGER = logging.getLogger(__name__)
@@ -22,7 +25,9 @@
2225
class RunSubmitterClient:
2326
def __init__(self, config: Optional[BatchEngineConfig] = None) -> None:
2427
self._config = config or BatchEngineConfig(LOGGER, use_async=True)
25-
self._thread_pool = ThreadPoolExecutor(thread_name_prefix="evaluators_thread")
28+
self._thread_pool = ThreadPoolExecutorWithContext(
29+
thread_name_prefix="evaluators_thread",
30+
max_workers=self._config.max_concurrency)
2631

2732
def run(
2833
self,
@@ -33,30 +38,36 @@ def run(
3338
**kwargs: Any,
3439
) -> BatchClientRun:
3540
if not isinstance(data, pd.DataFrame):
36-
# Should never get here
3741
raise ValueError("Data must be a pandas DataFrame")
38-
if not column_mapping:
39-
raise ValueError("Column mapping must be provided")
4042

41-
# The column mappings are index by data to indicate they come from the data
43+
# The column mappings are indexed by data to indicate they come from the data
4244
# input. Update the inputs so that each entry is a dictionary with a data key
4345
# that contains the original input data.
4446
inputs = [{"data": input_data} for input_data in data.to_dict(orient="records")]
4547

46-
# always uses async behind the scenes
48+
# Pass the correct previous run to the evaluator
49+
run: Optional[BatchClientRun] = kwargs.pop("run", None)
50+
if run:
51+
kwargs["run"] = self._get_run(run)
52+
53+
# Try to get async function to use
4754
if isinstance(flow, HasAsyncCallable):
4855
flow = flow._to_async() # pylint: disable=protected-access
4956

50-
run_submitter = RunSubmitter(self._config)
57+
# Start an event loop for async execution on a thread pool thread to separate it
58+
# from the caller's thread.
59+
run_submitter = RunSubmitter(self._config, self._thread_pool)
5160
run_future = self._thread_pool.submit(
52-
run_submitter.submit,
53-
dynamic_callable=flow,
54-
inputs=inputs,
55-
column_mapping=column_mapping,
56-
name_prefix=evaluator_name,
57-
created_on=kwargs.pop("created_on", None),
58-
storage_creator=kwargs.pop("storage_creator", None),
59-
**kwargs,
61+
asyncio.run,
62+
run_submitter.submit(
63+
dynamic_callable=flow,
64+
inputs=inputs,
65+
column_mapping=column_mapping,
66+
name_prefix=evaluator_name,
67+
created_on=kwargs.pop("created_on", None),
68+
storage_creator=kwargs.pop("storage_creator", None),
69+
**kwargs,
70+
)
6071
)
6172

6273
return run_future
@@ -75,7 +86,10 @@ def _update(prefix: str, items: Sequence[Mapping[str, Any]]) -> None:
7586
key = f"{prefix}.{k}"
7687
data[key].append(value)
7788

89+
# Go from a list of dictionaries (i.e. a row view of the data) to a dictionary of lists
90+
# (i.e. a column view of the data)
7891
_update("inputs", run.inputs)
92+
_update("inputs", [{ LINE_NUMBER: i } for i in range(len(run.inputs)) ])
7993
_update("outputs", run.outputs)
8094

8195
df = pd.DataFrame(data).reindex(columns=[k for k in data.keys()])

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/eval_run_context.py

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,10 @@
88
from azure.ai.evaluation._legacy._adapters._constants import PF_FLOW_ENTRY_IN_TMP, PF_FLOW_META_LOAD_IN_SUBPROCESS
99
from azure.ai.evaluation._legacy._adapters.utils import ClientUserAgentUtil
1010
from azure.ai.evaluation._legacy._adapters.tracing import inject_openai_api, recover_openai_api
11+
from azure.ai.evaluation._legacy._batch_engine._openai_injector import (
12+
inject_openai_api as ported_inject_openai_api,
13+
recover_openai_api as ported_recover_openai_api,
14+
)
1115

1216
from azure.ai.evaluation._constants import (
1317
OTEL_EXPORTER_OTLP_TRACES_TIMEOUT,
@@ -68,6 +72,7 @@ def __enter__(self) -> None:
6872

6973
if isinstance(self.client, RunSubmitterClient):
7074
set_event_loop_policy()
75+
ported_inject_openai_api()
7176

7277
def __exit__(
7378
self,
@@ -92,3 +97,6 @@ def __exit__(
9297
if self._is_otel_timeout_set_by_system:
9398
os.environ.pop(OTEL_EXPORTER_OTLP_TRACES_TIMEOUT, None)
9499
self._is_otel_timeout_set_by_system = False
100+
101+
if isinstance(self.client, RunSubmitterClient):
102+
ported_recover_openai_api()

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/proxy_client.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,11 @@ def run(
5858
if not name:
5959
name = f"azure_ai_evaluation_evaluators_{evaluator_name}_{datetime.now().strftime('%Y%m%d_%H%M%S_%f')}"
6060

61+
# Pass the correct previous run to the evaluator
62+
run: Optional[BatchClientRun] = kwargs.pop("run", None)
63+
if run:
64+
kwargs["run"] = self.get_result(run)
65+
6166
batch_use_async = self._should_batch_use_async(flow_to_run)
6267
eval_future = self._thread_pool.submit(
6368
self._pf_client.run,

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_batch_run/target_run_context.py

Lines changed: 17 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,15 @@
55
import types
66
from typing import Optional, Type
77

8+
from azure.ai.evaluation._evaluate._batch_run.batch_clients import BatchClient
9+
from azure.ai.evaluation._evaluate._batch_run import RunSubmitterClient
810
from azure.ai.evaluation._legacy._adapters._constants import PF_FLOW_ENTRY_IN_TMP
11+
from azure.ai.evaluation._legacy._batch_engine._openai_injector import (
12+
inject_openai_api as ported_inject_openai_api,
13+
recover_openai_api as ported_recover_openai_api,
14+
)
915
from azure.ai.evaluation._constants import PF_DISABLE_TRACING
16+
from azure.ai.evaluation._evaluate._utils import set_event_loop_policy
1017

1118

1219
class TargetRunContext:
@@ -16,7 +23,8 @@ class TargetRunContext:
1623
:type upload_snapshot: bool
1724
"""
1825

19-
def __init__(self, upload_snapshot: bool = False) -> None:
26+
def __init__(self, client: BatchClient, upload_snapshot: bool = False) -> None:
27+
self._client = client
2028
self._upload_snapshot = upload_snapshot
2129
self._original_cwd = os.getcwd()
2230

@@ -32,6 +40,11 @@ def __enter__(self) -> None:
3240

3341
os.environ[PF_DISABLE_TRACING] = "true"
3442

43+
if isinstance(self._client, RunSubmitterClient):
44+
ported_inject_openai_api()
45+
# For addressing the issue of asyncio event loop closed on Windows
46+
set_event_loop_policy()
47+
3548
def __exit__(
3649
self,
3750
exc_type: Optional[Type[BaseException]],
@@ -44,3 +57,6 @@ def __exit__(
4457
os.environ.pop(PF_FLOW_ENTRY_IN_TMP, None)
4558

4659
os.environ.pop(PF_DISABLE_TRACING, None)
60+
61+
if isinstance(self._client, RunSubmitterClient):
62+
ported_recover_openai_api()

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_evaluate/_evaluate.py

Lines changed: 29 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing import Any, Callable, Dict, List, Optional, Set, Tuple, TypedDict, Union, cast
1010

1111
from azure.ai.evaluation._legacy._adapters._constants import LINE_NUMBER
12+
from azure.ai.evaluation._legacy._adapters._errors import MissingRequiredPackage
1213
from azure.ai.evaluation._legacy._adapters.entities import Run
1314
import pandas as pd
1415

@@ -40,7 +41,7 @@
4041
_write_output,
4142
DataLoaderFactory,
4243
)
43-
from ._batch_run.batch_clients import BatchClient
44+
from ._batch_run.batch_clients import BatchClient, BatchClientRun
4445

4546
LOGGER = logging.getLogger(__name__)
4647

@@ -486,12 +487,12 @@ def _validate_and_load_data(target, data, evaluators, output_path, azure_ai_proj
486487

487488
def _apply_target_to_data(
488489
target: Callable,
489-
data: Union[str, os.PathLike],
490+
data: Union[str, os.PathLike, pd.DataFrame],
490491
batch_client: BatchClient,
491492
initial_data: pd.DataFrame,
492493
evaluation_name: Optional[str] = None,
493494
**kwargs,
494-
) -> Tuple[pd.DataFrame, Set[str], Run]:
495+
) -> Tuple[pd.DataFrame, Set[str], BatchClientRun]:
495496
"""
496497
Apply the target function to the data set and return updated data and generated columns.
497498
@@ -509,24 +510,18 @@ def _apply_target_to_data(
509510
:rtype: Tuple[pandas.DataFrame, List[str]]
510511
"""
511512

512-
if not isinstance(batch_client, ProxyClient):
513-
raise ValueError("Only ProxyClient supports target runs for now.")
514-
515513
_run_name = kwargs.get("_run_name")
516-
with TargetRunContext():
517-
run = cast(
518-
ProxyRun,
519-
batch_client.run(
520-
flow=target,
521-
display_name=evaluation_name,
522-
data=data,
523-
stream=True,
524-
name=_run_name,
525-
),
514+
with TargetRunContext(batch_client):
515+
run: BatchClientRun = batch_client.run(
516+
flow=target,
517+
display_name=evaluation_name,
518+
data=data,
519+
stream=True,
520+
name=_run_name,
521+
evaluator_name=getattr(target, "__qualname__", "TARGET"),
526522
)
527-
528-
target_output: pd.DataFrame = batch_client.get_details(run, all_results=True)
529-
run_summary = batch_client.get_run_summary(run)
523+
target_output: pd.DataFrame = batch_client.get_details(run, all_results=True)
524+
run_summary = batch_client.get_run_summary(run)
530525

531526
if run_summary["completed_lines"] == 0:
532527
msg = (
@@ -557,7 +552,7 @@ def _apply_target_to_data(
557552
# Concatenate output to input
558553
target_output = pd.concat([target_output, initial_data], axis=1)
559554

560-
return target_output, generated_columns, run.run.result()
555+
return target_output, generated_columns, run
561556

562557

563558
def _process_column_mappings(
@@ -777,19 +772,27 @@ def _evaluate( # pylint: disable=too-many-locals,too-many-statements
777772
column_mapping = column_mapping or {}
778773
column_mapping.setdefault("default", {})
779774

780-
target_run: Optional[Run] = None
775+
target_run: Optional[BatchClientRun] = None
781776
target_generated_columns: Set[str] = set()
782777
batch_run_client: BatchClient
783778
batch_run_data: Union[str, os.PathLike, pd.DataFrame] = data
784779

785-
# If target is set, apply 1-1 column mapping from target outputs to evaluator inputs
786-
if data is not None and target is not None:
787-
# Right now, only the ProxyClient that uses Promptflow supports a target function
780+
if kwargs.pop("_use_run_submitter_client", False):
781+
batch_run_client = RunSubmitterClient()
782+
batch_run_data = input_data_df
783+
elif kwargs.pop("_use_pf_client", True):
788784
batch_run_client = ProxyClient(user_agent=USER_AGENT)
785+
# Ensure the absolute path is passed to pf.run, as relative path doesn't work with
786+
# multiple evaluators. If the path is already absolute, abspath will return the original path.
789787
batch_run_data = os.path.abspath(data)
788+
else:
789+
batch_run_client = CodeClient()
790+
batch_run_data = input_data_df
790791

792+
# If target is set, apply 1-1 column mapping from target outputs to evaluator inputs
793+
if data is not None and target is not None:
791794
input_data_df, target_generated_columns, target_run = _apply_target_to_data(
792-
target, data, batch_run_client, input_data_df, evaluation_name, **kwargs
795+
target, batch_run_data, batch_run_client, input_data_df, evaluation_name, **kwargs
793796
)
794797

795798
for evaluator_name, mapping in column_mapping.items():
@@ -803,17 +806,6 @@ def _evaluate( # pylint: disable=too-many-locals,too-many-statements
803806
# customer did not mapped target output.
804807
if col not in mapping and run_output not in mapped_to_values:
805808
column_mapping[evaluator_name][col] = run_output # pylint: disable=unnecessary-dict-index-lookup
806-
elif kwargs.pop("_use_run_submitter_client", False):
807-
batch_run_client = RunSubmitterClient()
808-
batch_run_data = input_data_df
809-
elif kwargs.pop("_use_pf_client", True):
810-
batch_run_client = ProxyClient(user_agent=USER_AGENT)
811-
# Ensure the absolute path is passed to pf.run, as relative path doesn't work with
812-
# multiple evaluators. If the path is already absolute, abspath will return the original path.
813-
batch_run_data = os.path.abspath(data)
814-
else:
815-
batch_run_client = CodeClient()
816-
batch_run_data = input_data_df
817809

818810
# After we have generated all columns, we can check if we have everything we need for evaluators.
819811
_validate_columns_for_evaluators(input_data_df, evaluators, target, target_generated_columns, column_mapping)
@@ -896,12 +888,11 @@ def _evaluate( # pylint: disable=too-many-locals,too-many-statements
896888
metrics.update(evaluators_metric)
897889

898890
# Since tracing is disabled, pass None for target_run so a dummy evaluation run will be created each time.
899-
target_run: Optional[Run] = None
900891
trace_destination = _trace_destination_from_project_scope(azure_ai_project) if azure_ai_project else None
901892
studio_url = None
902893
if trace_destination:
903894
studio_url = _log_metrics_and_instance_results(
904-
metrics, result_df, trace_destination, target_run, evaluation_name, **kwargs
895+
metrics, result_df, trace_destination, None, evaluation_name, **kwargs
905896
)
906897

907898
result_df_dict = result_df.to_dict("records")

sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/_legacy/_adapters/__init__.py

Lines changed: 0 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,3 @@
55
# NOTE: This contains adapters that make the Promptflow dependency optional. In the first phase,
66
# Promptflow will still be installed as part of the azure-ai-evaluation dependencies. This
77
# will be removed in the future once the code migration is complete.
8-
9-
from typing import Final
10-
11-
12-
_has_legacy = False
13-
try:
14-
from promptflow.client import PFClient
15-
16-
_has_legacy = True
17-
except ImportError:
18-
pass
19-
20-
HAS_LEGACY_SDK: Final[bool] = _has_legacy
21-
MISSING_LEGACY_SDK: Final[bool] = not _has_legacy
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
# ---------------------------------------------------------
2+
# Copyright (c) Microsoft Corporation. All rights reserved.
3+
# ---------------------------------------------------------
4+
5+
from typing import Final
6+
7+
8+
_has_legacy = False
9+
try:
10+
from promptflow._constants import FlowType
11+
12+
_has_legacy = True
13+
except ImportError:
14+
pass
15+
16+
HAS_LEGACY_SDK: Final[bool] = _has_legacy
17+
MISSING_LEGACY_SDK: Final[bool] = not _has_legacy

0 commit comments

Comments
 (0)