Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
245 changes: 245 additions & 0 deletions dingo/model/rule/rule_common.py
Original file line number Diff line number Diff line change
Expand Up @@ -2313,6 +2313,251 @@ def eval(cls, input_data: Data) -> EvalDetail:
return res


@Model.rule_register("QUALITY_BAD_SECURITY", ["default", "pretrain", "benchmark"])
class RulePIIDetection(BaseRule):
"""检测文本中的个人身份信息(PII)- 基于 NIST SP 800-122 和中国《个人信息保护法》"""

# Metadata for documentation generation
_metric_info = {
"category": "Rule-Based TEXT Quality Metrics",
"quality_dimension": "SECURITY",
"metric_name": "RulePIIDetection",
"description": "Detects Personal Identifiable Information (PII) including ID cards, phone numbers, emails, and credit cards",
"standard": "NIST SP 800-122, China Personal Information Protection Law",
"reference_url": "https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-122.pdf",
"evaluation_results": ""
}

# PII 检测模式配置(按严重程度排序)
PII_PATTERNS = {
# 1. 中国身份证号(18位)- 高风险
"cn_id_card": {
"pattern": r"\b[1-9]\d{5}(18|19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[0-9Xx]\b",
"description": "Chinese ID Card",
"description_zh": "中国身份证号",
"severity": "high"
},

# 2. 信用卡号(13-19位,支持分隔符)- 高风险
"credit_card": {
"pattern": r"\b\d{4}(?:[-\s]?\d{4}){2}[-\s]?\d{1,7}\b",
"description": "Credit Card Number",
"description_zh": "信用卡号",
"severity": "high",
"validator": "_validate_luhn"
},

# 3. 中国手机号(11位)- 中风险
"cn_phone": {
"pattern": r"\b1[3-9]\d{9}\b",
"description": "Chinese Mobile Phone",
"description_zh": "中国手机号",
"severity": "medium"
},

# 4. 电子邮件 - 中风险
"email": {
"pattern": r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b",
"description": "Email Address",
"description_zh": "电子邮件",
"severity": "medium"
},

# 5. 美国社会安全号(SSN)- 高风险
"ssn": {
"pattern": r"\b\d{3}-\d{2}-\d{4}\b",
"description": "US Social Security Number",
"description_zh": "美国社会安全号",
"severity": "high"
},

# 6. 中国护照号(E/G/P开头+8位数字)- 高风险
"cn_passport": {
"pattern": r"\b[EGP]\d{8}\b",
"description": "Chinese Passport Number",
"description_zh": "中国护照号",
"severity": "high"
},

# 7. IP 地址(IPv4)- 低风险
"ip_address": {
"pattern": r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b",
"description": "IP Address",
"description_zh": "IP地址",
"severity": "low",
"validator": "_validate_ip"
}
}

@classmethod
def _validate_luhn(cls, number: str) -> bool:
"""Luhn 算法验证信用卡号"""
# 移除空格和连字符
digits = [int(d) for d in number if d.isdigit()]

if len(digits) < 13 or len(digits) > 19:
return False

checksum = 0
reverse_digits = digits[::-1]

for i, digit in enumerate(reverse_digits):
if i % 2 == 1:
digit *= 2
if digit > 9:
digit -= 9
checksum += digit

return checksum % 10 == 0

@classmethod
def _validate_ip(cls, ip: str) -> bool:
"""验证 IP 地址合法性"""
parts = ip.split('.')
if len(parts) != 4:
return False

try:
for part in parts:
num = int(part)
if num < 0 or num > 255:
return False
return True
except ValueError:
return False

@classmethod
def _mask_email(cls, value: str) -> str:
"""邮箱脱敏:保留用户名首字母和域名"""
if "@" in value:
username, domain = value.split("@", 1)
if len(username) <= 2:
masked_username = "*" * len(username)
else:
masked_username = username[0] + "*" * (len(username) - 1)
return f"{masked_username}@{domain}"
return cls._mask_default(value)

@classmethod
def _mask_cn_phone(cls, value: str) -> str:
"""手机号脱敏:保留前3位和后4位"""
if len(value) == 11:
return value[:3] + "****" + value[-4:]
return cls._mask_default(value)

@classmethod
def _mask_cn_id_card(cls, value: str) -> str:
"""身份证脱敏:保留前6位和后4位"""
if len(value) == 18:
return value[:6] + "********" + value[-4:]
return cls._mask_default(value)

@classmethod
def _mask_credit_card(cls, value: str) -> str:
"""信用卡脱敏:只保留后4位"""
digits = ''.join(c for c in value if c.isdigit())
if len(digits) >= 4:
return "*" * (len(digits) - 4) + digits[-4:]
return "*" * len(digits)

@classmethod
def _mask_ip_address(cls, value: str) -> str:
"""IP地址脱敏:保留第一段和最后一段"""
parts = value.split('.')
if len(parts) == 4:
return f"{parts[0]}.***.***.{parts[3]}"
return cls._mask_default(value)

@classmethod
def _mask_default(cls, value: str) -> str:
"""默认脱敏策略:保留前3位和后4位"""
if len(value) <= 7:
return "*" * len(value)
return value[:3] + "*" * (len(value) - 7) + value[-4:]

@classmethod
def _mask_pii(cls, value: str, pii_type: str) -> str:
"""
脱敏处理:根据不同类型的 PII 采用不同的脱敏策略

Args:
value: 原始 PII 值
pii_type: PII 类型

Returns:
脱敏后的值
"""
# 使用字典分发策略
strategies = {
"email": cls._mask_email,
"cn_phone": cls._mask_cn_phone,
"cn_id_card": cls._mask_cn_id_card,
"credit_card": cls._mask_credit_card,
"ip_address": cls._mask_ip_address,
}

mask_func = strategies.get(pii_type, cls._mask_default)
return mask_func(value)

@classmethod
def eval(cls, input_data: Data) -> EvalDetail:
res = EvalDetail(metric=cls.__name__)
content = input_data.content

detected_pii = []

# 遍历所有 PII 模式进行检测
for pii_type, config in cls.PII_PATTERNS.items():
pattern = config["pattern"]
matches = re.findall(pattern, content)

for match in matches:
# 如果有自定义验证器,进行额外验证
if "validator" in config:
validator_method = getattr(cls, config["validator"], None)
if validator_method and not validator_method(match):
continue # 验证失败,跳过

# 脱敏处理
masked_value = cls._mask_pii(match, pii_type)

detected_pii.append({
"type": pii_type,
"value": masked_value,
"description": config.get("description_zh", config["description"]),
"severity": config["severity"]
})

# 如果检测到 PII,标记为 QUALITY_BAD
if detected_pii:
res.status = True
res.label = [f"{cls.metric_type}.{cls.__name__}"]

# 使用 defaultdict 按严重程度分组(一次遍历)
from collections import defaultdict
pii_by_severity = defaultdict(list)
for item in detected_pii:
pii_by_severity[item["severity"]].append(item)

# 构建详细原因
reasons = []
severity_labels = {"high": "High Risk PII", "medium": "Medium Risk PII", "low": "Low Risk PII"}

for severity in ["high", "medium", "low"]:
if severity in pii_by_severity:
items = ', '.join([
"{desc}({val})".format(desc=item["description"], val=item["value"])
for item in pii_by_severity[severity]
])
reasons.append(f"{severity_labels[severity]}: {items}")

res.reason = reasons
else:
res.label = [QualityLabel.QUALITY_GOOD]

return res


if __name__ == "__main__":
data = Data(data_id="", prompt="", content="\n \n \n \n hello \n \n ")
tmp = RuleEnterAndSpace().eval(data)
Loading