Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions sdk/evaluation/azure-ai-evaluation/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

### Features Added

- Added App Insights redaction for agent safety result queries to prevent adversarial prompts from being stored in telemetry

### Breaking Changes

### Bugs Fixed
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1640,9 +1640,11 @@ async def _finalize_results(
# Extract AOAI summary for passing to MLflow logging
aoai_summary = red_team_result.scan_result.get("AOAI_Compatible_Summary")
if self._app_insights_configuration:
emit_eval_result_events_to_app_insights(
self._app_insights_configuration, aoai_summary["output_items"]["data"]
# Get redacted results from the result processor for App Insights logging
redacted_results = self.result_processor.get_app_insights_redacted_results(
aoai_summary["output_items"]["data"]
)
emit_eval_result_events_to_app_insights(self._app_insights_configuration, redacted_results)
# Log results to MLFlow if not skipping upload
if not skip_upload:
self.logger.info("Logging results to AI Foundry")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
This module handles the processing, aggregation, and formatting of red team evaluation results.
"""

import copy
import hashlib
import json
import math
Expand All @@ -18,6 +19,8 @@

import pandas as pd

from azure.ai.evaluation._common.constants import EvaluationMetrics

# Local imports
from ._red_team_result import (
RedTeamResult,
Expand Down Expand Up @@ -1616,3 +1619,90 @@ def _build_results_payload(
}

return run_payload

def get_app_insights_redacted_results(self, results: List[Dict]) -> List[Dict]:
"""
Creates a redacted copy of results specifically for App Insights logging.
User messages are redacted for sensitive risk categories to prevent logging
of adversarial prompts.

Args:
results: List of evaluation result dictionaries

Returns:
A deep copy of results with user messages redacted for applicable risk categories
"""
# Create a deep copy to avoid modifying the original data
redacted_results = copy.deepcopy(results)

for result in redacted_results:
if "results" not in result or not isinstance(result["results"], list):
continue

# Get risk category and attack technique from the first result
if len(result["results"]) > 0:
first_result = result["results"][0]
risk_category = first_result.get("name", "unknown")

# Only redact if this is a sensitive risk category
if self._should_redact_for_risk_category(risk_category):
# Extract additional properties for redaction message
attack_technique = "unknown"
risk_sub_type = None

if "properties" in first_result and isinstance(first_result["properties"], dict):
attack_technique = first_result["properties"].get("attack_technique", "unknown")
risk_sub_type = first_result["properties"].get("risk_sub_type", None)

# Redact user messages in the sample input
if "sample" in result and "input" in result["sample"]:
sample_input = result["sample"]["input"]

if isinstance(sample_input, list):
for message in sample_input:
if isinstance(message, dict) and message.get("role") == "user":
message["content"] = self._get_redacted_input_message(
risk_category, attack_technique, risk_sub_type
)

return redacted_results

def _should_redact_for_risk_category(self, risk_category: str) -> bool:
"""
Determines if user messages should be redacted for the given risk category.

Args:
risk_category: The risk category name to check

Returns:
True if the risk category requires redaction, False otherwise
"""
redaction_required_categories = {
EvaluationMetrics.PROHIBITED_ACTIONS,
EvaluationMetrics.TASK_ADHERENCE,
EvaluationMetrics.SENSITIVE_DATA_LEAKAGE,
}

return risk_category in redaction_required_categories

def _get_redacted_input_message(self, risk_category: str, attack_technique: str, risk_sub_type: str = None) -> str:
"""
Generates a redacted message for adversarial prompts based on risk category and attack technique.

Args:
risk_category: The risk category of the adversarial prompt
attack_technique: The attack technique used
risk_sub_type: Optional sub-type of the risk category

Returns:
A redacted message string
"""
# Convert snake_case to Title Case for readability
risk_category_readable = risk_category.replace("_", " ").replace("-", " ").title()
attack_technique_readable = attack_technique.replace("_", " ").replace("-", " ").title()

if risk_sub_type:
risk_sub_type_readable = risk_sub_type.replace("_", " ").replace("-", " ").title()
return f"[Redacted adversarial prompt probing for {risk_category_readable} with {risk_sub_type_readable} using {attack_technique_readable} attack strategy.]"
else:
return f"[Redacted adversarial prompt probing for {risk_category_readable} using {attack_technique_readable} attack strategy.]"