diff --git a/dingo/model/rule/rule_common.py b/dingo/model/rule/rule_common.py index 2a415802..93fe610f 100644 --- a/dingo/model/rule/rule_common.py +++ b/dingo/model/rule/rule_common.py @@ -2313,6 +2313,251 @@ def eval(cls, input_data: Data) -> EvalDetail: return res +@Model.rule_register("QUALITY_BAD_SECURITY", ["default", "pretrain", "benchmark"]) +class RulePIIDetection(BaseRule): + """检测文本中的个人身份信息(PII)- 基于 NIST SP 800-122 和中国《个人信息保护法》""" + + # Metadata for documentation generation + _metric_info = { + "category": "Rule-Based TEXT Quality Metrics", + "quality_dimension": "SECURITY", + "metric_name": "RulePIIDetection", + "description": "Detects Personal Identifiable Information (PII) including ID cards, phone numbers, emails, and credit cards", + "standard": "NIST SP 800-122, China Personal Information Protection Law", + "reference_url": "https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-122.pdf", + "evaluation_results": "" + } + + # PII 检测模式配置(按严重程度排序) + PII_PATTERNS = { + # 1. 中国身份证号(18位)- 高风险 + "cn_id_card": { + "pattern": r"\b[1-9]\d{5}(18|19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[0-9Xx]\b", + "description": "Chinese ID Card", + "description_zh": "中国身份证号", + "severity": "high" + }, + + # 2. 信用卡号(13-19位,支持分隔符)- 高风险 + "credit_card": { + "pattern": r"\b\d{4}(?:[-\s]?\d{4}){2}[-\s]?\d{1,7}\b", + "description": "Credit Card Number", + "description_zh": "信用卡号", + "severity": "high", + "validator": "_validate_luhn" + }, + + # 3. 中国手机号(11位)- 中风险 + "cn_phone": { + "pattern": r"\b1[3-9]\d{9}\b", + "description": "Chinese Mobile Phone", + "description_zh": "中国手机号", + "severity": "medium" + }, + + # 4. 电子邮件 - 中风险 + "email": { + "pattern": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b", + "description": "Email Address", + "description_zh": "电子邮件", + "severity": "medium" + }, + + # 5. 美国社会安全号(SSN)- 高风险 + "ssn": { + "pattern": r"\b\d{3}-\d{2}-\d{4}\b", + "description": "US Social Security Number", + "description_zh": "美国社会安全号", + "severity": "high" + }, + + # 6. 中国护照号(E/G/P开头+8位数字)- 高风险 + "cn_passport": { + "pattern": r"\b[EGP]\d{8}\b", + "description": "Chinese Passport Number", + "description_zh": "中国护照号", + "severity": "high" + }, + + # 7. IP 地址(IPv4)- 低风险 + "ip_address": { + "pattern": r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b", + "description": "IP Address", + "description_zh": "IP地址", + "severity": "low", + "validator": "_validate_ip" + } + } + + @classmethod + def _validate_luhn(cls, number: str) -> bool: + """Luhn 算法验证信用卡号""" + # 移除空格和连字符 + digits = [int(d) for d in number if d.isdigit()] + + if len(digits) < 13 or len(digits) > 19: + return False + + checksum = 0 + reverse_digits = digits[::-1] + + for i, digit in enumerate(reverse_digits): + if i % 2 == 1: + digit *= 2 + if digit > 9: + digit -= 9 + checksum += digit + + return checksum % 10 == 0 + + @classmethod + def _validate_ip(cls, ip: str) -> bool: + """验证 IP 地址合法性""" + parts = ip.split('.') + if len(parts) != 4: + return False + + try: + for part in parts: + num = int(part) + if num < 0 or num > 255: + return False + return True + except ValueError: + return False + + @classmethod + def _mask_email(cls, value: str) -> str: + """邮箱脱敏:保留用户名首字母和域名""" + if "@" in value: + username, domain = value.split("@", 1) + if len(username) <= 2: + masked_username = "*" * len(username) + else: + masked_username = username[0] + "*" * (len(username) - 1) + return f"{masked_username}@{domain}" + return cls._mask_default(value) + + @classmethod + def _mask_cn_phone(cls, value: str) -> str: + """手机号脱敏:保留前3位和后4位""" + if len(value) == 11: + return value[:3] + "****" + value[-4:] + return cls._mask_default(value) + + @classmethod + def _mask_cn_id_card(cls, value: str) -> str: + """身份证脱敏:保留前6位和后4位""" + if len(value) == 18: + return value[:6] + "********" + value[-4:] + return cls._mask_default(value) + + @classmethod + def _mask_credit_card(cls, value: str) -> str: + """信用卡脱敏:只保留后4位""" + digits = ''.join(c for c in value if c.isdigit()) + if len(digits) >= 4: + return "*" * (len(digits) - 4) + digits[-4:] + return "*" * len(digits) + + @classmethod + def _mask_ip_address(cls, value: str) -> str: + """IP地址脱敏:保留第一段和最后一段""" + parts = value.split('.') + if len(parts) == 4: + return f"{parts[0]}.***.***.{parts[3]}" + return cls._mask_default(value) + + @classmethod + def _mask_default(cls, value: str) -> str: + """默认脱敏策略:保留前3位和后4位""" + if len(value) <= 7: + return "*" * len(value) + return value[:3] + "*" * (len(value) - 7) + value[-4:] + + @classmethod + def _mask_pii(cls, value: str, pii_type: str) -> str: + """ + 脱敏处理:根据不同类型的 PII 采用不同的脱敏策略 + + Args: + value: 原始 PII 值 + pii_type: PII 类型 + + Returns: + 脱敏后的值 + """ + # 使用字典分发策略 + strategies = { + "email": cls._mask_email, + "cn_phone": cls._mask_cn_phone, + "cn_id_card": cls._mask_cn_id_card, + "credit_card": cls._mask_credit_card, + "ip_address": cls._mask_ip_address, + } + + mask_func = strategies.get(pii_type, cls._mask_default) + return mask_func(value) + + @classmethod + def eval(cls, input_data: Data) -> EvalDetail: + res = EvalDetail(metric=cls.__name__) + content = input_data.content + + detected_pii = [] + + # 遍历所有 PII 模式进行检测 + for pii_type, config in cls.PII_PATTERNS.items(): + pattern = config["pattern"] + matches = re.findall(pattern, content) + + for match in matches: + # 如果有自定义验证器,进行额外验证 + if "validator" in config: + validator_method = getattr(cls, config["validator"], None) + if validator_method and not validator_method(match): + continue # 验证失败,跳过 + + # 脱敏处理 + masked_value = cls._mask_pii(match, pii_type) + + detected_pii.append({ + "type": pii_type, + "value": masked_value, + "description": config.get("description_zh", config["description"]), + "severity": config["severity"] + }) + + # 如果检测到 PII,标记为 QUALITY_BAD + if detected_pii: + res.status = True + res.label = [f"{cls.metric_type}.{cls.__name__}"] + + # 使用 defaultdict 按严重程度分组(一次遍历) + from collections import defaultdict + pii_by_severity = defaultdict(list) + for item in detected_pii: + pii_by_severity[item["severity"]].append(item) + + # 构建详细原因 + reasons = [] + severity_labels = {"high": "High Risk PII", "medium": "Medium Risk PII", "low": "Low Risk PII"} + + for severity in ["high", "medium", "low"]: + if severity in pii_by_severity: + items = ', '.join([ + "{desc}({val})".format(desc=item["description"], val=item["value"]) + for item in pii_by_severity[severity] + ]) + reasons.append(f"{severity_labels[severity]}: {items}") + + res.reason = reasons + else: + res.label = [QualityLabel.QUALITY_GOOD] + + return res + + if __name__ == "__main__": data = Data(data_id="", prompt="", content="\n \n \n \n hello \n \n ") tmp = RuleEnterAndSpace().eval(data) diff --git a/docs/PII_DETECTION_IMPLEMENTATION.md b/docs/PII_DETECTION_IMPLEMENTATION.md new file mode 100644 index 00000000..fd8578aa --- /dev/null +++ b/docs/PII_DETECTION_IMPLEMENTATION.md @@ -0,0 +1,220 @@ +# PII 检测规则实现文档 + +## 📊 实现概览 + +已在 `dingo/model/rule/rule_common.py` 中实现 PII(个人身份信息)检测规则 `RulePIIDetection`。 + +--- + +## ✅ 实现完成情况 + +| 项目 | 状态 | 说明 | +|------|------|------| +| **规则实现** | ✅ 完成 | `RulePIIDetection` 类 | +| **标准依据** | ✅ 完成 | NIST SP 800-122 + 中国《个人信息保护法》| +| **脱敏处理** | ✅ 完成 | 自动脱敏检测到的 PII | +| **严重等级** | ✅ 完成 | high/medium/low 三级分类 | + +--- + +## 🎯 支持的 PII 类型 + +### 1. **高风险 PII** 🔴 + +| PII 类型 | 正则模式 | 额外验证 | 示例 | +|---------|---------|---------|------| +| **中国身份证号** | 18位格式验证 | ❌ | 110101199001011234 | +| **信用卡号** | 13-19位,支持分隔符 | ✅ Luhn算法 | 4532 1488 0343 6464 | +| **美国SSN** | XXX-XX-XXXX格式 | ❌ | 123-45-6789 | +| **中国护照号** | E/G/P开头+8位数字 | ❌ | E12345678 | + +### 2. **中风险 PII** 🟡 + +| PII 类型 | 正则模式 | 额外验证 | 示例 | +|---------|---------|---------|------| +| **中国手机号** | 1[3-9]开头11位 | ❌ | 13812345678 | +| **电子邮件** | 标准邮箱格式 | ❌ | user@example.com | + +### 3. **低风险 PII** 🟢 + +| PII 类型 | 正则模式 | 额外验证 | 示例 | +|---------|---------|---------|------| +| **IP地址** | IPv4格式 | ✅ 范围验证 | 192.168.1.100 | + +--- + +## 🛡️ 脱敏策略 + +### 脱敏规则 + +```python +# 身份证号:保留前6位和后4位 +110101199001011234 → 110101********1234 + +# 手机号:保留前3位和后4位 +13812345678 → 138****5678 + +# 邮箱:保留用户名首字母和域名 +user@example.com → u***@example.com + +# 信用卡:只保留后4位 +4532148803436464 → ************6464 + +# IP地址:保留第一段和最后一段 +192.168.1.100 → 192.***.***.100 +``` + +--- + +## 🔍 验证算法 + +### 1. **Luhn 算法(信用卡验证)** + +用于验证信用卡号的合法性,防止误报。 + +```python +def _validate_luhn(cls, number: str) -> bool: + """Luhn 算法验证信用卡号""" + digits = [int(d) for d in number if d.isdigit()] + + if len(digits) < 13 or len(digits) > 19: + return False + + checksum = 0 + reverse_digits = digits[::-1] + + for i, digit in enumerate(reverse_digits): + if i % 2 == 1: + digit *= 2 + if digit > 9: + digit -= 9 + checksum += digit + + return checksum % 10 == 0 +``` + +**优势**: +- ✅ 过滤掉无效的卡号组合 +- ✅ 减少误报率 +- ✅ 支持带空格和连字符的格式 + +### 2. **IP 地址验证** + +验证 IP 地址每段数字是否在 0-255 范围内。 + +```python +def _validate_ip(cls, ip: str) -> bool: + """验证 IP 地址合法性""" + parts = ip.split('.') + if len(parts) != 4: + return False + + try: + for part in parts: + num = int(part) + if num < 0 or num > 255: + return False + return True + except ValueError: + return False +``` + +--- + +## 📝 使用示例 + +### 基础使用 + +```python +from dingo.io import Data +from dingo.model.rule.rule_common import RulePIIDetection + +# 创建测试数据 +data = Data( + data_id="1", + content="张三,身份证 110101199001011234,手机 13812345678" +) + +# 执行检测 +result = RulePIIDetection.eval(data) + +# 查看结果 +print(f"检测状态: {result.status}") # True(检测到PII) +print(f"标签: {result.label}") # ['QUALITY_BAD_SECURITY.RulePIIDetection'] +print(f"原因: {result.reason}") +# ['High Risk PII: 中国身份证号(110101********1234)', +# 'Medium Risk PII: 中国手机号(138****5678)'] +``` + +### 集成到评测流程 + +```python +from dingo.config import InputArgs +from dingo.exec import Executor + +input_data = { + "task_name": "pii_detection", + "input_path": "data.jsonl", + "output_path": "outputs/", + "evaluator": [ + { + "fields": { + "content": "text" + }, + "evals": [ + { + "name": "RulePIIDetection" + } + ] + } + ] +} + +input_args = InputArgs(**input_data) +executor = Executor.exec_map["local"](input_args) +summary = executor.execute() +``` + +--- + +## 📚 标准依据 + +### 1. **NIST SP 800-122** ⭐ +**美国国家标准与技术研究院 - PII 保护指南** + +- **文档**: https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-122.pdf +- **适用**: 通用PII识别和分类 +- **分类**: 直接标识符、间接标识符、敏感PII + + +### 2. **GDPR(参考)** +- **文档**: https://gdpr-info.eu/art-4-gdpr/ +- **适用**: 欧盟业务 +- **特点**: 最严格的数据保护标准 + +--- + +## 📊 输出格式 + +### EvalDetail 结构 + +```python +EvalDetail( + metric="RulePIIDetection", + status=True, # True表示检测到PII + label=["QUALITY_BAD_SECURITY.RulePIIDetection"], + reason=[ + "High Risk PII: 中国身份证号(110101********1234), 信用卡号(************6464)", + "Medium Risk PII: 中国手机号(138****5678), 电子邮件(u***@example.com)", + "Low Risk PII: IP地址(192.***.***.100)" + ] +) +``` + + + +## 📖 相关文档 + +- [NIST SP 800-122: Guide to Protecting PII](https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-122.pdf) +- [GDPR Article 4](https://gdpr-info.eu/art-4-gdpr/) +- [Microsoft Presidio](https://github.com/microsoft/presidio) diff --git a/docs/assets/architeture.png b/docs/assets/architeture.png index d1399fa3..1b7bdf08 100644 Binary files a/docs/assets/architeture.png and b/docs/assets/architeture.png differ diff --git a/docs/metrics.md b/docs/metrics.md index ed811264..1718888f 100644 --- a/docs/metrics.md +++ b/docs/metrics.md @@ -54,10 +54,10 @@ This document provides comprehensive information about all quality metrics used | Type | Metric | Description | Paper Source | Evaluation Results | |------|--------|-------------|--------------|-------------------| | `QUALITY_BAD_COMPLETENESS` | RuleLineEndWithEllipsis, RuleLineEndWithTerminal, RuleSentenceNumber, RuleWordNumber | Checks whether the ratio of lines ending with ellipsis is below threshold; Checks whether the ratio of lines ending w... | [RedPajama: an Open Dataset for Training Large Language Models](https://github.com/togethercomputer/RedPajama-Data) (Together Computer, 2023) | [📊 See Results](eval/rule/slimpajama_data_evaluated_by_rule.md) | -| `QUALITY_BAD_EFFECTIVENESS` | RuleDoi, RuleIsbn, RuleAbnormalChar, RuleAbnormalHtml, RuleAlphaWords, RuleAudioDataFormat, RuleCharNumber, RuleColonEnd, RuleContentNull, RuleContentShort, RuleContentShortMultiLan, RuleEnterAndSpace, RuleEnterMore, RuleEnterRatioMore, RuleHtmlEntity, RuleHtmlTag, RuleInvisibleChar, RuleImageDataFormat, RuleLatexSpecialChar, RuleLineJavascriptCount, RuleLoremIpsum, RuleMeanWordLength, RuleNlpDataFormat, RuleSftDataFormat, RuleSpaceMore, RuleSpecialCharacter, RuleStopWord, RuleSymbolWordRatio, RuleVedioDataFormat, RuleOnlyUrl | Check whether the string is in the correct format of the doi; Check whether the string is in the correct format of th... | Internal Implementation | N/A | +| `QUALITY_BAD_EFFECTIVENESS` | RuleAbnormalChar, RuleAbnormalHtml, RuleAlphaWords, RuleAudioDataFormat, RuleCharNumber, RuleColonEnd, RuleContentNull, RuleContentShort, RuleContentShortMultiLan, RuleEnterAndSpace, RuleEnterMore, RuleEnterRatioMore, RuleHtmlEntity, RuleHtmlTag, RuleInvisibleChar, RuleImageDataFormat, RuleLatexSpecialChar, RuleLineJavascriptCount, RuleLoremIpsum, RuleMeanWordLength, RuleNlpDataFormat, RuleSftDataFormat, RuleSpaceMore, RuleSpecialCharacter, RuleStopWord, RuleSymbolWordRatio, RuleVedioDataFormat, RuleOnlyUrl, RuleDoi, RuleIsbn | Detects garbled text and anti-crawling characters by combining special character and invisible character detection; D... | [RedPajama: an Open Dataset for Training Large Language Models](https://github.com/togethercomputer/RedPajama-Data) (Together Computer, 2023) | [📊 See Results](eval/rule/slimpajama_data_evaluated_by_rule.md) | | `QUALITY_BAD_FLUENCY` | RuleAbnormalNumber, RuleCharSplit, RuleNoPunc, RuleWordSplit, RuleWordStuck | Checks PDF content for abnormal book page or index numbers that disrupt text flow; Checks PDF content for abnormal ch... | [RedPajama: an Open Dataset for Training Large Language Models](https://github.com/togethercomputer/RedPajama-Data) (Together Computer, 2023) | [📊 See Results](eval/rule/slimpajama_data_evaluated_by_rule.md) | | `QUALITY_BAD_RELEVANCE` | RuleHeadWordAr, RuleHeadWordCs, RuleHeadWordHu, RuleHeadWordKo, RuleHeadWordRu, RuleHeadWordSr, RuleHeadWordTh, RuleHeadWordVi, RulePatternSearch, RuleWatermark | Checks whether Arabic content contains irrelevant tail source information; Checks whether Czech content contains irre... | [RedPajama: an Open Dataset for Training Large Language Models](https://github.com/togethercomputer/RedPajama-Data) (Together Computer, 2023) | [📊 See Results](eval/rule/slimpajama_data_evaluated_by_rule.md) | -| `QUALITY_BAD_SECURITY` | RuleIDCard, RuleUnsafeWords | Checks whether content contains ID card information; Checks whether content contains unsafe words | [RedPajama: an Open Dataset for Training Large Language Models](https://github.com/togethercomputer/RedPajama-Data) (Together Computer, 2023) | [📊 See Results](eval/rule/slimpajama_data_evaluated_by_rule.md) | +| `QUALITY_BAD_SECURITY` | RuleIDCard, RuleUnsafeWords, RulePIIDetection | Checks whether content contains ID card information; Checks whether content contains unsafe words; Detects Personal I... | [RedPajama: an Open Dataset for Training Large Language Models](https://github.com/togethercomputer/RedPajama-Data) (Together Computer, 2023) | [📊 See Results](eval/rule/slimpajama_data_evaluated_by_rule.md) | | `QUALITY_BAD_SIMILARITY` | RuleDocRepeat, RuleDocFormulaRepeat | Evaluates text for consecutive repeated content and multiple occurrences of special characters; Evaluates text for co... | [RedPajama: an Open Dataset for Training Large Language Models](https://github.com/togethercomputer/RedPajama-Data) (Together Computer, 2023) | [📊 See Results](eval/rule/slimpajama_data_evaluated_by_rule.md) | | `QUALITY_BAD_UNDERSTANDABILITY` | RuleCapitalWords, RuleCurlyBracket, RuleLineStartWithBulletpoint, RuleUniqueWords | Checks whether the ratio of capital words is above threshold, indicating poor readability; Checks whether the ratio o... | [RedPajama: an Open Dataset for Training Large Language Models](https://github.com/togethercomputer/RedPajama-Data) (Together Computer, 2023) | [📊 See Results](eval/rule/slimpajama_data_evaluated_by_rule.md) | diff --git a/test/scripts/model/rule/test_rule_common.py b/test/scripts/model/rule/test_rule_common.py index e872672e..bae3279d 100644 --- a/test/scripts/model/rule/test_rule_common.py +++ b/test/scripts/model/rule/test_rule_common.py @@ -1,6 +1,6 @@ from dingo.io import Data -from dingo.io.output.eval_detail import EvalDetail -from dingo.model.rule.rule_common import RuleDocFormulaRepeat, RuleUnsafeWords +from dingo.io.output.eval_detail import QualityLabel +from dingo.model.rule.rule_common import RuleDocFormulaRepeat, RulePIIDetection, RuleUnsafeWords class TestRuleDocFormulaRepeat: @@ -22,3 +22,180 @@ def test_rule_unsafe_words(self): assert 'av' not in tmp.reason assert 'b' not in tmp.reason assert 'java' in tmp.reason + + +class TestRulePIIDetection: + """PII 检测规则测试""" + + def test_no_pii_content(self): + """测试不包含 PII 的正常内容""" + data = Data(data_id="1", content="这是一段普通的文本,没有任何敏感信息。") + res = RulePIIDetection.eval(data) + assert res.status is False + assert res.label == [QualityLabel.QUALITY_GOOD] + assert res.metric == "RulePIIDetection" + + def test_chinese_id_card(self): + """测试中国身份证号检测""" + data = Data(data_id="2", content="我的身份证号是 110101199001011234。") + res = RulePIIDetection.eval(data) + assert res.status is True + assert res.label == ["QUALITY_BAD_SECURITY.RulePIIDetection"] + assert res.metric == "RulePIIDetection" + assert res.reason is not None + assert len(res.reason) > 0 + # 验证已脱敏 + assert "110101********1234" in str(res.reason) or "***" in str(res.reason) + + def test_chinese_phone(self): + """测试中国手机号检测""" + data = Data(data_id="3", content="请联系我:13812345678") + res = RulePIIDetection.eval(data) + assert res.status is True + assert res.label == ["QUALITY_BAD_SECURITY.RulePIIDetection"] + assert "138****5678" in str(res.reason) + + def test_email_address(self): + """测试电子邮件检测""" + data = Data(data_id="4", content="我的邮箱是 user@example.com") + res = RulePIIDetection.eval(data) + assert res.status is True + assert res.label == ["QUALITY_BAD_SECURITY.RulePIIDetection"] + assert "@example.com" in str(res.reason) + + def test_credit_card_valid(self): + """测试有效信用卡号检测(通过 Luhn 验证)- 16位""" + # 4532148803436464 是一个通过 Luhn 验证的测试卡号 + data = Data(data_id="5", content="信用卡号:4532 1488 0343 6464") + res = RulePIIDetection.eval(data) + assert res.status is True + assert res.label == ["QUALITY_BAD_SECURITY.RulePIIDetection"] + assert "6464" in str(res.reason) + + def test_credit_card_15_digits(self): + """测试15位信用卡号检测(Amex)""" + # 378282246310005 是一个有效的15位 Amex 测试卡号 + data = Data(data_id="5b", content="Card: 378282246310005") + res = RulePIIDetection.eval(data) + assert res.status is True + assert res.label == ["QUALITY_BAD_SECURITY.RulePIIDetection"] + assert "0005" in str(res.reason) + + def test_credit_card_invalid_luhn(self): + """测试无效信用卡号(不通过 Luhn 验证)""" + data = Data(data_id="6", content="卡号:1234 5678 9012 3456") + res = RulePIIDetection.eval(data) + # 不通过 Luhn 验证,应该不被检测为 PII + assert res.status is False + assert res.label == [QualityLabel.QUALITY_GOOD] + + def test_us_ssn(self): + """测试美国社会安全号检测""" + data = Data(data_id="7", content="SSN: 123-45-6789") + res = RulePIIDetection.eval(data) + assert res.status is True + assert res.label == ["QUALITY_BAD_SECURITY.RulePIIDetection"] + + def test_chinese_passport(self): + """测试中国护照号检测""" + data = Data(data_id="8", content="护照号码:E12345678") + res = RulePIIDetection.eval(data) + assert res.status is True + assert res.label == ["QUALITY_BAD_SECURITY.RulePIIDetection"] + + def test_ip_address_valid(self): + """测试有效 IP 地址检测""" + data = Data(data_id="9", content="服务器 IP:192.168.1.100") + res = RulePIIDetection.eval(data) + assert res.status is True + assert res.label == ["QUALITY_BAD_SECURITY.RulePIIDetection"] + # IP 是低风险,应该在 reason 中 + assert "192" in str(res.reason) + + def test_ip_address_invalid(self): + """测试无效 IP 地址(不应检测)""" + data = Data(data_id="10", content="IP: 300.400.500.600") + res = RulePIIDetection.eval(data) + # 无效 IP 不应被检测 + assert res.status is False + assert res.label == [QualityLabel.QUALITY_GOOD] + + def test_multiple_pii_types(self): + """测试混合多种 PII 类型""" + data = Data( + data_id="11", + content="张三,身份证 110101199001011234,手机 13812345678,邮箱 zhangsan@qq.com" + ) + res = RulePIIDetection.eval(data) + assert res.status is True + assert res.label == ["QUALITY_BAD_SECURITY.RulePIIDetection"] + # 应该检测到多种 PII + assert res.reason is not None + assert len(res.reason) > 0 + # 验证包含高风险和中风险 + reason_str = str(res.reason) + assert "High Risk" in reason_str or "Medium Risk" in reason_str + + def test_pii_masking_id_card(self): + """测试身份证号脱敏""" + masked = RulePIIDetection._mask_pii("110101199001011234", "cn_id_card") + assert masked == "110101********1234" + assert "199001011234" not in masked # 确保中间部分被隐藏 + + def test_pii_masking_phone(self): + """测试手机号脱敏""" + masked = RulePIIDetection._mask_pii("13812345678", "cn_phone") + assert masked == "138****5678" + assert "1234" not in masked # 确保中间部分被隐藏 + + def test_pii_masking_email(self): + """测试邮箱脱敏""" + masked = RulePIIDetection._mask_pii("user@example.com", "email") + assert "@example.com" in masked + assert "user" not in masked or masked.startswith("u") + + def test_pii_masking_credit_card(self): + """测试信用卡号脱敏""" + masked = RulePIIDetection._mask_pii("4532148803436464", "credit_card") + assert masked.endswith("6464") + assert "4532148803436464" not in masked # 确保不显示完整卡号 + + def test_luhn_validation_valid(self): + """测试 Luhn 算法验证 - 有效卡号""" + assert RulePIIDetection._validate_luhn("4532148803436464") is True + + def test_luhn_validation_invalid(self): + """测试 Luhn 算法验证 - 无效卡号""" + assert RulePIIDetection._validate_luhn("1234567890123456") is False + + def test_luhn_validation_with_spaces(self): + """测试 Luhn 算法验证 - 带空格的卡号""" + assert RulePIIDetection._validate_luhn("4532 1488 0343 6464") is True + + def test_ip_validation_valid(self): + """测试 IP 地址验证 - 有效 IP""" + assert RulePIIDetection._validate_ip("192.168.1.1") is True + assert RulePIIDetection._validate_ip("10.0.0.1") is True + + def test_ip_validation_invalid(self): + """测试 IP 地址验证 - 无效 IP""" + assert RulePIIDetection._validate_ip("300.400.500.600") is False + assert RulePIIDetection._validate_ip("256.1.1.1") is False + assert RulePIIDetection._validate_ip("1.1.1") is False + + def test_severity_levels(self): + """测试不同严重等级的 PII""" + # 高风险:身份证 + data_high = Data(data_id="12", content="身份证:110101199001011234") + res_high = RulePIIDetection.eval(data_high) + assert "High Risk" in str(res_high.reason) + + # 中风险:手机号 + data_medium = Data(data_id="13", content="手机:13812345678") + res_medium = RulePIIDetection.eval(data_medium) + assert "Medium Risk" in str(res_medium.reason) + + # 低风险:IP + data_low = Data(data_id="14", content="IP:192.168.1.1") + res_low = RulePIIDetection.eval(data_low) + assert "Low Risk" in str(res_low.reason)