diff --git a/sdk/evaluation/azure-ai-evaluation/CHANGELOG.md b/sdk/evaluation/azure-ai-evaluation/CHANGELOG.md index be8a038c1c1b..7c537d76eaf0 100644 --- a/sdk/evaluation/azure-ai-evaluation/CHANGELOG.md +++ b/sdk/evaluation/azure-ai-evaluation/CHANGELOG.md @@ -4,6 +4,8 @@ ### Features Added +- Added App Insights redaction for agent safety result queries to prevent adversarial prompts from being stored in telemetry + ### Breaking Changes ### Bugs Fixed diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/red_team/_red_team.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/red_team/_red_team.py index a658a7ce9951..1081d4e4ddac 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/red_team/_red_team.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/red_team/_red_team.py @@ -1640,9 +1640,11 @@ async def _finalize_results( # Extract AOAI summary for passing to MLflow logging aoai_summary = red_team_result.scan_result.get("AOAI_Compatible_Summary") if self._app_insights_configuration: - emit_eval_result_events_to_app_insights( - self._app_insights_configuration, aoai_summary["output_items"]["data"] + # Get redacted results from the result processor for App Insights logging + redacted_results = self.result_processor.get_app_insights_redacted_results( + aoai_summary["output_items"]["data"] ) + emit_eval_result_events_to_app_insights(self._app_insights_configuration, redacted_results) # Log results to MLFlow if not skipping upload if not skip_upload: self.logger.info("Logging results to AI Foundry") diff --git a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/red_team/_result_processor.py b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/red_team/_result_processor.py index dd9a922fc0f8..6aa03ea2a76e 100644 --- a/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/red_team/_result_processor.py +++ b/sdk/evaluation/azure-ai-evaluation/azure/ai/evaluation/red_team/_result_processor.py @@ -7,6 +7,7 @@ This module handles the processing, aggregation, and formatting of red team evaluation results. """ +import copy import hashlib import json import math @@ -18,6 +19,8 @@ import pandas as pd +from azure.ai.evaluation._common.constants import EvaluationMetrics + # Local imports from ._red_team_result import ( RedTeamResult, @@ -1616,3 +1619,90 @@ def _build_results_payload( } return run_payload + + def get_app_insights_redacted_results(self, results: List[Dict]) -> List[Dict]: + """ + Creates a redacted copy of results specifically for App Insights logging. + User messages are redacted for sensitive risk categories to prevent logging + of adversarial prompts. + + Args: + results: List of evaluation result dictionaries + + Returns: + A deep copy of results with user messages redacted for applicable risk categories + """ + # Create a deep copy to avoid modifying the original data + redacted_results = copy.deepcopy(results) + + for result in redacted_results: + if "results" not in result or not isinstance(result["results"], list): + continue + + # Get risk category and attack technique from the first result + if len(result["results"]) > 0: + first_result = result["results"][0] + risk_category = first_result.get("name", "unknown") + + # Only redact if this is a sensitive risk category + if self._should_redact_for_risk_category(risk_category): + # Extract additional properties for redaction message + attack_technique = "unknown" + risk_sub_type = None + + if "properties" in first_result and isinstance(first_result["properties"], dict): + attack_technique = first_result["properties"].get("attack_technique", "unknown") + risk_sub_type = first_result["properties"].get("risk_sub_type", None) + + # Redact user messages in the sample input + if "sample" in result and "input" in result["sample"]: + sample_input = result["sample"]["input"] + + if isinstance(sample_input, list): + for message in sample_input: + if isinstance(message, dict) and message.get("role") == "user": + message["content"] = self._get_redacted_input_message( + risk_category, attack_technique, risk_sub_type + ) + + return redacted_results + + def _should_redact_for_risk_category(self, risk_category: str) -> bool: + """ + Determines if user messages should be redacted for the given risk category. + + Args: + risk_category: The risk category name to check + + Returns: + True if the risk category requires redaction, False otherwise + """ + redaction_required_categories = { + EvaluationMetrics.PROHIBITED_ACTIONS, + EvaluationMetrics.TASK_ADHERENCE, + EvaluationMetrics.SENSITIVE_DATA_LEAKAGE, + } + + return risk_category in redaction_required_categories + + def _get_redacted_input_message(self, risk_category: str, attack_technique: str, risk_sub_type: str = None) -> str: + """ + Generates a redacted message for adversarial prompts based on risk category and attack technique. + + Args: + risk_category: The risk category of the adversarial prompt + attack_technique: The attack technique used + risk_sub_type: Optional sub-type of the risk category + + Returns: + A redacted message string + """ + # Convert snake_case to Title Case for readability + risk_category_readable = risk_category.replace("_", " ").replace("-", " ").title() + attack_technique_readable = attack_technique.replace("_", " ").replace("-", " ").title() + + if risk_sub_type: + risk_sub_type_readable = risk_sub_type.replace("_", " ").replace("-", " ").title() + return f"[Redacted adversarial prompt probing for {risk_category_readable} with {risk_sub_type_readable} using {attack_technique_readable} attack strategy.]" + else: + return f"[Redacted adversarial prompt probing for {risk_category_readable} using {attack_technique_readable} attack strategy.]"