|
| 1 | +""" Core redactor engine class implementation """ |
| 2 | +from pyredactkit.common_jobs import CommonJobs |
| 3 | +from pyredactkit.identifiers import Identifier |
| 4 | +import os |
| 5 | +import sys |
| 6 | +import re |
| 7 | +import uuid |
| 8 | + |
| 9 | +# Instantiate identifier and commonjobs objects |
| 10 | +id_object = Identifier() |
| 11 | +cj_object = CommonJobs() |
| 12 | +""" Coreredactor library """ |
| 13 | + |
| 14 | + |
| 15 | +class CoreRedactorEngine: |
| 16 | + """CoreRedactorEngine class |
| 17 | + Class containing all methods to support redaction |
| 18 | + of core sensitive data type |
| 19 | +
|
| 20 | + Static variables: |
| 21 | + block (unicode string): To redact sensitive data |
| 22 | + """ |
| 23 | + |
| 24 | + block = "\u2588" * 15 |
| 25 | + dir_create = " directory does not exist, creating it." |
| 26 | + |
| 27 | + def __init__(self) -> None: |
| 28 | + """ |
| 29 | + Class Initialization |
| 30 | + Args: |
| 31 | + None |
| 32 | +
|
| 33 | + Returns: |
| 34 | + None |
| 35 | + """ |
| 36 | + return None |
| 37 | + |
| 38 | + def redact_all(self, line: str) -> tuple: |
| 39 | + """Function to redact specific option |
| 40 | + Args: |
| 41 | + line (str) : line to be supplied to redact |
| 42 | +
|
| 43 | + Returns: |
| 44 | + line (str): redacted line |
| 45 | + kv_pair (dict) : key value pair of uuid to sensitive data. |
| 46 | + """ |
| 47 | + hash_map = {} |
| 48 | + for id in id_object.regexes: |
| 49 | + redact_pattern = id['pattern'] |
| 50 | + if re.search(redact_pattern, line): |
| 51 | + pattern_string = re.search(redact_pattern, line) |
| 52 | + pattern_string = pattern_string.group(0) |
| 53 | + masked_data = str(uuid.uuid4()) |
| 54 | + hash_map.update({masked_data: pattern_string}) |
| 55 | + line = re.sub(redact_pattern, masked_data, line) |
| 56 | + return line, hash_map |
| 57 | + |
| 58 | + def process_text(self, text: str, savedir="./"): |
| 59 | + """Function to process supplied text from cli. |
| 60 | + Args: |
| 61 | + text (str): string to redact |
| 62 | + savedir (str): [Optional] directory to place results |
| 63 | +
|
| 64 | + Returns: |
| 65 | + Creates redacted file. |
| 66 | + """ |
| 67 | + hash_map = {} |
| 68 | + generated_file = f"redacted_file_{str(uuid.uuid1())}.txt" |
| 69 | + with open( |
| 70 | + f"{generated_file}", |
| 71 | + "w", |
| 72 | + encoding="utf-8", |
| 73 | + ) as result: |
| 74 | + for line in text: |
| 75 | + data = self.redact_all(line) |
| 76 | + redacted_line = data[0] |
| 77 | + kv_pairs = data[1] |
| 78 | + hash_map.update(kv_pairs) |
| 79 | + result.write(f"{redacted_line}\n") |
| 80 | + cj_object.write_hashmap(hash_map, generated_file, savedir) |
| 81 | + print( |
| 82 | + f"[+] .hashshadow_{os.path.basename(generated_file)}.json file generated. Keep this safe if you need to undo the redaction.") |
| 83 | + print( |
| 84 | + f"[+] Redacted and results saved to {os.path.basename(generated_file)}") |
| 85 | + |
| 86 | + def process_core_file(self, filename: str, savedir="./"): |
| 87 | + """Function to process supplied file from cli. |
| 88 | + Args: |
| 89 | + filename (str): File to redact |
| 90 | + savedir (str): [Optional] directory to place results |
| 91 | +
|
| 92 | + Returns: |
| 93 | + Creates redacted file. |
| 94 | + """ |
| 95 | + count = 0 |
| 96 | + hash_map = {} |
| 97 | + try: |
| 98 | + # Open a file read pointer as target_file |
| 99 | + with open(filename, encoding="utf-8") as target_file: |
| 100 | + if savedir != "./" and savedir[-1] != "/": |
| 101 | + savedir = savedir + "/" |
| 102 | + |
| 103 | + # created the directory if not present |
| 104 | + if not os.path.exists(os.path.dirname(savedir)): |
| 105 | + print( |
| 106 | + "[+] " |
| 107 | + + os.path.dirname(savedir) |
| 108 | + + f"{self.dir_create}" |
| 109 | + ) |
| 110 | + os.makedirs(os.path.dirname(savedir)) |
| 111 | + |
| 112 | + print( |
| 113 | + "[+] Processing starts now. This may take some time " |
| 114 | + "depending on the file size. Monitor the redacted file " |
| 115 | + "size to monitor progress" |
| 116 | + ) |
| 117 | + |
| 118 | + # Open a file write pointer as result |
| 119 | + with open( |
| 120 | + f"{savedir}redacted_{os.path.basename(filename)}", |
| 121 | + "w", |
| 122 | + encoding="utf-8", |
| 123 | + ) as result: |
| 124 | + # Check if any redaction type option is given in argument. If none, will redact all sensitive data. |
| 125 | + print("[+] No custom regex pattern supplied, will be redacting all the core sensitive data supported") |
| 126 | + hash_map = {} |
| 127 | + for line in target_file: |
| 128 | + # count elements to be redacted |
| 129 | + for id in id_object.regexes: |
| 130 | + if re.search(id['pattern'], line): |
| 131 | + count += 1 |
| 132 | + # redact all and write hashshadow |
| 133 | + data = self.redact_all(line) |
| 134 | + redacted_line = data[0] |
| 135 | + kv_pairs = data[1] |
| 136 | + hash_map.update(kv_pairs) |
| 137 | + result.write(redacted_line) |
| 138 | + cj_object.write_hashmap(hash_map, filename, savedir) |
| 139 | + print( |
| 140 | + f"[+] .hashshadow_{os.path.basename(filename)}.json file generated. Keep this safe if you need to undo the redaction.") |
| 141 | + print(f"[+] Redacted {count} targets...") |
| 142 | + print( |
| 143 | + f"[+] Redacted results saved to {savedir}redacted_{os.path.basename(filename)}") |
| 144 | + cj_object.process_report(filename) |
| 145 | + |
| 146 | + except UnicodeDecodeError: |
| 147 | + os.remove(f"{savedir}redacted_{os.path.basename(filename)}") |
| 148 | + print("[-] Removed incomplete redact file") |
| 149 | + sys.exit("[-] Unable to read file") |
0 commit comments