diff --git a/.idea/csv-editor.xml b/.idea/csv-editor.xml
deleted file mode 100644
index cb2fb408..00000000
--- a/.idea/csv-editor.xml
+++ /dev/null
@@ -1,16 +0,0 @@
-
-
-
-
-
-
\ No newline at end of file
diff --git a/.idea/misc.xml b/.idea/misc.xml
index 800e363e..bdf2c791 100644
--- a/.idea/misc.xml
+++ b/.idea/misc.xml
@@ -5,4 +5,7 @@
+
+
+
\ No newline at end of file
diff --git a/CODE/Logicytics.py b/CODE/Logicytics.py
index bd4928ce..bbe11631 100644
--- a/CODE/Logicytics.py
+++ b/CODE/Logicytics.py
@@ -12,7 +12,7 @@
import psutil
from prettytable import PrettyTable
-from logicytics import Log, Execute, Check, Get, FileManagement, Flag, DEBUG, DELETE_LOGS, config
+from logicytics import Log, execute, check, get, file_management, flag, DEBUG, DELETE_LOGS, config
# Initialization
log = Log({"log_level": DEBUG, "delete_log": DELETE_LOGS})
@@ -66,7 +66,7 @@ def __generate_execution_list(self) -> list[str]:
- Logs the final execution list for debugging purposes
- Warns users about potential long execution times for certain actions
"""
- execution_list = Get.list_of_files(".", only_extensions=(".py", ".exe", ".ps1", ".bat"),
+ execution_list = get.list_of_files(".", only_extensions=(".py", ".exe", ".ps1", ".bat"),
exclude_files=["Logicytics.py"],
exclude_dirs=["logicytics", "SysInternal_Suite"])
files_to_remove = {
@@ -101,7 +101,7 @@ def __generate_execution_list(self) -> list[str]:
elif ACTION == "modded":
# Add all files in MODS to execution list
- execution_list = Get.list_of_files("../MODS", only_extensions=(".py", ".exe", ".ps1", ".bat"),
+ execution_list = get.list_of_files("../MODS", only_extensions=(".py", ".exe", ".ps1", ".bat"),
append_file_list=execution_list, exclude_files=["Logicytics.py"],
exclude_dirs=["logicytics", "SysInternal_Suite"])
@@ -144,7 +144,7 @@ def __script_handler(script: str) -> tuple[str, Exception | None]:
"""
log.debug(f"Executing {script}")
try:
- log.execution(Execute.script(script))
+ log.execution(execute.script(script))
log.info(f"{script} executed successfully")
return script, None
except Exception as err:
@@ -207,7 +207,7 @@ def __performance(self):
gc.collect()
start_time = datetime.now()
start_memory = process.memory_full_info().uss / 1024 / 1024 # MB
- log.execution(Execute.script(self.execution_list[file]))
+ log.execution(execute.script(self.execution_list[file]))
end_time = datetime.now()
end_memory = process.memory_full_info().uss / 1024 / 1024 # MB
elapsed_time = end_time - start_time
@@ -352,7 +352,7 @@ def get_flags():
"""
global ACTION, SUB_ACTION
# Get flags_list
- ACTION, SUB_ACTION = Flag.data()
+ ACTION, SUB_ACTION = flag.data()
log.debug(f"Action: {ACTION}")
log.debug(f"Sub-Action: {SUB_ACTION}")
@@ -382,7 +382,7 @@ def handle_special_actions():
log.info("Opening debug menu...")
SpecialAction.execute_new_window("_debug.py")
- messages = Check.sys_internal_zip()
+ messages = check.sys_internal_zip()
if messages:
# If there are messages, log them with debug
log.debug(messages)
@@ -407,7 +407,7 @@ def handle_special_actions():
"Sorry, this feature is yet to be implemented. You can manually Restore your backups, We will open "
"the location for you"
)
- FileManagement.open_file("../ACCESS/BACKUP/")
+ file_management.open_file("../ACCESS/BACKUP/")
input("Press Enter to exit...")
exit(1)
@@ -438,7 +438,7 @@ def check_privileges():
- Depends on global `DEBUG` configuration variable
- Logs warnings or critical messages based on privilege and UAC status
"""
- if not Check.admin():
+ if not check.admin():
if DEBUG == "DEBUG":
log.warning("Running in debug mode, continuing without admin privileges - This may cause issues")
else:
@@ -447,7 +447,7 @@ def check_privileges():
input("Press Enter to exit...")
exit(1)
- if Check.uac():
+ if check.uac():
log.warning("UAC is enabled, this may cause issues - Please disable UAC if possible")
@@ -462,7 +462,7 @@ def files(cls):
@staticmethod
def __and_log(directory: str, name: str):
log.debug(f"Zipping directory '{directory}' with name '{name}' under action '{ACTION}'")
- zip_values = FileManagement.Zip.and_hash(
+ zip_values = file_management.Zip.and_hash(
directory,
name,
ACTION if ACTION is not None else f"ERROR_NO_ACTION_SPECIFIED_{datetime.now().isoformat()}"
@@ -530,9 +530,10 @@ def Logicytics():
try:
Logicytics()
except KeyboardInterrupt:
- log.warning("⚠️ Force shutdown detected! Some temporary files might be left behind.")
- log.warning("💡 Pro tip: Next time, let the program finish naturally.")
- # TODO v3.4.2 -> Cleanup function
+ log.warning("Force shutdown detected! Some temporary files might be left behind.")
+ log.warning("Next time, let the program finish naturally for complete cleanup.")
+ # Emergency cleanup - zip generated files
+ ZIP.files()
exit(0)
else:
log.error("This script cannot be imported!")
diff --git a/CODE/_debug.py b/CODE/_debug.py
index 044a7c12..46c395ea 100644
--- a/CODE/_debug.py
+++ b/CODE/_debug.py
@@ -9,10 +9,11 @@
import psutil
import requests
-from logicytics import Log, DEBUG, VERSION, Check
+from logicytics import Log, DEBUG, VERSION, check, config
log_path = os.path.join(os.path.dirname(os.path.dirname(os.path.abspath(__file__))), "ACCESS\\LOGS\\DEBUG\\DEBUG.log")
log = Log({"log_level": DEBUG, "filename": log_path, "truncate_message": False, "delete_log": True})
+url = config.get("System Settings", "config_url")
class VersionManager:
@@ -149,10 +150,9 @@ def get_online_config() -> dict | None:
Retrieves configuration data from a remote repository.
"""
try:
- url = "https://raw.githubusercontent.com/DefinetlyNotAI/Logicytics/main/CODE/config.ini"
- config = configparser.ConfigParser()
- config.read_string(requests.get(url, timeout=15).text)
- return config
+ _config = configparser.ConfigParser()
+ _config.read_string(requests.get(url, timeout=15).text)
+ return _config
except requests.exceptions.RequestException as e:
log.error(f"Connection error: {e}")
return None
@@ -194,27 +194,27 @@ def debug():
Executes a comprehensive system debug routine, performing various checks and logging system information.
"""
# Online Configuration Check
- config = ConfigManager.get_online_config()
- if config:
- HealthCheck.check_versions(VERSION, config["System Settings"]["version"])
+ _config = ConfigManager.get_online_config()
+ if _config:
+ HealthCheck.check_versions(VERSION, _config["System Settings"]["version"])
# File Integrity Check
- required_files = config["System Settings"].get("files", "").split(",")
+ required_files = _config["System Settings"].get("files", "").split(",")
FileManager.check_required_files(".", required_files)
# SysInternal Binaries Check
SysInternalManager.check_binaries("SysInternal_Suite")
# System Checks
- log.info("Admin privileges found" if Check.admin() else "Admin privileges not found")
- log.info("UAC enabled" if Check.uac() else "UAC disabled")
+ log.info("Admin privileges found" if check.admin() else "Admin privileges not found")
+ log.info("UAC enabled" if check.uac() else "UAC disabled")
log.info(f"Execution path: {psutil.__file__}")
log.info(f"Global execution path: {sys.executable}")
log.info(f"Local execution path: {sys.prefix}")
log.info(
"Running in a virtual environment" if sys.prefix != sys.base_prefix else "Not running in a virtual environment")
log.info(
- "Execution policy is unrestricted" if Check.execution_policy() else "Execution policy is restricted")
+ "Execution policy is unrestricted" if check.execution_policy() else "Execution policy is restricted")
# Python Version Check
SystemInfoManager.python_version()
diff --git a/CODE/_dev.py b/CODE/_dev.py
index aa671aea..1b9cd3e6 100644
--- a/CODE/_dev.py
+++ b/CODE/_dev.py
@@ -6,7 +6,7 @@
import configobj
-from logicytics import log, Get, FileManagement, CURRENT_FILES, VERSION
+from logicytics import log, get, file_management, CURRENT_FILES, VERSION
def color_print(text, color="reset", is_input=False) -> None | str:
@@ -21,8 +21,8 @@ def color_print(text, color="reset", is_input=False) -> None | str:
color_code = colors.get(color.lower(), colors["reset"])
if is_input:
return input(f"{color_code}{text}{colors['reset']}")
- else:
- print(f"{color_code}{text}{colors['reset']}")
+ print(f"{color_code}{text}{colors['reset']}")
+ return None
def _update_ini_file(filename: str, new_data: list | str, key: str) -> None:
@@ -88,6 +88,7 @@ def _prompt_user(question: str, file_to_open: str = None, special: bool = False)
return True
except Exception as e:
color_print(f"[x] {e}", "red")
+ return None
def _perform_checks() -> bool:
@@ -116,7 +117,8 @@ def _handle_file_operations() -> None:
Handles file operations and logging for added, removed, and normal files.
"""
EXCLUDE_FILES = ["logicytics\\User_History.json.gz", "logicytics\\User_History.json"]
- files = Get.list_of_files(".", exclude_files=EXCLUDE_FILES, exclude_dirs=["SysInternal_Suite"])
+ files = get.list_of_files(".", exclude_files=EXCLUDE_FILES, exclude_dirs=["SysInternal_Suite"],
+ exclude_extensions=[".pyc"])
added_files, removed_files, normal_files = [], [], []
clean_files_list = [file.replace('"', '') for file in CURRENT_FILES]
@@ -187,7 +189,7 @@ def dev_checks() -> None:
- Updates configuration file with current files and version
- Logs warnings or errors during the process
"""
- FileManagement.mkdir()
+ file_management.mkdir()
if not _perform_checks():
return
_handle_file_operations()
diff --git a/CODE/bluetooth_details.py b/CODE/bluetooth_details.py
index f692ed06..fb28dcdb 100644
--- a/CODE/bluetooth_details.py
+++ b/CODE/bluetooth_details.py
@@ -2,6 +2,7 @@
import json
import subprocess
+from typing import TextIO
from logicytics import log
@@ -84,7 +85,7 @@ def _query_bluetooth_devices() -> bool | list[dict[str, str]]:
return device_info_list
-def _write_device_info_to_file(devices, filename):
+def _write_device_info_to_file(devices: list[dict[str, str]], filename: str):
"""
Writes the details of Bluetooth devices to a specified file.
@@ -105,11 +106,12 @@ def _write_device_info_to_file(devices, filename):
with open(filename, "w", encoding="UTF-8") as file:
for device_info in devices:
_write_single_device_info(file, device_info)
+ log.info(f"Successfully wrote device details to '{filename}'")
except Exception as e:
log.error(f"Failed to write device information to file: {e}")
-def _write_single_device_info(file, device_info):
+def _write_single_device_info(file: TextIO, device_info: dict[str, str]):
"""
Writes detailed information for a single Bluetooth device to the specified file.
diff --git a/CODE/bluetooth_logger.py b/CODE/bluetooth_logger.py
index 73609c78..c6d18819 100644
--- a/CODE/bluetooth_logger.py
+++ b/CODE/bluetooth_logger.py
@@ -1,12 +1,13 @@
import datetime
import re
import subprocess
+from typing import LiteralString
from logicytics import log
# Utility function to log data to a file
-def save_to_file(filename, section_title, data):
+def save_to_file(filename: str, section_title: str, data: str):
"""
Appends data to a file with a section title.
@@ -35,7 +36,7 @@ def save_to_file(filename, section_title, data):
# Utility function to run PowerShell commands
-def run_powershell_command(command):
+def run_powershell_command(command: str) -> None | list[LiteralString]:
"""
Runs a PowerShell command and returns the output as a list of lines.
@@ -67,7 +68,7 @@ def run_powershell_command(command):
# Unified parsing function for PowerShell output
-def parse_output(lines, regex, group_names):
+def parse_output(lines: list[LiteralString], regex: str, group_names: list[str]):
"""
Parses the output lines using the provided regex and group names.
@@ -101,7 +102,7 @@ def parse_output(lines, regex, group_names):
# Function to get paired Bluetooth devices
-def get_paired_bluetooth_devices():
+def get_paired_bluetooth_devices() -> list[str]:
"""
Retrieves a list of paired Bluetooth devices with their names and MAC addresses.
@@ -168,41 +169,6 @@ def log_bluetooth():
save_to_file(filename, section_title, paired_devices or ["No paired Bluetooth devices found."])
log.debug(f"{section_title}: {paired_devices}")
- # Collect and log event logs
- def collect_logs(title: str, command: str):
- """
- Collects and logs event logs by executing a PowerShell command and saving the results.
-
- Args:
- title (str): The title or description of the log section being collected.
- command (str): The PowerShell command to execute for retrieving event logs.
-
- Behavior:
- - Runs the specified PowerShell command using `run_powershell_command()`
- - Saves the log results to a file using `save_to_file()`
- - Logs an informational message about the log collection
- - If no logs are found, saves a default "No logs found." message
- - Uses the global `filename` variable for log file destination
-
- Raises:
- Potential exceptions from `run_powershell_command()` and `save_to_file()` which are handled internally
- """
- logs = run_powershell_command(command)
- save_to_file(filename, title, logs or ["No logs found."])
- log.info(f"Getting {title}...")
-
- collect_logs(
- "Bluetooth Connection/Disconnection Logs",
- 'Get-WinEvent -LogName "Microsoft-Windows-Bluetooth-BthLEServices/Operational" '
- '| Select-Object TimeCreated, Id, Message | Format-Table -AutoSize'
- )
-
- collect_logs(
- "Bluetooth File Transfer Logs",
- 'Get-WinEvent -LogName "Microsoft-Windows-Bluetooth-BthLEServices/Operational" '
- '| Select-String -Pattern "file.*transferred" | Format-Table -AutoSize'
- )
-
log.info("Finished Bluetooth data logging.")
diff --git a/CODE/cmd_commands.py b/CODE/cmd_commands.py
index 3570002a..3739dca7 100644
--- a/CODE/cmd_commands.py
+++ b/CODE/cmd_commands.py
@@ -1,4 +1,4 @@
-from logicytics import log, Execute
+from logicytics import log, execute
@log.function
@@ -17,7 +17,7 @@ def command(file: str, commands: str, message: str, encoding: str = "UTF-8") ->
"""
log.info(f"Executing {message}")
try:
- output = Execute.command(commands)
+ output = execute.command(commands)
with open(file, "w", encoding=encoding) as f:
f.write(output)
log.info(f"{message} Successful - {file}")
diff --git a/CODE/config.ini b/CODE/config.ini
index a01d2fa1..2f92e51a 100644
--- a/CODE/config.ini
+++ b/CODE/config.ini
@@ -5,7 +5,7 @@
[Settings]
# Would you like to enable debug mode?
# This will print out more information to the console, with prefix DEBUG
-# This will not be logged however
+# This will not be logged however, and is useful for developers - This is different than the DEBUGGER itself
log_using_debug = false
# Would you like for new logs to be created every execution?
@@ -26,8 +26,11 @@ save_preferences = true
[System Settings]
# Do not play with these settings unless you know what you are doing
# Dev Mode allows a safe way to modify these settings!!
-version = 3.4.1
+version = 3.4.2
files = "bluetooth_details.py, bluetooth_logger.py, browser_miner.ps1, cmd_commands.py, config.ini, dir_list.py, dump_memory.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, network_psutil.py, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, logicytics\Checks.py, logicytics\Config.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\User_History.json.gz, VulnScan\Model SenseMini .3n3.pth, VulnScan\README.md, VulnScan\Vectorizer .3n3.pkl"
+# If you forked the project, change the USERNAME to your own to use your own fork as update material,
+# I dont advise doing this however
+config_url = https://raw.githubusercontent.com/DefinetlyNotAI/Logicytics/main/CODE/config.ini
########################################################
# The following settings are for specific modules #
@@ -97,6 +100,14 @@ timeout = 10
max_retry_time = 30
###################################################
+[VulnScan Settings]
+# Following extensions to be skipped by the model
+# Format: comma-separated list with dots (e.g., .exe, .dll)
+unreadable_extensions = .exe, .dll, .so, .zip, .tar, .gz, .7z, .rar, .jpg, .jpeg, .png, .gif, .bmp, .tiff, .webp, .mp3, .wav, .flac, .aac, .ogg, .mp4, .mkv, .avi, .mov, .wmv, .flv, .pdf, .doc, .docx, .xls, .xlsx, .ppt, .pptx, .odt, .ods, .odp, .bin, .dat, .iso, .class, .pyc, .o, .obj, .sqlite, .db, .ttf, .otf, .woff, .woff2, .lnk, .url
+# In MB, max file size that the model is allowed to scan, if commented out disables the limit, you can also just say None
+max_file_size_mb = None
+# Max workers to be used, either integer or use auto to make it decide the best value
+max_workers = auto
[VulnScan.generate Settings]
# The following settings are for the Generate module for fake training data
diff --git a/CODE/dir_list.py b/CODE/dir_list.py
index defe31b4..c4495b79 100644
--- a/CODE/dir_list.py
+++ b/CODE/dir_list.py
@@ -1,7 +1,7 @@
import os
from concurrent.futures import ThreadPoolExecutor
-from logicytics import log, Execute
+from logicytics import log, execute
def run_command_threaded(directory: str, file: str, message: str, encoding: str = "UTF-8") -> None:
@@ -26,7 +26,7 @@ def run_command_threaded(directory: str, file: str, message: str, encoding: str
try:
safe_directory = directory.replace('"', '`"') # Escape quotes
command = f'powershell -NoProfile -Command "Get-ChildItem \\""{safe_directory}\\"" -Recurse"'
- output = Execute.command(command)
+ output = execute.command(command)
open(file, "a", encoding=encoding).write(output)
log.info(f"{message} Successful for {directory} - {file}")
except Exception as e:
diff --git a/CODE/dump_memory.py b/CODE/dump_memory.py
index 11b137af..f4dbaa3f 100644
--- a/CODE/dump_memory.py
+++ b/CODE/dump_memory.py
@@ -7,18 +7,16 @@
from logicytics import log, config
-# TODO v3.4.1
-# psutil.virtual_memory(): used, free, percent, total
-# psutil.swap_memory(): used, free, percent, total
+# Constants from config with validation
+LIMIT_FILE_SIZE = config.getint("DumpMemory Settings", "file_size_limit") # MiB
+SAFETY_MARGIN = config.getfloat("DumpMemory Settings", "file_size_safety") # MiB
+DUMP_DIR = config.get("DumpMemory Settings", "dump_directory", fallback="memory_dumps")
-LIMIT_FILE_SIZE = config.getint("DumpMemory Settings", "file_size_limit") # Always in MiB
-SAFETY_MARGIN = config.getfloat("DumpMemory Settings", "file_size_safety") # Always in MiB
if SAFETY_MARGIN < 1:
log.critical("Invalid Safety Margin Inputted - Cannot proceed with dump memory")
exit(1)
-# Capture RAM Snapshot
def capture_ram_snapshot():
"""
Captures and logs the current system memory statistics to a file.
@@ -39,43 +37,31 @@ def capture_ram_snapshot():
IOError: If unable to write to the output file
Exception: For any unexpected errors during memory snapshot capture
"""
+
+ def memory_helper(mem_var, flavor_text: str, use_free_rather_than_available: bool = False):
+ file.write(f"Total {flavor_text}: {mem_var.total / (1024 ** 3):.2f} GB\n")
+ file.write(f"Used {flavor_text}: {mem_var.used / (1024 ** 3):.2f} GB\n")
+ if use_free_rather_than_available:
+ file.write(f"Available {flavor_text}: {mem_var.free / (1024 ** 3):.2f} GB\n")
+ else:
+ file.write(f"Available {flavor_text}: {mem_var.available / (1024 ** 3):.2f} GB\n")
+ file.write(f"{flavor_text} Percent Usage: {mem_var.percent:.2f}%\n")
+
log.info("Capturing RAM Snapshot...")
- memory = psutil.virtual_memory()
- swap = psutil.swap_memory()
- with open("memory_dumps/Ram_Snapshot.txt", "w") as file:
- try:
- file.write(f"Total RAM: {memory.total / (1024 ** 3):.2f} GB\n")
- file.write(f"Used RAM: {memory.used / (1024 ** 3):.2f} GB\n")
- file.write(f"Available RAM: {memory.available / (1024 ** 3):.2f} GB\n")
- file.write(f"Total Swap: {swap.total / (1024 ** 3):.2f} GB\n")
- file.write(f"Used Swap: {swap.used / (1024 ** 3):.2f} GB\n")
- file.write(f"Free Swap: {swap.free / (1024 ** 3):.2f} GB\n")
- file.write(f"Percent RAM Used: {memory.percent:.2f}%\n")
- except Exception as e:
- log.error(f"Error writing RAM snapshot: {e}")
- file.write("Error writing RAM snapshot.")
+ try:
+ memory = psutil.virtual_memory()
+ swap = psutil.swap_memory()
+ with open(os.path.join(DUMP_DIR, "Ram_Snapshot.txt"), "w", encoding="utf-8") as file:
+ memory_helper(memory, "RAM")
+ memory_helper(swap, "Swap Memory", use_free_rather_than_available=True)
+ except Exception as e:
+ log.error(f"Failed to capture RAM snapshot: {e}")
log.info("RAM Snapshot saved to Ram_Snapshot.txt")
-# Gather system information
def gather_system_info():
"""
- Gather and log detailed system information to a text file.
-
- This function collects system-specific details including architecture,
- operating system, machine type, processor information, and page size.
- The information is written to 'SystemRam_Info.txt' and logged for tracking.
-
- Returns:
- None: Writes system information directly to a file.
-
- Raises:
- Exception: Logs and handles any errors encountered during system
- information retrieval, ensuring graceful error reporting.
-
- Side Effects:
- - Creates/overwrites 'SystemRam_Info.txt' with system details
- - Logs information gathering process and potential errors
+ Gathers detailed system information and saves it to a file.
"""
log.info("Gathering system information...")
try:
@@ -86,107 +72,80 @@ def gather_system_info():
'Processor': platform.processor(),
'Page Size (bytes)': struct.calcsize("P"),
'CPU Count': psutil.cpu_count(),
- 'CPU Frequency': psutil.cpu_freq().current if psutil.cpu_freq() else 'N/A',
+ 'CPU Frequency': psutil.cpu_freq().current if psutil.cpu_freq() else 'Unavailable',
'Boot Time': datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d %H:%M:%S'),
}
except Exception as e:
log.error(f"Error gathering system information: {e}")
sys_info = {'Error': 'Failed to gather system information'}
- with open("memory_dumps/SystemRam_Info.txt", "w") as file:
- for key, value in sys_info.items():
- file.write(f"{key}: {value}\n")
+ try:
+ with open(os.path.join(DUMP_DIR, "SystemRam_Info.txt"), "w", encoding="utf-8") as file:
+ for key, value in sys_info.items():
+ file.write(f"{key}: {value}\n")
+ except Exception as e:
+ log.error(f"Error writing system info to file: {e}")
log.info("System Information saved to SystemRam_Info.txt")
# Memory Dump
def memory_dump():
"""
- Perform a memory dump of the current process, capturing detailed metadata for each readable memory region.
-
- This function scans the memory regions of the current process and logs their metadata to 'Ram_Dump.txt'.
- It captures information such as start and end addresses, resident set size (RSS), permissions,
- associated file paths, and other region-specific details.
-
- Key Features:
- - Retrieves memory map for the current process
- - Filters and logs only readable memory regions
- - Captures metadata for each memory region
- - Supports file size limitation via LIMIT_FILE_SIZE constant
- - Handles potential errors during memory scanning and file writing
-
- Notes:
- - Writes metadata to 'Ram_Dump.txt' in the current working directory
- - Truncates output if file size exceeds LIMIT_FILE_SIZE (if set)
- - Logs errors encountered during the memory dump process
-
- Raises:
- psutil.Error: If there are issues accessing process memory
- Exception: For any unexpected errors during memory scanning
+ Performs a memory dump of the current process and saves it to a file.
"""
log.info("Creating basic memory dump scan...")
pid = os.getpid()
try:
process = psutil.Process(pid)
- with open("memory_dumps/Ram_Dump.txt", "wb") as dump_file:
+ dump_path = os.path.join(DUMP_DIR, "Ram_Dump.txt")
+ with open(dump_path, "wb", encoding="utf-8") as dump_file:
total_size = 0
+
+ # Disk space safety check
+ required_space = LIMIT_FILE_SIZE * 1024 * 1024 * SAFETY_MARGIN
+ free_space = psutil.disk_usage(DUMP_DIR).free
+ if free_space < required_space:
+ log.error(f"Not enough disk space. Need at least {required_space / (1024 ** 2):.2f} MiB")
+ return
+
for mem_region in process.memory_maps(grouped=False):
- # Check available disk space
- if os.path.exists("Ram_Dump.txt"):
- required_space = LIMIT_FILE_SIZE * 1024 * 1024 * SAFETY_MARGIN # 2x safety margin
- free_space = psutil.disk_usage(".").free
- if free_space < required_space:
- log.error(f"Not enough disk space. Need {required_space / 1024 / 1024:.2f}MB")
- return
-
- # Check if the memory region is readable ('r' permission)
- if 'r' in mem_region.perms:
- # Extract start and end addresses from the memory region string
- if '-' in mem_region.addr:
- start, end = [int(addr, 16) for addr in mem_region.addr.split('-')]
- else:
- start = int(mem_region.addr, 16)
- end = start + mem_region.rss
-
- # Gather memory region metadata
- region_metadata = {
- ' Start Address': hex(start),
- ' End Address': hex(end),
- ' RSS (bytes)': mem_region.rss, # Using rss as size
- ' Permissions': mem_region.perms,
- ' Path': mem_region.path, # Path is often available for shared memory regions
- ' Index': mem_region.index,
- }
-
- # Try getting more detailed memory information
- try:
- # Check if the memory region corresponds to a file and add file metadata
- if mem_region.path:
- # Try to get device and inode-like info
- file_path = mem_region.path
- region_metadata[' File Path'] = file_path
-
- except Exception as e:
- log.error(f"Error adding extra file information: {str(e)}")
-
- # Write the metadata to the dump file
- try:
- metadata_str = "Memory Region Metadata:\n" + "\n".join(
- f"{key}: {value}" for key, value in region_metadata.items()) + "\n\n"
- metadata_bytes = metadata_str.encode()
- if total_size + len(metadata_bytes) > LIMIT_FILE_SIZE * 1024 * 1024 and LIMIT_FILE_SIZE != 0:
- dump_file.write(f"Truncated due to file exceeding {LIMIT_FILE_SIZE}\n"
- "Additional memory regions not included.\n".encode())
- break
- dump_file.write(metadata_bytes)
- total_size += len(metadata_bytes)
- except Exception as e:
- log.error(f"Error writing memory region metadata: {str(e)}")
+ if 'r' not in mem_region.perms:
+ continue
+
+ try:
+ start, end = (int(addr, 16) for addr in mem_region.addr.split('-')) \
+ if '-' in mem_region.addr else (int(mem_region.addr, 16),
+ int(mem_region.addr, 16) + mem_region.rss)
+ except Exception as e:
+ log.warning(f"Invalid address format '{mem_region.addr}': {e}")
+ continue
+
+ region_metadata = {
+ ' Start Address': hex(start),
+ ' End Address': hex(end),
+ ' Region Size (bytes)': end - start,
+ ' RSS (bytes)': mem_region.rss,
+ ' Permissions': mem_region.perms,
+ ' Path': mem_region.path,
+ ' Index': mem_region.index,
+ }
+
+ try:
+ metadata_str = "Memory Region Metadata:\n" + "\n".join(
+ f"{key}: {value}" for key, value in region_metadata.items()) + "\n\n"
+ metadata_bytes = metadata_str.encode()
+ if (total_size + len(metadata_bytes) > LIMIT_FILE_SIZE * 1024 * 1024) and (LIMIT_FILE_SIZE != 0):
+ dump_file.write(f"Truncated: file exceeded {LIMIT_FILE_SIZE} MiB limit.\n".encode())
+ break
+ dump_file.write(metadata_bytes)
+ total_size += len(metadata_bytes)
+ except Exception as e:
+ log.error(f"Error writing memory region metadata: {e}")
except psutil.Error as e:
- log.error(f"Error opening process memory: {str(e)}")
+ log.error(f"Error accessing process memory: {e}")
except Exception as e:
- log.error(f"Error creating memory scan: {str(e)}")
+ log.error(f"General memory dump error: {e}")
log.info("Memory scan saved to Ram_Dump.txt")
@@ -195,19 +154,14 @@ def memory_dump():
@log.function
def main():
"""
- Orchestrates the execution of system memory collection tasks.
-
- This function performs three primary operations:
- 1. Captures a snapshot of current RAM and swap memory statistics
- 2. Gathers detailed system information
- 3. Creates a memory dump of the current process
-
- The tasks are executed sequentially, with logging to track the start and completion of the entire process.
-
- No parameters.
- No return value.
+ Executes all memory diagnostics and collection routines.
"""
- os.makedirs("memory_dumps", exist_ok=True)
+ try:
+ os.makedirs(DUMP_DIR, exist_ok=True)
+ except Exception as e:
+ log.critical(f"Failed to create dump directory '{DUMP_DIR}': {e}")
+ return
+
log.info("Starting system memory collection tasks...")
capture_ram_snapshot()
gather_system_info()
diff --git a/CODE/logicytics/Checks.py b/CODE/logicytics/Checks.py
index 1bdd2dc8..e3ad3c9d 100644
--- a/CODE/logicytics/Checks.py
+++ b/CODE/logicytics/Checks.py
@@ -42,8 +42,7 @@ def execution_policy() -> bool:
)
return result.returncode == 0 and result.stdout.strip().lower() == "unrestricted"
except (subprocess.TimeoutExpired, subprocess.SubprocessError) as e:
- print(f"Failed to check execution policy: {e}")
- exit(1)
+ exit(f"Failed to check execution policy: {e}")
@staticmethod
def uac() -> bool:
@@ -88,5 +87,6 @@ def sys_internal_zip() -> str:
elif ignore_file:
return "Found .sys.ignore file, skipping SysInternal_Suite zip extraction"
+ return None
except Exception as err:
exit(f"Failed to unzip SysInternal_Suite: {err}")
diff --git a/CODE/logicytics/Config.py b/CODE/logicytics/Config.py
index 5969040b..6e0bc187 100644
--- a/CODE/logicytics/Config.py
+++ b/CODE/logicytics/Config.py
@@ -26,9 +26,7 @@ def _config_path() -> str:
if os.path.exists(configs_path):
return configs_path
- else:
- print("The config.ini file is not found in the expected location.")
- exit(1)
+ exit("The config.ini file is not found in the expected location.")
config_local = configparser.ConfigParser()
path = _config_path()
diff --git a/CODE/logicytics/Execute.py b/CODE/logicytics/Execute.py
index d89a34c6..85de3f8b 100644
--- a/CODE/logicytics/Execute.py
+++ b/CODE/logicytics/Execute.py
@@ -6,17 +6,21 @@
class Execute:
@classmethod
- def script(cls, script_path: str) -> list[list[str, str]] | None:
+ def script(cls, script_path: str) -> list[tuple[str, str]] | None:
"""
Execute a script file based on its file extension.
- Executes Python and PowerShell scripts with different handling mechanisms. For Python scripts, runs the script and returns None. For PowerShell scripts, first unblocks the script and then executes it, returning a list of message-ID pairs.
+ Executes Python and PowerShell scripts with different handling mechanisms.
+ For Python scripts, runs the script and returns None.
+ For PowerShell scripts, first unblocks the script and then executes it,
+ returning a list of message-ID pairs.
Parameters:
script_path (str): Path to the script file to be executed.
Returns:
- list[list[str, str]] | None: A list of message-ID pairs for PowerShell scripts, or None for Python scripts.
+ list[list[str]] | None: A list of message-ID pairs for PowerShell scripts,
+ or None for Python scripts.
Raises:
Potential subprocess-related exceptions during script execution.
diff --git a/CODE/logicytics/Flag.py b/CODE/logicytics/Flag.py
index 887ab6b9..149a885e 100644
--- a/CODE/logicytics/Flag.py
+++ b/CODE/logicytics/Flag.py
@@ -9,6 +9,7 @@
from datetime import datetime
from logicytics.Config import config
+from logicytics.Logger import log
# Check if the script is being run directly, if not, set up the library
if __name__ == '__main__':
@@ -63,9 +64,9 @@ def __get_sim(user_input: str, all_descriptions: list[str]) -> list[float]:
try:
MODEL = SentenceTransformer(config.get("Flag Settings", "model_to_use"))
except Exception as e:
- print(f"Error: {e}")
- print("Please check the model name in the config file.")
- print(f"Model name {config.get('Flag Settings', 'model_to_use')} may not be valid.")
+ log.critical(f"Error: {e}")
+ log.error("Please check the model name in the config file.")
+ log.error(f"Model name {config.get('Flag Settings', 'model_to_use')} may not be valid.")
exit(1)
user_embedding = MODEL.encode(user_input, convert_to_tensor=True, show_progress_bar=DEBUG_MODE)
@@ -164,9 +165,9 @@ def _generate_summary_and_graph(cls):
# Summary of flag usage
total_interactions = sum(flags_usage.values())
- print("User Interaction Summary:")
+ log.info("User Interaction Summary:-\n-------------------------------------------------")
for flag, details in interactions.items():
- print(f"\nFlag: {flag}")
+ log.info(f"\nFlag: {flag}")
accuracies = [detail['accuracy'] for detail in details]
device_names = [detail['device_name'] for detail in details]
@@ -176,15 +177,15 @@ def _generate_summary_and_graph(cls):
most_common_device = Counter(device_names).most_common(1)[0][0]
average_user_input = Counter(user_inputs).most_common(1)[0][0]
- print(f" Average Accuracy: {average_accuracy:.2f}%")
- print(f" Most Common Device Name: {most_common_device}")
- print(f" Most Common User Input: {average_user_input}")
+ print(f" Average Accuracy: {average_accuracy:.2f}%")
+ print(f" Most Common Device Name: {most_common_device}")
+ print(f" Most Common User Input: {average_user_input}")
# Print the summary to the console
- print(f"\n\nTotal Interactions with the match flag feature: {total_interactions}")
- print("\nFlag Usage Summary:")
+ log.info(f"\n\nTotal Interactions with the match flag feature: {total_interactions}")
+ log.info("\nFlag Usage Summary:")
for flag, count in flags_usage.items():
- print(f" {flag}: {count} times")
+ print(f" {flag}: {count} times")
# Generate the graph for flag usage
flags = list(flags_usage.keys())
@@ -200,14 +201,14 @@ def _generate_summary_and_graph(cls):
# Save and display the graph
try:
plt.savefig('../ACCESS/DATA/Flag_usage_summary.png')
- print("\nFlag Usage Summary Graph saved to 'ACCESS/DATA/Flag_usage_summary.png'")
+ log.info("\nFlag Usage Summary Graph saved to 'ACCESS/DATA/Flag_usage_summary.png'")
except FileNotFoundError:
try:
plt.savefig('../../ACCESS/DATA/Flag_usage_summary.png')
- print("\nFlag Usage Summary Graph saved to 'ACCESS/DATA/Flag_usage_summary.png'")
+ log.info("\nFlag Usage Summary Graph saved to 'ACCESS/DATA/Flag_usage_summary.png'")
except FileNotFoundError:
plt.savefig('Flag_usage_summary.png')
- print("\nFlag Usage Summary Graph saved in current working directory as 'Flag_usage_summary.png'")
+ log.info("\nFlag Usage Summary Graph saved in current working directory as 'Flag_usage_summary.png'")
@staticmethod
def load_history() -> dict[str, any]:
@@ -360,8 +361,8 @@ def flag(cls, user_input: str, flags: list[str], flag_description: list[str]) ->
if best_accuracy < MIN_ACCURACY_THRESHOLD:
suggested_flags = cls.__suggest_flags_based_on_history(user_input)
if suggested_flags:
- print(f"No Flags matched so suggestions based on historical data: "
- f"{', '.join(suggested_flags)}")
+ log.warning(f"No Flags matched so suggestions based on historical data: "
+ f"{', '.join(suggested_flags)}")
return best_match, best_accuracy
@@ -579,15 +580,15 @@ def __exclusivity_logic(args: argparse.Namespace) -> bool:
}
if any(special_flags) and not any(action_flags):
- print("Invalid combination of flags_list: Special and Action flag exclusivity issue.")
+ log.error("Invalid combination of flags_list: Special and Action flag exclusivity issue.")
exit(1)
if any(exclusive_flags) and any(action_flags):
- print("Invalid combination of flags_list: Exclusive and Action flag exclusivity issue.")
+ log.error("Invalid combination of flags_list: Exclusive and Action flag exclusivity issue.")
exit(1)
if any(exclusive_flags) and any(special_flags):
- print("Invalid combination of flags_list: Exclusive and Special flag exclusivity issue.")
+ log.error("Invalid combination of flags_list: Exclusive and Special flag exclusivity issue.")
exit(1)
return any(special_flags)
@@ -647,7 +648,7 @@ def __suggest_flag(cls, user_input: str, valid_flags: list[str]):
# Get the closest valid flag match based on the user's input
closest_matches = difflib.get_close_matches(user_input, valid_flags, n=1, cutoff=0.6)
if closest_matches:
- print(f"Invalid flag '{user_input}', Did you mean '--{closest_matches[0].replace('_', '-')}'?")
+ log.warning(f"Invalid flag '{user_input}', Did you mean '--{closest_matches[0].replace('_', '-')}'?")
exit(1)
# Prompt the user for a description if no close match is found
@@ -658,10 +659,10 @@ def __suggest_flag(cls, user_input: str, valid_flags: list[str]):
descriptions_list = [f"Run Logicytics with {flag}" for flag in valid_flags]
flag_received, accuracy_received = _Match.flag(user_input_desc, flags_list, descriptions_list)
if DEBUG_MODE:
- print(
+ log.info(
f"User input: {user_input_desc}\nMatched flag: {flag_received.replace('_', '-')}\nAccuracy: {accuracy_received:.2f}%\n")
else:
- print(f"Matched flag: {flag_received.replace('_', '-')} (Accuracy: {accuracy_received:.2f}%)\n")
+ log.info(f"Matched flag: {flag_received.replace('_', '-')} (Accuracy: {accuracy_received:.2f}%)\n")
@staticmethod
def show_help_menu(return_output: bool = False) -> str | None:
@@ -689,8 +690,8 @@ def show_help_menu(return_output: bool = False) -> str | None:
parser = Flag.__available_arguments()[1]
if return_output:
return parser.format_help()
- else:
- parser.print_help()
+ parser.print_help()
+ return None
@classmethod
def data(cls) -> tuple[str, str | None]:
@@ -720,13 +721,13 @@ def data(cls) -> tuple[str, str | None]:
used_flags = [flag for flag in vars(args) if getattr(args, flag)]
if not special_flag_used and len(used_flags) > 1:
- print("Invalid combination of flags: Maximum 1 action flag allowed.")
+ log.error("Invalid combination of flags: Maximum 1 action flag allowed.")
exit(1)
if special_flag_used:
used_flags = cls.__used_flags_logic(args)
if len(used_flags) > 2:
- print("Invalid combination of flags: Maximum 2 flag mixes allowed.")
+ log.error("Invalid combination of flags: Maximum 2 flag mixes allowed.")
exit(1)
if not used_flags:
@@ -735,7 +736,7 @@ def data(cls) -> tuple[str, str | None]:
# Update history with the matched flag(s)
if not SAVE_PREFERENCES:
- return
+ return None
def update_data_history(matched_flag: str):
"""
diff --git a/CODE/logicytics/Logger.py b/CODE/logicytics/Logger.py
index 4e5ddac4..b2be07da 100644
--- a/CODE/logicytics/Logger.py
+++ b/CODE/logicytics/Logger.py
@@ -10,6 +10,8 @@
import colorlog
+from logicytics.Config import DEBUG
+
class Log:
"""
@@ -42,7 +44,8 @@ def __init__(self, config: dict = None):
self._initialized = True
if config:
self.reset()
- # log_path_relative variable takes Logger.py full path, goes up twice then joins with ACCESS\\LOGS\\Logicytics.log
+ # log_path_relative variable takes Logger.py full path,
+ # goes up twice then joins with ACCESS\\LOGS\\Logicytics.log
log_path_relative = os.path.join(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))),
"ACCESS\\LOGS\\Logicytics.log")
config = config or {
@@ -154,10 +157,13 @@ def __internal(self, message):
"""
Log an internal message exclusively to the console.
- Internal messages are used for logging system states or debug information that should not be written to log files. These messages are only displayed in the console when color logging is enabled.
+ Internal messages are used for logging system states or debug information
+ that should not be written to log files.
+ These messages are only displayed in the console when color logging is enabled.
Parameters:
- message (str): The internal message to be logged. If the message is "None" or None, no logging occurs.
+ message (str): The internal message to be logged.
+ If the message is "None" or None, no logging occurs.
Notes:
- Requires color logging to be enabled
@@ -278,8 +284,9 @@ def string(self, message, type: str):
"""
Logs a message with a specified log type, supporting multiple type aliases.
- This method allows logging messages with flexible type specifications, mapping aliases to standard log types
- and handling potential errors in type selection. It supports logging with color if enabled.
+ This method allows logging messages with flexible type specifications,
+ mapping aliases to standard log types and handling potential errors in type selection.
+ It supports logging with color if enabled.
Parameters:
message (str): The message to be logged. Skipped if "None" or None.
@@ -328,14 +335,15 @@ def exception(self, message, exception_type: Type = Exception):
f"[{self.__timestamp()}] > EXCEPTION:| {self.__trunc_message(f'{message} -> Exception provoked: {str(exception_type)}')}")
raise exception_type(message)
- def execution(self, message_log: list[list[str, str]]):
+ def execution(self, message_log: list[tuple[str, str]]):
"""
Parse and log multiple messages with their corresponding log types.
This method processes a list of messages, where each message is associated with a specific log type. It is designed for scenarios where multiple log entries need to be processed simultaneously, such as logging script execution results.
Parameters:
- message_log (list[list[str, str]]): A list of message entries. Each entry is a list containing two elements:
+ message_log (list[tuple[str, str]]): A list of message entries.
+ Each entry is a list containing two elements:
- First element: The log message (str)
- Second element: The log type (str)
@@ -415,3 +423,6 @@ def wrapper(*args, **kwargs):
return result
return wrapper
+
+
+log = Log({"log_level": DEBUG})
diff --git a/CODE/logicytics/__init__.py b/CODE/logicytics/__init__.py
index d2481937..c0071958 100644
--- a/CODE/logicytics/__init__.py
+++ b/CODE/logicytics/__init__.py
@@ -7,19 +7,18 @@
from logicytics.FileManagement import FileManagement
from logicytics.Flag import Flag
from logicytics.Get import Get
-from logicytics.Logger import Log
+from logicytics.Logger import log, Log
# Check if the script is being run directly, if not, set up the library
if __name__ == '__main__':
exit("This is a library, Please import rather than directly run.")
-Execute = Execute()
-Get = Get()
-Check = Check()
-Flag = Flag()
-FileManagement = FileManagement()
+execute = Execute()
+get = Get()
+check = Check()
+flag = Flag()
+file_management = FileManagement()
__show_trace = DEBUG == "DEBUG"
-FileManagement.mkdir()
-log = Log({"log_level": DEBUG})
+file_management.mkdir()
class ObjectLoadError(Exception):
@@ -103,3 +102,6 @@ def wrapper(*args, **kwargs) -> callable:
return wrapper
return decorator
+
+
+__all__ = ['execute', 'get', 'check', 'flag', 'file_management', 'deprecated', 'ObjectLoadError', 'log', 'Log']
diff --git a/CODE/network_psutil.py b/CODE/network_psutil.py
index 310a8d60..2fee2afe 100644
--- a/CODE/network_psutil.py
+++ b/CODE/network_psutil.py
@@ -4,7 +4,7 @@
import psutil
-from logicytics import log, Execute, config
+from logicytics import log, execute, config
class NetworkInfo:
@@ -107,7 +107,7 @@ def __execute_external_network_command(self):
Executes an external network command and saves the output.
"""
log.debug("Executing external network command...")
- result = Execute.command("ipconfig")
+ result = execute.command("ipconfig")
self.__save_data("network_command_output.txt", result)
log.info("Network command output saved.")
diff --git a/CODE/packet_sniffer.py b/CODE/packet_sniffer.py
index 504e04a4..69804a14 100644
--- a/CODE/packet_sniffer.py
+++ b/CODE/packet_sniffer.py
@@ -1,406 +1,149 @@
from __future__ import annotations
+import warnings
from time import time
import matplotlib.pyplot as plt
import networkx as nx
import pandas as pd
+from cryptography.utils import CryptographyDeprecationWarning
+
+# TripleDES deprecation warning
+warnings.filterwarnings("ignore", category=CryptographyDeprecationWarning)
+warnings.filterwarnings("ignore", category=DeprecationWarning)
+
from scapy.all import sniff, conf
from scapy.layers.inet import IP, TCP, UDP, ICMP
from logicytics import log, config
-class Sniff:
- # Global configuration
+class PacketSniffer:
def __init__(self):
- conf.verb = 0 # Turn off verbosity for clean output
- self.packet_data = [] # List to store packet information
- self.G = nx.Graph() # Initialize a graph
-
- # Function to process and log packet details
- def __log_packet(self, packet: IP):
- """
- Processes a captured IP packet, extracting and logging network connection details.
-
- Extracts key network information from the packet including source and destination IP addresses,
- protocol, source and destination ports. Logs packet details, updates global packet data collection,
- prints a summary, and adds connection information to the network graph.
-
- Parameters:
- packet (IP): A Scapy IP layer packet to be processed and analyzed.
-
- Raises:
- Exception: Logs and suppresses any errors encountered during packet processing.
-
- Side Effects:
- - Appends packet information to global `packet_data` list
- - Prints packet summary to console
- - Updates network connection graph
- - Logs debug information about captured packet
-
- Notes:
- - Silently handles packet processing errors to prevent sniffing interruption
- - Requires global variables `packet_data` and supporting functions like
- `get_protocol_name()`, `get_port_info()`, `print_packet_summary()`, and `add_to_graph()`
- """
- try:
- if packet.haslayer(IP):
- log.debug(f"Packet captured: {packet.summary()}")
- packet_info = {
- 'src_ip': packet[IP].src,
- 'dst_ip': packet[IP].dst,
- 'protocol': self.__get_protocol_name(packet),
- 'src_port': self.__get_port_info(packet, 'sport'),
- 'dst_port': self.__get_port_info(packet, 'dport'),
- }
- self.packet_data.append(packet_info)
- self.__print_packet_summary(packet_info)
- self.__add_to_graph(packet_info)
- except Exception as err:
- log.error(f"Error processing packet {packet.summary() if hasattr(packet, 'summary') else 'Unknown'}: {err}")
+ conf.verb = 0
+ self.packet_data = []
+ self.G = nx.Graph()
- # Function to determine the protocol name
@staticmethod
- def __get_protocol_name(packet: IP) -> str:
- """
- Determines the protocol name of a captured network packet.
-
- This function examines the layers of a given IP packet to identify its protocol type. It supports identification of TCP, UDP, ICMP, and classifies any other packet types as 'Other'.
-
- Parameters:
- packet (IP): The captured network packet to analyze for protocol identification.
-
- Returns:
- str: The protocol name, which can be one of: 'TCP', 'UDP', 'ICMP', or 'Other'.
-
- Notes:
- - Uses Scapy's layer checking methods to determine protocol
- - Logs debug information about the packet and detected protocol
- - Provides a fallback 'Other' classification for unrecognized protocols
- """
- log.debug(f"Checking protocol for packet: {packet.summary()}")
+ def _get_protocol(packet: IP) -> str:
if packet.haslayer(TCP):
- log.debug("Protocol: TCP")
- return 'TCP'
+ return "TCP"
elif packet.haslayer(UDP):
- log.debug("Protocol: UDP")
- return 'UDP'
+ return "UDP"
elif packet.haslayer(ICMP):
- log.debug("Protocol: ICMP")
- return 'ICMP'
- else:
- log.debug("Protocol: Other")
- return 'Other'
+ return "ICMP"
+ return "Other"
- # Function to extract port information from a packet
@staticmethod
- def __get_port_info(packet: IP, port_type: str = "sport") -> int | None:
- """
- Extracts the source or destination port from a captured packet.
-
- Parameters:
- packet (IP): The captured packet to analyze.
- port_type (str, optional): The type of port to extract ('sport' for source port, 'dport' for destination port). Defaults to 'sport'.
-
- Returns:
- int | None: The port number if available, otherwise None.
-
- Raises:
- ValueError: If an invalid port_type is provided.
-
- Notes:
- - Supports extracting ports from TCP and UDP layers
- - Returns None if the packet does not have TCP or UDP layers
- """
- if port_type not in ('sport', 'dport'):
- log.critical(
- f"Invalid port_type '{port_type}'. Must be 'sport' or 'dport'. Using 'sport' as the port type (default).")
- port_type = 'sport'
- log.debug(f"Port type: {port_type}")
- if packet.haslayer(TCP):
- return packet[TCP].sport if port_type == 'sport' else packet[TCP].dport
- elif packet.haslayer(UDP):
- return packet[UDP].sport if port_type == 'sport' else packet[UDP].dport
+ def _get_port(packet: IP, port_type: str) -> int | None:
+ if port_type == "sport":
+ return getattr(packet[TCP], "sport", None) if packet.haslayer(TCP) else getattr(packet[UDP], "sport", None)
+ elif port_type == "dport":
+ return getattr(packet[TCP], "dport", None) if packet.haslayer(TCP) else getattr(packet[UDP], "dport", None)
return None
- # Function to print packet summary
- @staticmethod
- def __print_packet_summary(packet_info: dict):
- """
- Prints a summary of the captured network packet to the debug log.
-
- Parameters:
- packet_info (dict): A dictionary containing detailed information about a captured network packet with the following expected keys:
- - 'protocol' (str): The network protocol of the packet (e.g., TCP, UDP, ICMP)
- - 'src_ip' (str): Source IP address of the packet
- - 'dst_ip' (str): Destination IP address of the packet
- - 'src_port' (int/str): Source port number of the packet
- - 'dst_port' (int/str): Destination port number of the packet
-
- Returns:
- None: Logs packet summary information without returning a value
- """
- log.debug(f"Packet captured: {packet_info['protocol']} packet from {packet_info['src_ip']} "
- f"to {packet_info['dst_ip']} | Src Port: {packet_info['src_port']} | Dst Port: {packet_info['dst_port']}")
-
- # Function to add packet information to the graph
- def __add_to_graph(self, packet_info: dict):
- """
- Adds an edge to the network graph representing a connection between source and destination IPs.
-
- Parameters:
- packet_info (dict): A dictionary containing packet network details with the following keys:
- - 'src_ip' (str): Source IP address of the packet
- - 'dst_ip' (str): Destination IP address of the packet
- - 'protocol' (str): Network protocol used for the connection (e.g., TCP, UDP)
-
- Side Effects:
- Modifies the global NetworkX graph (G) by adding an edge between source and destination IPs
- with the protocol information as an edge attribute.
-
- Notes:
- - Assumes a global NetworkX graph object 'G' is already initialized
- - Does not perform validation of input packet_info dictionary
- """
- src_ip = packet_info['src_ip']
- dst_ip = packet_info['dst_ip']
- protocol = packet_info['protocol']
- self.G.add_edge(src_ip, dst_ip, protocol=protocol)
-
- # Function to start sniffing packets
- def __start_sniffing(self, interface: str, packet_count: int = 10, timeout: int = 10):
- """
- Starts packet sniffing on a given network interface.
-
- Captures network packets on the specified interface with configurable packet count and timeout. Processes each captured packet using a custom callback function, logs packet details, and stops when the specified packet count is reached.
-
- Parameters:
- interface (str): Network interface name to capture packets from.
- packet_count (int, optional): Maximum number of packets to capture. Defaults to 10.
- timeout (int, optional): Maximum time to spend capturing packets in seconds. Defaults to 10.
-
- Side Effects:
- - Logs packet details during capture
- - Saves captured packet data to a CSV file
- - Generates a network graph visualization
-
- Raises:
- Exception: If packet capture encounters unexpected errors
-
- Example:
- start_sniffing('eth0', packet_count=50, timeout=30)
- """
- log.info(f"Starting packet capture on interface '{interface}'...")
-
- # Initialize a packet capture counter
- packet_counter = 0
-
- # Define a custom packet callback to count packets
- def packet_callback(packet: IP) -> bool:
- """
- Callback function to process each captured network packet during sniffing.
-
- Processes individual packets, logs their details, and manages packet capture termination. Tracks the number of packets captured and stops sniffing when the predefined packet count is reached.
-
- Parameters:
- packet (IP): The captured network packet to be processed.
+ def _log_packet(self, packet: IP):
+ if not packet.haslayer(IP):
+ return
- Returns:
- bool: True if the specified packet count has been reached, signaling the sniffer to stop; False otherwise.
-
- Side Effects:
- - Increments the global packet counter
- - Logs packet details using log_packet function
- - Logs debug information about received packets
- - Stops packet capture when packet count limit is met
-
- Raises:
- No explicit exceptions raised, but may propagate exceptions from log_packet function.
- """
- log.debug(f"Received packet: {packet.summary()}")
- nonlocal packet_counter # Reference the outer packet_counter
- if packet_counter >= packet_count:
- # Stop sniffing once the packet count is reached
- log.info(f"Captured {packet_count} packets, stopping sniffing.")
- return True # Return True to stop sniffing
- self.__log_packet(packet) # Call the existing log_packet function
- packet_counter += 1 # Increment the packet counter
-
- # Start sniffing with the custom callback
- sniff(iface=interface, prn=packet_callback, count=packet_count, timeout=timeout)
-
- # After sniffing completes, save the captured packet data to CSV and visualize the graph
- log.info("Packet capture completed.")
- self.__save_packet_data_to_csv('captured_packets.csv')
- self.__visualize_graph()
-
- # Function to save captured packet data to CSV
- def __save_packet_data_to_csv(self, file_path: str):
- """
- Saves captured packet data to a CSV file.
-
- Writes the collected network packet information to a specified CSV file. If packet data exists, it creates a pandas DataFrame and exports it to the given file path. If no packet data has been captured, it logs a warning message.
-
- Parameters:
- file_path (str): The file path where the packet data will be saved as a CSV file.
-
- Returns:
- None
-
- Side Effects:
- - Writes packet data to a CSV file
- - Logs an informational message on successful save
- - Logs a warning if no packet data is available
-
- Raises:
- IOError: Potential file writing permission or path-related errors (implicitly handled by pandas)
- """
- if self.packet_data:
- df = pd.DataFrame(self.packet_data)
- df.to_csv(file_path, index=False)
- log.info(f"Packet data saved to '{file_path}'.")
- else:
- log.warning("No packet data to save.")
-
- # Function to visualize the graph of packet connections
- def __visualize_graph(self, node_colors: dict[str, str] | None = None,
- node_sizes: dict[str, int] | None = None,
- *, # Force keyword arguments for the following parameters
- figsize: tuple[int, int] = (12, 8),
- font_size: int = 10,
- font_weight: str = "bold",
- title: str = "Network Connections Graph",
- output_file: str = "network_connections_graph.png",
- layout_func: callable = nx.spring_layout):
- """
- Visualizes the graph of packet connections with customizable node colors and sizes.
-
- Generates a network graph representation of packet connections using NetworkX and Matplotlib, with optional customization of node colors and sizes.
-
- Parameters:
- node_colors (dict, optional): A dictionary mapping nodes to their display colors.
- If not provided, defaults to skyblue for all nodes.
- node_sizes (dict, optional): A dictionary mapping nodes to their display sizes.
- If not provided, defaults to 3000 for all nodes.
- figsize (tuple, optional): The size of the figure in inches (width, height). Defaults to (12, 8).
- font_size (int, optional): The font size for node labels. Defaults to 10.
- font_weight (str, optional): The font weight for node labels. Defaults to 'bold'.
- title (str, optional): The title of the graph. Defaults to 'Network Connections Graph'.
- output_file (str, optional): The name of the output PNG file to save the graph visualization. Defaults to 'network_connections_graph.png'.
- layout_func (callable, optional): The layout function to use for the graph. Defaults to nx.spring_layout.
-
- Side Effects:
- - Creates a matplotlib figure
- - Saves a PNG image file named 'network_connections_graph.png'
- - Closes the matplotlib figure after saving
-
- Returns:
- None
-
- Example:
- # Default visualization
- visualize_graph()
-
- # Custom node colors and sizes
- custom_colors = {'192.168.1.1': 'red', '10.0.0.1': 'green'}
- custom_sizes = {'192.168.1.1': 5000, '10.0.0.1': 2000}
- visualize_graph(node_colors=custom_colors, node_sizes=custom_sizes)
- """
- pos = layout_func(self.G)
- plt.figure(figsize=figsize)
-
- if node_colors is None:
- node_colors = {node: "skyblue" for node in self.G.nodes()}
-
- if node_sizes is None:
- node_sizes = {node: 3000 for node in self.G.nodes()}
-
- node_color_list = [node_colors.get(node, "skyblue") for node in self.G.nodes()]
- node_size_list = [node_sizes.get(node, 3000) for node in self.G.nodes()]
+ try:
+ protocol = self._get_protocol(packet)
+ src_ip = packet[IP].src
+ dst_ip = packet[IP].dst
+
+ src_port = dst_port = None
+ if protocol in ("TCP", "UDP"):
+ src_port = self._get_port(packet, "sport")
+ dst_port = self._get_port(packet, "dport")
+
+ info = {
+ "src_ip": src_ip,
+ "dst_ip": dst_ip,
+ "protocol": protocol,
+ "src_port": src_port,
+ "dst_port": dst_port
+ }
- nx.draw(self.G, pos, with_labels=True, node_size=node_size_list, node_color=node_color_list,
- font_size=font_size,
- font_weight=font_weight)
- edge_labels = nx.get_edge_attributes(self.G, 'protocol')
- nx.draw_networkx_edge_labels(self.G, pos, edge_labels=edge_labels)
- plt.title(title)
- plt.savefig(output_file)
+ self.packet_data.append(info)
+ self.G.add_edge(src_ip, dst_ip, protocol=protocol)
+ log.debug(f"{protocol} {src_ip}:{src_port} -> {dst_ip}:{dst_port}")
+ except Exception as err:
+ log.error(f"Error logging packet: {err}")
+
+ def _save_to_csv(self, path: str):
+ if not self.packet_data:
+ log.warning("No packets to save.")
+ return
+ pd.DataFrame(self.packet_data).to_csv(path, index=False)
+ log.info(f"Saved packet data to {path}")
+
+ def _visualize_graph(self, output: str = "graph.png"):
+ if self.G.number_of_edges() == 0:
+ log.warning("No edges to plot in graph.")
+ return
+
+ pos = nx.spring_layout(self.G)
+ plt.figure(figsize=(12, 8))
+ nx.draw(self.G, pos, with_labels=True, node_color="skyblue", node_size=3000, font_size=10, font_weight="bold")
+ labels = nx.get_edge_attributes(self.G, 'protocol')
+ nx.draw_networkx_edge_labels(self.G, pos, edge_labels=labels)
+ plt.title("Network Graph")
+ plt.savefig(output)
plt.close()
+ log.info(f"Graph saved to {output}")
- @log.function
- def packets(self):
- """
- Initiates packet sniffing based on configuration settings.
-
- Reads network configuration parameters from a global config dictionary, including network interface, packet count, and timeout. Validates input parameters to ensure they are positive values. Attempts to start packet sniffing on the specified interface, with built-in error handling and interface name correction for common variations.
+ @staticmethod
+ def _correct_interface(iface: str) -> str:
+ corrections = {"WiFi": "Wi-Fi", "Wi-Fi": "WiFi"}
+ return corrections.get(iface, iface)
- Raises:
- SystemExit: If packet count or timeout values are invalid
- Exception: If there are issues with the network interface or packet sniffing process
+ def sniff_packets(self, iface: str, count: int, timeout: int, retry_max: int):
+ iface = self._correct_interface(iface)
+ retry_start = time()
- Side Effects:
- - Logs configuration and sniffing errors
- - Attempts to autocorrect interface names
- - Calls start_sniffing() to capture network packets
- - Exits the program if critical configuration errors are encountered
- """
+ while time() - retry_start < retry_max:
+ try:
+ log.info(f"Sniffing on {iface}... (count={count}, timeout={timeout})")
+ sniff(
+ iface=iface,
+ prn=self._log_packet,
+ count=count,
+ timeout=timeout
+ )
+ log.info("Sniff complete.")
+ break
+ except Exception as e:
+ log.warning(f"Sniff failed on {iface}: {e}")
+ iface = self._correct_interface(iface)
+ else:
+ log.error("Max retry time exceeded.")
- def correct_interface_name(interface_name: str) -> str:
- corrections = {
- "WiFi": "Wi-Fi",
- "Wi-Fi": "WiFi"
- }
- return corrections.get(interface_name, interface_name)
+ self._save_to_csv("packets.csv")
+ self._visualize_graph()
- interface = config.get("PacketSniffer Settings", "interface")
- packet_count = config.getint("PacketSniffer Settings", "packet_count")
- timeout = config.getint("PacketSniffer Settings", "timeout")
- max_retry_time = config.getint("PacketSniffer Settings", "max_retry_time")
+ def run(self):
+ iface = config.get("PacketSniffer Settings", "interface", fallback="WiFi")
+ count = config.getint("PacketSniffer Settings", "packet_count", fallback=5000)
+ timeout = config.getint("PacketSniffer Settings", "timeout", fallback=10)
+ retry_max = config.getint("PacketSniffer Settings", "max_retry_time", fallback=30)
- if packet_count < 1 or timeout < 5 or max_retry_time < 10 or max_retry_time < timeout:
- try:
- log.critical(
- "Oops! Can't work with these values):\n"
- f" - Packet count: {packet_count} {'❌ (must be > 0)' if packet_count < 1 else '✅'}\n"
- f" - Timeout: {timeout} {'❌ (must be >= 5)' if timeout < 5 else '✅'}\n"
- f" - Max Retry Time: {max_retry_time} {'❌ (must be >= 10 and larger than timeout)' if max_retry_time < 10 or max_retry_time < timeout else '✅'}"
- )
- except Exception:
- log.critical("Error reading configuration: Improper values for packet count or timeout")
- exit(1)
+ if count <= 0 or timeout < 5 or retry_max < timeout:
+ log.critical("Invalid configuration values.")
+ return
- start_time = time()
- for attempt in range(2): # Try original and corrected name
- try:
- if time() - start_time > max_retry_time:
- log.error("Retry timeout exceeded")
- break
- self.__start_sniffing(interface, packet_count, timeout)
- break
- except Exception as err:
- if attempt == 0 and interface in ("WiFi", "Wi-Fi"):
- log.warning("Retrying with corrected interface name...")
- interface = correct_interface_name(interface)
- else:
- log.error(f"Failed to sniff packets: {err}")
+ self.sniff_packets(iface, count, timeout, retry_max)
def cleanup(self):
- # Clean up resources
- try:
- plt.close('all') # Close all figures
- except Exception as err:
- log.error(f"Error during cleanup: {err}")
- finally:
- self.G.clear() # Clear the graph to free memory
+ self.G.clear()
+ plt.close("all")
+ log.info("Cleanup complete.")
-# Entry point of the script
if __name__ == "__main__":
- sniffer = Sniff()
+ sniffer = PacketSniffer()
try:
- sniffer.packets()
+ sniffer.run()
except Exception as e:
- log.error(e)
+ log.error(f"Fatal error: {e}")
finally:
sniffer.cleanup()
diff --git a/CODE/vulnscan.py b/CODE/vulnscan.py
index f732bc7c..cd755a9b 100644
--- a/CODE/vulnscan.py
+++ b/CODE/vulnscan.py
@@ -1,152 +1,118 @@
from __future__ import annotations
+import asyncio
import os
import threading
import warnings
-from concurrent.futures import ThreadPoolExecutor, as_completed
from pathlib import Path
+import aiofiles
import joblib
import numpy as np
import torch
from safetensors import safe_open
from tqdm import tqdm
-from logicytics import log
+from logicytics import log, config
-# Ignore all warnings
warnings.filterwarnings("ignore")
+UNREADABLE_EXTENSIONS = config.get("VulnScan Settings", "unreadable_extensions").split(",")
+MAX_FILE_SIZE_MB = config.get("VulnScan Settings", "max_file_size_mb", fallback="None")
+raw_workers = config.get("VulnScan Settings", "max_workers", fallback="auto")
+max_workers = min(32, os.cpu_count() * 2) if raw_workers == "auto" else int(raw_workers)
-# TODO: v3.4.2
-# apply Batch file reading,
-# Use Asynchronous File Scanning,
-# Optimize Model Loading and Caching,
-# Improve Feature Extraction
-# also add a global variable called MAX_FILE_SIZE, if its none ignore it, else only scan files under that file size (default at 50MB)
-# add this to config.ini -> max_workers = min(32, os.cpu_count() * 2)
-# add UNREADABLE_EXTENSIONS as well to config.ini
-
-UNREADABLE_EXTENSIONS = [
- ".exe", ".dll", ".so", # Executables & libraries
- ".zip", ".tar", ".gz", ".7z", ".rar", # Archives
- ".jpg", ".jpeg", ".png", ".gif", ".bmp", ".tiff", ".webp", # Images
- ".mp3", ".wav", ".flac", ".aac", ".ogg", # Audio
- ".mp4", ".mkv", ".avi", ".mov", ".wmv", ".flv", # Video
- ".pdf", # PDFs aren't plain text
- ".doc", ".docx", ".xls", ".xlsx", ".ppt", ".pptx", # Microsoft Office files
- ".odt", ".ods", ".odp", # OpenDocument files
- ".bin", ".dat", ".iso", # Binary, raw data, disk images
- ".class", ".pyc", ".o", ".obj", # Compiled code
- ".sqlite", ".db", # Databases
- ".ttf", ".otf", ".woff", ".woff2", # Fonts
- ".lnk", ".url" # Links
-]
+if MAX_FILE_SIZE_MB != "None":
+ MAX_FILE_SIZE_MB = max(int(MAX_FILE_SIZE_MB), 1)
+else:
+ MAX_FILE_SIZE_MB = None
class _SensitiveDataScanner:
- """
- Class for scanning files for sensitive content using a trained model.
- """
-
def __init__(self, model_path: str, vectorizer_path: str):
self.model_path = model_path
self.vectorizer_path = vectorizer_path
-
self.model_cache = {}
self.vectorizer_cache = {}
-
self.model_lock = threading.Lock()
self.vectorizer_lock = threading.Lock()
-
self.model = None
self.vectorizer = None
self._load_model()
self._load_vectorizer()
def _load_model(self) -> None:
- """Loads and caches the ML model."""
- if self.model_path in self.model_cache:
- log.info(f"Using cached model from {self.model_path}")
- self.model = self.model_cache[self.model_path]
- return
-
- if self.model_path.endswith('.pkl'):
- self.model = joblib.load(self.model_path)
- elif self.model_path.endswith('.safetensors'):
- self.model = safe_open(self.model_path, framework='torch')
- elif self.model_path.endswith('.pth'):
- with warnings.catch_warnings():
- warnings.filterwarnings("ignore", category=FutureWarning)
- self.model = torch.load(self.model_path, weights_only=False)
- else:
- raise ValueError("Unsupported model file format. Use .pkl, .safetensors, or .pth")
-
- self.model_cache[self.model_path] = self.model
- log.info(f"Loaded and cached model from {self.model_path}")
+ with self.model_lock:
+ if self.model_path in self.model_cache:
+ self.model = self.model_cache[self.model_path]
+ return
+
+ if self.model_path.endswith('.pkl'):
+ self.model = joblib.load(self.model_path)
+ elif self.model_path.endswith('.safetensors'):
+ self.model = safe_open(self.model_path, framework='torch')
+ elif self.model_path.endswith('.pth'):
+ with warnings.catch_warnings():
+ warnings.filterwarnings("ignore", category=FutureWarning)
+ self.model = torch.load(self.model_path, weights_only=False)
+ else:
+ raise ValueError("Unsupported model file format")
+
+ self.model_cache[self.model_path] = self.model
def _load_vectorizer(self) -> None:
- """Loads and caches the vectorizer."""
- if self.vectorizer_path in self.vectorizer_cache:
- log.info(f"Using cached vectorizer from {self.vectorizer_path}")
- self.vectorizer = self.vectorizer_cache[self.vectorizer_path]
- return
+ with self.vectorizer_lock:
+ if self.vectorizer_path in self.vectorizer_cache:
+ self.vectorizer = self.vectorizer_cache[self.vectorizer_path]
+ return
- try:
- self.vectorizer = joblib.load(self.vectorizer_path)
- except Exception as e:
- log.critical(f"Failed to load vectorizer from {self.vectorizer_path}: {e}")
- exit(1)
- self.vectorizer_cache[self.vectorizer_path] = self.vectorizer
- log.info(f"Loaded and cached vectorizer from {self.vectorizer_path}")
+ try:
+ self.vectorizer = joblib.load(self.vectorizer_path)
+ except Exception as e:
+ log.critical(f"Failed to load vectorizer: {e}")
+ exit(1)
+
+ self.vectorizer_cache[self.vectorizer_path] = self.vectorizer
- def _is_sensitive(self, file_content: str) -> tuple[bool, float, str]:
- """Determines if a file's content is sensitive using the model."""
+ def _extract_features(self, content: str):
+ return self.vectorizer.transform([content])
+
+ def _is_sensitive(self, content: str) -> tuple[bool, float, str]:
+ features = self._extract_features(content)
if isinstance(self.model, torch.nn.Module):
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
self.model.to(device)
- # Use sparse matrices to save memory
- features = self.vectorizer.transform([file_content]).tocsr()
self.model.eval()
+ indices = torch.LongTensor(np.vstack(features.nonzero()))
+ values = torch.FloatTensor(features.data)
+ tensor = torch.sparse_coo_tensor(indices, values, size=features.shape).to(device)
+
with torch.no_grad():
- # Convert sparse matrix to tensor more efficiently
- features_tensor = torch.sparse_coo_tensor(
- torch.LongTensor([features.nonzero()[0], features.nonzero()[1]]),
- torch.FloatTensor(features.data),
- size=features.shape
- ).to(device)
- prediction = self.model(features_tensor)
- probability = torch.softmax(prediction, dim=1).max().item()
- # Get top features from sparse matrix directly
- feature_scores = features.data
- top_indices = np.argsort(feature_scores)[-5:]
- reason = ", ".join([self.vectorizer.get_feature_names_out()[i] for i in top_indices])
- return prediction.argmax(dim=1).item() == 1, probability, reason
+ pred = self.model(tensor)
+ prob = torch.softmax(pred, dim=1).max().item()
+ reason = ", ".join(self.vectorizer.get_feature_names_out()[i] for i in np.argsort(features.data)[-5:])
+ return pred.argmax(dim=1).item() == 1, prob, reason
else:
- features = self.vectorizer.transform([file_content])
- prediction = self.model.predict_proba(features)
- probability = prediction.max()
- top_features = np.argsort(features.toarray()[0])[-5:]
- reason = ", ".join([self.vectorizer.get_feature_names_out()[i] for i in top_features])
- return self.model.predict(features)[0] == 1, probability, reason
-
- def scan_file(self, file_path: str) -> tuple[bool, float, str]:
- """Scans a file for sensitive content."""
+ probs = self.model.predict_proba(features)
+ top_indices = np.argsort(features.toarray()[0])[-5:]
+ reason = ", ".join(self.vectorizer.get_feature_names_out()[i] for i in top_indices)
+ return self.model.predict(features)[0] == 1, probs.max(), reason
+
+ async def scan_file_async(self, file_path: str) -> tuple[bool, float, str]:
try:
- with open(file_path, 'r', encoding='utf-8', errors='ignore') as file:
- content = file.read()
+ async with aiofiles.open(file_path, 'r', encoding='utf-8', errors='ignore') as f:
+ content = await f.read()
return self._is_sensitive(content)
except Exception as e:
log.error(f"Failed to scan {file_path}: {e}")
- return False, 0.0, "Error reading file"
+ return False, 0.0, "Error"
def cleanup(self):
- """Clears caches and resets model & vectorizer."""
self.model_cache.clear()
self.vectorizer_cache.clear()
self.model = None
self.vectorizer = None
- log.info("Cleanup complete!")
+ log.info("Cleanup complete.")
class VulnScan:
@@ -155,79 +121,57 @@ def __init__(self, model_path: str, vectorizer_path: str):
@log.function
def scan_directory(self, scan_paths: list[str]) -> None:
- """Scans multiple directories for sensitive files."""
- max_workers = min(32, os.cpu_count() * 2)
- log.debug(f"max_workers={max_workers}")
-
- log.info("Getting directories files...")
- try:
- # Fast file collection using ThreadPoolExecutor and efficient flattening
- with ThreadPoolExecutor(max_workers=max_workers):
- all_files = []
- for path in scan_paths:
- try:
- all_files.extend([str(f) for f in Path(path).rglob('*') if f.is_file()])
- except Exception as e:
- log.warning(f"Error collecting files from {path}: {e}")
- continue # Skip this path and continue with others
-
- log.info(f"Files collected successfully: {len(all_files)}")
-
- except Exception as e:
- log.error(f"Failed to collect files: {e}")
- return
-
- log.info(f"Scanning {len(all_files)} files...")
-
- try:
- # Use ThreadPoolExecutor for scanning files concurrently
- with ThreadPoolExecutor(max_workers=max_workers) as executor:
- futures = {}
- total_len_modifiable = len(all_files)
-
- # Submit scan tasks
- with tqdm(total=total_len_modifiable,
- desc="\033[32mSCAN\033[0m \033[94mSubmitting Scan Tasks\033[0m",
- unit="file", bar_format="{l_bar} {bar} {n_fmt}/{total_fmt}") as submit_pbar:
-
- for file in all_files:
- if any(file.lower().endswith(ext) for ext in UNREADABLE_EXTENSIONS):
- log.debug(f"Skipping file '{file}'")
- total_len_modifiable -= 1
- submit_pbar.update(1)
- continue
-
- futures[executor.submit(self.scanner.scan_file, file)] = file
- submit_pbar.update(1)
-
- # Scan progress tracking
- log.info(f"Valid file count: {total_len_modifiable}")
- with tqdm(total=total_len_modifiable, desc="\033[32mSCAN\033[0m \033[94mScanning Files\033[0m",
- unit="file", bar_format="{l_bar} {bar} {n_fmt}/{total_fmt}") as scan_pbar:
-
- sensitive_files = []
- for future in as_completed(futures):
- try:
- result, probability, reason = future.result()
- if result:
- file_path = futures[future]
- log.debug(
- f"Sensitive file detected: {file_path} (Confidence: {probability:.2f}). Reason: {reason}")
- sensitive_files.append(file_path)
- except Exception as e:
- log.error(f"Scan failed: {e}")
-
- scan_pbar.update(1)
-
- # Write all sensitive files at once
- with open("Sensitive_File_Paths.txt", "a") as sensitive_file:
- if sensitive_files:
- sensitive_file.write("\n".join(sensitive_files) + "\n")
- else:
- sensitive_file.write("Sadly no sensitive file's were detected.")
-
- except Exception as e:
- log.error(f"Scanning error: {e}")
+ log.info("Collecting files...")
+ all_files = []
+
+ for path in scan_paths:
+ try:
+ all_files.extend(str(f) for f in Path(path).rglob('*') if f.is_file())
+ log.debug(f"Found {len(all_files)} files in {path}")
+ except Exception as e:
+ log.warning(f"Skipping path {path} due to error: {e}")
+
+ log.info(f"Collected {len(all_files)} files.")
+
+ loop = asyncio.get_event_loop()
+ loop.run_until_complete(self._async_scan(all_files))
+
+ async def _async_scan(self, files: list[str]) -> None:
+ valid_files = []
+
+ for file in files:
+ try:
+ file_size_mb = os.path.getsize(file) / (1024 * 1024)
+ if MAX_FILE_SIZE_MB and file_size_mb > MAX_FILE_SIZE_MB:
+ continue
+ if any(file.lower().endswith(ext) for ext in UNREADABLE_EXTENSIONS):
+ continue
+ valid_files.append(file)
+ except Exception as e:
+ log.debug(f"Skipping file {file}: {e}")
+
+ log.info(f"Valid files to scan: {len(valid_files)}")
+
+ semaphore = asyncio.Semaphore(max_workers)
+ sensitive_files = []
+
+ async def scan_worker(scan_file):
+ async with semaphore:
+ result, prob, reason = await self.scanner.scan_file_async(scan_file)
+ if result:
+ log.debug(f"SENSITIVE: {scan_file} | Confidence: {prob:.2f} | Reason: {reason}")
+ sensitive_files.append(scan_file)
+
+ tasks = [scan_worker(f) for f in valid_files]
+
+ with tqdm(total=len(valid_files), desc="\033[32mSCAN\033[0m \033[94mScanning Files\033[0m",
+ unit="file", bar_format="{l_bar} {bar} {n_fmt}/{total_fmt}\n") as pbar:
+ for f in asyncio.as_completed(tasks):
+ await f
+ pbar.update(1)
+
+ with open("Sensitive_File_Paths.txt", "a") as out:
+ out.write("\n".join(sensitive_files) + "\n" if sensitive_files else "No sensitive files detected.\n")
self.scanner.cleanup()
@@ -243,5 +187,5 @@ def scan_directory(self, scan_paths: list[str]) -> None:
vulnscan = VulnScan("VulnScan/Model SenseMini .3n3.pth", "VulnScan/Vectorizer .3n3.pkl")
vulnscan.scan_directory(base_paths)
except KeyboardInterrupt:
- log.warning("User interrupted. Please don't do this as it won't follow the code's cleanup process")
+ log.warning("User interrupted. Exiting gracefully.")
exit(0)
diff --git a/CODE/wifi_stealer.py b/CODE/wifi_stealer.py
index 85afdbc9..efb95567 100644
--- a/CODE/wifi_stealer.py
+++ b/CODE/wifi_stealer.py
@@ -1,6 +1,6 @@
from __future__ import annotations
-from logicytics import log, Execute
+from logicytics import log, execute
def get_password(ssid: str) -> str | None:
@@ -22,7 +22,7 @@ def get_password(ssid: str) -> str | None:
- Logs any errors encountered during the process
"""
try:
- command_output = Execute.command(
+ command_output = execute.command(
f'netsh wlan show profile name="{ssid}" key=clear'
)
if command_output:
@@ -30,8 +30,10 @@ def get_password(ssid: str) -> str | None:
for line in key_content:
if "Key Content" in line:
return line.split(":")[1].strip()
+ return None
except Exception as err:
log.error(err)
+ return None
def parse_wifi_names(command_output: str) -> list:
@@ -43,14 +45,6 @@ def parse_wifi_names(command_output: str) -> list:
Returns:
list: A list of extracted Wi-Fi profile names, stripped of whitespace.
-
- Raises:
- No explicit exceptions are raised by this function.
-
- Example:
- >>> output = "All User Profile : HomeNetwork\\nAll User Profile : WorkWiFi"
- >>> parse_wifi_names(output)
- ['HomeNetwork', 'WorkWiFi']
"""
wifi_names = []
@@ -81,11 +75,12 @@ def get_wifi_names() -> list:
"""
try:
log.info("Retrieving Wi-Fi names...")
- wifi_names = parse_wifi_names(Execute.command("netsh wlan show profile"))
+ wifi_names = parse_wifi_names(execute.command("netsh wlan show profile"))
log.info(f"Retrieved {len(wifi_names)} Wi-Fi names.")
return wifi_names
except Exception as err:
log.error(err)
+ return []
@log.function
diff --git a/CODE/wmic.py b/CODE/wmic.py
index 64625031..8978bf41 100644
--- a/CODE/wmic.py
+++ b/CODE/wmic.py
@@ -1,4 +1,4 @@
-from logicytics import log, Execute
+from logicytics import log, execute
@log.function
@@ -16,7 +16,7 @@ def wmic():
Returns:
None
"""
- data = Execute.command("wmic BIOS get Manufacturer,Name,Version /format:htable")
+ data = execute.command("wmic BIOS get Manufacturer,Name,Version /format:htable")
with open("WMIC.html", "w") as file:
file.write(data)
wmic_commands = [
@@ -28,7 +28,7 @@ def wmic():
with open("wmic_output.txt", "w") as file:
for index, command in enumerate(wmic_commands):
log.info(f"Executing Command Number {index + 1}: {command}")
- output = Execute.command(command)
+ output = execute.command(command)
file.write("-" * 190)
file.write(f"Command {index + 1}: {command}\n")
file.write(output)
diff --git a/PLANS.md b/PLANS.md
index 6424ae97..661ea33d 100644
--- a/PLANS.md
+++ b/PLANS.md
@@ -5,16 +5,13 @@
> - ❌ ➡️ Might be done, Not sure yet
> - ✅ ➡️ Will be done, 100% sure
-| Task | Version | Might or Will be done? |
-|----------------------------------------------------------------------------------------------|---------|------------------------|
-| Implement TODOs for v3.4.1 | v3.4.1 | ✅ |
-| Add docstrings to all functions as well as var types | v3.4.2 | ✅ |
-| Implement Cleanup functionality for Logicytics if KeyboardInterrupt occurs | v3.4.2 | ✅ |
-| Implement TODOs for v3.4.2 | v3.4.2 | ✅ |
-| Implement logs for the logicytics lib, rather than prints | v3.4.2 | ✅ |
-| Implement the 2 missing flags | v3.5.0 | ✅ |
-| Move VulnScan tools and v3 module to separate repository, keep only the model and vectorizer | v3.5.0 | ✅ |
-| Get any BETA features out of BETA | v3.6.0 | ✅ |
-| Remake VulnScan .pkl and .pth to be more accurate | v3.6.0 | ✅ |
-| Encrypted Volume Detection and Analysis, Advanced USB Device History Tracker | v3.7.0 | ✅ |
-| Smush `sensitive data miner` with `vulnscan` | v3.7.0 | ✅ |
+| Task | Version | Might or Will be done? |
+|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---------|------------------------|
+| Implement the 2 missing flags | v3.5.0 | ✅ |
+| Move VulnScan tools and v3 module to separate repository, keep only the model and vectorizer | v3.5.0 | ✅ |
+| Get any BETA features out of BETA | v3.6.0 | ✅ |
+| Replace Logger.py with Util that contains (tprint), also implement the ExceptionHandler and UpdateManager from Util | v3.6.0 | ✅ |
+| Remake VulnScan .pkl and .pth to be more accurate | v3.6.0 | ❌ |
+| Encrypted Volume Detection and Analysis, Advanced USB Device History Tracker | v3.7.0 | ❌ |
+| Merge `sensitive data miner` with `vulnscan` | v3.7.0 | ✅ |
+| Remake Logicytics End-Execution cycle, where files created must go in `temp/` directory, and zipper takes it from there only, simplifying any code logic with this as well | v4.0.0 | ✅ |
diff --git a/SECURITY.md b/SECURITY.md
index 994e4097..9b5c6b67 100644
--- a/SECURITY.md
+++ b/SECURITY.md
@@ -11,9 +11,9 @@ This section outlines the versions of our project that are currently supported w
| 3.2.x | ✖️ | Dec 19, 2024 |
| 3.1.x | ✖️ | Dec 11, 2024 |
| 3.0.x | ✖️ | Dec 6, 2024 |
-| 2.5.x | ✖️ | Nov 25, 2024 |
-| 2.4.x | ✖️ | Nov 12, 2024 |
-| 2.3.x | ✖️ | Sep 21, 2024 |
+| 2.5.x | ❌ | Nov 25, 2024 |
+| 2.4.x | ❌ | Nov 12, 2024 |
+| 2.3.x | ❌ | Sep 21, 2024 |
| 2.2.x | ❌ | Sep 9, 2024 |
| 2.1.x | ❌ | Aug 29, 2024 |
| 2.0.x | ❌ | Aug 25, 2024 |
diff --git a/requirements.txt b/requirements.txt
index 20a27c6a..da6e4703 100644
--- a/requirements.txt
+++ b/requirements.txt
@@ -20,4 +20,7 @@ WMI~=1.5.1
prettytable~=3.15.1
pandas~=2.2.2
scapy~=2.5.0
-psutil~=7.0.0
\ No newline at end of file
+psutil~=7.0.0
+configparser~=7.1.0
+aiofiles~=24.1.0
+cryptography~=44.0.2
\ No newline at end of file