|
7 | 7 | This module handles the processing, aggregation, and formatting of red team evaluation results. |
8 | 8 | """ |
9 | 9 |
|
| 10 | +import copy |
10 | 11 | import hashlib |
11 | 12 | import json |
12 | 13 | import math |
|
18 | 19 |
|
19 | 20 | import pandas as pd |
20 | 21 |
|
| 22 | +from azure.ai.evaluation._common.constants import EvaluationMetrics |
| 23 | + |
21 | 24 | # Local imports |
22 | 25 | from ._red_team_result import ( |
23 | 26 | RedTeamResult, |
@@ -1616,3 +1619,90 @@ def _build_results_payload( |
1616 | 1619 | } |
1617 | 1620 |
|
1618 | 1621 | return run_payload |
| 1622 | + |
| 1623 | + def get_app_insights_redacted_results(self, results: List[Dict]) -> List[Dict]: |
| 1624 | + """ |
| 1625 | + Creates a redacted copy of results specifically for App Insights logging. |
| 1626 | + User messages are redacted for sensitive risk categories to prevent logging |
| 1627 | + of adversarial prompts. |
| 1628 | +
|
| 1629 | + Args: |
| 1630 | + results: List of evaluation result dictionaries |
| 1631 | +
|
| 1632 | + Returns: |
| 1633 | + A deep copy of results with user messages redacted for applicable risk categories |
| 1634 | + """ |
| 1635 | + # Create a deep copy to avoid modifying the original data |
| 1636 | + redacted_results = copy.deepcopy(results) |
| 1637 | + |
| 1638 | + for result in redacted_results: |
| 1639 | + if "results" not in result or not isinstance(result["results"], list): |
| 1640 | + continue |
| 1641 | + |
| 1642 | + # Get risk category and attack technique from the first result |
| 1643 | + if len(result["results"]) > 0: |
| 1644 | + first_result = result["results"][0] |
| 1645 | + risk_category = first_result.get("name", "unknown") |
| 1646 | + |
| 1647 | + # Only redact if this is a sensitive risk category |
| 1648 | + if self._should_redact_for_risk_category(risk_category): |
| 1649 | + # Extract additional properties for redaction message |
| 1650 | + attack_technique = "unknown" |
| 1651 | + risk_sub_type = None |
| 1652 | + |
| 1653 | + if "properties" in first_result and isinstance(first_result["properties"], dict): |
| 1654 | + attack_technique = first_result["properties"].get("attack_technique", "unknown") |
| 1655 | + risk_sub_type = first_result["properties"].get("risk_sub_type", None) |
| 1656 | + |
| 1657 | + # Redact user messages in the sample input |
| 1658 | + if "sample" in result and "input" in result["sample"]: |
| 1659 | + sample_input = result["sample"]["input"] |
| 1660 | + |
| 1661 | + if isinstance(sample_input, list): |
| 1662 | + for message in sample_input: |
| 1663 | + if isinstance(message, dict) and message.get("role") == "user": |
| 1664 | + message["content"] = self._get_redacted_input_message( |
| 1665 | + risk_category, attack_technique, risk_sub_type |
| 1666 | + ) |
| 1667 | + |
| 1668 | + return redacted_results |
| 1669 | + |
| 1670 | + def _should_redact_for_risk_category(self, risk_category: str) -> bool: |
| 1671 | + """ |
| 1672 | + Determines if user messages should be redacted for the given risk category. |
| 1673 | +
|
| 1674 | + Args: |
| 1675 | + risk_category: The risk category name to check |
| 1676 | +
|
| 1677 | + Returns: |
| 1678 | + True if the risk category requires redaction, False otherwise |
| 1679 | + """ |
| 1680 | + redaction_required_categories = { |
| 1681 | + EvaluationMetrics.PROHIBITED_ACTIONS, |
| 1682 | + EvaluationMetrics.TASK_ADHERENCE, |
| 1683 | + EvaluationMetrics.SENSITIVE_DATA_LEAKAGE, |
| 1684 | + } |
| 1685 | + |
| 1686 | + return risk_category in redaction_required_categories |
| 1687 | + |
| 1688 | + def _get_redacted_input_message(self, risk_category: str, attack_technique: str, risk_sub_type: str = None) -> str: |
| 1689 | + """ |
| 1690 | + Generates a redacted message for adversarial prompts based on risk category and attack technique. |
| 1691 | +
|
| 1692 | + Args: |
| 1693 | + risk_category: The risk category of the adversarial prompt |
| 1694 | + attack_technique: The attack technique used |
| 1695 | + risk_sub_type: Optional sub-type of the risk category |
| 1696 | +
|
| 1697 | + Returns: |
| 1698 | + A redacted message string |
| 1699 | + """ |
| 1700 | + # Convert snake_case to Title Case for readability |
| 1701 | + risk_category_readable = risk_category.replace("_", " ").replace("-", " ").title() |
| 1702 | + attack_technique_readable = attack_technique.replace("_", " ").replace("-", " ").title() |
| 1703 | + |
| 1704 | + if risk_sub_type: |
| 1705 | + risk_sub_type_readable = risk_sub_type.replace("_", " ").replace("-", " ").title() |
| 1706 | + return f"[Redacted adversarial prompt probing for {risk_category_readable} with {risk_sub_type_readable} using {attack_technique_readable} attack strategy.]" |
| 1707 | + else: |
| 1708 | + return f"[Redacted adversarial prompt probing for {risk_category_readable} using {attack_technique_readable} attack strategy.]" |
0 commit comments