@@ -2313,6 +2313,251 @@ def eval(cls, input_data: Data) -> EvalDetail:
23132313 return res
23142314
23152315
2316+ @Model .rule_register ("QUALITY_BAD_SECURITY" , ["default" , "pretrain" , "benchmark" ])
2317+ class RulePIIDetection (BaseRule ):
2318+ """检测文本中的个人身份信息(PII)- 基于 NIST SP 800-122 和中国《个人信息保护法》"""
2319+
2320+ # Metadata for documentation generation
2321+ _metric_info = {
2322+ "category" : "Rule-Based TEXT Quality Metrics" ,
2323+ "quality_dimension" : "SECURITY" ,
2324+ "metric_name" : "RulePIIDetection" ,
2325+ "description" : "Detects Personal Identifiable Information (PII) including ID cards, phone numbers, emails, and credit cards" ,
2326+ "standard" : "NIST SP 800-122, China Personal Information Protection Law" ,
2327+ "reference_url" : "https://nvlpubs.nist.gov/nistpubs/Legacy/SP/nistspecialpublication800-122.pdf" ,
2328+ "evaluation_results" : ""
2329+ }
2330+
2331+ # PII 检测模式配置(按严重程度排序)
2332+ PII_PATTERNS = {
2333+ # 1. 中国身份证号(18位)- 高风险
2334+ "cn_id_card" : {
2335+ "pattern" : r"\b[1-9]\d{5}(18|19|20)\d{2}(0[1-9]|1[0-2])(0[1-9]|[12]\d|3[01])\d{3}[0-9Xx]\b" ,
2336+ "description" : "Chinese ID Card" ,
2337+ "description_zh" : "中国身份证号" ,
2338+ "severity" : "high"
2339+ },
2340+
2341+ # 2. 信用卡号(13-19位,支持分隔符)- 高风险
2342+ "credit_card" : {
2343+ "pattern" : r"\b\d{4}(?:[-\s]?\d{4}){2}[-\s]?\d{1,7}\b" ,
2344+ "description" : "Credit Card Number" ,
2345+ "description_zh" : "信用卡号" ,
2346+ "severity" : "high" ,
2347+ "validator" : "_validate_luhn"
2348+ },
2349+
2350+ # 3. 中国手机号(11位)- 中风险
2351+ "cn_phone" : {
2352+ "pattern" : r"\b1[3-9]\d{9}\b" ,
2353+ "description" : "Chinese Mobile Phone" ,
2354+ "description_zh" : "中国手机号" ,
2355+ "severity" : "medium"
2356+ },
2357+
2358+ # 4. 电子邮件 - 中风险
2359+ "email" : {
2360+ "pattern" : r"\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b" ,
2361+ "description" : "Email Address" ,
2362+ "description_zh" : "电子邮件" ,
2363+ "severity" : "medium"
2364+ },
2365+
2366+ # 5. 美国社会安全号(SSN)- 高风险
2367+ "ssn" : {
2368+ "pattern" : r"\b\d{3}-\d{2}-\d{4}\b" ,
2369+ "description" : "US Social Security Number" ,
2370+ "description_zh" : "美国社会安全号" ,
2371+ "severity" : "high"
2372+ },
2373+
2374+ # 6. 中国护照号(E/G/P开头+8位数字)- 高风险
2375+ "cn_passport" : {
2376+ "pattern" : r"\b[EGP]\d{8}\b" ,
2377+ "description" : "Chinese Passport Number" ,
2378+ "description_zh" : "中国护照号" ,
2379+ "severity" : "high"
2380+ },
2381+
2382+ # 7. IP 地址(IPv4)- 低风险
2383+ "ip_address" : {
2384+ "pattern" : r"\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b" ,
2385+ "description" : "IP Address" ,
2386+ "description_zh" : "IP地址" ,
2387+ "severity" : "low" ,
2388+ "validator" : "_validate_ip"
2389+ }
2390+ }
2391+
2392+ @classmethod
2393+ def _validate_luhn (cls , number : str ) -> bool :
2394+ """Luhn 算法验证信用卡号"""
2395+ # 移除空格和连字符
2396+ digits = [int (d ) for d in number if d .isdigit ()]
2397+
2398+ if len (digits ) < 13 or len (digits ) > 19 :
2399+ return False
2400+
2401+ checksum = 0
2402+ reverse_digits = digits [::- 1 ]
2403+
2404+ for i , digit in enumerate (reverse_digits ):
2405+ if i % 2 == 1 :
2406+ digit *= 2
2407+ if digit > 9 :
2408+ digit -= 9
2409+ checksum += digit
2410+
2411+ return checksum % 10 == 0
2412+
2413+ @classmethod
2414+ def _validate_ip (cls , ip : str ) -> bool :
2415+ """验证 IP 地址合法性"""
2416+ parts = ip .split ('.' )
2417+ if len (parts ) != 4 :
2418+ return False
2419+
2420+ try :
2421+ for part in parts :
2422+ num = int (part )
2423+ if num < 0 or num > 255 :
2424+ return False
2425+ return True
2426+ except ValueError :
2427+ return False
2428+
2429+ @classmethod
2430+ def _mask_email (cls , value : str ) -> str :
2431+ """邮箱脱敏:保留用户名首字母和域名"""
2432+ if "@" in value :
2433+ username , domain = value .split ("@" , 1 )
2434+ if len (username ) <= 2 :
2435+ masked_username = "*" * len (username )
2436+ else :
2437+ masked_username = username [0 ] + "*" * (len (username ) - 1 )
2438+ return f"{ masked_username } @{ domain } "
2439+ return cls ._mask_default (value )
2440+
2441+ @classmethod
2442+ def _mask_cn_phone (cls , value : str ) -> str :
2443+ """手机号脱敏:保留前3位和后4位"""
2444+ if len (value ) == 11 :
2445+ return value [:3 ] + "****" + value [- 4 :]
2446+ return cls ._mask_default (value )
2447+
2448+ @classmethod
2449+ def _mask_cn_id_card (cls , value : str ) -> str :
2450+ """身份证脱敏:保留前6位和后4位"""
2451+ if len (value ) == 18 :
2452+ return value [:6 ] + "********" + value [- 4 :]
2453+ return cls ._mask_default (value )
2454+
2455+ @classmethod
2456+ def _mask_credit_card (cls , value : str ) -> str :
2457+ """信用卡脱敏:只保留后4位"""
2458+ digits = '' .join (c for c in value if c .isdigit ())
2459+ if len (digits ) >= 4 :
2460+ return "*" * (len (digits ) - 4 ) + digits [- 4 :]
2461+ return "*" * len (digits )
2462+
2463+ @classmethod
2464+ def _mask_ip_address (cls , value : str ) -> str :
2465+ """IP地址脱敏:保留第一段和最后一段"""
2466+ parts = value .split ('.' )
2467+ if len (parts ) == 4 :
2468+ return f"{ parts [0 ]} .***.***.{ parts [3 ]} "
2469+ return cls ._mask_default (value )
2470+
2471+ @classmethod
2472+ def _mask_default (cls , value : str ) -> str :
2473+ """默认脱敏策略:保留前3位和后4位"""
2474+ if len (value ) <= 7 :
2475+ return "*" * len (value )
2476+ return value [:3 ] + "*" * (len (value ) - 7 ) + value [- 4 :]
2477+
2478+ @classmethod
2479+ def _mask_pii (cls , value : str , pii_type : str ) -> str :
2480+ """
2481+ 脱敏处理:根据不同类型的 PII 采用不同的脱敏策略
2482+
2483+ Args:
2484+ value: 原始 PII 值
2485+ pii_type: PII 类型
2486+
2487+ Returns:
2488+ 脱敏后的值
2489+ """
2490+ # 使用字典分发策略
2491+ strategies = {
2492+ "email" : cls ._mask_email ,
2493+ "cn_phone" : cls ._mask_cn_phone ,
2494+ "cn_id_card" : cls ._mask_cn_id_card ,
2495+ "credit_card" : cls ._mask_credit_card ,
2496+ "ip_address" : cls ._mask_ip_address ,
2497+ }
2498+
2499+ mask_func = strategies .get (pii_type , cls ._mask_default )
2500+ return mask_func (value )
2501+
2502+ @classmethod
2503+ def eval (cls , input_data : Data ) -> EvalDetail :
2504+ res = EvalDetail (metric = cls .__name__ )
2505+ content = input_data .content
2506+
2507+ detected_pii = []
2508+
2509+ # 遍历所有 PII 模式进行检测
2510+ for pii_type , config in cls .PII_PATTERNS .items ():
2511+ pattern = config ["pattern" ]
2512+ matches = re .findall (pattern , content )
2513+
2514+ for match in matches :
2515+ # 如果有自定义验证器,进行额外验证
2516+ if "validator" in config :
2517+ validator_method = getattr (cls , config ["validator" ], None )
2518+ if validator_method and not validator_method (match ):
2519+ continue # 验证失败,跳过
2520+
2521+ # 脱敏处理
2522+ masked_value = cls ._mask_pii (match , pii_type )
2523+
2524+ detected_pii .append ({
2525+ "type" : pii_type ,
2526+ "value" : masked_value ,
2527+ "description" : config .get ("description_zh" , config ["description" ]),
2528+ "severity" : config ["severity" ]
2529+ })
2530+
2531+ # 如果检测到 PII,标记为 QUALITY_BAD
2532+ if detected_pii :
2533+ res .status = True
2534+ res .label = [f"{ cls .metric_type } .{ cls .__name__ } " ]
2535+
2536+ # 使用 defaultdict 按严重程度分组(一次遍历)
2537+ from collections import defaultdict
2538+ pii_by_severity = defaultdict (list )
2539+ for item in detected_pii :
2540+ pii_by_severity [item ["severity" ]].append (item )
2541+
2542+ # 构建详细原因
2543+ reasons = []
2544+ severity_labels = {"high" : "High Risk PII" , "medium" : "Medium Risk PII" , "low" : "Low Risk PII" }
2545+
2546+ for severity in ["high" , "medium" , "low" ]:
2547+ if severity in pii_by_severity :
2548+ items = ', ' .join ([
2549+ "{desc}({val})" .format (desc = item ["description" ], val = item ["value" ])
2550+ for item in pii_by_severity [severity ]
2551+ ])
2552+ reasons .append (f"{ severity_labels [severity ]} : { items } " )
2553+
2554+ res .reason = reasons
2555+ else :
2556+ res .label = [QualityLabel .QUALITY_GOOD ]
2557+
2558+ return res
2559+
2560+
23162561if __name__ == "__main__" :
23172562 data = Data (data_id = "" , prompt = "" , content = "\n \n \n \n hello \n \n " )
23182563 tmp = RuleEnterAndSpace ().eval (data )
0 commit comments