diff --git a/.github/ISSUE_TEMPLATE/bug_report.yml b/.github/ISSUE_TEMPLATE/bug_report.yml
index 16554f39..7029f67c 100644
--- a/.github/ISSUE_TEMPLATE/bug_report.yml
+++ b/.github/ISSUE_TEMPLATE/bug_report.yml
@@ -44,9 +44,9 @@ body:
validations:
required: false
- type: dropdown
- id: flags
+ id: flags_list
attributes:
- label: What flags were you using to run Logicytics?
+ label: What flags_list were you using to run Logicytics?
multiple: false
options:
- Threading
diff --git a/.gitignore b/.gitignore
index add49568..65c57571 100644
--- a/.gitignore
+++ b/.gitignore
@@ -320,3 +320,5 @@ $RECYCLE.BIN/
/CODE/SysInternal_Suite/.sys.ignore
/ACCESS/
/CODE/VulnScan/tools/NN features/
+/CODE/logicytics/User_History.json.gz
+/CODE/logicytics/User_History.json
diff --git a/.idea/csv-editor.xml b/.idea/csv-editor.xml
new file mode 100644
index 00000000..cb2fb408
--- /dev/null
+++ b/.idea/csv-editor.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/CODE/Logicytics.py b/CODE/Logicytics.py
index 1c6617e7..e97b2b8c 100644
--- a/CODE/Logicytics.py
+++ b/CODE/Logicytics.py
@@ -3,8 +3,8 @@
import os
import shutil
import subprocess
-import threading
import zipfile
+from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime
from typing import Any
@@ -75,35 +75,42 @@ def update() -> tuple[str, str]:
def get_flags():
"""
- Retrieves the command-line flags and sub-actions.
-
- This function checks if the flags are provided as a tuple. If so, it attempts to unpack
- the tuple into ACTION and SUB_ACTION. If an exception occurs, it sets SUB_ACTION to None.
- If the flags are not a tuple, it prints the help message and exits the program.
-
+ Retrieves action and sub-action flags from the Flag module and sets global variables.
+
+ This function extracts the current action and sub-action from the Flag module, setting global
+ ACTION and SUB_ACTION variables. It logs the retrieved values for debugging and tracing purposes.
+
+ No parameters.
+
+ Side effects:
+ - Sets global variables ACTION and SUB_ACTION
+ - Logs debug information about current action and sub-action
"""
global ACTION, SUB_ACTION
- if isinstance(Flag.data(), tuple):
- try:
- # Get flags
- ACTION, SUB_ACTION = Flag.data()
- except ValueError:
- actions = Flag.data()
- ACTION = actions[0]
- SUB_ACTION = None
- else:
- parser = Flag.data()
- parser.print_help()
- input("Press Enter to exit...")
- exit(1)
+ # Get flags_list
+ ACTION, SUB_ACTION = Flag.data()
+ log.debug(f"Action: {ACTION}")
+ log.debug(f"Sub-Action: {SUB_ACTION}")
def special_execute(file_path: str):
"""
- Executes a Python script in a new command prompt window.
-
- Args:
- file_path (str): The relative path to the Python script to be executed.
+ Execute a Python script in a new command prompt window.
+
+ This function launches the specified Python script in a separate command prompt window, waits for its completion, and then exits the current process.
+
+ Parameters:
+ file_path (str): The relative path to the Python script to be executed,
+ which will be resolved relative to the current script's directory.
+
+ Side Effects:
+ - Opens a new command prompt window
+ - Runs the specified Python script
+ - Terminates the current process after script execution
+
+ Raises:
+ FileNotFoundError: If the specified script path does not exist
+ subprocess.SubprocessError: If there are issues launching the subprocess
"""
sr_current_dir = os.path.dirname(os.path.abspath(__file__))
sr_script_path = os.path.join(sr_current_dir, file_path)
@@ -114,12 +121,23 @@ def special_execute(file_path: str):
def handle_special_actions():
"""
- Handles special actions based on the provided action flag.
-
- This function checks the value of the `action` variable and performs
- corresponding special actions such as opening debug, developer, or extra
- tools menus, updating the repository, restoring backups, creating backups,
- or unzipping extra files.
+ Handles special actions based on the current action flag.
+
+ This function performs specific actions depending on the global `ACTION` variable:
+ - For "debug": Opens the debug menu by executing '_debug.py'
+ - For "dev": Opens the developer menu by executing '_dev.py'
+ - For "update": Updates the repository using Health.update() method
+ - For "restore": Displays a warning and opens the backup location
+ - For "backup": Creates backups of the CODE and MODS directories
+
+ Side Effects:
+ - Logs informational, debug, warning, or error messages
+ - May execute external Python scripts
+ - May open file locations
+ - May terminate the program after completing special actions
+
+ Raises:
+ SystemExit: Exits the program after completing certain special actions
"""
# Special actions -> Quit
if ACTION == "debug":
@@ -135,14 +153,6 @@ def handle_special_actions():
log.info("Opening developer menu...")
special_execute("_dev.py")
- # Deprecated, remove in v3.3.0
- if ACTION == "extra":
- print("\033[91mDeprecationWarning: The `extra` feature has been removed! 🚫\n"
- "Why? It didn't match our code quality standards.\n"
- "What to use instead? Check out our new features with --help\033[0m")
- input("Press Enter to exit...")
- exit(0)
-
if ACTION == "update":
log.info("Updating...")
message, log_type = Health.update()
@@ -173,22 +183,22 @@ def handle_special_actions():
input("Press Enter to exit...")
exit(0)
- # Deprecated, remove in v3.3.0
- if ACTION == "unzip_extra":
- print("\033[91mDeprecationWarning: The `unzip_extra` feature has been removed! 🚫\n"
- "Why? It didn't match our code quality standards.\n"
- "What to use instead? Check out our new features with --help\033[0m")
- input("Press Enter to exit...")
- exit(0)
-
def check_privileges():
"""
Checks if the script is running with administrative privileges and handles UAC (User Account Control) settings.
-
+
This function verifies if the script has admin privileges. If not, it either logs a warning (in debug mode) or
prompts the user to run the script with admin privileges and exits. It also checks if UAC is enabled and logs
warnings accordingly.
+
+ Raises:
+ SystemExit: If the script is not running with admin privileges and not in debug mode.
+
+ Notes:
+ - Requires the `Check` module with `admin()` and `uac()` methods
+ - Depends on global `DEBUG` configuration variable
+ - Logs warnings or critical messages based on privilege and UAC status
"""
if not Check.admin():
if DEBUG == "DEBUG":
@@ -205,16 +215,33 @@ def check_privileges():
def generate_execution_list() -> list | list[str] | list[str | Any]:
"""
- Creates an execution list based on the provided action.
-
+ Generate an execution list of scripts based on the specified action.
+
+ This function dynamically creates a list of scripts to be executed by filtering and selecting
+ scripts based on the global ACTION variable. It supports different execution modes:
+ - 'minimal': A predefined set of lightweight scripts
+ - 'nopy': PowerShell and script-based scripts without Python
+ - 'modded': Includes scripts from the MODS directory
+ - 'depth': Comprehensive script execution with data mining and logging scripts
+ - 'vulnscan_ai': Vulnerability scanning script only
+
Returns:
- list: The execution list of scripts to be executed.
+ list[str]: A list of script file paths to be executed, filtered and modified based on the current action.
+
+ Raises:
+ ValueError: Implicitly if a script file cannot be removed from the initial list.
+
+ Notes:
+ - Removes sensitive or unnecessary scripts from the initial file list
+ - Logs the final execution list for debugging purposes
+ - Warns users about potential long execution times for certain actions
"""
execution_list = Get.list_of_files(".", extensions=(".py", ".exe", ".ps1", ".bat"))
execution_list.remove("sensitive_data_miner.py")
execution_list.remove("dir_list.py")
execution_list.remove("tree.ps1")
execution_list.remove("vulnscan.py")
+ execution_list.remove("event_log.py")
if ACTION == "minimal":
execution_list = [
@@ -248,6 +275,7 @@ def generate_execution_list() -> list | list[str] | list[str | Any]:
execution_list.append("sensitive_data_miner.py")
execution_list.append("dir_list.py")
execution_list.append("tree.ps1")
+ execution_list.append("event_log.py")
log.warning("This flag will use threading!")
if ACTION == "vulnscan_ai":
@@ -262,34 +290,42 @@ def execute_scripts():
"""Executes the scripts in the execution list based on the action."""
# Check weather to use threading or not, as well as execute code
log.info("Starting Logicytics...")
+
if ACTION == "threaded" or ACTION == "depth":
- def threaded_execution(execution_list_thread, index_thread):
- log.debug(f"Thread {index_thread} started")
+
+ def execute_single_script(script: str) -> tuple[str, Exception | None]:
+ """
+ Executes a single script and logs the result.
+
+ This function executes a single script and logs the result,
+ capturing any exceptions that occur during execution
+
+ Parameters:
+ script (str): The path to the script to be executed
+ """
+ log.debug(f"Executing {script}")
try:
- log.parse_execution(Execute.script(execution_list_thread[index_thread]))
- log.info(f"{execution_list_thread[index_thread]} executed")
- except UnicodeDecodeError as err:
- log.error(f"Error in thread: {err}")
+ log.parse_execution(Execute.script(script))
+ log.info(f"{script} executed")
+ return script, None
except Exception as err:
- log.error(f"Error in thread: {err}")
- log.debug(f"Thread {index_thread} finished")
+ log.error(f"Error executing {script}: {err}")
+ return script, err
log.debug("Using threading")
- threads = []
execution_list = generate_execution_list()
- for index, _ in enumerate(execution_list):
- thread = threading.Thread(
- target=threaded_execution,
- args=(
- execution_list,
- index,
- ),
- )
- threads.append(thread)
- thread.start()
-
- for thread in threads:
- thread.join()
+ with ThreadPoolExecutor() as executor:
+ futures = {executor.submit(execute_single_script, script): script
+ for script in execution_list}
+
+ for future in as_completed(futures):
+ script = futures[future]
+ result, error = future.result()
+ if error:
+ log.error(f"Failed to execute {script}")
+ else:
+ log.debug(f"Completed {script}")
+
elif ACTION == "performance_check":
execution_times = []
execution_list = generate_execution_list()
@@ -367,8 +403,23 @@ def handle_sub_action():
# log.warning("This feature is not implemented yet! Sorry")
-if __name__ == "__main__":
- # Get flags and configs
+@log.function
+def Logicytics():
+ """
+ Orchestrates the complete Logicytics workflow, managing script execution, system actions, and user interactions.
+
+ This function serves as the primary entry point for the Logicytics utility, coordinating a series of system-level operations:
+ - Retrieves command-line configuration flags
+ - Processes special actions
+ - Verifies system privileges
+ - Executes targeted scripts
+ - Compresses generated output files
+ - Handles final system sub-actions
+ - Provides a graceful exit mechanism
+
+ Performs actions sequentially without returning a value, designed to be the main execution flow of the Logicytics utility.
+ """
+ # Get flags_list and configs
get_flags()
# Check for special actions
handle_special_actions()
@@ -382,6 +433,10 @@ def handle_sub_action():
handle_sub_action()
# Finish
input("Press Enter to exit...")
+
+
+if __name__ == "__main__":
+ Logicytics()
else:
log.error("This script cannot be imported!")
exit(1)
diff --git a/CODE/_debug.py b/CODE/_debug.py
index 6d79a68e..7ca58dcc 100644
--- a/CODE/_debug.py
+++ b/CODE/_debug.py
@@ -178,6 +178,27 @@ def cpu_info() -> tuple[str, str, str]:
def python_version():
+ """
+ Checks the current Python version against recommended version ranges and logs the result.
+
+ This function determines the compatibility of the current Python runtime by comparing its version
+ against predefined minimum and maximum version thresholds. It provides informative logging about
+ the Python version status.
+
+ Logs:
+ - Info: When Python version is within the recommended range (3.11.x to 3.12.x)
+ - Warning: When Python version is below the minimum recommended version (< 3.11)
+ - Error: When Python version is above the maximum supported version (>= 3.13) or parsing fails
+
+ Raises:
+ No explicit exceptions are raised; errors are logged internally
+
+ Example:
+ Typical log outputs might include:
+ - "Python Version: 3.11.5 - Perfect"
+ - "Python Version: 3.10.2 - Recommended: 3.11.x"
+ - "Python Version: 3.13.0 - Incompatible"
+ """
version = sys.version.split()[0]
MIN_VERSION = (3, 11)
MAX_VERSION = (3, 13)
@@ -211,9 +232,28 @@ def get_online_config() -> dict | None:
return None
+@log_debug.function
def debug():
"""
- Executes system checks and logs results.
+ Executes a comprehensive system debug routine, performing various checks and logging system information.
+
+ This function performs the following tasks:
+ - Clears the existing debug log file
+ - Retrieves and validates online configuration
+ - Checks system version compatibility
+ - Verifies required file integrity
+ - Checks SysInternal binaries
+ - Logs system privileges and environment details
+ - Checks Python version compatibility
+ - Retrieves and logs CPU information
+
+ Logs are written to the debug log file, capturing system state, configuration, and potential issues.
+
+ Notes:
+ - Requires admin privileges for full system checks
+ - Logs information about execution environment
+ - Checks system and Python version compatibility
+ - Provides insights into system configuration and potential security settings
"""
# Clear Debug Log
log_path = "../ACCESS/LOGS/DEBUG/DEBUG.log"
diff --git a/CODE/_dev.py b/CODE/_dev.py
index 302fb22d..8be3ab92 100644
--- a/CODE/_dev.py
+++ b/CODE/_dev.py
@@ -1,8 +1,9 @@
from __future__ import annotations
-import configobj
import subprocess
+import configobj
+
from logicytics import Log, DEBUG, Get, FileManagement, CURRENT_FILES, VERSION
if __name__ == "__main__":
@@ -39,15 +40,26 @@ def _update_ini_file(filename: str, new_data: list | str, key: str) -> None:
def _prompt_user(question: str, file_to_open: str = None, special: bool = False) -> bool:
"""
- Prompts the user with a question and optionally opens a file if the answer is not 'yes'.
- Args:
- question (str): The question to ask the user.
- file_to_open (str, optional): The file to open if the user doesn't answer 'yes'.
- Returns:
- bool: True if the user's answer is 'yes', otherwise False.
- """
+ Prompts the user with a yes/no question and optionally opens a file.
+
+ Parameters:
+ question (str): The question to be presented to the user.
+ file_to_open (str, optional): Path to a file that will be opened if the user does not respond affirmatively.
+ special (bool, optional): Flag to suppress the default reminder message when the user responds negatively.
+
+ Returns:
+ bool: True if the user responds with 'yes' or 'Y', False otherwise.
+
+ Raises:
+ Exception: Logs any unexpected errors during user interaction.
+
+ Notes:
+ - Uses subprocess to open files on Windows systems
+ - Case-insensitive input handling for 'yes' responses
+ - Provides optional file opening and reminder messaging
+ """
try:
- answer = input(question + " (yes or no):- ")
+ answer = input(question + " (Y)es or (N)o:- ")
if not (answer.lower() == "yes" or answer.lower() == "y"):
if file_to_open:
subprocess.run(["start", file_to_open], shell=True)
@@ -61,11 +73,35 @@ def _prompt_user(question: str, file_to_open: str = None, special: bool = False)
log_dev.error(e)
+@log_dev.function
def dev_checks() -> None:
"""
- Performs a series of checks to ensure that the developer has followed the required guidelines and best practices.
- Returns:
- bool: True if all checks pass, otherwise False.
+ Performs comprehensive developer checks to ensure code quality and project guidelines compliance.
+
+ This function guides developers through a series of predefined checks, validates file additions,
+ and updates project configuration. It performs the following key steps:
+ - Verify adherence to contributing guidelines
+ - Check file naming conventions
+ - Validate file placement
+ - Confirm docstring and comment coverage
+ - Assess feature modularity
+ - Categorize and display file changes
+ - Update project configuration file
+
+ Raises:
+ None: Returns None if any check fails or an error occurs during the process.
+
+ Side Effects:
+ - Creates necessary directories
+ - Prompts user for multiple confirmations
+ - Prints file change lists with color coding
+ - Updates configuration file with current files and version
+ - Logs warnings or errors during the process
+
+ Example:
+ Typical usage is during project development to ensure consistent practices:
+ >>> dev_checks()
+ # Interactively guides developer through project checks
"""
# Create the necessary directories if they do not exist
FileManagement.mkdir()
@@ -87,20 +123,21 @@ def dev_checks() -> None:
return None
# Get the list of files in the current directory
- files = Get.list_of_files(".", True)
+ EXCLUDE_FILES = ["logicytics\\User_History.json.gz", "logicytics\\User_History.json"]
+ files = Get.list_of_files(".", True, exclude_files=EXCLUDE_FILES)
added_files, removed_files, normal_files = [], [], []
clean_files_list = [file.replace('"', '') for file in CURRENT_FILES]
for f in files:
clean_f = f.replace('"', '')
- if clean_f in clean_files_list:
+ if clean_f in clean_files_list and clean_f not in EXCLUDE_FILES:
normal_files.append(clean_f)
- else:
+ elif clean_f not in EXCLUDE_FILES:
added_files.append(clean_f)
for f in clean_files_list:
clean_f = f.replace('"', '')
- if clean_f not in files:
+ if clean_f not in files and clean_f not in EXCLUDE_FILES:
removed_files.append(clean_f)
# Print the list of added, removed, and normal files in color
@@ -124,9 +161,10 @@ def dev_checks() -> None:
print("\nGreat Job! Please tick the box in the GitHub PR request for completing steps in --dev")
except Exception as e:
# Log any exceptions that occur during the process
- log_dev.exception(str(e))
+ log_dev.error(f"An error occurred: {e}")
-dev_checks()
-# Wait for the user to press Enter to exit the program
-input("\nPress Enter to exit the program... ")
+if __name__ == "__main__":
+ dev_checks()
+ # Wait for the user to press Enter to exit the program
+ input("\nPress Enter to exit the program... ")
diff --git a/CODE/bluetooth_details.py b/CODE/bluetooth_details.py
index d257485b..f15b3749 100644
--- a/CODE/bluetooth_details.py
+++ b/CODE/bluetooth_details.py
@@ -1,46 +1,54 @@
-import subprocess
+from __future__ import annotations
+
import json
+import subprocess
+
from logicytics import Log, DEBUG
if __name__ == "__main__":
log = Log({"log_level": DEBUG})
+@log.function
def get_bluetooth_device_details():
"""
Retrieves and logs detailed information about Bluetooth devices on the system.
-
- The function runs a PowerShell command to query devices whose names contain the term 'Bluetooth'.
- It writes the information to a text file named 'Bluetooth Info.txt'.
-
- Information for each device includes:
- - Name
- - Device ID
- - Description
- - Manufacturer
- - Status
- - PNP Device ID
-
- Logs errors if any issues are encountered during the process.
-
+
+ Executes a PowerShell query to collect Bluetooth device details and writes the information to a text file.
+ The function performs the following key actions:
+ - Logs the start of the device information retrieval process
+ - Queries Bluetooth devices using an internal helper function
+ - Writes device details to 'Bluetooth Info.txt' if devices are found
+
Returns:
- None
+ None: No return value; results are written to a file and logged
"""
log.info("Fetching detailed info for Bluetooth devices...")
- try:
- devices = _query_bluetooth_devices()
+ devices = _query_bluetooth_devices()
+ if devices:
_write_device_info_to_file(devices, "Bluetooth Info.txt")
- except Exception as e:
- log.error(f"Error: {e}")
- exit(1)
-def _query_bluetooth_devices():
+def _query_bluetooth_devices() -> bool | list[dict[str, str]]:
"""
Queries the system for Bluetooth devices using PowerShell commands.
-
+
+ Executes a PowerShell command to retrieve detailed information about Bluetooth devices connected to the system.
+ The function handles potential errors during command execution and JSON parsing, providing fallback values
+ for device information.
+
Returns:
- list: A list of device information dictionaries.
+ bool | list[dict[str, str]]: A list of device information dictionaries or False if an error occurs.
+ Each dictionary contains details such as Name, Device ID, Description, Manufacturer, Status, and PNP Device ID.
+
+ Raises:
+ No direct exceptions are raised. Errors are logged and the function returns False.
+
+ Example:
+ devices = _query_bluetooth_devices()
+ if devices:
+ for device in devices:
+ print(device['Name'])
"""
try:
# Run PowerShell command to get Bluetooth devices
@@ -49,27 +57,29 @@ def _query_bluetooth_devices():
"Select-Object FriendlyName, DeviceID, Description, Manufacturer, Status, PnpDeviceID | "
"ConvertTo-Json -Depth 3"
)
- result = subprocess.run(["powershell", "-Command", command], capture_output=True, text=True, check=True)
+ result = subprocess.run(["powershell", "-Command", command],
+ capture_output=True, text=True, check=True)
devices = json.loads(result.stdout)
except subprocess.CalledProcessError as e:
- log.error(f"Failed to query Bluetooth devices: {e}")
- exit(1)
+ log.error(f"Failed to query Bluetooth devices with command '{command}': {e}")
+ return False
except json.JSONDecodeError as e:
log.error(f"Failed to parse device information: {e}")
- exit(1)
+ return False
if isinstance(devices, dict):
devices = [devices] # Handle single result case
device_info_list = []
for device in devices:
+ FALLBACK_MSG = 'Unknown (Fallback due to failed Get request)'
device_info = {
- 'Name': device.get('FriendlyName', 'Unknown'),
- 'Device ID': device.get('DeviceID', 'Unknown'),
- 'Description': device.get('Description', 'Unknown'),
- 'Manufacturer': device.get('Manufacturer', 'Unknown'),
- 'Status': device.get('Status', 'Unknown'),
- 'PNP Device ID': device.get('PnpDeviceID', 'Unknown')
+ 'Name': device.get('FriendlyName', FALLBACK_MSG),
+ 'Device ID': device.get('DeviceID', FALLBACK_MSG),
+ 'Description': device.get('Description', FALLBACK_MSG),
+ 'Manufacturer': device.get('Manufacturer', FALLBACK_MSG),
+ 'Status': device.get('Status', FALLBACK_MSG),
+ 'PNP Device ID': device.get('PnpDeviceID', FALLBACK_MSG)
}
log.debug(f"Retrieved device: {device_info['Name']}")
device_info_list.append(device_info)
@@ -79,14 +89,20 @@ def _query_bluetooth_devices():
def _write_device_info_to_file(devices, filename):
"""
- Writes the details of the Bluetooth devices to a file.
-
+ Writes the details of Bluetooth devices to a specified file.
+
Args:
- devices (list): List of device information dictionaries.
- filename (str): Name of the file to write to.
-
- Returns:
- None
+ devices (list): A list of dictionaries containing Bluetooth device information.
+ filename (str): The path and name of the file where device details will be written.
+
+ Raises:
+ IOError: If there is an error opening or writing to the specified file.
+ OSError: If there are file system related issues during file writing.
+
+ Notes:
+ - Uses UTF-8 encoding for file writing
+ - Logs an error if file writing fails
+ - Calls _write_single_device_info() for each device in the list
"""
try:
with open(filename, "w", encoding="UTF-8") as file:
@@ -94,19 +110,29 @@ def _write_device_info_to_file(devices, filename):
_write_single_device_info(file, device_info)
except Exception as e:
log.error(f"Failed to write device information to file: {e}")
- exit(1)
def _write_single_device_info(file, device_info):
"""
- Writes information for a single Bluetooth device to the file.
-
- Args:
- file (TextIO): File object to write to.
- device_info (dict): Dictionary containing device information.
-
- Returns:
- None
+ Writes detailed information for a single Bluetooth device to the specified file.
+
+ Parameters:
+ file (TextIO): An open file object to which device information will be written.
+ device_info (dict): A dictionary containing key-value pairs of Bluetooth device attributes.
+
+ Writes the device name followed by all other device attributes, with each device's information separated by a blank line. Uses `.get()` method to provide a fallback 'Unknown' value if the device name is missing.
+
+ Example:
+ If device_info is {'Name': 'Wireless Headset', 'Address': '00:11:22:33:44:55', 'Connected': 'True'}
+ The file will contain:
+ Name: Wireless Headset
+ Address: 00:11:22:33:44:55
+ Connected: True
+
+ If no name is provided:
+ Name: Unknown
+ Address: 00:11:22:33:44:55
+ Connected: True
"""
file.write(f"Name: {device_info.get('Name', 'Unknown')}\n")
for key, value in device_info.items():
diff --git a/CODE/bluetooth_logger.py b/CODE/bluetooth_logger.py
index 39afa0ec..f94fb21f 100644
--- a/CODE/bluetooth_logger.py
+++ b/CODE/bluetooth_logger.py
@@ -1,6 +1,7 @@
-import subprocess
-import re
import datetime
+import re
+import subprocess
+
from logicytics import Log, DEBUG
if __name__ == "__main__":
@@ -9,46 +10,119 @@
# Utility function to log data to a file
def save_to_file(filename, section_title, data):
- """Logs data to a text file with a section title."""
+ """
+ Appends data to a file with a section title.
+
+ Args:
+ filename (str): Path to the file where data will be written. Must be a valid file path.
+ section_title (str): Title describing the section being added to the file.
+ data (str or list): Content to be written. Accepts either a single string or a list of strings.
+
+ Raises:
+ IOError: If the file cannot be opened or written to due to permission or path issues.
+ Exception: For any unexpected errors during file writing.
+
+ Notes:
+ - Uses UTF-8 encoding for file writing
+ - Adds decorative section separators around the content
+ - Automatically handles single string or list of strings input
+ - Logs any errors encountered during file writing
+ """
try:
with open(filename, 'a', encoding='utf-8') as file:
file.write(f"\n{'=' * 50}\n{section_title}\n{'=' * 50}\n")
file.write(f"{data}\n" if isinstance(data, str) else "\n".join(data) + "\n")
file.write(f"{'=' * 50}\n")
- except Exception as e:
- log.error(f"Error writing to file {filename}: {e}")
+ except Exception as err:
+ log.error(f"Error writing to file {filename}: {err}")
# Utility function to run PowerShell commands
def run_powershell_command(command):
- """Runs a PowerShell command and returns the output."""
+ """
+ Runs a PowerShell command and returns the output as a list of lines.
+
+ Args:
+ command (str): The PowerShell command to execute.
+
+ Returns:
+ list: A list of strings representing each line of the command output.
+ Returns an empty list if the command execution fails or an exception occurs.
+
+ Raises:
+ subprocess.CalledProcessError: If the PowerShell command returns a non-zero exit status.
+ Exception: For any unexpected errors during command execution.
+
+ Notes:
+ - Uses subprocess.run() with capture_output=True to capture command output
+ - Logs errors for failed commands or exceptions
+ - Splits command output into lines for easier processing
+ """
try:
result = subprocess.run(["powershell", "-Command", command], capture_output=True, text=True)
if result.returncode != 0:
log.error(f"PowerShell command failed with return code {result.returncode}")
return []
return result.stdout.splitlines()
- except Exception as e:
- log.error(f"Error running PowerShell command: {e}")
+ except Exception as err:
+ log.error(f"Error running PowerShell command: {err}")
return []
# Unified parsing function for PowerShell output
def parse_output(lines, regex, group_names):
- """Parses command output using a regex and extracts specified groups."""
+ """
+ Parses the output lines using the provided regex and group names.
+
+ Parameters:
+ lines (list): A list of strings representing command output lines.
+ regex (str): Regular expression pattern to match each line.
+ group_names (list): List of group names to extract from matched regex.
+
+ Returns:
+ list: Dictionaries containing extracted group names and their values.
+
+ Raises:
+ Exception: If parsing the output encounters an unexpected error.
+
+ Notes:
+ - Skips lines that do not match the provided regex pattern
+ - Logs debug messages for unrecognized lines
+ - Logs error if parsing fails completely
+ """
results = []
- for line in lines:
- match = re.match(regex, line)
- if match:
- results.append({name: match.group(name) for name in group_names})
- else:
- log.debug(f"Skipping unrecognized line: {line}")
- return results
+ try:
+ for line in lines:
+ match = re.match(regex, line)
+ if match:
+ results.append({name: match.group(name) for name in group_names})
+ else:
+ log.debug(f"Skipping unrecognized line: {line}")
+ return results
+ except Exception as err:
+ log.error(f"Parsing output failed: {err}")
# Function to get paired Bluetooth devices
def get_paired_bluetooth_devices():
- """Retrieves paired Bluetooth devices with names and MAC addresses."""
+ """
+ Retrieves a list of paired Bluetooth devices with their names and MAC addresses.
+
+ This function executes a PowerShell command to fetch Bluetooth devices with an "OK" status,
+ parses the output to extract device details, and attempts to retrieve MAC addresses from device IDs.
+
+ Returns:
+ list: A list of formatted strings containing device names and MAC addresses.
+ Each string follows the format "Name: , MAC: ".
+
+ Raises:
+ Exception: If there are issues running the PowerShell command or parsing the output.
+
+ Example:
+ >>> devices = get_paired_bluetooth_devices()
+ >>> print(devices)
+ ['Name: Wireless Headphones, MAC: 001122334455', 'Name: Bluetooth Speaker, MAC: 667788990011']
+ """
command = (
'Get-PnpDevice -Class Bluetooth | Where-Object { $_.Status -eq "OK" } | Select-Object Name, DeviceID'
)
@@ -70,8 +144,26 @@ def get_paired_bluetooth_devices():
# Function to log all Bluetooth data
+@log.function
def log_bluetooth():
- """Logs Bluetooth device data and event logs."""
+ """
+ Logs comprehensive Bluetooth data including paired devices and system event logs.
+
+ This function performs the following actions:
+ - Captures the current timestamp
+ - Retrieves and logs paired Bluetooth devices
+ - Collects Bluetooth connection/disconnection event logs
+ - Captures Bluetooth file transfer logs
+ - Saves all collected data to 'bluetooth_data.txt'
+
+ The function uses internal utility functions to run PowerShell commands, parse outputs, and save results to a file. It provides a systematic approach to logging Bluetooth-related system information.
+
+ Logs are saved with descriptive section titles, making the output easily readable and organized. If no data is found for a specific section, a default "No logs found" message is recorded.
+
+ Note:
+ - Requires administrative or sufficient system permissions to access Windows event logs
+ - Logs are appended to the file, allowing historical tracking of Bluetooth events
+ """
log.info("Starting Bluetooth data logging...")
filename = "bluetooth_data.txt"
timestamp = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
@@ -85,7 +177,24 @@ def log_bluetooth():
log.debug(f"{section_title}: {paired_devices}")
# Collect and log event logs
- def collect_logs(title, command):
+ def collect_logs(title: str, command: str):
+ """
+ Collects and logs event logs by executing a PowerShell command and saving the results.
+
+ Args:
+ title (str): The title or description of the log section being collected.
+ command (str): The PowerShell command to execute for retrieving event logs.
+
+ Behavior:
+ - Runs the specified PowerShell command using `run_powershell_command()`
+ - Saves the log results to a file using `save_to_file()`
+ - Logs an informational message about the log collection
+ - If no logs are found, saves a default "No logs found." message
+ - Uses the global `filename` variable for log file destination
+
+ Raises:
+ Potential exceptions from `run_powershell_command()` and `save_to_file()` which are handled internally
+ """
logs = run_powershell_command(command)
save_to_file(filename, title, logs or ["No logs found."])
log.info(f"Getting {title}...")
@@ -106,4 +215,7 @@ def collect_logs(title, command):
if __name__ == "__main__":
- log_bluetooth()
+ try:
+ log_bluetooth()
+ except Exception as e:
+ log.error(f"Failed to log Bluetooth data: {e}")
diff --git a/CODE/cmd_commands.py b/CODE/cmd_commands.py
index 9ec5d7a0..0d172ba1 100644
--- a/CODE/cmd_commands.py
+++ b/CODE/cmd_commands.py
@@ -27,6 +27,7 @@ def command(file: str, commands: str, message: str, encoding: str = "UTF-8") ->
log.error(f"Error while getting {message}: {e}")
-command("Drivers.txt", "driverquery /v", "Driver Query")
-command("SysInfo.txt", "systeminfo", "System Info")
-command("GPResult.txt", "GPResult /r", "GPResult", "windows-1252")
+if __name__ == "__main__":
+ command("Drivers.txt", "driverquery /v", "Driver Query")
+ command("SysInfo.txt", "systeminfo", "System Info")
+ command("GPResult.txt", "GPResult /r", "GPResult", "windows-1252")
diff --git a/CODE/config.ini b/CODE/config.ini
index 8182315a..32e01ddd 100644
--- a/CODE/config.ini
+++ b/CODE/config.ini
@@ -6,16 +6,44 @@ log_using_debug = false
# Would you like for new logs to be created every execution?
# Or would you like to append to the same log file?
delete_old_logs = false
+# Logicytics will save preferences and history in a file,
+# This is used by Flag.py, to suggest better flags
+# Would you like this to happen?
+# This is recommended, as it will improve the suggestions - Data will never be shared
+save_preferences = true
[System Settings]
# Do not play with these settings unless you know what you are doing
-version = 3.2.0
-files = "bluetooth_details.py, bluetooth_logger.py, browser_miner.ps1, cmd_commands.py, config.ini, dir_list.py, dump_memory.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, _debug.py, _dev.py, logicytics\Checks.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\__init__.py, SysInternal_Suite\.sys.ignore, SysInternal_Suite\SysInternal_Suite.zip, VulnScan\Model SenseMini .3n3.pth, VulnScan\README.md, VulnScan\Vectorizer .3n3.pkl, VulnScan\tools\_study_network.py, VulnScan\tools\_test_gpu_acceleration.py, VulnScan\tools\_vectorizer.py, VulnScan\v2-deprecated\_generate_data.py, VulnScan\v3\_generate_data.py, VulnScan\v3\_train.py"
+version = 3.3.0
+files = "bluetooth_details.py, bluetooth_logger.py, browser_miner.ps1, cmd_commands.py, config.ini, dir_list.py, dump_memory.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, _debug.py, _dev.py, logicytics\Checks.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\User_History.json.gz, logicytics\__init__.py, SysInternal_Suite\.sys.ignore, SysInternal_Suite\SysInternal_Suite.zip, VulnScan\Model SenseMini .3n3.pth, VulnScan\README.md, VulnScan\Vectorizer .3n3.pkl, VulnScan\tools\_study_network.py, VulnScan\tools\_test_gpu_acceleration.py, VulnScan\tools\_vectorizer.py, VulnScan\v2-deprecated\_generate_data.py, VulnScan\v3\_generate_data.py, VulnScan\v3\_train.py"
###################################################
# The following settings are for specific modules #
###################################################
+[Flag Settings]
+# The minimum accuracy to suggest a flag,
+# This is a percentage, and must be a float
+# The default is 30.0, and is what we advise
+# If the accuracy is below this, the flag will move to the next suggestion process
+# The process is: difflib, then model, then history suggestions
+# Make sure to keep between 0.0 and 100.0
+accuracy_min = 30.0
+
+# This is the model to use to suggest flags,
+# I advise to keep it as all-MiniLM-L6-v2
+# This is the best model for this task, and is lightweight
+# The model MUST be a Sentence Transformer model
+model_to_use = all-MiniLM-L6-v2
+
+# Finally, should debug mode be enabled for the flag module?
+# This will print out more information to the console,
+# This is for the model itself, and is based on tqdm, it shows extra info on batches
+# As well as more information on behind the scenes
+model_debug = false
+
+###################################################
+
[PacketSniffer Settings]
# The interface to sniff packets on, keep it as WiFi for most cases
# Autocorrects between WiFi and Wi-Fi
@@ -101,3 +129,5 @@ vectorizer_path = PATH
# Number of features to visualise in the SVG Bar graph, maximum is 3000 due to limitations
# Placing -1 will visualise first 3000 features. Bar will be a color gradient heatmap.
number_of_features = -1
+
+##################################################
diff --git a/CODE/dir_list.py b/CODE/dir_list.py
index 2eb6b122..754a9851 100644
--- a/CODE/dir_list.py
+++ b/CODE/dir_list.py
@@ -7,23 +7,28 @@
log = Log({"log_level": DEBUG})
-@log.function
def run_command_threaded(directory: str, file: str, message: str, encoding: str = "UTF-8") -> None:
"""
- Executes a command for a specific directory and writes the output to a file.
-
+ Executes a PowerShell command to recursively list directory contents and writes the output to a specified file.
+
Args:
- directory (str): The directory to run the command on.
- file (str): The name of the file to write the command output to.
- message (str): A message to be logged.
- encoding (str): The encoding to write the file in.
-
- Returns:
- None
+ directory (str): The target directory path to list contents from.
+ file (str): The output file path where directory contents will be appended.
+ message (str): A descriptive message for logging the operation.
+ encoding (str, optional): File writing encoding. Defaults to "UTF-8".
+
+ Raises:
+ Exception: If command execution or file writing fails.
+
+ Notes:
+ - Uses PowerShell's Get-ChildItem with recursive flag
+ - Appends output to the specified file
+ - Logs operation start and result/error
"""
log.info(f"Executing {message} for {directory}")
try:
- command = f"powershell -Command Get-ChildItem {directory} -Recurse"
+ safe_directory = directory.replace('"', '`"') # Escape quotes
+ command = f'powershell -NoProfile -Command "Get-ChildItem \\""{safe_directory}\\"" -Recurse"'
output = Execute.command(command)
open(file, "a", encoding=encoding).write(output)
log.info(f"{message} Successful for {directory} - {file}")
@@ -34,24 +39,35 @@ def run_command_threaded(directory: str, file: str, message: str, encoding: str
@log.function
def command_threaded(base_directory: str, file: str, message: str, encoding: str = "UTF-8") -> None:
"""
- Splits the base directory into subdirectories and runs the command concurrently.
-
+ Concurrently lists contents of subdirectories within a base directory using thread pooling.
+
Args:
- base_directory (str): The base directory to split and run the command on.
- file (str): The name of the file to write the command output to.
- message (str): A message to be logged.
- encoding (str): The encoding to write the file in.
-
- Returns:
- None
+ base_directory (str): Root directory to explore and list subdirectories from.
+ file (str): Output file path to write directory listing results.
+ message (str): Descriptive logging message for the operation.
+ encoding (str, optional): File writing character encoding. Defaults to "UTF-8".
+
+ Raises:
+ Exception: Logs and captures any errors during thread pool execution.
+
+ Notes:
+ - Uses ThreadPoolExecutor for parallel directory content listing
+ - Processes each subdirectory concurrently
+ - Writes results to the specified file
+ - Handles potential errors during thread execution
"""
- with ThreadPoolExecutor() as executor:
- subdirectories = [os.path.join(base_directory, d) for d in os.listdir(base_directory) if
- os.path.isdir(os.path.join(base_directory, d))]
- futures = [executor.submit(run_command_threaded, subdir, file, message, encoding) for subdir in subdirectories]
- for future in futures:
- future.result()
+ try:
+ with ThreadPoolExecutor(max_workers=min(32, os.cpu_count() * 4)) as executor:
+ subdirectories = [os.path.join(base_directory, d) for d in os.listdir(base_directory) if
+ os.path.isdir(os.path.join(base_directory, d))]
+ futures = [executor.submit(run_command_threaded, subdir, file, message, encoding) for subdir in
+ subdirectories]
+ for future in futures:
+ future.result()
+ except Exception as e:
+ log.error(f"Thread Pool Error: {e}")
-log.warning("Running command_threaded() - This is very slow")
-command_threaded("C:\\", "Dir_Root.txt", "Root Directory Listing")
+if __name__ == "__main__":
+ log.warning("Running dir_list.py - This is very slow - We will use threading to speed it up")
+ command_threaded("C:\\", "Dir_Root.txt", "Root Directory Listing")
diff --git a/CODE/dump_memory.py b/CODE/dump_memory.py
index 927c40dd..238d7d4d 100644
--- a/CODE/dump_memory.py
+++ b/CODE/dump_memory.py
@@ -1,218 +1,220 @@
-import datetime
-import platform
-import ctypes
import os
+import platform
+import struct
+from datetime import datetime
+
import psutil
+
from logicytics import Log, DEBUG
if __name__ == "__main__":
log = Log({"log_level": DEBUG})
- # Constants
- PROCESS_QUERY_INFORMATION = 0x0400
- PROCESS_VM_READ = 0x0010
- MEM_COMMIT = 0x1000
- PAGE_READWRITE = 0x04
-
-
-# Function to save RAM content snapshot to a file
-@log.function
-def dump_ram_content():
- """
- Capture the current state of the system's RAM and write it to a file.
- This function gathers memory statistics, system-specific details, and writes
- the information to a file named 'Ram_Snapshot.txt'.
- """
- try:
- # Generate a timestamp for the file
- dump_file = "Ram_Snapshot.txt"
-
- # Gather memory statistics using psutil
- memory_info = psutil.virtual_memory()
- swap_info = psutil.swap_memory()
-
- # Get system-specific details
- system_info = (
- "System Information:\n"
- "===================================\n"
- f"OS: {platform.system()} {platform.release()}\n"
- f"Architecture: {platform.architecture()[0]}\n"
- f"Processor: {platform.processor()}\n"
- f"Machine: {platform.machine()}\n\n"
- )
-
- # Prepare content to dump
- dump_content = (
- f"RAM Snapshot - {datetime.datetime.now().strftime('%Y-%m-%d_%H-%M-%S')}\n"
- "===================================\n"
- f"{system_info}"
- f"Total Memory: {memory_info.total / (1024 ** 3):.2f} GB\n"
- f"Available Memory: {memory_info.available / (1024 ** 3):.2f} GB\n"
- f"Used Memory: {memory_info.used / (1024 ** 3):.2f} GB\n"
- f"Memory Usage: {memory_info.percent}%\n\n"
- f"Swap Total: {swap_info.total / (1024 ** 3):.2f} GB\n"
- f"Swap Used: {swap_info.used / (1024 ** 3):.2f} GB\n"
- f"Swap Free: {swap_info.free / (1024 ** 3):.2f} GB\n"
- f"Swap Usage: {swap_info.percent}%\n"
- )
-
- # Write the content to the file
- with open(dump_file, "w", encoding="utf-8") as file:
- file.write(dump_content)
-
- log.info(f"RAM snapshot saved to: {dump_file}")
+# TODO v3.3.1
+# psutil.virtual_memory(): used, free, percent, total
+# psutil.swap_memory(): used, free, percent, total
- except Exception as e:
- log.error(f"Error capturing RAM snapshot: {e}")
+# If the file size exceeds this limit, the file will be truncated with a message
+# Put 0 to disable the limit
+# TODO v3.3.1: Make this take from config.ini
+LIMIT_FILE_SIZE = 20 # Always in MiB
-# Define structures for SystemInfo
-class SystemInfo(ctypes.Structure):
- # noinspection PyUnresolvedReferences
+# Capture RAM Snapshot
+def capture_ram_snapshot():
"""
- A ctypes Structure to hold system information.
-
- Attributes:
- wProcessorArchitecture (ctypes.c_ushort): Processor architecture.
- wReserved (ctypes.c_ushort): Reserved.
- dwPageSize (ctypes.c_ulong): Page size.
- lpMinimumApplicationAddress (ctypes.c_void_p): Minimum application address.
- lpMaximumApplicationAddress (ctypes.c_void_p): Maximum application address.
- dwActiveProcessorMask (ctypes.POINTER(ctypes.c_ulong)): Active processor mask.
- dwNumberOfProcessors (ctypes.c_ulong): Number of processors.
- dwProcessorType (ctypes.c_ulong): Processor type.
- dwAllocationGranularity (ctypes.c_ulong): Allocation granularity.
- wProcessorLevel (ctypes.c_ushort): Processor level.
- wProcessorRevision (ctypes.c_ushort): Processor revision.
+ Captures and logs the current system memory statistics to a file.
+
+ Retrieves detailed information about RAM and swap memory usage using psutil.
+ Writes memory statistics in gigabytes to 'Ram_Snapshot.txt', including:
+ - Total RAM
+ - Used RAM
+ - Available RAM
+ - Total Swap memory
+ - Used Swap memory
+ - Free Swap memory
+ - Percentage of RAM used
+
+ Logs the process and handles potential file writing errors.
+
+ Raises:
+ IOError: If unable to write to the output file
+ Exception: For any unexpected errors during memory snapshot capture
"""
- _fields_ = [
- ("wProcessorArchitecture", ctypes.c_ushort),
- ("wReserved", ctypes.c_ushort),
- ("dwPageSize", ctypes.c_ulong),
- ("lpMinimumApplicationAddress", ctypes.c_void_p),
- ("lpMaximumApplicationAddress", ctypes.c_void_p),
- ("dwActiveProcessorMask", ctypes.POINTER(ctypes.c_ulong)),
- ("dwNumberOfProcessors", ctypes.c_ulong),
- ("dwProcessorType", ctypes.c_ulong),
- ("dwAllocationGranularity", ctypes.c_ulong),
- ("wProcessorLevel", ctypes.c_ushort),
- ("wProcessorRevision", ctypes.c_ushort),
- ]
-
-
-# Define BasicMemInfo
-class BasicMemInfo(ctypes.Structure):
- # noinspection PyUnresolvedReferences
+ log.info("Capturing RAM Snapshot...")
+ memory = psutil.virtual_memory()
+ swap = psutil.swap_memory()
+ with open("Ram_Snapshot.txt", "w") as file:
+ try:
+ file.write(f"Total RAM: {memory.total / (1024 ** 3):.2f} GB\n")
+ file.write(f"Used RAM: {memory.used / (1024 ** 3):.2f} GB\n")
+ file.write(f"Available RAM: {memory.available / (1024 ** 3):.2f} GB\n")
+ file.write(f"Total Swap: {swap.total / (1024 ** 3):.2f} GB\n")
+ file.write(f"Used Swap: {swap.used / (1024 ** 3):.2f} GB\n")
+ file.write(f"Free Swap: {swap.free / (1024 ** 3):.2f} GB\n")
+ file.write(f"Percent RAM Used: {memory.percent:.2f}%\n")
+ except Exception as e:
+ log.error(f"Error writing RAM snapshot: {e}")
+ file.write("Error writing RAM snapshot.")
+ log.info("RAM Snapshot saved to Ram_Snapshot.txt")
+
+
+# Gather system information
+def gather_system_info():
"""
- A ctypes Structure to hold basic memory information.
-
- Attributes:
- BaseAddress (ctypes.c_void_p): Base address.
- AllocationBase (ctypes.c_void_p): Allocation base.
- AllocationProtect (ctypes.c_ulong): Allocation protection.
- RegionSize (ctypes.c_size_t): Region size.
- State (ctypes.c_ulong): State.
- Protect (ctypes.c_ulong): Protection.
- Type (ctypes.c_ulong): Type.
+ Gather and log detailed system information to a text file.
+
+ This function collects system-specific details including architecture,
+ operating system, machine type, processor information, and page size.
+ The information is written to 'SystemRam_Info.txt' and logged for tracking.
+
+ Returns:
+ None: Writes system information directly to a file.
+
+ Raises:
+ Exception: Logs and handles any errors encountered during system
+ information retrieval, ensuring graceful error reporting.
+
+ Side Effects:
+ - Creates/overwrites 'SystemRam_Info.txt' with system details
+ - Logs information gathering process and potential errors
"""
- _fields_ = [
- ("BaseAddress", ctypes.c_void_p),
- ("AllocationBase", ctypes.c_void_p),
- ("AllocationProtect", ctypes.c_ulong),
- ("RegionSize", ctypes.c_size_t),
- ("State", ctypes.c_ulong),
- ("Protect", ctypes.c_ulong),
- ("Type", ctypes.c_ulong),
- ]
+ log.info("Gathering system information...")
+ try:
+ sys_info = {
+ 'Architecture': platform.architecture(),
+ 'System': platform.system(),
+ 'Machine': platform.machine(),
+ 'Processor': platform.processor(),
+ 'Page Size (bytes)': struct.calcsize("P"),
+ 'CPU Count': psutil.cpu_count(),
+ 'CPU Frequency': psutil.cpu_freq().current if psutil.cpu_freq() else 'N/A',
+ 'Boot Time': datetime.fromtimestamp(psutil.boot_time()).strftime('%Y-%m-%d %H:%M:%S'),
+ }
+ except Exception as e:
+ log.error(f"Error gathering system information: {e}")
+ sys_info = {'Error': 'Failed to gather system information'}
+ with open("SystemRam_Info.txt", "w") as file:
+ for key, value in sys_info.items():
+ file.write(f"{key}: {value}\n")
+ log.info("System Information saved to SystemRam_Info.txt")
-@log.function
-def get_system_info() -> SystemInfo:
+# Memory Dump (Windows-specific, using psutil)
+def memory_dump():
"""
- Retrieve and return system information using the `GetSystemInfo` function from the Windows API.
-
- Returns:
- SystemInfo: A `SystemInfo` structure containing details about the system's architecture,
- processor, memory, and other attributes.
+ Perform a memory dump of the current process, capturing detailed metadata for each readable memory region.
+
+ This function scans the memory regions of the current process and logs their metadata to 'Ram_Dump.txt'.
+ It captures information such as start and end addresses, resident set size (RSS), permissions,
+ associated file paths, and other region-specific details.
+
+ Key Features:
+ - Retrieves memory map for the current process
+ - Filters and logs only readable memory regions
+ - Captures metadata for each memory region
+ - Supports file size limitation via LIMIT_FILE_SIZE constant
+ - Handles potential errors during memory scanning and file writing
+
+ Notes:
+ - Writes metadata to 'Ram_Dump.txt' in the current working directory
+ - Truncates output if file size exceeds LIMIT_FILE_SIZE (if set)
+ - Logs errors encountered during the memory dump process
+
+ Raises:
+ psutil.Error: If there are issues accessing process memory
+ Exception: For any unexpected errors during memory scanning
"""
- system_info = SystemInfo()
- ctypes.windll.kernel32.GetSystemInfo(ctypes.byref(system_info))
- return system_info
+ log.info("Creating basic memory dump scan...")
+ pid = os.getpid()
+
+ try:
+ process = psutil.Process(pid)
+ with open("Ram_Dump.txt", "wb") as dump_file:
+ total_size = 0
+ for mem_region in process.memory_maps(grouped=False):
+ # Check available disk space
+ if os.path.exists("Ram_Dump.txt"):
+ required_space = LIMIT_FILE_SIZE * 1024 * 1024 * 1.5 # 2x safety margin
+ free_space = psutil.disk_usage(".").free
+ if free_space < required_space:
+ log.error(f"Not enough disk space. Need {required_space / 1024 / 1024:.2f}MB")
+ return
+
+ # Check if the memory region is readable ('r' permission)
+ if 'r' in mem_region.perms:
+ # Extract start and end addresses from the memory region string
+ if '-' in mem_region.addr:
+ start, end = [int(addr, 16) for addr in mem_region.addr.split('-')]
+ else:
+ start = int(mem_region.addr, 16)
+ end = start + mem_region.rss
+
+ # Gather memory region metadata
+ region_metadata = {
+ ' Start Address': hex(start),
+ ' End Address': hex(end),
+ ' RSS (bytes)': mem_region.rss, # Using rss as size
+ ' Permissions': mem_region.perms,
+ ' Path': mem_region.path, # Path is often available for shared memory regions
+ ' Index': mem_region.index,
+ }
+
+ # Try getting more detailed memory information
+ try:
+ # Check if the memory region corresponds to a file and add file metadata
+ if mem_region.path:
+ # Try to get device and inode-like info
+ file_path = mem_region.path
+ region_metadata[' File Path'] = file_path
+
+ except Exception as e:
+ log.error(f"Error adding extra file information: {str(e)}")
+
+ # Write the metadata to the dump file
+ try:
+ metadata_str = "Memory Region Metadata:\n" + "\n".join(
+ f"{key}: {value}" for key, value in region_metadata.items()) + "\n\n"
+ metadata_bytes = metadata_str.encode()
+ if total_size + len(metadata_bytes) > LIMIT_FILE_SIZE * 1024 * 1024 and LIMIT_FILE_SIZE != 0:
+ dump_file.write(f"Truncated due to file exceeding {LIMIT_FILE_SIZE}\n"
+ "Additional memory regions not included.\n".encode())
+ break
+ dump_file.write(metadata_bytes)
+ total_size += len(metadata_bytes)
+ except Exception as e:
+ log.error(f"Error writing memory region metadata: {str(e)}")
+
+ except psutil.Error as e:
+ log.error(f"Error opening process memory: {str(e)}")
+ except Exception as e:
+ log.error(f"Error creating memory scan: {str(e)}")
+
+ log.info("Memory scan saved to Ram_Dump.txt")
+# Main function to run all tasks
@log.function
-def read_memory():
+def main():
"""
- Read the memory of the current process and write the content to a file.
-
- This function opens the current process with the necessary permissions,
- retrieves system information, and iterates through memory pages to read
+ Orchestrates the execution of system memory collection tasks.
+
+ This function performs three primary operations:
+ 1. Captures a snapshot of current RAM and swap memory statistics
+ 2. Gathers detailed system information
+ 3. Creates a memory dump of the current process
+
+ The tasks are executed sequentially, with logging to track the start and completion of the entire process.
+
+ No parameters.
+ No return value.
"""
- # Open current process with permissions
- process = ctypes.windll.kernel32.OpenProcess(PROCESS_QUERY_INFORMATION | PROCESS_VM_READ, False, os.getpid())
- if not process:
- log.error("Unable to open process for reading.")
- return
-
- # Get system info
- system_info = get_system_info()
- min_address = system_info.lpMinimumApplicationAddress
- max_address = system_info.lpMaximumApplicationAddress
- with open("SystemRam_Info.txt", "w") as sys_file:
- sys_file.write("System Information:\n")
- sys_file.write("===================================\n")
- sys_file.write(f"Minimum Address: {min_address}\n")
- sys_file.write(f"Maximum Address: {max_address}\n")
- sys_file.write(f"Allocation Granularity: {system_info.dwAllocationGranularity}\n")
- sys_file.write(f"Processor Architecture: {system_info.wProcessorArchitecture}\n")
- sys_file.write(f"Number of Processors: {system_info.dwNumberOfProcessors}\n")
- sys_file.write(f"Processor Type: {system_info.dwProcessorType}\n")
- sys_file.write(f"Processor Level: {system_info.wProcessorLevel}\n")
- sys_file.write(f"Processor Revision: {system_info.wProcessorRevision}\n")
- sys_file.write(f"Page Size: {system_info.dwPageSize}\n")
- sys_file.write(f"Active Processor Mask: {system_info.dwActiveProcessorMask.contents}\n")
- sys_file.write(f"Reserved: {system_info.wReserved}\n")
- sys_file.write("===================================\n")
- sys_file.write(f"Raw SystemInfo: {system_info}\n")
- sys_file.write("===================================\n")
- log.debug(f"Memory Range: {min_address:#x} - {max_address:#x}")
-
- # Iterate through memory pages
- memory_info = BasicMemInfo()
- address = min_address
- with open("Ram_Dump.txt", "w") as dump_file:
- while address < max_address:
- result = ctypes.windll.kernel32.VirtualQueryEx(
- process, ctypes.c_void_p(address), ctypes.byref(memory_info), ctypes.sizeof(memory_info)
- )
- if not result:
- break
-
- # Check if the memory is committed and readable
- if memory_info.State == MEM_COMMIT and memory_info.Protect == PAGE_READWRITE:
- buffer = ctypes.create_string_buffer(memory_info.RegionSize)
- bytes_read = ctypes.c_size_t()
- ctypes.windll.kernel32.ReadProcessMemory(
- process,
- ctypes.c_void_p(memory_info.BaseAddress),
- buffer,
- memory_info.RegionSize,
- ctypes.byref(bytes_read),
- )
- dump_file.write(str(buffer.raw[: bytes_read.value]))
-
- address += memory_info.RegionSize
-
- # Close the process handle
- ctypes.windll.kernel32.CloseHandle(process)
- log.info("Memory dump complete. Saved to 'ram_dump.txt'.")
- log.warning("Encoding is in HEX")
+ log.info("Starting system memory collection tasks...")
+ capture_ram_snapshot()
+ gather_system_info()
+ memory_dump()
+ log.info("All tasks completed [dump_memory.py].")
if __name__ == "__main__":
- try:
- log.info("Starting memory dump process...")
- dump_ram_content()
- read_memory()
- except Exception as err:
- log.error(f"Error during memory dump: {err}")
+ main()
diff --git a/CODE/event_log.py b/CODE/event_log.py
index cb39df7b..07d62dfb 100644
--- a/CODE/event_log.py
+++ b/CODE/event_log.py
@@ -1,6 +1,8 @@
-from os import mkdir
+import os
+import shutil
+import threading
-import win32evtlog
+import wmi # Import the wmi library
from logicytics import Log, DEBUG
@@ -11,46 +13,76 @@
@log.function
-def parse_event_logs(log_type: str, output_file: str, server: str = 'localhost'):
+def parse_event_logs(log_type: str, output_file: str):
"""
- Parses Windows event logs of a specified type and writes them to an output file.
-
+ Parses Windows event logs of a specified type and writes them to an output file using WMI.
+
Args:
- log_type (str): The type of event log to parse (e.g., 'Security', 'Application').
+ log_type (str): The type of event log to parse (e.g., 'Security', 'Application', 'System').
output_file (str): The file path where the parsed event logs will be written.
- server (str): The name of the server to connect to. Default is 'localhost'.
-
+
Raises:
- Exception: If there is an error opening or reading the event log, or writing to the output file.
+ wmi.x_wmi: If there is a WMI-specific error during event log retrieval.
+ Exception: If there is a general error during file operations or log parsing.
+
+ Notes:
+ - Requires administrative privileges to access Windows event logs.
+ - Retrieves all events for the specified log type using a WMI query.
+ - Writes event details including category, timestamp, source, event ID, type, and data.
+ - Logs informational and debug messages during the parsing process.
"""
+ log.info(f"Parsing {log_type} events (Windows Events) and writing to {output_file}, this may take a while...")
try:
- hand = win32evtlog.OpenEventLog(server, log_type)
- flags = win32evtlog.EVENTLOG_BACKWARDS_READ | win32evtlog.EVENTLOG_SEQUENTIAL_READ
- total = win32evtlog.GetNumberOfEventLogRecords(hand)
+ # Initialize WMI connection
+ c = wmi.WMI()
+ # Query based on log_type ('Security', 'Application', or 'System')
+ query = f"SELECT * FROM Win32_NTLogEvent WHERE Logfile = '{log_type}'"
+ log.debug(f"Executing WMI query: {query}")
+
+ # Open the output file for writing
with open(output_file, 'w') as f:
- f.write(f"Total records: {total}\n\n")
- events = win32evtlog.ReadEventLog(hand, flags, 0)
- while events:
- for event in events:
- event_data = {
- 'Event Category': event.EventCategory,
- 'Time Generated': event.TimeGenerated.Format(),
- 'Source Name': event.SourceName,
- 'Event ID': event.EventID,
- 'Event Type': event.EventType,
- 'Event Data': event.StringInserts
- }
- f.write(str(event_data) + '\n\n')
- events = win32evtlog.ReadEventLog(hand, flags, 0)
-
- win32evtlog.CloseEventLog(hand)
+ events = c.query(query)
+ f.write(f"Total records: {len(events)}\n\n")
+ log.debug(f"Number of events retrieved: {len(events)}")
+ for event in events:
+ event_data = {
+ 'Event Category': event.Category,
+ 'Time Generated': event.TimeGenerated,
+ 'Source Name': event.SourceName,
+ 'Event ID': event.EventCode,
+ 'Event Type': event.Type,
+ 'Event Data': event.InsertionStrings
+ }
+ f.write(str(event_data) + '\n\n')
+
log.info(f"{log_type} events (Windows Events) have been written to {output_file}")
+ except wmi.x_wmi as err:
+ log.error(f"Error opening or reading the event log: {err}")
+ except Exception as err:
+ log.error(f"Fatal issue: {err}")
+
+
+if __name__ == "__main__":
+ try:
+ if os.path.exists('event_logs'):
+ shutil.rmtree('event_logs')
+ os.mkdir('event_logs')
except Exception as e:
- log.error(f"(Most likely) Permission Error: {e}")
+ log.error(f"Fatal issue: {e}")
+ exit(1)
+ threads = []
+ threads_items = [('Security', 'event_logs/Security_events.txt'),
+ ('Application', 'event_logs/App_events.txt'),
+ ('System', 'event_logs/System_events.txt')]
-mkdir('event_logs')
-parse_event_logs('Security', 'event_logs/Security_events.txt')
-parse_event_logs('Application', 'event_logs/App_events.txt')
-parse_event_logs('System', 'event_logs/System_events.txt')
+ for log_type_main, output_file_main in threads_items:
+ thread = threading.Thread(target=parse_event_logs, args=(log_type_main, output_file_main))
+ thread.daemon = True # Don't hang if main thread exits
+ threads.append(thread)
+ thread.start()
+ for thread in threads:
+ thread.join(timeout=600) # Wait max 10 minutes per thread
+ if thread.is_alive():
+ log.error(f"Thread for {thread.name} timed out (10 minutes)!")
diff --git a/CODE/log_miner.py b/CODE/log_miner.py
index a9641027..efe5531d 100644
--- a/CODE/log_miner.py
+++ b/CODE/log_miner.py
@@ -9,14 +9,23 @@
@log.function
def backup_windows_logs():
"""
- Backs up Windows system logs to a CSV file.
-
- This function constructs a PowerShell command to retrieve system logs and export them to a CSV file.
- It then executes the command using subprocess.Popen and handles any errors that may occur.
- The function logs the result of the backup operation and any errors that occur.
-
+ Backs up Windows system logs to a CSV file using PowerShell.
+
+ This function retrieves system logs and exports them to a CSV file named 'Logs_backup.csv'.
+ It uses PowerShell's Get-EventLog cmdlet to collect system logs and Export-Csv to save them.
+
+ The function handles potential errors during log backup and logs the operation's outcome.
+ If the backup fails, an error message is logged without raising an exception.
+
Returns:
None
+
+ Raises:
+ No explicit exceptions are raised; errors are logged instead.
+
+ Example:
+ When called, this function will create a 'Logs_backup.csv' file
+ containing all system event log entries.
"""
try:
log_type = "System"
@@ -35,7 +44,7 @@ def backup_windows_logs():
stdout, stderr = process.communicate(input=cmd)
if process.returncode != 0:
- raise Exception(f"Failed to backup logs: {stderr.strip()}")
+ log.error(f"Failed to backup logs: {stderr.strip()}")
log.info(f"Windows logs backed up to {backup_file}")
except Exception as e:
@@ -44,4 +53,5 @@ def backup_windows_logs():
log.info("Log Miner completed.")
-backup_windows_logs()
+if __name__ == "__main__":
+ backup_windows_logs()
diff --git a/CODE/logicytics/Execute.py b/CODE/logicytics/Execute.py
index e2eed6a3..d89a34c6 100644
--- a/CODE/logicytics/Execute.py
+++ b/CODE/logicytics/Execute.py
@@ -6,11 +6,20 @@
class Execute:
@classmethod
- def script(cls, script_path: str) -> list[list[str]] | None:
+ def script(cls, script_path: str) -> list[list[str, str]] | None:
"""
- Executes a script file and handles its output based on the file extension.
+ Execute a script file based on its file extension.
+
+ Executes Python and PowerShell scripts with different handling mechanisms. For Python scripts, runs the script and returns None. For PowerShell scripts, first unblocks the script and then executes it, returning a list of message-ID pairs.
+
Parameters:
- script_path (str): The path of the script file to be executed.
+ script_path (str): Path to the script file to be executed.
+
+ Returns:
+ list[list[str, str]] | None: A list of message-ID pairs for PowerShell scripts, or None for Python scripts.
+
+ Raises:
+ Potential subprocess-related exceptions during script execution.
"""
if script_path.endswith(".py"):
cls.__run_python_script(script_path)
diff --git a/CODE/logicytics/FileManagement.py b/CODE/logicytics/FileManagement.py
index 07f9fc3c..bc72b8b1 100644
--- a/CODE/logicytics/FileManagement.py
+++ b/CODE/logicytics/FileManagement.py
@@ -7,7 +7,6 @@
import subprocess
import zipfile
from datetime import datetime
-from pathlib import Path
class FileManagement:
@@ -36,9 +35,16 @@ def open_file(file: str, use_full_path: bool = False) -> str | None:
def mkdir():
"""
Creates the necessary directories for storing logs, backups, and data.
-
+
+ This method ensures the existence of specific directory structures used by the application, including:
+ - Log directories for general, debug, and performance logs
+ - Backup directory
+ - Data directories for storing hashes and zip files
+
+ The method uses `os.makedirs()` with `exist_ok=True` to create directories without raising an error if they already exist.
+
Returns:
- None
+ None: No return value. Directories are created as a side effect.
"""
os.makedirs("../ACCESS/LOGS/", exist_ok=True)
os.makedirs("../ACCESS/LOGS/DEBUG", exist_ok=True)
@@ -47,27 +53,6 @@ def mkdir():
os.makedirs("../ACCESS/DATA/Hashes", exist_ok=True)
os.makedirs("../ACCESS/DATA/Zip", exist_ok=True)
- @staticmethod
- def unzip(zip_path: Path):
- """
- Unzips a given zip file to a new directory with the same name.
-
- Args:
- zip_path (str): The path to the zip file to be unzipped.
-
- Returns:
- None
- """
- # Get the base name of the zip file
- base_name = os.path.splitext(os.path.basename(zip_path))[0]
-
- # Create a new directory with the same name as the zip file
- output_dir = os.path.join(os.path.dirname(zip_path), base_name)
- os.makedirs(output_dir, exist_ok=True)
-
- with zipfile.ZipFile(zip_path, "r") as z:
- z.extractall(path=str(output_dir))
-
class Zip:
"""
A class to handle zipping files, generating SHA256 hashes, and moving files.
diff --git a/CODE/logicytics/Flag.py b/CODE/logicytics/Flag.py
index 659adbd5..95f4cd75 100644
--- a/CODE/logicytics/Flag.py
+++ b/CODE/logicytics/Flag.py
@@ -1,21 +1,390 @@
from __future__ import annotations
import argparse
-from argparse import ArgumentParser
+import configparser
+import difflib
+import gzip
+import json
+import os
+from collections import Counter
+from datetime import datetime
+
+# Check if the script is being run directly, if not, set up the library
+if __name__ == '__main__':
+ exit("This is a library, Please import rather than directly run.")
+else:
+ # Set up constants and configurations
+ config = configparser.ConfigParser()
+ try:
+ config.read('config.ini')
+ except FileNotFoundError:
+ try:
+ config.read('../config.ini')
+ except FileNotFoundError:
+ exit("No configuration file found.")
+ # Save user preferences?
+ SAVE_PREFERENCES = config.getboolean("Settings", "save_preferences")
+ # Debug mode for Sentence Transformer
+ DEBUG_MODE = config.getboolean("Flag Settings", "model_debug") # Debug mode for Sentence Transformer
+ # File for storing user history data
+ HISTORY_FILE = 'logicytics/User_History.json.gz' # User history file
+ # Minimum accuracy threshold for flag suggestions
+ MIN_ACCURACY_THRESHOLD = float(
+ config.get("Flag Settings", "accuracy_min")) # Minimum accuracy threshold for flag suggestions
+ if not 0 <= MIN_ACCURACY_THRESHOLD <= 100:
+ raise ValueError("accuracy_min must be between 0 and 100")
+
+
+class Match:
+ @staticmethod
+ def __get_sim(user_input: str, all_descriptions: list[str]) -> list[float]:
+ """
+ Compute cosine similarity between user input and flag descriptions using a Sentence Transformer model.
+
+ This method encodes the user input and historical flag descriptions into embeddings and calculates their cosine similarities. It handles model loading, logging configuration, and error handling for the embedding process.
+
+ Parameters:
+ user_input (str): The current user input to match against historical descriptions
+ all_descriptions (list[str]): A list of historical flag descriptions to compare
+
+ Returns:
+ list[float]: A list of similarity scores between the user input and each historical description
+
+ Raises:
+ SystemExit: If there is an error loading the specified Sentence Transformer model
+
+ Notes:
+ - Uses the model specified in the configuration file
+ - Configures logging based on the global DEBUG_MODE setting
+ - Converts embeddings to tensors for efficient similarity computation
+ """
+ # Encode the current user input and historical inputs
+ from sentence_transformers import SentenceTransformer, util
+
+ import logging # Suppress logging messages from Sentence Transformer due to verbosity
+ # Set the logging level based on the debug mode, either DEBUG or ERROR (aka only important messages)
+ if DEBUG_MODE:
+ logging.getLogger("sentence_transformers").setLevel(logging.DEBUG)
+ else:
+ logging.getLogger("sentence_transformers").setLevel(logging.ERROR)
+
+ try:
+ MODEL = SentenceTransformer(config.get("Flag Settings", "model_to_use"))
+ except Exception as e:
+ print(f"Error: {e}")
+ print("Please check the model name in the config file.")
+ print(f"Model name {config.get('Flag Settings', 'model_to_use')} may not be valid.")
+ exit(1)
+
+ user_embedding = MODEL.encode(user_input, convert_to_tensor=True, show_progress_bar=DEBUG_MODE)
+ historical_embeddings = MODEL.encode(all_descriptions, convert_to_tensor=True, show_progress_bar=DEBUG_MODE)
+
+ # Compute cosine similarities
+ similarities = util.pytorch_cos_sim(user_embedding, historical_embeddings).squeeze(0).tolist()
+ return similarities
+
+ @classmethod
+ def __suggest_flags_based_on_history(cls, user_input: str) -> list[str]:
+ """
+ Suggests flags based on historical data and similarity to the current input.
+
+ This method analyzes historical user interactions to recommend relevant flags when preferences for saving history are enabled. It uses semantic similarity to find the most contextually related flags from past interactions.
+
+ Parameters:
+ user_input (str): The current input for which suggestions are needed.
+
+ Returns:
+ list[str]: A list of suggested flags derived from historical interactions, filtered by similarity threshold.
+
+ Notes:
+ - Returns an empty list if history saving is disabled or no interaction history exists
+ - Uses cosine similarity with a minimum threshold of 0.3 to filter suggestions
+ - Limits suggestions to top 3 most similar historical inputs
+ - Removes duplicate flag suggestions
+ """
+ if not SAVE_PREFERENCES:
+ return []
+ history_data = cls.load_history()
+ if not history_data or 'interactions' not in history_data:
+ return []
+
+ interactions = history_data['interactions']
+ all_descriptions = []
+ all_flags = []
+
+ # Combine all flags and their respective user inputs
+ for flag, details in interactions.items():
+ all_flags.extend([flag] * len(details))
+ all_descriptions.extend([detail['user_input'] for detail in details])
+
+ # Encode the current user input and historical inputs
+ # Compute cosine similarities
+ similarities = cls.__get_sim(user_input, all_descriptions)
+
+ # Find the top 3 most similar historical inputs
+ top_indices = sorted(range(len(similarities)), key=lambda i: similarities[i], reverse=True)[:3]
+ suggested_flags = [all_flags[i] for i in top_indices if similarities[i] > 0.3]
+
+ # Remove duplicates and return suggestions
+ return list(dict.fromkeys(suggested_flags))
+
+ @classmethod
+ def _generate_summary_and_graph(cls):
+ """
+ Generates a comprehensive summary and visualization of user interaction history with command-line flags.
+
+ This method processes historical interaction data, computes statistical insights, and creates a bar graph representing flag usage frequency. It performs the following key tasks:
+ - Loads historical interaction data from a compressed file
+ - Calculates and prints detailed statistics for each flag
+ - Generates a horizontal bar graph of flag usage counts
+ - Saves the graph visualization to a PNG file
+
+ Parameters:
+ cls (Match): The class instance containing historical data methods
+
+ Raises:
+ SystemExit: If no history data file is found
+ FileNotFoundError: If unable to save the graph in default locations
+
+ Side Effects:
+ - Prints detailed interaction summary to console
+ - Saves flag usage graph as a PNG image
+ - Uses matplotlib to create visualization
+
+ Notes:
+ - Currently in beta stage of development
+ - Requires matplotlib for graph generation
+ - Attempts to save graph in multiple predefined directory paths
+ """
+ # TODO Yet in beta
+ # Load the decompressed history data using the load_history function
+ import matplotlib.pyplot as plt
+
+ if not os.path.exists(HISTORY_FILE):
+ exit("No history data found.")
+
+ history_data = cls.load_history()
+
+ # Extract interactions and flag usage count
+ interactions = history_data['interactions']
+ flags_usage = history_data['flags_usage']
+
+ # Summary of flag usage
+ total_interactions = sum(flags_usage.values())
+
+ print("User Interaction Summary:")
+ for flag, details in interactions.items():
+ print(f"\nFlag: {flag}")
+
+ accuracies = [detail['accuracy'] for detail in details]
+ device_names = [detail['device_name'] for detail in details]
+ user_inputs = [detail['user_input'] for detail in details]
+
+ average_accuracy = sum(accuracies) / len(accuracies)
+ most_common_device = Counter(device_names).most_common(1)[0][0]
+ average_user_input = Counter(user_inputs).most_common(1)[0][0]
+
+ print(f" Average Accuracy: {average_accuracy:.2f}%")
+ print(f" Most Common Device Name: {most_common_device}")
+ print(f" Most Common User Input: {average_user_input}")
+
+ # Print the summary to the console
+ print(f"\n\nTotal Interactions with the match flag feature: {total_interactions}")
+ print("\nFlag Usage Summary:")
+ for flag, count in flags_usage.items():
+ print(f" {flag}: {count} times")
+
+ # Generate the graph for flag usage
+ flags = list(flags_usage.keys())
+ counts = list(flags_usage.values())
+
+ plt.figure(figsize=(10, 6))
+ plt.barh(flags, counts, color='skyblue')
+ plt.xlabel('Usage Count')
+ plt.title('Flag Usage Frequency')
+ plt.gca().invert_yaxis() # Invert y-axis for better readability
+ plt.subplots_adjust(left=0.2, right=0.8, top=0.9, bottom=0.1) # Adjust layout
+
+ # Save and display the graph
+ try:
+ plt.savefig('../ACCESS/DATA/Flag_usage_summary.png')
+ print("\nFlag Usage Summary Graph saved to 'ACCESS/DATA/Flag_usage_summary.png'")
+ except FileNotFoundError:
+ try:
+ plt.savefig('../../ACCESS/DATA/Flag_usage_summary.png')
+ print("\nFlag Usage Summary Graph saved to 'ACCESS/DATA/Flag_usage_summary.png'")
+ except FileNotFoundError:
+ plt.savefig('Flag_usage_summary.png')
+ print("\nFlag Usage Summary Graph saved in current working directory as 'Flag_usage_summary.png'")
+
+ @staticmethod
+ def load_history() -> dict[str, any]:
+ """
+ Load user interaction history from a gzipped JSON file.
+
+ This method attempts to read and parse historical interaction data from a compressed JSON file. If the file is not found, it returns an empty history structure with an empty interactions dictionary and a zero-initialized flags usage counter.
+
+ Returns:
+ dict[str, any]: A dictionary containing:
+ - 'interactions': A dictionary of past user interactions
+ - 'flags_usage': A Counter object tracking flag usage frequencies
+
+ Raises:
+ json.JSONDecodeError: If the JSON file is malformed
+ gzip.BadGzipFile: If the gzipped file is corrupted
+ """
+ try:
+ with gzip.open(HISTORY_FILE, 'rt', encoding='utf-8') as f: # Use 'rt' mode for text read
+ return json.load(f)
+ except FileNotFoundError:
+ return {'interactions': {}, 'flags_usage': Counter()}
+
+ @staticmethod
+ def save_history(history_data: dict[str, any]):
+ """
+ Save user interaction history to a gzipped JSON file.
+
+ This method writes the user history to a compressed JSON file only if saving preferences are enabled.
+ The history is saved with an indentation of 4 spaces for readability.
+
+ Parameters:
+ history_data (dict[str, any]): A dictionary containing user interaction history data to be saved.
+
+ Notes:
+ - Saves only if SAVE_PREFERENCES is True
+ - Uses gzip compression to reduce file size
+ - Writes in UTF-8 encoding
+ - Indents JSON for human-readable format
+ """
+ if SAVE_PREFERENCES:
+ with gzip.open(HISTORY_FILE, 'wt', encoding='utf-8') as f: # Use 'wt' mode for text write
+ json.dump(history_data, f, indent=4)
+
+ @classmethod
+ def update_history(cls, user_input: str, matched_flag: str, accuracy: float):
+ """
+ Update the user interaction history with details of a matched flag.
+
+ This method records user interactions with flags, including timestamp, input, match accuracy,
+ and device information. It only updates history if save preferences are enabled.
+
+ Parameters:
+ user_input (str): The original input text provided by the user.
+ matched_flag (str): The flag that was successfully matched to the user input.
+ accuracy (float): The similarity/match accuracy score for the flag.
+
+ Side Effects:
+ - Modifies the history JSON file by adding a new interaction entry
+ - Increments the usage count for the matched flag
+ - Requires write access to the history file
+
+ Notes:
+ - Skips history update if SAVE_PREFERENCES is False
+ - Creates new flag entries in history if they do not exist
+ - Uses current timestamp and logged-in user's device name
+ """
+ if not SAVE_PREFERENCES:
+ return
+ history_data = cls.load_history()
+
+ # Ensure that interactions is a dictionary (not a list)
+ if not isinstance(history_data['interactions'], dict):
+ history_data['interactions'] = {}
+
+ # Create a new interaction dictionary
+ interaction = {
+ 'timestamp': datetime.now().strftime('%H:%M:%S - %d/%m/%Y'),
+ 'user_input': user_input,
+ 'accuracy': accuracy,
+ 'device_name': os.getlogin()
+ }
+
+ # Ensure the flag exists in the interactions dictionary
+ if matched_flag not in history_data['interactions']:
+ history_data['interactions'][matched_flag] = []
+
+ # Append the new interaction to the flag's list of interactions
+ history_data['interactions'][matched_flag].append(interaction)
+
+ # Ensure the flag exists in the flags_usage counter and increment it
+ if matched_flag not in history_data['flags_usage']:
+ history_data['flags_usage'][matched_flag] = 0
+ history_data['flags_usage'][matched_flag] += 1
+
+ cls.save_history(history_data)
+
+ @classmethod
+ def flag(cls, user_input: str, flags: list[str], flag_description: list[str]) -> tuple[str, float]:
+ """
+ Matches user input to flag descriptions using advanced semantic similarity.
+
+ Computes the best matching flag based on cosine similarity between the user input and flag descriptions.
+ Handles matching with a minimum accuracy threshold and provides flag suggestions from historical data
+ if no direct match is found.
+
+ Parameters:
+ user_input (str): The input string to match against available flags.
+ flags (list[str]): List of available command flags.
+ flag_description (list[str]): Corresponding descriptions for each flag.
+
+ Returns:
+ tuple[str, float]: A tuple containing:
+ - The best matched flag (or 'Nothing matched')
+ - Accuracy percentage of the match (0.0-100.0)
+
+ Raises:
+ ValueError: If the number of flags and descriptions do not match.
+
+ Side Effects:
+ - Updates user interaction history
+ - Prints flag suggestions if no direct match is found
+ - Requires a global MIN_ACCURACY_THRESHOLD to be defined
+
+ Example:
+ matched_flag, accuracy = Flag.flag("show help",
+ ["-h", "--verbose"],
+ ["Display help", "Enable verbose output"])
+ """
+ if len(flags) != len(flag_description):
+ raise ValueError("flags and flag_description lists must be of the same length")
+
+ # Combine flags and descriptions for better matching context
+ combined_descriptions = [f"{flag} {desc}" for flag, desc in zip(flags, flag_description)]
+
+ # Encode user input and all descriptions
+ # Compute cosine similarities
+ similarities = cls.__get_sim(user_input, combined_descriptions)
+
+ # Find the best match
+ best_index = max(range(len(similarities)), key=lambda i: similarities[i])
+ best_accuracy = similarities[best_index] * 100
+ best_match = flags[best_index] if best_accuracy > MIN_ACCURACY_THRESHOLD else "Nothing matched"
+
+ # Update history
+ cls.update_history(user_input, best_match, best_accuracy)
+
+ # Suggest flags if accuracy is low
+ if best_accuracy < MIN_ACCURACY_THRESHOLD:
+ suggested_flags = cls.__suggest_flags_based_on_history(user_input)
+ if suggested_flags:
+ print(f"No Flags matched so suggestions based on historical data: "
+ f"{', '.join(suggested_flags)}")
+
+ return best_match, best_accuracy
class Flag:
@classmethod
- def colorify(cls, text: str, color: str) -> str:
+ def __colorify(cls, text: str, color: str) -> str:
"""
- Adds color to the given text based on the specified color code.
+ Colorize text with ANSI color codes.
Args:
- text (str): The text to be colorized.
- color (str): The color code ('y' for yellow, 'r' for red, 'b' for blue).
+ text (str): The text to colorize
+ color (str): The color code ('y' for yellow, 'r' for red, 'b' for blue)
Returns:
- str: The colorized text if the color code is valid, otherwise the original text.
+ str: The colorized text with ANSI escape codes
"""
colors = {
"y": "\033[93m",
@@ -28,22 +397,23 @@ def colorify(cls, text: str, color: str) -> str:
@classmethod
def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentParser]:
"""
- A static method used to parse command-line arguments for the Logicytics application.
-
- It defines various flags that can be used to customize the behavior of the application,
- including options for running in default or minimal mode, unzipping extra files,
- backing up or restoring data, updating from GitHub, and more.
-
- The method returns a tuple containing the parsed arguments and the argument parser object.
+ Defines and parses command-line arguments for the Logicytics application.
+
+ This method creates an ArgumentParser with a comprehensive set of flags for customizing the application's behavior. It supports various execution modes, debugging options, system management flags, and post-execution actions.
+
+ The method handles argument parsing, provides helpful descriptions for each flag, and includes color-coded hints for user guidance. It also supports suggesting valid flags if an unknown flag is provided.
Returns:
- tuple[argparse.Namespace, argparse.ArgumentParser]: A tuple containing the parsed arguments and the argument parser object.
+ tuple[argparse.Namespace, argparse.ArgumentParser]: A tuple containing:
+ - Parsed command-line arguments (Namespace)
+ - The configured argument parser object
"""
# Define the argument parser
parser = argparse.ArgumentParser(
description="Logicytics, The most powerful tool for system data analysis. "
"This tool provides a comprehensive suite of features for analyzing system data, "
- "including various modes for different levels of detail and customization."
+ "including various modes for different levels of detail and customization.",
+ allow_abbrev=False
)
# Define Actions Flags
@@ -51,14 +421,14 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar
"--default",
action="store_true",
help="Runs Logicytics with its default settings and scripts. "
- f"{cls.colorify('- Recommended for most users -', 'b')}",
+ f"{cls.__colorify('- Recommended for most users -', 'b')}",
)
parser.add_argument(
"--threaded",
action="store_true",
help="Runs Logicytics using threads, where it runs in parallel, default settings though"
- f"{cls.colorify('- Recommended for some users -', 'b')}",
+ f"{cls.__colorify('- Recommended for some users -', 'b')}",
)
parser.add_argument(
@@ -73,14 +443,14 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar
action="store_true",
help="This flag will run all default script's in threading mode, "
"as well as any clunky and huge code, which produces a lot of data "
- f"{cls.colorify('- Will take a long time -', 'y')}",
+ f"{cls.__colorify('- Will take a long time -', 'y')}",
)
parser.add_argument(
"--nopy",
action="store_true",
help="Run Logicytics using all non-python scripts, "
- f"These may be {cls.colorify('outdated', 'y')} "
+ f"These may be {cls.__colorify('outdated', 'y')} "
"and not the best, use only if the device doesnt have python installed.",
)
@@ -89,8 +459,8 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar
action="store_true",
help="Run's Logicytics new Sensitive data Detection AI, its a new feature that will "
"detect any files that are out of the ordinary, and logs their path. Runs threaded."
- f"{cls.colorify('- Beta Mode -', 'y')} "
- f"{cls.colorify('- Will take a long time -', 'y')}",
+ f"{cls.__colorify('- Beta Mode -', 'y')} "
+ f"{cls.__colorify('- Will take a long time -', 'y')}",
)
parser.add_argument(
@@ -104,7 +474,7 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar
action="store_true",
help="Run's Logicytics default while testing its performance and time, "
"this then shows a table with the file names and time to executed. "
- f"{cls.colorify('- Beta Mode -', 'y')}"
+ f"{cls.__colorify('- Beta Mode -', 'y')}"
)
# Define Side Flags
@@ -113,14 +483,14 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar
action="store_true",
help="Runs the Debugger, Will check for any issues, "
"warning etc, useful for debugging and issue reporting "
- f"{cls.colorify('- Use to get a special log file to report the bug -', 'b')}.",
+ f"{cls.__colorify('- Use to get a special log file to report the bug -', 'b')}.",
)
parser.add_argument(
"--backup",
action="store_true",
help="Backup Logicytics files to the ACCESS/BACKUPS directory "
- f"{cls.colorify('- Use on your own device only -', 'y')}.",
+ f"{cls.__colorify('- Use on your own device only -', 'y')}.",
)
parser.add_argument(
@@ -128,7 +498,7 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar
action="store_true",
help="Update Logicytics from GitHub, only if you have git properly installed "
"and the project was downloaded via git "
- f"{cls.colorify('- Use on your own device only -', 'y')}.",
+ f"{cls.__colorify('- Use on your own device only -', 'y')}.",
)
parser.add_argument(
@@ -136,7 +506,7 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar
action="store_true",
help="Run Logicytics developer mod, this is only for people who want to "
"register their contributions properly. "
- f"{cls.colorify('- Use on your own device only -', 'y')}.",
+ f"{cls.__colorify('- Use on your own device only -', 'y')}.",
)
# Define After-Execution Flags
@@ -157,42 +527,43 @@ def __available_arguments(cls) -> tuple[argparse.Namespace, argparse.ArgumentPar
"--webhook",
action="store_true",
help="Execute Flag that will send zip File via webhook "
- f"{cls.colorify('- Not yet Implemented -', 'r')}",
+ f"{cls.__colorify('- Not yet Implemented -', 'r')}",
)
parser.add_argument(
"--restore",
action="store_true",
help="Restore Logicytics files from the ACCESS/BACKUPS directory "
- f"{cls.colorify('- Use on your own device only -', 'y')} "
- f"{cls.colorify('- Not yet Implemented -', 'r')}",
- )
-
- # Deprecated Flags - v3.3
- parser.add_argument(
- "--unzip-extra",
- action="store_true",
- help=f"{cls.colorify('[REMOVED]', 'r')} Unzip the extra directory zip File "
- )
-
- parser.add_argument(
- "--extra",
- action="store_true",
- help=f"{cls.colorify('[REMOVED]', 'r')} Open's the extra directory menu to use more tools. "
+ f"{cls.__colorify('- Use on your own device only -', 'y')} "
+ f"{cls.__colorify('- Not yet Implemented -', 'r')}",
)
- return parser.parse_args(), parser
+ # Parse the arguments
+ args, unknown = parser.parse_known_args()
+ valid_flags = [action.dest for action in parser._actions if action.dest != 'help']
+ if unknown:
+ cls.__suggest_flag(unknown[0], valid_flags)
+ exit(1)
+ return args, parser
@staticmethod
def __exclusivity_logic(args: argparse.Namespace) -> bool:
"""
- Checks if exclusive flags are used in the provided arguments.
-
- Args:
- args (argparse.Namespace): The arguments to be checked.
-
+ Validates the mutual exclusivity of command-line flags to prevent invalid flag combinations.
+
+ This method checks for conflicting or mutually exclusive flags across three flag categories:
+ - Special flags (reboot, shutdown, webhook)
+ - Action flags (default, threaded, modded, minimal, nopy, depth, performance_check)
+ - Exclusive flags (vulnscan_ai)
+
+ Parameters:
+ args (argparse.Namespace): Parsed command-line arguments to validate.
+
Returns:
- bool: True if exclusive flags are used, False otherwise.
+ bool: True if any special flags are set, False otherwise.
+
+ Raises:
+ SystemExit: If incompatible flag combinations are detected, with an error message describing the conflict.
"""
special_flags = {
args.reboot,
@@ -213,15 +584,15 @@ def __exclusivity_logic(args: argparse.Namespace) -> bool:
}
if any(special_flags) and not any(action_flags):
- print("Invalid combination of flags: Special and Action flag exclusivity issue.")
+ print("Invalid combination of flags_list: Special and Action flag exclusivity issue.")
exit(1)
if any(exclusive_flags) and any(action_flags):
- print("Invalid combination of flags: Exclusive and Action flag exclusivity issue.")
+ print("Invalid combination of flags_list: Exclusive and Action flag exclusivity issue.")
exit(1)
if any(exclusive_flags) and any(special_flags):
- print("Invalid combination of flags: Exclusive and Special flag exclusivity issue.")
+ print("Invalid combination of flags_list: Exclusive and Special flag exclusivity issue.")
exit(1)
return any(special_flags)
@@ -229,13 +600,22 @@ def __exclusivity_logic(args: argparse.Namespace) -> bool:
@staticmethod
def __used_flags_logic(args: argparse.Namespace) -> tuple[str, ...]:
"""
- Sets flags based on the provided arguments.
-
- Args:
- args (argparse.Namespace): The arguments to be checked for flags.
-
+ Determines the flags that are set to True in the provided command-line arguments.
+
+ This method examines the arguments namespace and returns a tuple of flag names
+ that have been activated. It limits the returned flags to a maximum of two to
+ prevent excessive flag usage.
+
+ Parameters:
+ args (argparse.Namespace): Parsed command-line arguments to be analyzed.
+
Returns:
- tuple[str, ...]: A tuple of flag names that are set to True.
+ tuple[str, ...]: A tuple containing the names of flags set to True,
+ with a maximum of two flags.
+
+ Notes:
+ - If no flags are set, returns an empty tuple.
+ - Stops collecting flags after finding two True flags to limit complexity.
"""
flags = {key: getattr(args, key) for key in vars(args)}
true_keys = []
@@ -247,20 +627,104 @@ def __used_flags_logic(args: argparse.Namespace) -> tuple[str, ...]:
return tuple(true_keys)
@classmethod
- def data(cls) -> ArgumentParser | tuple[str]:
+ def __suggest_flag(cls, user_input: str, valid_flags: list[str]):
+ """
+ Suggests the closest valid flag based on the user's input and provides interactive flag matching.
+
+ This method handles flag suggestion through two mechanisms:
+ 1. Using difflib to find close flag matches
+ 2. Prompting user for a description to find the most relevant flag
+
+ Args:
+ user_input (str): The flag input by the user.
+ valid_flags (list[str]): The list of valid flags.
+
+ Behavior:
+ - If a close flag match exists, suggests the closest match
+ - If no close match, prompts user for a description
+ - Uses the Match.flag method to find the most accurate flag based on description
+ - Prints matching results, with optional detailed output in debug mode
+
+ Side Effects:
+ - Prints suggestions and matched flags to console
+ - Prompts user for additional input if no direct match is found
+ """
+ # Get the closest valid flag match based on the user's input
+ closest_matches = difflib.get_close_matches(user_input, valid_flags, n=1, cutoff=0.6)
+ if closest_matches:
+ print(f"Invalid flag '{user_input}', Did you mean '--{closest_matches[0]}'?")
+
+ # Prompt the user for a description if no close match is found
+ user_input_desc = input("We can't find a match, Please provide a description: ").lower()
+
+ # Map the user-provided description to the closest valid flag
+ flags_list = [f"--{flag}" for flag in valid_flags]
+ descriptions_list = [f"Run Logicytics with {flag}" for flag in valid_flags]
+ flag_received, accuracy_received = Match.flag(user_input_desc, flags_list, descriptions_list)
+ if DEBUG_MODE:
+ print(f"User input: {user_input_desc}\nMatched flag: {flag_received}\nAccuracy: {accuracy_received:.2f}%\n")
+ else:
+ print(f"Matched flag: {flag_received} (Accuracy: {accuracy_received:.2f}%)\n")
+
+ @staticmethod
+ def show_help_menu(return_output: bool = False):
+ """
+ Display the help menu for the Logicytics application.
+
+ This method retrieves the argument parser from the Flag class and either prints or returns the help text based on the input parameter.
+
+ Args:
+ return_output (bool, optional): Controls the method's behavior.
+ - If True, returns the formatted help text as a string.
+ - If False (default), prints the help text directly to the console.
+
+ Returns:
+ str or None: Help text as a string if return_output is True, otherwise None.
+
+ Example:
+ # Print help menu to console
+ Flag.show_help_menu()
+
+ # Get help menu as a string
+ help_text = Flag.show_help_menu(return_output=True)
+ print(help_text)
+ """
+ parser = Flag.__available_arguments()[1]
+ if return_output:
+ return parser.format_help()
+ else:
+ parser.print_help()
+
+ @classmethod
+ def data(cls) -> tuple[str, str | None]:
"""
Handles the parsing and validation of command-line flags.
+
+ This method processes command-line arguments, validates their usage, and manages flag interactions. It ensures that:
+ - Only one primary action flag is used at a time
+ - Special flags are handled with specific logic
+ - No invalid flag combinations are permitted
+ - User history is optionally updated based on preferences
- Returns either a tuple of used flag names or an ArgumentParser instance.
+ Returns:
+ tuple[str, str | None]: A tuple containing:
+ - The primary matched flag
+ - An optional secondary flag (None if not applicable)
+ - Exits the program if no flags are used or invalid combinations are detected
+
+ Raises:
+ SystemExit: Terminates the program with an error message for:
+ - Invalid flag combinations
+ - No flags specified
"""
args, parser = cls.__available_arguments()
special_flag_used = cls.__exclusivity_logic(args)
- if not special_flag_used:
- used_flags = [flag for flag in vars(args) if getattr(args, flag)]
- if len(used_flags) > 1:
- print("Invalid combination of flags: Maximum 1 action flag allowed.")
- exit(1)
+ used_flags = [flag for flag in vars(args) if getattr(args, flag)]
+
+ if not special_flag_used and len(used_flags) > 1:
+ print("Invalid combination of flags: Maximum 1 action flag allowed.")
+ exit(1)
if special_flag_used:
used_flags = cls.__used_flags_logic(args)
@@ -268,20 +732,41 @@ def data(cls) -> ArgumentParser | tuple[str]:
print("Invalid combination of flags: Maximum 2 flag mixes allowed.")
exit(1)
- if not tuple(used_flags):
- return parser
- return tuple(used_flags)
-
- @staticmethod
- def show_help_menu(format_output: bool = False):
- """
- Displays the help menu for the Logicytics application.
-
- Args:
- format_output (bool): If True, returns the help text instead of printing it
- """
- _, parser = Flag.__available_arguments()
- if format_output:
- return parser.format_help()
- else:
- parser.print_help()
+ if not used_flags:
+ cls.show_help_menu()
+ exit(0)
+
+ # Update history with the matched flag(s)
+ if not SAVE_PREFERENCES:
+ return
+
+ def update_data_history(matched_flag: str):
+ """
+ Update the usage count for a specific flag in the user's interaction history.
+
+ This method increments the usage count for a given flag in the historical data. If the flag
+ does not exist in the history, it initializes its count to 0 before incrementing.
+
+ Parameters:
+ matched_flag (str): The flag whose usage count needs to be updated.
+
+ Side Effects:
+ - Modifies the 'flags_usage' dictionary in the user's history file
+ - Saves the updated history data to a persistent storage
+
+ Example:
+ update_data_history('--verbose') # Increments usage count for '--verbose' flag
+ """
+ history_data = Match.load_history()
+ # Ensure the flag exists in the flags_usage counter and increment it
+ if matched_flag not in history_data['flags_usage']:
+ history_data['flags_usage'][matched_flag] = 0
+ history_data['flags_usage'][matched_flag] += 1
+ Match.save_history(history_data)
+
+ if len(used_flags) == 2:
+ for flag in used_flags:
+ update_data_history(flag)
+ return tuple(used_flags)
+ update_data_history(used_flags[0])
+ return used_flags[0], None
diff --git a/CODE/logicytics/Get.py b/CODE/logicytics/Get.py
index 9a6a8f84..92a72896 100644
--- a/CODE/logicytics/Get.py
+++ b/CODE/logicytics/Get.py
@@ -7,26 +7,28 @@
class Get:
@staticmethod
- def list_of_files(directory: str,
- extensions: tuple | bool = True,
- append_file_list: list = None
- ) -> list:
+ def list_of_files(directory: str, extensions: tuple | bool = True, append_file_list: list[str] = None,
+ exclude_files: list[str] = None) -> list:
"""
- Retrieves a list of files in the specified directory that have the specified extensions.
-
- If the extensions parameter is set to 'all',
- all files in the directory are returned.
-
- Else, only files with the specified extensions are returned.
- Files starting with an underscore (_) and the file Logicytics.py
- are excluded from the list.
-
+ Retrieves a list of files in the specified directory based on given extensions and exclusion criteria.
+
+ Supports two modes of file retrieval:
+ 1. When `extensions` is `True`, retrieves all files recursively from the directory.
+ 2. When `extensions` is a tuple, retrieves files matching specific extensions while applying exclusion rules.
+
Parameters:
- directory (str): The path of the directory to search.
- append_file_list (list): The list to append the filenames to.
- extensions (tuple): The extensions of the files to search for.
+ directory (str): Path of the directory to search for files.
+ extensions (tuple | bool, optional): File extensions to filter or True to retrieve all files. Defaults to True.
+ append_file_list (list, optional): Existing list to append found filenames to. Creates a new list if not provided. Defaults to None.
+ exclude_files (list, optional): List of filenames to exclude from results. Defaults to None.
+
Returns:
- list: The list of filenames with the specified extensions.
+ list: A list of filenames matching the specified criteria.
+
+ Exclusion rules:
+ - Ignores files starting with an underscore (_)
+ - Excludes "Logicytics.py"
+ - Skips files specified in `exclude_files`
"""
append_file_list = [] if not append_file_list else append_file_list
@@ -42,40 +44,32 @@ def list_of_files(directory: str,
filename.endswith(extensions)
and not filename.startswith("_")
and filename != "Logicytics.py"
+ and (exclude_files is None or filename not in exclude_files)
):
append_file_list.append(filename)
return append_file_list
- @staticmethod
- def list_of_code_files(directory: str) -> list:
- """
- Retrieves a list of files with specific extensions within a specified directory and its subdirectories.
-
- Args:
- directory (str): The path to the directory to search for files.
-
- Returns:
- list: A list of file paths with the following extensions: .py, .exe, .ps1, .bat, .vbs.
- """
- file = []
- for root, _, filenames in os.walk(directory):
- for filename in filenames:
- if filename.endswith((".py", ".exe", ".ps1", ".bat", ".vbs")):
- files_path = os.path.join(root, filename)
- file.append(files_path.removeprefix(".\\"))
- return file
-
@staticmethod
def config_data() -> tuple[str, str, list[str], bool]:
"""
Retrieves configuration data from the 'config.ini' file.
-
- This method attempts to read the 'config.ini' file located in the current directory.
- If the file is not found, it attempts to read the 'config.ini' file from the parent directory.
- If neither file is found, the program exits with an error message.
-
+
+ This method attempts to read the 'config.ini' file from multiple potential locations:
+ 1. Current directory
+ 2. Parent directory
+ 3. Grandparent directory
+
+ If the configuration file is not found in any of these locations, the program exits with an error message.
+
Returns:
- tuple[str, str, list[str], bool]: A tuple containing the log level, version, and a list of files.
+ tuple[str, str, list[str], bool]: A tuple containing:
+ - Log level (str): Either "DEBUG" or "INFO"
+ - Version (str): System version from configuration
+ - Files (list[str]): List of files specified in configuration
+ - Delete old logs (bool): Flag indicating whether to delete old log files
+
+ Raises:
+ SystemExit: If the 'config.ini' file cannot be found in any of the attempted locations
"""
def get_config_data(config_file_name: str) -> tuple[str, str, list[str], bool]:
diff --git a/CODE/logicytics/Logger.py b/CODE/logicytics/Logger.py
index 23c34195..36f23bda 100644
--- a/CODE/logicytics/Logger.py
+++ b/CODE/logicytics/Logger.py
@@ -3,10 +3,12 @@
import inspect
import logging
import os
+import time
from datetime import datetime
-import colorlog
from typing import Type
+import colorlog
+
class Log:
"""
@@ -118,9 +120,17 @@ def __trunc_message(self, message: str) -> str:
def __internal(self, message):
"""
- Logs an internal message.
-
- :param message: The internal message to be logged.
+ Log an internal message exclusively to the console.
+
+ Internal messages are used for logging system states or debug information that should not be written to log files. These messages are only displayed in the console when color logging is enabled.
+
+ Parameters:
+ message (str): The internal message to be logged. If the message is "None" or None, no logging occurs.
+
+ Notes:
+ - Requires color logging to be enabled
+ - Uses a custom internal log level
+ - Converts the message to a string before logging
"""
if self.color and message != "None" and message is not None:
colorlog.log(self.INTERNAL_LOG_LEVEL, str(message))
@@ -136,9 +146,27 @@ def debug(self, message):
def raw(self, message):
"""
- Logs a raw message directly to the log file.
-
- :param message: The raw message to be logged.
+ Log a raw message directly to the log file.
+
+ This method writes a message directly to the log file without any additional formatting
+ or logging levels.
+
+ WARNING: This method is for internal use only! Using it directly can mess up
+ your log file format and make it hard to read. Use info(), debug(), or
+ other public methods instead.
+
+ Parameters:
+ message (str): The raw message to be written to the log file.
+
+ Notes:
+ - Checks the calling context to warn about non-function calls
+ - Handles potential Unicode encoding errors
+ - Skips logging if message is None or "None"
+ - Writes message with a newline character
+ - Logs internal errors if file writing fails
+
+ Raises:
+ Logs internal errors for Unicode or file writing issues without stopping execution
"""
frame = inspect.stack()[1]
if frame.function == "":
@@ -146,12 +174,26 @@ def raw(self, message):
f"Raw message called from a non-function - This is not recommended"
)
if message != "None" and message is not None:
- with open(self.filename, "a") as f:
- f.write(f"{str(message)}\n")
+ try:
+ with open(self.filename, "a", encoding="utf-8") as f:
+ f.write(f"{str(message)}\n")
+ except (UnicodeDecodeError, UnicodeEncodeError) as UDE:
+ self.__internal(
+ f"UnicodeDecodeError: {UDE} - Message: {str(message)}"
+ )
+ except Exception as E:
+ self.__internal(f"Error: {E} - Message: {str(message)}")
def newline(self):
"""
- Logs a newline separator in the log file.
+ Write a newline separator to the log file, creating a visual divider between log entries.
+
+ This method writes a formatted horizontal line to the log file using ASCII characters,
+ which helps visually separate different sections or log entries.
+ The line consists of vertical bars and dashes creating a structured tabular-like separator.
+
+ Side Effects:
+ Appends a newline separator to the log file specified by `self.filename`.
"""
with open(self.filename, "a") as f:
f.write("|" + "-" * 19 + "|" + "-" * 13 + "|" + "-" * 154 + "|" + "\n")
@@ -206,14 +248,28 @@ def critical(self, message):
def string(self, message, type: str):
"""
- Logs a message with a specified type. Supported types are 'debug', 'info', 'warning', 'error', 'critical'
- as well as the aliases 'err', 'warn', and 'crit'.
-
- :param message: The message to be logged.
- :param type: The type of the log message.
+ Logs a message with a specified log type, supporting multiple type aliases.
+
+ This method allows logging messages with flexible type specifications, mapping aliases to standard log types
+ and handling potential errors in type selection. It supports logging with color if enabled.
+
+ Parameters:
+ message (str): The message to be logged. Skipped if "None" or None.
+ type (str): The log type, which can be one of:
+ - Standard types: 'debug', 'info', 'warning', 'error', 'critical'
+ - Aliases: 'err' (error), 'warn' (warning), 'crit' (critical), 'except' (exception)
+
+ Behavior:
+ - Converts type to lowercase and maps aliases to standard log types
+ - Logs message using the corresponding log method
+ - Falls back to debug logging if an invalid type is provided
+ - Only logs if color is enabled and message is not "None"
+
+ Raises:
+ AttributeError: If no matching log method is found (internally handled)
"""
if self.color and message != "None" and message is not None:
- type_map = {"err": "error", "warn": "warning", "crit": "critical"}
+ type_map = {"err": "error", "warn": "warning", "crit": "critical", "except": "exception"}
type = type_map.get(type.lower(), type)
try:
getattr(self, type.lower())(str(message))
@@ -223,10 +279,21 @@ def string(self, message, type: str):
def exception(self, message, exception_type: Type = Exception):
"""
- Logs an exception message.
-
- :param message: The exception message to be logged.
- :param exception_type: The type of exception to raise.
+ Log an exception message and raise the specified exception.
+
+ Warning: Not recommended for production use. Prefer Log().error() for logging exceptions.
+
+ Args:
+ message (str): The exception message to be logged.
+ exception_type (Type, optional): The type of exception to raise. Defaults to Exception.
+
+ Raises:
+ The specified exception type with the provided message.
+
+ Note:
+ - Only logs the exception if color logging is enabled and message is not None
+ - Logs the exception with a timestamp and truncated message
+ - Includes both the original message and the exception type in the log
"""
if self.color and message != "None" and message is not None:
self.raw(
@@ -234,22 +301,90 @@ def exception(self, message, exception_type: Type = Exception):
)
raise exception_type(message)
- def parse_execution(self, message_log: list[list[str]]):
+ def parse_execution(self, message_log: list[list[str, str]]):
+ """
+ Parse and log multiple messages with their corresponding log types.
+
+ This method processes a list of messages, where each message is associated with a specific log type. It is designed for scenarios where multiple log entries need to be processed simultaneously, such as logging script execution results.
+
+ Parameters:
+ message_log (list[list[str, str]]): A list of message entries. Each entry is a list containing two elements:
+ - First element: The log message (str)
+ - Second element: The log type (str)
+
+ Behavior:
+ - Iterates through the provided message log
+ - Logs each message using the specified log type via `self.string()`
+ - Logs an internal warning if a message list does not contain exactly two elements
+
+ Example:
+ log = Log()
+ log.parse_execution([
+ ['Operation started', 'info'],
+ ['Processing data', 'debug'],
+ ['Completed successfully', 'info']
+ ])
+ """
if message_log:
for message_list in message_log:
if len(message_list) == 2:
self.string(message_list[0], message_list[1])
+ else:
+ self.__internal(
+ f"Message List is not in the correct format: {message_list}"
+ )
def function(self, func: callable):
+ """
+ A decorator that logs the execution details of a function, tracking its performance and providing runtime insights.
+
+ Parameters:
+ func (callable): The function to be decorated and monitored.
+
+ Returns:
+ callable: A wrapper function that logs execution metrics.
+
+ Raises:
+ TypeError: If the provided function is not callable.
+
+ Example:
+ @log.function
+ def example_function():
+ # Function implementation
+ pass
+ """
+ if not callable(func):
+ self.exception(f"Function {func.__name__} is not callable.", TypeError)
+
def wrapper(*args, **kwargs):
- if not callable(func):
- self.exception(f"Function {func.__name__} is not callable.",
- TypeError)
- start_time = datetime.now()
- self.debug(f"Running the function {func.__name__}().")
+ """
+ Wrapper function that logs the execution of the decorated function.
+
+ Tracks and logs the start, execution, and completion of a function with performance timing.
+
+ Parameters:
+ *args (tuple): Positional arguments passed to the decorated function.
+ **kwargs (dict): Keyword arguments passed to the decorated function.
+
+ Returns:
+ Any: The original result of the decorated function.
+
+ Raises:
+ TypeError: If the decorated function is not callable.
+
+ Notes:
+ - Logs debug messages before and after function execution
+ - Measures and logs the total execution time with microsecond precision
+ - Preserves the original function's return value
+ """
+ start_time = time.perf_counter()
+ func_args = ", ".join([str(arg) for arg in args] +
+ [f"{k}={v}" for k, v in kwargs.items()])
+ self.debug(f"Running the function {func.__name__}({func_args}).")
result = func(*args, **kwargs)
- end_time = datetime.now()
+ end_time = time.perf_counter()
elapsed_time = end_time - start_time
- self.debug(f"Function {func.__name__}() executed in {elapsed_time}.")
+ self.debug(f"{func.__name__}({func_args}) executed in {elapsed_time} -> returned {type(result).__name__}")
return result
+
return wrapper
diff --git a/CODE/logicytics/__init__.py b/CODE/logicytics/__init__.py
index f1ea295e..71c52177 100644
--- a/CODE/logicytics/__init__.py
+++ b/CODE/logicytics/__init__.py
@@ -1,29 +1,83 @@
-from logicytics.Execute import Execute
-from logicytics.Get import Get
-from logicytics.Logger import Log
+import functools
+import traceback
+
from logicytics.Checks import Check
+from logicytics.Execute import Execute
from logicytics.FileManagement import FileManagement
from logicytics.Flag import Flag
-import functools
+from logicytics.Get import Get
+from logicytics.Logger import Log
+
+Execute = Execute()
+Get = Get()
+Check = Check()
+FileManagement = FileManagement()
+Flag = Flag()
+DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS = Get.config_data()
+
+
+def deprecated(removal_version: str, reason: str, show_trace: bool = True if DEBUG == "DEBUG" else False) -> callable:
+ """
+ Decorator function that marks a function as deprecated
+ and provides a warning when the function is called.
+
+ Args:
+ removal_version (str): The version when the function will be removed.
+ reason (str): The reason for deprecation.
+ show_trace (bool): Whether to show the stack trace when the function is called. Default is based on DEBUG set by user.
+
+ Returns:
+ callable: A decorator that marks a function as deprecated.
+
+ Notes:
+ - Uses a nested decorator function to preserve the original function's metadata
+ - Prints a colorized deprecation warning
+ """
-def deprecated(removal_version: str, reason: str) -> callable:
def decorator(func: callable) -> callable:
+ """
+ Decorator function that marks a function as deprecated and provides a warning when the function is called.
+
+ Args:
+ func (callable): The function to be decorated with a deprecation warning.
+
+ Returns:
+ callable: A wrapper function that preserves the original function's metadata and prints a deprecation warning.
+
+ Notes:
+ - Uses functools.wraps to preserve the original function's metadata
+ - Prints a colorized deprecation warning to stderr
+ - Allows the original function to continue executing
+ """
+
@functools.wraps(func)
def wrapper(*args, **kwargs) -> callable:
- print(
- f"\033[91mDeprecationWarning: A call to the deprecated function {func.__name__}() has been called, {reason}. Function will be removed at version {removal_version}\033[0m")
+ """
+ Wraps a deprecated function to print a warning message before execution.
+
+ Args:
+ *args: Positional arguments passed to the original function.
+ **kwargs: Keyword arguments passed to the original function.
+
+ Returns:
+ The return value of the original function after printing a deprecation warning.
+
+ Warns:
+ Prints a colored deprecation warning to stderr with details about:
+ - Function name being deprecated
+ - Reason for deprecation
+ - Version when the function will be removed
+ """
+ message = f"\033[91mDeprecationWarning: A call to the deprecated function {func.__name__}() has been called, {reason}. Function will be removed at version {removal_version}\n"
+ if show_trace:
+ stack = ''.join(traceback.format_stack()[:-1])
+ message += f"Called from:\n{stack}\033[0m"
+ else:
+ message += "\033[0m"
+ print(message)
return func(*args, **kwargs)
return wrapper
return decorator
-
-
-Execute = Execute()
-Get = Get()
-Check = Check()
-FileManagement = FileManagement()
-Flag = Flag()
-
-DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS = Get.config_data()
diff --git a/CODE/media_backup.py b/CODE/media_backup.py
index b10fa3d7..9e5c380c 100644
--- a/CODE/media_backup.py
+++ b/CODE/media_backup.py
@@ -10,9 +10,28 @@
class Media:
+ """
+ A class to handle media backup operations.
+ """
+
@staticmethod
def __get_default_paths() -> list:
- """Returns the default paths for photos and videos based on the Windows username."""
+ """
+ Returns the default paths for photos and videos based on the Windows username.
+
+ This method retrieves the current Windows user's default media directories for photos and videos
+ by using the current username and standard Windows file system paths.
+
+ Returns:
+ list: A list containing two paths:
+ - First element: Default photo directory path
+ - Second element: Default video directory path
+
+ Notes:
+ - Uses `getpass.getuser()` to dynamically retrieve the current Windows username
+ - Expands the user path using `os.path.expanduser()` to handle potential path variations
+ - Assumes standard Windows user directory structure
+ """
username = getpass.getuser()
default_photo_path = os.path.expanduser(f"C:\\Users\\{username}\\Pictures")
default_video_path = os.path.expanduser(f"C:\\Users\\{username}\\Videos")
@@ -20,13 +39,34 @@ def __get_default_paths() -> list:
@staticmethod
def __ensure_backup_directory_exists(backup_directory: str):
- """Ensures the backup directory exists; creates it if not."""
+ """
+ Ensures the backup directory exists, creating it if necessary.
+
+ Args:
+ backup_directory (str): The full path to the directory where media files will be backed up.
+
+ Raises:
+ OSError: If the directory cannot be created due to permission issues or other system constraints.
+ """
if not os.path.exists(backup_directory):
os.makedirs(backup_directory)
@staticmethod
def __collect_media_files(source_dirs: list) -> list:
- """Collects all media files from the source directories."""
+ """
+ Collects media files from specified source directories.
+
+ Recursively searches through the provided source directories to find image and video files with extensions .jpg, .jpeg, .png, and .mp4.
+
+ Args:
+ source_dirs (list): List of directory paths to search for media files.
+
+ Returns:
+ list: Absolute file paths of all discovered media files, including those in subdirectories.
+
+ Raises:
+ OSError: If any of the source directories are inaccessible or cannot be traversed.
+ """
media_files = []
for source_dir in source_dirs:
for root, _, files in os.walk(source_dir):
@@ -37,7 +77,24 @@ def __collect_media_files(source_dirs: list) -> list:
@staticmethod
def __backup_files(media_files: list, backup_directory: str):
- """Backs up media files to the backup directory."""
+ """
+ Copies media files to a backup directory with timestamped filenames.
+
+ Parameters:
+ media_files (list): A list of file paths for media files to be backed up.
+ backup_directory (str): Destination directory path for storing backup files.
+
+ Behavior:
+ - Iterates through each media file in the input list
+ - Generates a new filename with current timestamp
+ - Attempts to copy each file to the backup directory
+ - Logs successful copy operations
+ - Logs any errors encountered during file copying
+
+ Exceptions:
+ Handles and logs any exceptions that occur during file copy process
+ Does not interrupt the entire backup process if a single file copy fails
+ """
for src_file in media_files:
dst_file = os.path.join(
backup_directory,
@@ -51,15 +108,34 @@ def __backup_files(media_files: list, backup_directory: str):
except Exception as e:
log.error(f"Failed to copy {src_file}: {str(e)}")
+ @classmethod
@log.function
- def backup(self):
- """Backs up media files from the default Windows photo and video directories."""
- source_dirs = self.__get_default_paths()
+ def backup(cls):
+ """
+ Orchestrates the complete media backup process by performing sequential backup operations.
+
+ This class method coordinates the backup workflow:
+ 1. Retrieves default media source directories
+ 2. Sets a standard backup directory
+ 3. Ensures the backup directory exists
+ 4. Collects media files from source directories
+ 5. Copies media files to the backup directory
+ 6. Logs the completion of the backup process
+
+ Returns:
+ None: Performs backup operations without returning a value
+
+ Raises:
+ OSError: If directory creation or file operations fail
+ PermissionError: If insufficient permissions for file/directory operations
+ """
+ source_dirs = cls.__get_default_paths()
backup_directory = "MediaBackup"
- self.__ensure_backup_directory_exists(backup_directory)
- media_files = self.__collect_media_files(source_dirs)
- self.__backup_files(media_files, backup_directory)
+ cls.__ensure_backup_directory_exists(backup_directory)
+ media_files = cls.__collect_media_files(source_dirs)
+ cls.__backup_files(media_files, backup_directory)
log.info("Media backup script completed.")
-Media().backup()
+if __name__ == "__main__":
+ Media.backup()
diff --git a/CODE/packet_sniffer.py b/CODE/packet_sniffer.py
index d016d025..1473b4d7 100644
--- a/CODE/packet_sniffer.py
+++ b/CODE/packet_sniffer.py
@@ -1,17 +1,18 @@
from __future__ import annotations
-import pandas as pd
-import networkx as nx
+from configparser import ConfigParser
+
import matplotlib.pyplot as plt
+import networkx as nx
+import pandas as pd
from scapy.all import sniff, conf
from scapy.layers.inet import IP, TCP, UDP, ICMP
-from configparser import ConfigParser
+
from logicytics import Log, DEBUG
if __name__ == "__main__":
log = Log({"log_level": DEBUG})
-
# Read configuration from config.ini
config = ConfigParser()
config.read('config.ini')
@@ -24,8 +25,31 @@
# Function to process and log packet details
-@log.function
def log_packet(packet: IP):
+ """
+ Processes a captured IP packet, extracting and logging network connection details.
+
+ Extracts key network information from the packet including source and destination IP addresses,
+ protocol, source and destination ports. Logs packet details, updates global packet data collection,
+ prints a summary, and adds connection information to the network graph.
+
+ Parameters:
+ packet (IP): A Scapy IP layer packet to be processed and analyzed.
+
+ Raises:
+ Exception: Logs and suppresses any errors encountered during packet processing.
+
+ Side Effects:
+ - Appends packet information to global `packet_data` list
+ - Prints packet summary to console
+ - Updates network connection graph
+ - Logs debug information about captured packet
+
+ Notes:
+ - Silently handles packet processing errors to prevent sniffing interruption
+ - Requires global variables `packet_data` and supporting functions like
+ `get_protocol_name()`, `get_port_info()`, `print_packet_summary()`, and `add_to_graph()`
+ """
try:
if packet.haslayer(IP):
log.debug(f"Packet captured: {packet.summary()}")
@@ -44,9 +68,23 @@ def log_packet(packet: IP):
# Function to determine the protocol name
-@log.function
def get_protocol_name(packet: IP) -> str:
- """Returns the name of the protocol."""
+ """
+ Determines the protocol name of a captured network packet.
+
+ This function examines the layers of a given IP packet to identify its protocol type. It supports identification of TCP, UDP, ICMP, and classifies any other packet types as 'Other'.
+
+ Parameters:
+ packet (IP): The captured network packet to analyze for protocol identification.
+
+ Returns:
+ str: The protocol name, which can be one of: 'TCP', 'UDP', 'ICMP', or 'Other'.
+
+ Notes:
+ - Uses Scapy's layer checking methods to determine protocol
+ - Logs debug information about the packet and detected protocol
+ - Provides a fallback 'Other' classification for unrecognized protocols
+ """
log.debug(f"Checking protocol for packet: {packet.summary()}")
if packet.haslayer(TCP):
log.debug("Protocol: TCP")
@@ -63,9 +101,24 @@ def get_protocol_name(packet: IP) -> str:
# Function to extract port information from a packet
-@log.function
def get_port_info(packet: IP, port_type: str) -> int | None:
- """Extracts the source or destination port from a packet."""
+ """
+ Extracts the source or destination port from a captured packet.
+
+ Parameters:
+ packet (IP): The captured packet to analyze.
+ port_type (str): The type of port to extract ('sport' for source port, 'dport' for destination port).
+
+ Returns:
+ int | None: The port number if available, otherwise None.
+
+ Raises:
+ ValueError: If an invalid port_type is provided.
+
+ Notes:
+ - Supports extracting ports from TCP and UDP layers
+ - Returns None if the packet does not have TCP or UDP layers
+ """
log.debug(f"Port type: {port_type}")
if packet.haslayer(TCP):
return packet[TCP].sport if port_type == 'sport' else packet[TCP].dport
@@ -75,17 +128,44 @@ def get_port_info(packet: IP, port_type: str) -> int | None:
# Function to print packet summary
-@log.function
def print_packet_summary(packet_info: dict):
- """Prints a summary of the captured packet."""
+ """
+ Prints a summary of the captured network packet to the debug log.
+
+ Parameters:
+ packet_info (dict): A dictionary containing detailed information about a captured network packet with the following expected keys:
+ - 'protocol' (str): The network protocol of the packet (e.g., TCP, UDP, ICMP)
+ - 'src_ip' (str): Source IP address of the packet
+ - 'dst_ip' (str): Destination IP address of the packet
+ - 'src_port' (int/str): Source port number of the packet
+ - 'dst_port' (int/str): Destination port number of the packet
+
+ Returns:
+ None: Logs packet summary information without returning a value
+ """
log.debug(f"Packet captured: {packet_info['protocol']} packet from {packet_info['src_ip']} "
- f"to {packet_info['dst_ip']} | Src Port: {packet_info['src_port']} | Dst Port: {packet_info['dst_port']}")
+ f"to {packet_info['dst_ip']} | Src Port: {packet_info['src_port']} | Dst Port: {packet_info['dst_port']}")
# Function to add packet information to the graph
-@log.function
def add_to_graph(packet_info: dict):
- """Adds the packet information to the graph."""
+ """
+ Adds an edge to the network graph representing a connection between source and destination IPs.
+
+ Parameters:
+ packet_info (dict): A dictionary containing packet network details with the following keys:
+ - 'src_ip' (str): Source IP address of the packet
+ - 'dst_ip' (str): Destination IP address of the packet
+ - 'protocol' (str): Network protocol used for the connection (e.g., TCP, UDP)
+
+ Side Effects:
+ Modifies the global NetworkX graph (G) by adding an edge between source and destination IPs
+ with the protocol information as an edge attribute.
+
+ Notes:
+ - Assumes a global NetworkX graph object 'G' is already initialized
+ - Does not perform validation of input packet_info dictionary
+ """
src_ip = packet_info['src_ip']
dst_ip = packet_info['dst_ip']
protocol = packet_info['protocol']
@@ -93,9 +173,28 @@ def add_to_graph(packet_info: dict):
# Function to start sniffing packets
-@log.function
def start_sniffing(interface: str, packet_count: int = 10, timeout: int = 10):
- """Starts packet sniffing on a given network interface."""
+ """
+ Starts packet sniffing on a given network interface.
+
+ Captures network packets on the specified interface with configurable packet count and timeout. Processes each captured packet using a custom callback function, logs packet details, and stops when the specified packet count is reached.
+
+ Parameters:
+ interface (str): Network interface name to capture packets from.
+ packet_count (int, optional): Maximum number of packets to capture. Defaults to 10.
+ timeout (int, optional): Maximum time to spend capturing packets in seconds. Defaults to 10.
+
+ Side Effects:
+ - Logs packet details during capture
+ - Saves captured packet data to a CSV file
+ - Generates a network graph visualization
+
+ Raises:
+ Exception: If packet capture encounters unexpected errors
+
+ Example:
+ start_sniffing('eth0', packet_count=50, timeout=30)
+ """
log.info(f"Starting packet capture on interface '{interface}'...")
# Initialize a packet capture counter
@@ -103,6 +202,26 @@ def start_sniffing(interface: str, packet_count: int = 10, timeout: int = 10):
# Define a custom packet callback to count packets
def packet_callback(packet: IP) -> bool:
+ """
+ Callback function to process each captured network packet during sniffing.
+
+ Processes individual packets, logs their details, and manages packet capture termination. Tracks the number of packets captured and stops sniffing when the predefined packet count is reached.
+
+ Parameters:
+ packet (IP): The captured network packet to be processed.
+
+ Returns:
+ bool: True if the specified packet count has been reached, signaling the sniffer to stop; False otherwise.
+
+ Side Effects:
+ - Increments the global packet counter
+ - Logs packet details using log_packet function
+ - Logs debug information about received packets
+ - Stops packet capture when packet count limit is met
+
+ Raises:
+ No explicit exceptions raised, but may propagate exceptions from log_packet function.
+ """
log.debug(f"Received packet: {packet.summary()}")
nonlocal packet_counter # Reference the outer packet_counter
if packet_counter >= packet_count:
@@ -122,9 +241,26 @@ def packet_callback(packet: IP) -> bool:
# Function to save captured packet data to CSV
-@log.function
def save_packet_data_to_csv(file_path: str):
- """Saves captured packet data to a CSV file."""
+ """
+ Saves captured packet data to a CSV file.
+
+ Writes the collected network packet information to a specified CSV file. If packet data exists, it creates a pandas DataFrame and exports it to the given file path. If no packet data has been captured, it logs a warning message.
+
+ Parameters:
+ file_path (str): The file path where the packet data will be saved as a CSV file.
+
+ Returns:
+ None
+
+ Side Effects:
+ - Writes packet data to a CSV file
+ - Logs an informational message on successful save
+ - Logs a warning if no packet data is available
+
+ Raises:
+ IOError: Potential file writing permission or path-related errors (implicitly handled by pandas)
+ """
global packet_data
if packet_data:
df = pd.DataFrame(packet_data)
@@ -134,18 +270,52 @@ def save_packet_data_to_csv(file_path: str):
log.warning("No packet data to save.")
-# Function to visualize the graph
-@log.function
-def visualize_graph(node_colors: str = None, node_sizes: str = None):
+# Function to visualize the graph of packet connections
+def visualize_graph(node_colors: dict[str, str] | None = None,
+ node_sizes: dict[str, int] | None = None,
+ *, # Force keyword arguments for the following parameters
+ figsize: tuple[int, int] = (12, 8),
+ font_size: int = 10,
+ font_weight: str = "bold",
+ title: str = "Network Connections Graph",
+ output_file: str = "network_connections_graph.png",
+ layout_func: callable = nx.spring_layout):
"""
Visualizes the graph of packet connections with customizable node colors and sizes.
-
+
+ Generates a network graph representation of packet connections using NetworkX and Matplotlib, with optional customization of node colors and sizes.
+
Parameters:
- node_colors (dict): A dictionary mapping node to color.
- node_sizes (dict): A dictionary mapping node to size.
+ node_colors (dict, optional): A dictionary mapping nodes to their display colors.
+ If not provided, defaults to skyblue for all nodes.
+ node_sizes (dict, optional): A dictionary mapping nodes to their display sizes.
+ If not provided, defaults to 3000 for all nodes.
+ figsize (tuple, optional): The size of the figure in inches (width, height). Defaults to (12, 8).
+ font_size (int, optional): The font size for node labels. Defaults to 10.
+ font_weight (str, optional): The font weight for node labels. Defaults to 'bold'.
+ title (str, optional): The title of the graph. Defaults to 'Network Connections Graph'.
+ output_file (str, optional): The name of the output PNG file to save the graph visualization. Defaults to 'network_connections_graph.png'.
+ layout_func (callable, optional): The layout function to use for the graph. Defaults to nx.spring_layout.
+
+ Side Effects:
+ - Creates a matplotlib figure
+ - Saves a PNG image file named 'network_connections_graph.png'
+ - Closes the matplotlib figure after saving
+
+ Returns:
+ None
+
+ Example:
+ # Default visualization
+ visualize_graph()
+
+ # Custom node colors and sizes
+ custom_colors = {'192.168.1.1': 'red', '10.0.0.1': 'green'}
+ custom_sizes = {'192.168.1.1': 5000, '10.0.0.1': 2000}
+ visualize_graph(node_colors=custom_colors, node_sizes=custom_sizes)
"""
- pos = nx.spring_layout(G)
- plt.figure(figsize=(12, 8))
+ pos = layout_func(G)
+ plt.figure(figsize=figsize)
if node_colors is None:
node_colors = {node: "skyblue" for node in G.nodes()}
@@ -156,43 +326,73 @@ def visualize_graph(node_colors: str = None, node_sizes: str = None):
node_color_list = [node_colors.get(node, "skyblue") for node in G.nodes()]
node_size_list = [node_sizes.get(node, 3000) for node in G.nodes()]
- nx.draw(G, pos, with_labels=True, node_size=node_size_list, node_color=node_color_list, font_size=10,
- font_weight="bold")
+ nx.draw(G, pos, with_labels=True, node_size=node_size_list, node_color=node_color_list, font_size=font_size,
+ font_weight=font_weight)
edge_labels = nx.get_edge_attributes(G, 'protocol')
nx.draw_networkx_edge_labels(G, pos, edge_labels=edge_labels)
- plt.title("Network Connections Graph")
- plt.savefig("network_connections_graph.png")
+ plt.title(title)
+ plt.savefig(output_file)
plt.close()
@log.function
-def main():
+def packet_sniffer():
+ """
+ Initiates packet sniffing based on configuration settings.
+
+ Reads network configuration parameters from a global config dictionary, including network interface, packet count, and timeout. Validates input parameters to ensure they are positive values. Attempts to start packet sniffing on the specified interface, with built-in error handling and interface name correction for common variations.
+
+ Raises:
+ SystemExit: If packet count or timeout values are invalid
+ Exception: If there are issues with the network interface or packet sniffing process
+
+ Side Effects:
+ - Logs configuration and sniffing errors
+ - Attempts to autocorrect interface names
+ - Calls start_sniffing() to capture network packets
+ - Exits the program if critical configuration errors are encountered
+ """
+
+ def correct_interface_name(interface_name: str) -> str:
+ corrections = {
+ "WiFi": "Wi-Fi",
+ "Wi-Fi": "WiFi"
+ }
+ return corrections.get(interface_name, interface_name)
+
interface = config['interface']
packet_count = int(config['packet_count'])
timeout = int(config['timeout'])
if packet_count <= 0 or timeout <= 0:
- log.error(
- f"Oops! Can't work with these values:\n"
- f"- Packet count: {packet_count} {'❌ (must be > 0)' if packet_count <= 0 else '✅'}\n"
- f"- Timeout: {timeout} {'❌ (must be > 0)' if timeout <= 0 else '✅'}"
- )
+ try:
+ log.error(
+ "Oops! Can't work with these values:\n"
+ f"- Packet count: {packet_count} {'❌ (must be > 0)' if packet_count <= 0 else '✅'}\n"
+ f"- Timeout: {timeout} {'❌ (must be > 0)' if timeout <= 0 else '✅'}"
+ )
+ except Exception:
+ log.error("Error reading configuration: Improper values for packet count or timeout")
exit(1)
- try:
- start_sniffing(interface, packet_count, timeout)
- except Exception as err:
- log.error(f"Invalid interface '{interface}'. Please check the configuration: {err}")
- if interface == "WiFi" or interface == "Wi-Fi":
- log.warning("Attempting to correct the interface name...")
- interface = "Wi-Fi" if interface == "WiFi" else "WiFi"
- log.info(f"Interface name corrected to '{interface}'.")
+ for attempt in range(2): # Try original and corrected name
+ try:
start_sniffing(interface, packet_count, timeout)
+ break
+ except Exception as err:
+ if attempt == 0 and interface in ("WiFi", "Wi-Fi"):
+ log.warning("Retrying with corrected interface name...")
+ interface = correct_interface_name(interface)
+ else:
+ log.error(f"Failed to sniff packets: {err}")
# Entry point of the script
if __name__ == "__main__":
try:
- main()
+ packet_sniffer()
except Exception as e:
log.error(e)
+ finally:
+ if G:
+ plt.close()
diff --git a/CODE/registry.py b/CODE/registry.py
index 9539b240..41c4282b 100644
--- a/CODE/registry.py
+++ b/CODE/registry.py
@@ -28,4 +28,5 @@ def backup_registry():
log.error(f"Failed to back up the registry: {e}")
-backup_registry()
+if __name__ == "__main__":
+ backup_registry()
diff --git a/CODE/sensitive_data_miner.py b/CODE/sensitive_data_miner.py
index e6692f99..eda42944 100644
--- a/CODE/sensitive_data_miner.py
+++ b/CODE/sensitive_data_miner.py
@@ -12,7 +12,13 @@
allowed_extensions = [
".png", ".txt", ".md", ".json", ".yaml", ".secret", ".jpg", ".jpeg",
".password", ".text", ".docx", ".doc", ".xls", ".xlsx", ".csv",
- ".xml", ".config", ".log", ".pdf", ".zip", ".rar", ".7z", ".tar"
+ ".xml", ".config", ".log", ".pdf", ".zip", ".rar", ".7z", ".tar",
+ ".gz", ".tgz", ".tar.gz", ".tar.bz2", ".tar.xz", ".tar.zst",
+ ".sql", ".db", ".dbf", ".sqlite", ".sqlite3", ".bak", ".dbx",
+ ".mdb", ".accdb", ".pst", ".ost", ".msg", ".eml", ".vsd",
+ ".vsdx", ".vsdm", ".vss", ".vssx", ".vssm", ".vst", ".vstx",
+ ".vstm", ".vdx", ".vsx", ".vtx", ".vdw", ".vsw", ".vst",
+ ".mpp", ".mppx", ".mpt", ".mpd", ".mpx", ".mpd", ".mdf",
]
@@ -20,15 +26,35 @@ class Mine:
@staticmethod
def __search_files_by_keyword(root: Path, keyword: str) -> list:
"""
- Searches for files containing the specified keyword in their names.
- Args:
- root (Path): The root directory to search in.
- keyword (str): The keyword to search for in file names.
+ Searches for files containing a specified keyword in their names within a given directory.
+
+ Parameters:
+ root (Path): The root directory to search in for files.
+ keyword (str): The keyword to search for in file names (case-insensitive).
+
Returns:
- list: List of files that match the search criteria.
+ list: A list of file paths matching the search criteria, which:
+ - Contain the keyword in their filename (case-insensitive)
+ - Are files (not directories)
+ - Have file extensions in the allowed_extensions list
+
+ Raises:
+ WindowsError: If permission is denied when accessing the directory (logged as a warning in debug mode)
+
+ Notes:
+ - Skips files with unsupported extensions, logging debug information
+ - Uses case-insensitive keyword matching
"""
matching_files = []
- for filename in os.listdir(root):
+ path_list = []
+ try:
+ path_list = os.listdir(root)
+ except WindowsError as e:
+ if DEBUG:
+ # Log the error if in debug mode, as it is a common occurrence.
+ log.warning(f"Permission Denied: {e}")
+
+ for filename in path_list:
file_path = root / filename
if (
keyword.lower() in filename.lower()
@@ -36,33 +62,60 @@ def __search_files_by_keyword(root: Path, keyword: str) -> list:
and file_path.suffix in allowed_extensions
):
matching_files.append(file_path)
+ else:
+ log.debug(f"Skipped {file_path}, Unsupported due to {file_path.suffix} extension")
return matching_files
@staticmethod
def __copy_file(src_file_path: Path, dst_file_path: Path):
"""
- Copies a file to the destination directory.
- Args:
- src_file_path (Path): Source file path.
- dst_file_path (Path): Destination file path.
- Returns:
- None
+ Copy a file from the source path to the destination path.
+
+ Parameters:
+ src_file_path (Path): The full path of the source file to be copied.
+ dst_file_path (Path): The full path where the file will be copied.
+
+ Raises:
+ FileExistsError: If a file already exists at the destination path.
+ Exception: For any other unexpected errors during file copying.
+
+ Notes:
+ - Uses shutil.copy() for file copying
+ - Logs debug message on successful copy
+ - Logs warning if file already exists
+ - Logs error for any other copying failures
"""
try:
+ # Check file size and permissions
+ if src_file_path.stat().st_size > 10_000_000: # 10MB limit
+ log.warning("File exceeds size limit")
+ return
shutil.copy(src_file_path, dst_file_path)
log.debug(f"Copied {src_file_path} to {dst_file_path}")
except FileExistsError as e:
- log.warning(f"Failed to copy file due to it already existing: {e}")
+ log.warning(f"File already exists in destination: {e}")
except Exception as e:
log.error(f"Failed to copy file: {e}")
- def __search_and_copy_files(self, keyword: str):
+ @classmethod
+ def __search_and_copy_files(cls, keyword: str):
"""
Searches for files containing the specified keyword in their names and copies them to a destination directory.
- Args:
- keyword (str): The keyword to search for in file names.
- Returns:
- None
+
+ This method performs a comprehensive file search across the C: drive, identifying files that match a given keyword and concurrently copying them to a dedicated "Password_Files" directory.
+
+ Parameters:
+ keyword (str): The keyword to search for in file names. Used to filter and identify potentially sensitive files.
+
+ Side Effects:
+ - Creates a "Password_Files" directory if it does not exist
+ - Logs informational messages about the search and copy process
+ - Utilizes multithreading to efficiently search and copy files
+
+ Notes:
+ - Searches recursively through all directories starting from C:\
+ - Uses ThreadPoolExecutor for concurrent file searching and copying
+ - Handles potential permission and file access errors during search
"""
log.info(f"Searching/Copying file's with keyword: {keyword}")
drives_root = Path("C:\\")
@@ -71,23 +124,44 @@ def __search_and_copy_files(self, keyword: str):
destination.mkdir()
with ThreadPoolExecutor() as executor:
- for root, dirs, files in os.walk(drives_root):
- future_to_file = {executor.submit(self.__search_files_by_keyword, Path(root), keyword): root_path for
- root_path in dirs}
+ for root, dirs, _ in os.walk(drives_root):
+ future_to_file = {
+ executor.submit(cls.__search_files_by_keyword, Path(root) / sub_dir, keyword): sub_dir
+ for sub_dir in dirs
+ }
for future in future_to_file:
for file_path in future.result():
dst_file_path = destination / file_path.name
- executor.submit(self.__copy_file, file_path, dst_file_path)
+ executor.submit(cls.__copy_file, file_path, dst_file_path)
+ @classmethod
@log.function
- def passwords(self):
+ def passwords(cls):
"""
- Searches for files containing sensitive data keywords in their filenames,
- copies them to a "Password Files" directory, and logs the completion of the task.
- Returns:
- None
+ Searches for and copies files containing sensitive data keywords to a dedicated directory.
+
+ This method performs a comprehensive search for files with predefined sensitive keywords in their names,
+ copying matching files to a "Password_Files" directory. It handles directory cleanup and uses predefined
+ keywords related to sensitive information.
+
+ Side Effects:
+ - Creates or recreates the "Password_Files" directory
+ - Copies files matching sensitive keywords to the destination directory
+ - Logs the completion of the sensitive data mining process
+
+ Keywords Searched:
+ - "password"
+ - "secret"
+ - "code"
+ - "login"
+ - "api"
+ - "key"
+
+ Logging:
+ - Logs an informational message upon completion of the search and copy process
"""
- keywords = ["password", "secret", "code", "login", "api", "key"]
+ keywords = ["password", "secret", "code", "login", "api", "key",
+ "token", "auth", "credentials", "private", "cert", "ssh", "pgp", "wallet"]
# Ensure the destination directory is clean
destination = Path("Password_Files")
@@ -96,10 +170,12 @@ def passwords(self):
destination.mkdir()
for word in keywords:
- self.__search_and_copy_files(word)
+ cls.__search_and_copy_files(word)
log.info("Sensitive Data Miner Completed")
-log.warning("Sensitive Data Miner Started, This may take a while... (aka touch some grass)")
-Mine().passwords()
+if __name__ == "__main__":
+ log.warning(
+ "Sensitive Data Miner Initialized. Processing may take a while... (Consider a break: coffee or fresh air recommended!)")
+ Mine.passwords()
diff --git a/CODE/ssh_miner.py b/CODE/ssh_miner.py
index 5214b7f3..1588784b 100644
--- a/CODE/ssh_miner.py
+++ b/CODE/ssh_miner.py
@@ -46,4 +46,5 @@ def ssh_miner():
log.info("SSH Miner completed.")
-ssh_miner()
+if __name__ == "__main__":
+ ssh_miner()
diff --git a/CODE/sys_internal.py b/CODE/sys_internal.py
index a8205128..2f3cfe01 100644
--- a/CODE/sys_internal.py
+++ b/CODE/sys_internal.py
@@ -60,7 +60,6 @@ def sys_internal():
log.info("SysInternal Suite fully executed")
-@log.function
def check_sys_internal_dir() -> tuple[bool, bool]:
"""
Checks the existence of the 'SysInternal_Suite' directory and its contents.
@@ -82,13 +81,11 @@ def check_sys_internal_dir() -> tuple[bool, bool]:
return False, False
-if check_sys_internal_dir()[0]:
- sys_internal()
-elif check_sys_internal_dir()[0] is False and check_sys_internal_dir()[1] is True:
- log.warning(
- "Files are not found, They are still zipped, most likely due to a .ignore file being present, continuing Logicytics"
- )
-elif check_sys_internal_dir()[0] is False and check_sys_internal_dir()[1] is False:
- log.error(
- "Files are not found, The zip file is also missing!, continuing Logicytics"
- )
+if __name__ == "__main__":
+ if check_sys_internal_dir()[0]:
+ sys_internal()
+ elif check_sys_internal_dir()[1]:
+ log.warning(
+ "Files are not found, They are still zipped, most likely due to a .ignore file being present, continuing Logicytics")
+ else:
+ log.error("Files are not found, The zip file is also missing!, continuing Logicytics")
diff --git a/CODE/tasklist.py b/CODE/tasklist.py
index f9664fbb..c8cc289a 100644
--- a/CODE/tasklist.py
+++ b/CODE/tasklist.py
@@ -32,4 +32,5 @@ def tasklist():
log.error(f"Error: {e}")
-tasklist()
+if __name__ == "__main__":
+ tasklist()
diff --git a/CODE/vulnscan.py b/CODE/vulnscan.py
index e9cf5fea..6c75f8df 100644
--- a/CODE/vulnscan.py
+++ b/CODE/vulnscan.py
@@ -4,6 +4,7 @@
import os
import threading
import warnings
+from concurrent.futures import ThreadPoolExecutor
import joblib
import numpy as np
@@ -17,26 +18,26 @@
if __name__ == "__main__":
log = Log({"log_level": DEBUG})
-log.info("Locking threads - Model and Vectorizer")
-model_lock = threading.Lock()
-vectorizer_lock = threading.Lock()
+# TODO v3.1.0: Load models and then use caching to avoid reloading models
-model_to_use = None
-vectorizer_to_use = None
+# Ignore all warnings
+warnings.filterwarnings("ignore")
def load_model(model_path_to_load: str) -> safe_open | torch.nn.Module:
"""
Load a machine learning model from the specified file path.
-
- Args:
- model_path_to_load (str): Path to the model file.
-
+
+ Supports loading models from three different file formats: Pickle (.pkl), SafeTensors (.safetensors), and PyTorch (.pth) files.
+
+ Parameters:
+ model_path_to_load (str): Full file path to the model file to be loaded.
+
Returns:
- model: Loaded model object.
-
+ safe_open | torch.nn.Module: Loaded model object, which can be a joblib, safetensors, or torch model.
+
Raises:
- ValueError: If the model file format is unsupported.
+ ValueError: If the model file does not have a supported extension (.pkl, .safetensors, or .pth).
"""
if model_path_to_load.endswith('.pkl'):
return joblib.load(model_path_to_load)
@@ -50,7 +51,27 @@ def load_model(model_path_to_load: str) -> safe_open | torch.nn.Module:
raise ValueError("Unsupported model file format. Use .pkl, .safetensors, or .pth")
+@log.function
def scan_path(model_path: str, scan_paths: str, vectorizer_path: str):
+ """
+ Scan a specified path for sensitive content using a pre-trained machine learning model and vectorizer.
+
+ This function handles loading the model and vectorizer if they are not already initialized, and then performs a vulnerability scan on the given path. It ensures thread-safe model and vectorizer loading using global locks.
+
+ Args:
+ model_path (str): Filesystem path to the machine learning model file to be used for scanning.
+ scan_paths (str): Filesystem path to the file or directory that will be scanned for sensitive content.
+ vectorizer_path (str): Filesystem path to the vectorizer file used for text feature extraction.
+
+ Raises:
+ Exception: Captures and logs any errors that occur during the scanning process, preventing the entire scanning operation from halting.
+
+ Side Effects:
+ - Loads global model and vectorizer if not already initialized
+ - Logs information about model and vectorizer loading
+ - Calls vulnscan() to perform actual file scanning
+ - Logs any errors encountered during scanning
+ """
global model_to_use, vectorizer_to_use
try:
with model_lock:
@@ -62,8 +83,14 @@ def scan_path(model_path: str, scan_paths: str, vectorizer_path: str):
log.info(f"Loading vectorizer from {vectorizer_path}")
vectorizer_to_use = joblib.load(vectorizer_path)
vulnscan(model_to_use, scan_paths, vectorizer_to_use)
- except Exception as e:
- log.error(f"Error scanning path {scan_paths}: {e}")
+ except FileNotFoundError as err:
+ log.error(f"File not found while scanning {scan_paths}: {err}")
+ except PermissionError as err:
+ log.error(f"Permission denied while scanning {scan_paths}: {err}")
+ except (torch.serialization.pickle.UnpicklingError, RuntimeError) as err:
+ log.error(f"Model loading failed for {scan_paths}: {err}")
+ except Exception as err:
+ log.error(f"Unexpected error scanning {scan_paths}: {err}")
def is_sensitive(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_content: str) -> tuple[bool, float, str]:
@@ -122,11 +149,31 @@ def scan_file(model: torch.nn.Module, vectorizer: TfidfVectorizer, file_path: st
return is_sensitive(model, vectorizer, content)
+@log.function
def vulnscan(model, SCAN_PATH, vectorizer):
- log.info(f"Scanning {SCAN_PATH}")
+ """
+ Scan a file to determine if it contains sensitive content and log the results.
+
+ Args:
+ model (object): Machine learning model used for content sensitivity classification.
+ SCAN_PATH (str): Absolute or relative file path to be scanned for sensitive content.
+ vectorizer (object): Text vectorization model to transform file content into feature representation.
+
+ Returns:
+ None: Logs sensitive file details and appends file path to 'Sensitive_File_Paths.txt' if sensitive content is detected.
+
+ Side Effects:
+ - Logs scanning information using the configured logger
+ - Creates or appends to 'Sensitive_File_Paths.txt' when sensitive content is found
+ - Writes sensitive file paths to the log file
+
+ Raises:
+ IOError: If there are issues writing to the 'Sensitive_File_Paths.txt' file
+ """
+ log.debug(f"Scanning {SCAN_PATH}")
result, probability, reason = scan_file(model, vectorizer, SCAN_PATH)
if result:
- log.info(f"File {SCAN_PATH} is sensitive with probability {probability:.2f}. Reason: {reason}")
+ log.debug(f"File {SCAN_PATH} is sensitive with probability {probability:.2f}. Reason: {reason}")
if not os.path.exists("Sensitive_File_Paths.txt"):
with open("Sensitive_File_Paths.txt", "w") as sensitive_file:
sensitive_file.write(f"{SCAN_PATH}\n\n")
@@ -134,31 +181,57 @@ def vulnscan(model, SCAN_PATH, vectorizer):
sensitive_file.write(f"{SCAN_PATH}\n")
-# Start scanning
-log.info("Getting paths to scan - This may take some time!!")
-
-threads = []
-paths = []
-base_paths = [
- "C:\\Users\\",
- "C:\\Windows\\Logs",
- "C:\\Program Files",
- "C:\\Program Files (x86)"
-]
-
-for base_path in base_paths:
- for root, dirs, files_main in os.walk(base_path):
- for file_main in files_main:
- paths.append(os.path.join(root, file_main))
-
-# Start scanning
-log.warning("Starting scan - This may take hours and consume memory!!")
-
-for path in paths:
- thread = threading.Thread(target=scan_path,
- args=("VulnScan/Model SenseMini .3n3.pth", path, "VulnScan/Vectorizer .3n3.pkl"))
- threads.append(thread)
- thread.start()
-
-for thread in threads:
- thread.join()
+if __name__ == "__main__":
+ # Locks for model and vectorizer
+ log.info("Locking threads - Model and Vectorizer")
+ model_lock = threading.Lock()
+ vectorizer_lock = threading.Lock()
+
+ model_to_use = None
+ vectorizer_to_use = None
+
+ # Start scanning
+ log.info("Getting paths to scan - This may take some time!!")
+
+ threads = []
+ paths = []
+ base_paths = [
+ "C:\\Users\\",
+ "C:\\Windows\\Logs",
+ "C:\\Program Files",
+ "C:\\Program Files (x86)"
+ ]
+
+ # Use max_workers based on CPU count but cap it at a reasonable number
+ max_workers = min(32, os.cpu_count() * 2)
+
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
+ futures = [executor.submit(os.path.join, root, file_main) for base_path in base_paths for root, _, files_main in
+ os.walk(base_path) for file_main in files_main]
+ for future in futures:
+ paths.append(future.result())
+
+ # Start scanning
+ log.warning("Starting scan - This may take hours and consume memory!!")
+
+ with ThreadPoolExecutor(max_workers=max_workers) as executor:
+ total_paths = len(paths)
+ completed = 0
+ futures = [
+ executor.submit(
+ scan_path,
+ "VulnScan/Model SenseMini .3n3.pth",
+ path,
+ "VulnScan/Vectorizer .3n3.pkl"
+ )
+ for path in paths
+ ]
+ for future in futures:
+ try:
+ future.result()
+ completed += 1
+ if completed % 100 == 0:
+ progress = (completed / total_paths) * 100
+ log.info(f"Scan progress: {progress:.1f}% ({completed}/{total_paths})")
+ except Exception as e:
+ log.error(f"Scan failed: {e}")
diff --git a/CODE/wifi_stealer.py b/CODE/wifi_stealer.py
index 8189b94f..ddd563ab 100644
--- a/CODE/wifi_stealer.py
+++ b/CODE/wifi_stealer.py
@@ -6,19 +6,23 @@
log = Log({"log_level": DEBUG})
-@log.function
def get_password(ssid: str) -> str | None:
"""
- Retrieves the password associated with a given Wi-Fi SSID.
-
+ Retrieves the password for a specified Wi-Fi network.
+
Args:
- ssid (str): The SSID of the Wi-Fi network.
-
+ ssid (str): The name (SSID) of the Wi-Fi network to retrieve the password for.
+
Returns:
- str or None: The password associated with the SSID, or None if no password is found.
-
+ str or None: The Wi-Fi network password if found, otherwise None.
+
Raises:
- Exception: If an error occurs while executing the command.
+ Exception: If an error occurs during command execution or password retrieval.
+
+ Notes:
+ - Uses the Windows `netsh` command to extract network profile details
+ - Searches command output for "Key Content" to find the password
+ - Logs any errors encountered during the process
"""
try:
command_output = Execute.command(
@@ -33,16 +37,23 @@ def get_password(ssid: str) -> str | None:
log.error(err)
-@log.function
def parse_wifi_names(command_output: str) -> list:
"""
Parses the output of the command to extract Wi-Fi profile names.
-
+
Args:
- command_output (str): The output of the command "netsh wlan show profile".
-
+ command_output (str): The output of the command "netsh wlan show profile" containing Wi-Fi profile information.
+
Returns:
- list: A list of Wi-Fi profile names.
+ list: A list of extracted Wi-Fi profile names, stripped of whitespace.
+
+ Raises:
+ No explicit exceptions are raised by this function.
+
+ Example:
+ >>> output = "All User Profile : HomeNetwork\\nAll User Profile : WorkWiFi"
+ >>> parse_wifi_names(output)
+ ['HomeNetwork', 'WorkWiFi']
"""
wifi_names = []
@@ -55,17 +66,21 @@ def parse_wifi_names(command_output: str) -> list:
return wifi_names
-@log.function
def get_wifi_names() -> list:
"""
Retrieves the names of all Wi-Fi profiles on the system.
-
- This function executes the command "netsh wlan show profile" to retrieve the list of Wi-Fi profiles.
- It then iterates over each line of the command output and checks if the line contains the string "All User Profile".
- If it does, it extrActions()s the Wi-Fi profile name and appends it to the list of Wi-Fi names.
-
+
+ Executes the "netsh wlan show profile" command to list available Wi-Fi network profiles.
+ Parses the command output to extract individual profile names.
+
Returns:
- list: A list of Wi-Fi profile names.
+ list: A list of Wi-Fi network profile names discovered on the system.
+
+ Raises:
+ Exception: If an error occurs during the retrieval of Wi-Fi names.
+
+ Example:
+ wifi_profiles = get_wifi_names() # Returns ['HomeNetwork', 'CoffeeShop', ...]
"""
try:
log.info("Retrieving Wi-Fi names...")
@@ -98,4 +113,5 @@ def get_wifi_passwords():
log.error(e)
-get_wifi_passwords()
+if __name__ == "__main__":
+ get_wifi_passwords()
diff --git a/CODE/wmic.py b/CODE/wmic.py
index 53bb99d5..28b59bf6 100644
--- a/CODE/wmic.py
+++ b/CODE/wmic.py
@@ -38,4 +38,5 @@ def wmic():
file.write("-" * 190)
-wmic()
+if __name__ == "__main__":
+ wmic()
diff --git a/CREDITS.md b/CREDITS.md
index 9f6aa9bf..35dc7568 100644
--- a/CREDITS.md
+++ b/CREDITS.md
@@ -3,6 +3,10 @@
This project is built on the shoulders of giants and inspired by the work of many talented individuals and
organizations. We acknowledge their contributions and are grateful for the knowledge and tools they have shared.
+
+
+
+