-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathpii_scan.py
More file actions
138 lines (117 loc) · 5.31 KB
/
pii_scan.py
File metadata and controls
138 lines (117 loc) · 5.31 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
"""PII Scan"""
import re
import logging
import spacy
from presidio_analyzer import AnalyzerEngine, RecognizerRegistry, RecognizerResult
from presidio_analyzer.predefined_recognizers import (ItDriverLicenseRecognizer,
ItVatCodeRecognizer,
ItFiscalCodeRecognizer,
ItIdentityCardRecognizer,
ItPassportRecognizer,
EsNieRecognizer,
EsNifRecognizer,
PlPeselRecognizer,
FiPersonalIdentityCodeRecognizer,
AuTfnRecognizer,
AbaRoutingRecognizer,
AuAcnRecognizer)
from presidio_anonymizer import AnonymizerEngine
import requests
# make sure en_core_web_lg is loaded correctly
# this can also be achieved with
# python -m spacy download en_core_web_lg
try:
nlp = spacy.load("en_core_web_lg")
except OSError:
from spacy.cli import download
download("en_core_web_lg")
nlp = spacy.load("en_core_web_lg")
# Configure logging to DEBUG level when needed
# logging.basicConfig(level=logging.DEBUG)
# Configure logging to INFO level when needed
# logging.basicConfig(level=logging.INFO)
# By default only critical logs will be printed
logging.basicConfig(level=logging.CRITICAL)
# Create an analyzer object
registry = RecognizerRegistry()
registry.load_predefined_recognizers()
# Add some language specific recognizers as english instead of default language
registry.add_recognizer(ItDriverLicenseRecognizer(supported_language='en'))
registry.add_recognizer(ItVatCodeRecognizer(supported_language='en'))
registry.add_recognizer(ItFiscalCodeRecognizer(supported_language='en'))
registry.add_recognizer(ItIdentityCardRecognizer(supported_language='en'))
registry.add_recognizer(ItPassportRecognizer(supported_language='en'))
registry.add_recognizer(EsNieRecognizer(supported_language='en'))
registry.add_recognizer(EsNifRecognizer(supported_language='en'))
registry.add_recognizer(PlPeselRecognizer(supported_language='en'))
registry.add_recognizer(FiPersonalIdentityCodeRecognizer(supported_language='en'))
registry.add_recognizer(AuTfnRecognizer(supported_language='en'))
registry.add_recognizer(AuAcnRecognizer(supported_language='en'))
# Add support for ABA_ROUTING_NUMBER
registry.add_recognizer(AbaRoutingRecognizer(supported_language='en'))
# Create an analyzer object
# log_decision_process=True will log the decision process for debugging
analyzer = AnalyzerEngine(registry=registry, log_decision_process=False)
anonymizer = AnonymizerEngine()
def show_aggie_pride():
"""Show Aggie Pride"""
return "Aggie Pride - Worldwide"
def anonymize_text(text: str, entity_list: list) -> str:
"""
Anonymize the text using the entity list
:param text: the text to be anonymized
:param entity_list: the list of entities to be anonymized
https://microsoft.github.io/presidio/supported_entities/
"""
# Call analyzer to get results
results = analyze_text(text=text, entity_list=entity_list)
# Analyzer results are passed to the AnonymizerEngine for anonymization
anonymized_text = anonymizer.anonymize(text=text, analyzer_results=results)
return anonymized_text.text
def anonymize_data(data: list) -> None:
"""
Anonymize the text using the entity list
:param data: the data to be anonymized
"""
for i, item in enumerate(data):
if item:
if item.startswith('#'):
print(item)
else:
print(f'ID:{i}:Original : {item}')
print(f'ID:{i}:Anonymized: {anonymize_text(item, [])}')
def analyze_text(text: str, entity_list: list, ) -> list[RecognizerResult]:
"""
Analyze the text using the entity list
:param text: the text to be analyzed
:param entity_list: the list of entities to be analyzed
https://microsoft.github.io/presidio/supported_entities/
"""
# Call analyzer to get results
results = analyzer.analyze(text=text,
entities=entity_list,
language='en',
return_decision_process=True) # return decision process details
return results
def read_data() -> list:
"""
Reads data from a secure file using a secret key stored in .env
:return: list of lines from the file
"""
# Load SECRET from .env file
with open('.env', encoding='utf-8') as f:
for line in f.readlines():
m = re.search(r'SECRET="(\w+)"', line)
if m:
secret = m.group(1)
break
else:
raise RuntimeError("SECRET not found in .env file")
# Construct the URL from the API key
url = requests.get('https://drive.google.com/uc?export=download&id=1Madj8otKjwwOO353nL_' + secret,
timeout=10)
# Return the data as a list of lines
return url.text.split('\n')
if __name__ == '__main__':
show_aggie_pride()
anonymize_data(read_data())