11from mailcom import utils
22import re
3+ from typing import Optional
34
45
56class Pseudonymize :
@@ -48,6 +49,18 @@ def reset(self):
4849 self .ne_sent .clear ()
4950 self .sentences .clear ()
5051
52+ def _get_ne_sent_dict (self ) -> dict :
53+ """Convert the list of named entities and their sentence
54+ indices into a dictionary."""
55+ ne_sent_dict = {}
56+ for sent_idx , ne in zip (self .ne_sent , self .ne_list ):
57+ # drop any existing pseudonyms in ne_list
58+ ne .pop ("pseudonym" , None )
59+ if str (sent_idx ) not in ne_sent_dict :
60+ ne_sent_dict [str (sent_idx )] = []
61+ ne_sent_dict [str (sent_idx )].append (ne )
62+ return ne_sent_dict
63+
5164 def get_sentences (self , input_text , language , model = "default" ):
5265 """Splits a text into sentences using spacy.
5366
@@ -90,6 +103,49 @@ def get_ner(self, sentence, pipeline_info: dict = None):
90103 ner = self .ner_recognizer (sentence )
91104 return ner
92105
106+ def _check_pseudonyms_in_content (self , lang : str = "fr" ):
107+ """Checks if any of the pseudonyms are present in the current content.
108+
109+ Args:
110+ lang (str): Language context of the data, defaults to "fr".
111+ """
112+ names = []
113+ exclude_pseudonym = False
114+ for entity in self .ne_list :
115+ if entity ["entity_group" ] == "PER" :
116+ name = entity ["word" ]
117+ # here we should consider first names only, without
118+ # the given name after the space
119+ name = name .split (" " )[0 ] if " " in name else name
120+ (
121+ names .extend ([name , name .lower (), name .title ()])
122+ if name not in names
123+ else None
124+ )
125+ # now we have collected all possible names, lets check for a match
126+ if any (pseudo in names for pseudo in self .pseudo_first_names .get (lang , [])):
127+ print ("Found matching name(s) from pseudonyms to actual person names." )
128+ print (f"Names found: { names } " )
129+ print (f"Pseudonyms provided: { self .pseudo_first_names .get (lang , [])} " )
130+ exclude_pseudonym = True
131+ # drop the pseudonym from all further processing
132+ self .pseudo_first_names [lang ] = [
133+ pseudo
134+ for pseudo in self .pseudo_first_names [lang ]
135+ if pseudo not in names
136+ ]
137+ print (f"Updated pseudonyms: { self .pseudo_first_names .get (lang , [])} " )
138+ # raise an exception for the user to restart with other pseudonyms if there are
139+ # no pseudonyms left in the list
140+ if not self .pseudo_first_names [lang ]:
141+ raise ValueError (
142+ """Please provide a different list of pseudonyms via the
143+ workflow settings file. The current list of pseudonyms
144+ is too short and contains only names that already
145+ exist in the actual data."""
146+ )
147+ return exclude_pseudonym
148+
93149 def choose_per_pseudonym (self , name , lang = "fr" ):
94150 """Chooses a pseudonym for a PER named entity based on previously used pseudonyms.
95151 If the name has previously been replaced, the same pseudonym is used again.
@@ -328,12 +384,18 @@ def pseudonymize(
328384 if pseudo_numbers :
329385 sent = self .pseudonymize_numbers (sent , detected_dates )
330386 pseudonymized_sentences .append (sent )
331- return self .concatenate (pseudonymized_sentences )
387+ # check that pseudonyms are not the same as actual
388+ # names in the current content
389+ # if they are, the pseudonym is dropped for the present and all future content
390+ exclude_pseudonym = (
391+ self ._check_pseudonyms_in_content (lang = language ) if self .ne_list else False
392+ )
393+ return self .concatenate (pseudonymized_sentences ), exclude_pseudonym
332394
333395 def pseudonymize_with_updated_ne (
334396 self ,
335397 sentences ,
336- ne_sent_dict : dict [list [dict ]],
398+ ne_sent_dict : Optional [ dict [list [dict ] ]],
337399 language = "de" ,
338400 detected_dates : list [str ] = None ,
339401 pseudo_emailaddresses = True ,
@@ -346,7 +408,9 @@ def pseudonymize_with_updated_ne(
346408
347409 Args:
348410 sentences (list[str]): List of sentences to pseudonymize.
349- ne_sent_dict (dict[list[dict]]): Dictionary containing named entities
411+ ne_sent_dict (dict[list[dict]]|None): Dictionary containing named entities.
412+ If set to none, the previous named entities will be used. This is the case
413+ for reprocessing emails with new pseudonyms.
350414 language (str, optional): Language of the email. Defaults to "de".
351415 detected_dates (list[str], optional): Detected dates in the email.
352416 Defaults to None.
@@ -360,7 +424,12 @@ def pseudonymize_with_updated_ne(
360424 Returns:
361425 str: Pseudonymized text
362426 """
427+ if not ne_sent_dict :
428+ # the ne was ok last time, but we need to rerun with new pseudonyms
429+ ne_sent_dict = self ._get_ne_sent_dict ()
430+
363431 self .reset ()
432+ self .sentences = sentences
364433 pseudonymized_sentences = []
365434 for sent_idx , sent in enumerate (sentences ):
366435 if pseudo_emailaddresses :
@@ -378,4 +447,10 @@ def pseudonymize_with_updated_ne(
378447 if pseudo_numbers :
379448 sent = self .pseudonymize_numbers (sent , detected_dates )
380449 pseudonymized_sentences .append (sent )
381- return self .concatenate (pseudonymized_sentences )
450+ # check that pseudonyms are not the same as actual
451+ # names in the current content
452+ # if they are, the pseudonym is dropped for the present and all future content
453+ exclude_pseudonym = (
454+ self ._check_pseudonyms_in_content (lang = language ) if self .ne_list else False
455+ )
456+ return self .concatenate (pseudonymized_sentences ), exclude_pseudonym
0 commit comments