1414 Insight , Recommendation , RiskAssessment
1515)
1616import json
17-
17+ from app . services . pdf_service import PDFExcelService
1818import re
1919import hashlib
2020from typing import Dict , Any , List
2121from dataclasses import dataclass
2222
23- @dataclass
24- class SanitizationConfig :
25- """Configuration for what data to sanitize"""
26- redact_account_numbers : bool = True
27- redact_phone_numbers : bool = True
28- redact_emails : bool = True
29- redact_addresses : bool = True
30- redact_names : bool = True
31- redact_ssn : bool = True
32- preserve_transaction_amounts : bool = True
33- preserve_dates : bool = True
34- preserve_merchant_names : bool = True
35-
36- class BankStatementSanitizer :
37- def __init__ (self , config : SanitizationConfig = None ):
38- self .config = config or SanitizationConfig ()
39- self .replacement_map = {} # Store original -> sanitized mappings
40- self .logger = LoggerMixin
41-
42- def _generate_consistent_replacement (self , original_value : str , prefix : str ) -> str :
43- """Generate consistent replacement for same values"""
44- self .logger .log_operation ("generating_replacement" , original_value = original_value , prefix = prefix )
45- if original_value in self .replacement_map :
46- return self .replacement_map [original_value ]
47-
48- # Create hash-based consistent replacement
49- hash_obj = hashlib .md5 (original_value .encode ())
50- hash_hex = hash_obj .hexdigest ()[:8 ]
51- replacement = f"{ prefix } _{ hash_hex } "
52-
53- self .replacement_map [original_value ] = replacement
54- return replacement
55-
56- def _sanitize_account_numbers (self , text : str ) -> str :
57- """Replace account numbers with sanitized versions"""
58- self .logger .log_operation ("sanitizing_account_numbers" )
59- if not self .config .redact_account_numbers :
60- return text
61-
62- # Pattern for account numbers (8-17 digits, possibly with dashes/spaces)
63- account_pattern = r'\b\d{4}[-\s]?\d{4}[-\s]?\d{4,9}\b'
64-
65- def replace_account (match ):
66- account = match .group (0 )
67- return self ._generate_consistent_replacement (account , "ACCT" )
68-
69- return re .sub (account_pattern , replace_account , text )
70-
71- def _sanitize_phone_numbers (self , text : str ) -> str :
72- """Replace phone numbers"""
73- self .logger .log_operation ("sanitizing_phone_numbers" )
74- if not self .config .redact_phone_numbers :
75- return text
76-
77- phone_patterns = [
78- r'\b\d{3}[-.\s]?\d{3}[-.\s]?\d{4}\b' , # US format
79- r'\(\d{3}\)\s?\d{3}[-.\s]?\d{4}' , # (123) 456-7890
80- r'\+\d{1,3}[-.\s]?\d{3,14}\b' # International
81- ]
82-
83- for pattern in phone_patterns :
84- def replace_phone (match ):
85- phone = match .group (0 )
86- return self ._generate_consistent_replacement (phone , "PHONE" )
87- text = re .sub (pattern , replace_phone , text )
88-
89- return text
90-
91- def _sanitize_emails (self , text : str ) -> str :
92- """Replace email addresses"""
93- self .logger .log_operation ("sanitizing_emails" )
94- if not self .config .redact_emails :
95- return text
96-
97- email_pattern = r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b'
98-
99- def replace_email (match ):
100- email = match .group (0 )
101- return self ._generate_consistent_replacement (email , "EMAIL" )
102-
103- return re .sub (email_pattern , replace_email , text )
104-
105- def _sanitize_addresses (self , text : str ) -> str :
106- """Replace street addresses (basic implementation)"""
107- self .logger .log_operation ("sanitizing_addresses" )
108- if not self .config .redact_addresses :
109- return text
110-
111- # Simple address patterns - you may need more sophisticated NER
112- address_patterns = [
113- r'\d+\s+[A-Za-z\s]+(?:Street|St|Avenue|Ave|Road|Rd|Drive|Dr|Lane|Ln|Boulevard|Blvd|Way|Place|Pl)\b' ,
114- r'P\.?O\.?\s+Box\s+\d+' ,
115- ]
116-
117- for pattern in address_patterns :
118- def replace_address (match ):
119- address = match .group (0 )
120- return self ._generate_consistent_replacement (address , "ADDRESS" )
121- text = re .sub (pattern , replace_address , text , flags = re .IGNORECASE )
122-
123- return text
124-
125- def _sanitize_ssn (self , text : str ) -> str :
126- """Replace Social Security Numbers"""
127- if not self .config .redact_ssn :
128- return text
129-
130- ssn_pattern = r'\b\d{3}[-\s]?\d{2}[-\s]?\d{4}\b'
13123
132- def replace_ssn (match ):
133- ssn = match .group (0 )
134- return self ._generate_consistent_replacement (ssn , "SSN" )
135-
136- return re .sub (ssn_pattern , replace_ssn , text )
137-
138- def _sanitize_names (self , text : str ) -> str :
139- """Replace common name patterns (basic implementation)"""
140- if not self .config .redact_names :
141- return text
142-
143- # This is a basic implementation - consider using NER libraries like spaCy
144- # Pattern for capitalized words that might be names
145- name_indicators = [
146- r'\b(?:Mr\.?|Mrs\.?|Ms\.?|Dr\.?|Prof\.?)\s+[A-Z][a-z]+(?:\s+[A-Z][a-z]+)*' ,
147- # Add more sophisticated name detection as needed
148- ]
149-
150- for pattern in name_indicators :
151- def replace_name (match ):
152- name = match .group (0 )
153- return self ._generate_consistent_replacement (name , "NAME" )
154- text = re .sub (pattern , replace_name , text )
155-
156- return text
157-
158- def sanitize_text (self , text : str ) -> str :
159- """Apply all sanitization rules to text"""
160- sanitized = text
161-
162- # Apply sanitization in order
163- sanitized = self ._sanitize_account_numbers (sanitized )
164- sanitized = self ._sanitize_phone_numbers (sanitized )
165- sanitized = self ._sanitize_emails (sanitized )
166- sanitized = self ._sanitize_ssn (sanitized )
167- sanitized = self ._sanitize_addresses (sanitized )
168- sanitized = self ._sanitize_names (sanitized )
169-
170- return sanitized
171-
172- def sanitize_pdf (self , pdf_bytes : bytes ) -> bytes :
173- """Sanitize a PDF document and return sanitized PDF bytes"""
174- self .logger .log_operation ("sanitizing_pdf" )
175- # Open PDF from bytes
176- doc = fitz .open (stream = pdf_bytes , filetype = "pdf" )
177-
178- for page_num in range (len (doc )):
179- page = doc [page_num ]
180-
181- # Extract text blocks
182- text_dict = page .get_text ("dict" )
183-
184- # Process each text block
185- for block in text_dict ["blocks" ]:
186- if "lines" in block :
187- for line in block ["lines" ]:
188- for span in line ["spans" ]:
189- original_text = span ["text" ]
190- sanitized_text = self .sanitize_text (original_text )
191-
192- if original_text != sanitized_text :
193- # Replace text in PDF
194- rect = fitz .Rect (span ["bbox" ])
195- page .add_redact_annot (rect )
196- page .apply_redactions ()
197-
198- # Add sanitized text
199- page .insert_text (
200- rect .tl ,
201- sanitized_text ,
202- fontsize = span ["size" ],
203- color = (0 , 0 , 0 )
204- )
205-
206- # Return sanitized PDF as bytes
207- return doc .write ()
208-
209- def get_replacement_map (self ) -> Dict [str , str ]:
210- """Get the mapping of original -> sanitized values for audit purposes"""
211- return self .replacement_map .copy ()
21224
21325class AIAnalysisService (LoggerMixin ):
21426 """Service for AI-powered financial analysis using Google Gemini with direct file upload"""
@@ -422,45 +234,64 @@ async def analyze_financial_document(
422234 file_content : bytes ,
423235 filename : str ,
424236 analysis_type : str = "comprehensive" ,
425- sanitization_config : SanitizationConfig = None
426237 ) -> Dict [str , Any ]:
427238 """Perform comprehensive financial analysis using direct file upload to Gemini"""
239+ patterns = {
240+ "email" : r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b' ,
241+ "phone" : r'\b(?:\+234|0)([789]\d{9})\b' ,
242+ "account_number" : r'\b\d{10,20}\b' ,
243+ "address" : r'\d+\s+\w+(?:\s+\w+)*\s+(Street|St|Avenue|Ave|Close|Rd|Road|Lane|Ln|Crescent|Cres)\b' ,
244+ "name" : r'\b(?:[A-Z][A-Za-z\'\.-]+(?:\s+[A-Z][A-Za-z\'\.-]+){1,3}'
245+ r'|[A-Z]{2,}(?:\s+[A-Z]{2,}){1,3})\b' ,
246+ "bvn" : r'\b\d{11}\b' ,
247+ "ssn" : r'\b\d{3}-\d{2}-\d{4}\b' ,
248+ "credit_card" : r'\b(?:\d{4}[- ]?){3}\d{4}\b' ,
249+ "routing_number" : r'\b\d{9}\b' ,
250+ "iban" : r'\b[A-Za-z]{2}\d{2}[A-Za-z0-9]{11,30}\b' ,
251+ # "swift_code": r'\b[A-Z]{4}[A-Z]{2}[A-Za-z0-9]{2}([A-Za-z0-9]{3})?\b',
252+ # "currency": r'\b[\$€¥₹£]?\d{1,3}(,\d{3})*(\.\d{1,2})?\b',
253+ "tin" : r'\b\d{2}-\d{7}\b' ,
254+ "pan" : r'\b[A-Z]{5}\d{4}[A-Z]\b' ,
255+ "gstin" : r'\b\d{2}[A-Z]{5}\d{4}[A-Z][1-9A-Z][Z][A-Z0-9]\b' ,
256+ # "cheque_number": r'\b\d{6,9}\b',
257+ "reference_number" : r'\b[A-Z]{3,4}-\d{6,10}\b' ,
258+ # "business_reg": r'\b[A-Z0-9]{7,15}\b',
259+ # "sort_code": r'\b\d{9}\b',
260+ "ip_address" : r'\b\d{1,3}(?:\.\d{1,3}){3}\b' ,
261+ "ipv4" : r'\b\d{1,3}(?:\.\d{1,3}){3}\b' ,
262+ "url" : r'\bhttps?://[^\s]+\b' ,
263+ "merchant_id" : r'\b(M|C)-[A-Za-z0-9]{6,12}\b' ,
264+ "nin" : r'\b\d{11}\b' ,
265+ }
266+
428267 try :
429268 self .log_operation ("ai_analysis_start" , filename = filename , analysis_type = analysis_type )
430269
431- # sanitizer = BankStatementSanitizer(sanitization_config)
432- #
433- # if filename.lower().endswith('.pdf'):
434- # sanitized_content = sanitizer.sanitize_pdf(file_content)
435- #
436- # else:
437- # text_content = file_content.decode('utf-8', errors='ignore')
438- # sanitized_text = sanitizer.sanitize_text(text_content)
439- # sanitized_content = sanitized_text.encode('utf-8')
440- #
441- # replacement_count = len(sanitizer.get_replacement_map())
442- # self.log_operation("sanitization_complete",
443- # replacements_made=replacement_count)
444270
445271 with tempfile .NamedTemporaryFile (delete = False , suffix = '.pdf' ) as temp_file :
446272 temp_file .write (file_content )
447273 temp_file .flush ()
448-
274+ tmp_path = temp_file .name
275+
276+ text = PDFExcelService .extract_text_from_pdf (tmp_path )
277+ anonymized_text = PDFExcelService .anonymize_text (text , patterns )
278+
279+
449280 try :
450281 self .log_operation ("uploading_file_to_gemini" , filename = filename )
451- uploaded_file = self .client .files .upload (
452- file = temp_file .name ,
453-
454- )
282+ # uploaded_file = self.client.files.upload(
283+ # file=temp_file.name,
284+ #
285+ # )
455286
456287 import time
457- while uploaded_file .state .name == "PROCESSING" :
458- self .log_operation ("waiting_for_file_processing" )
459- time .sleep (2 )
460- uploaded_file = self .client .files .get (uploaded_file .name )
461-
462- if uploaded_file .state .name == "FAILED" :
463- raise ExternalServiceError ("File processing failed in Gemini" )
288+ # while uploaded_file.state.name == "PROCESSING":
289+ # self.log_operation("waiting_for_file_processing")
290+ # time.sleep(2)
291+ # # uploaded_file = self.client.files.get(uploaded_file.name)
292+ #
293+ # if uploaded_file.state.name == "FAILED":
294+ # raise ExternalServiceError("File processing failed in Gemini")
464295
465296 prompt = self ._create_analysis_prompt (analysis_type )
466297
@@ -469,11 +300,11 @@ async def analyze_financial_document(
469300 response = self .client .models .generate_content (
470301 model = settings .GEMINI_MODEL ,
471302 contents = [
472- uploaded_file ,
303+ anonymized_text ,
473304 prompt
474305 ])
475306
476- if not response or len ( response . strip ()) == 0 :
307+ if not response :
477308 raise ExternalServiceError ("Gemini returned empty content." )
478309
479310 if hasattr (response , 'text' ):
@@ -510,7 +341,7 @@ async def analyze_financial_document(
510341
511342
512343 try :
513- self .client .files .delete (name = uploaded_file .name )
344+ # self.client.files.delete(name=uploaded_file.name)
514345 self .log_operation ("gemini_file_cleanup_successful" )
515346 except Exception as e :
516347 self .log_error (e , "gemini_file_cleanup_failed" )
0 commit comments