diff --git a/.gitignore b/.gitignore
index 65c57571..f0f94ffc 100644
--- a/.gitignore
+++ b/.gitignore
@@ -322,3 +322,9 @@ $RECYCLE.BIN/
/CODE/VulnScan/tools/NN features/
/CODE/logicytics/User_History.json.gz
/CODE/logicytics/User_History.json
+/CODE/SysInternal_Suite/psfile.exe
+/CODE/SysInternal_Suite/PsGetsid.exe
+/CODE/SysInternal_Suite/PsInfo.exe
+/CODE/SysInternal_Suite/pslist.exe
+/CODE/SysInternal_Suite/PsLoggedon.exe
+/CODE/SysInternal_Suite/psloglist.exe
diff --git a/.idea/csv-editor.xml b/.idea/csv-editor.xml
new file mode 100644
index 00000000..cb2fb408
--- /dev/null
+++ b/.idea/csv-editor.xml
@@ -0,0 +1,16 @@
+
+
+
+
+
+
\ No newline at end of file
diff --git a/CODE/Logicytics.py b/CODE/Logicytics.py
index 850f09dc..bd4928ce 100644
--- a/CODE/Logicytics.py
+++ b/CODE/Logicytics.py
@@ -12,12 +12,12 @@
import psutil
from prettytable import PrettyTable
-from logicytics import Log, Execute, Check, Get, FileManagement, Flag, DEBUG, DELETE_LOGS, CONFIG
+from logicytics import Log, Execute, Check, Get, FileManagement, Flag, DEBUG, DELETE_LOGS, config
# Initialization
log = Log({"log_level": DEBUG, "delete_log": DELETE_LOGS})
ACTION, SUB_ACTION = None, None
-MAX_WORKERS = CONFIG.getint("Settings", "max_workers", fallback=min(32, (os.cpu_count() or 1) + 4))
+MAX_WORKERS = config.getint("Settings", "max_workers", fallback=min(32, (os.cpu_count() or 1) + 4))
log.debug(f"MAX_WORKERS: {MAX_WORKERS}")
@@ -67,7 +67,8 @@ def __generate_execution_list(self) -> list[str]:
- Warns users about potential long execution times for certain actions
"""
execution_list = Get.list_of_files(".", only_extensions=(".py", ".exe", ".ps1", ".bat"),
- exclude_files=["Logicytics.py"])
+ exclude_files=["Logicytics.py"],
+ exclude_dirs=["logicytics", "SysInternal_Suite"])
files_to_remove = {
"sensitive_data_miner.py",
"dir_list.py",
@@ -101,7 +102,8 @@ def __generate_execution_list(self) -> list[str]:
elif ACTION == "modded":
# Add all files in MODS to execution list
execution_list = Get.list_of_files("../MODS", only_extensions=(".py", ".exe", ".ps1", ".bat"),
- append_file_list=execution_list, exclude_files=["Logicytics.py"])
+ append_file_list=execution_list, exclude_files=["Logicytics.py"],
+ exclude_dirs=["logicytics", "SysInternal_Suite"])
elif ACTION == "depth":
log.warning(
@@ -128,6 +130,7 @@ def __generate_execution_list(self) -> list[str]:
log.critical("Nothing is in the execution list.. This is due to faulty code or corrupted Logicytics files!")
exit(1)
+ log.debug(f"Execution list length: {len(execution_list)}")
log.debug(f"The following will be executed: {execution_list}")
return execution_list
@@ -208,18 +211,32 @@ def __performance(self):
end_time = datetime.now()
end_memory = process.memory_full_info().uss / 1024 / 1024 # MB
elapsed_time = end_time - start_time
- memory_delta = end_memory - start_memory
- memory_usage.append((self.execution_list[file], str(memory_delta)))
+ memory_delta = max(0, end_memory - start_memory) # Clamps negative delta to 0
+ memory_usage.append((self.execution_list[file], f"{memory_delta}"))
execution_times.append((self.execution_list[file], elapsed_time))
log.info(f"{self.execution_list[file]} executed in {elapsed_time}")
- log.info(f"{self.execution_list[file]} used {memory_delta:.2f}MB of memory")
- log.debug(f"Started with {start_memory}MB of memory and ended with {end_memory}MB of memory")
+ try:
+ if (end_memory - start_memory) < 0:
+ log.info(
+ f"{self.execution_list[file]} used {memory_delta:.3f}MB of memory - \033[33mPossible Affected by outside processes\033[0m")
+ else:
+ log.info(f"{self.execution_list[file]} used {memory_delta:.3f}MB of memory")
+ except Exception as e:
+ log.warning("Failed to log memory usage delta, reason: " + str(e))
+ log.debug(f"Started with {start_memory:.3f}MB of memory and ended with {end_memory:.3f}MB of memory")
table = PrettyTable()
table.field_names = ["Script", "Execution Time", "Memory Usage (MB)"]
for script, elapsed_time in execution_times:
- memory = next(m[1] for m in memory_usage if m[0] == script)
- table.add_row([script, elapsed_time, f"{memory:.2f}"])
+ try:
+ memory = f"{float(next(m[1] for m in memory_usage if m[0] == script)):.3f}"
+ except StopIteration:
+ log.warning(f"No memory data found for {script}")
+ memory = "N/A"
+ except Exception as e:
+ log.warning(f"Failed to log memory usage for {script}, reason: " + str(e))
+ memory = "N/A"
+ table.add_row([script, elapsed_time, f"{memory}"])
try:
with open(
@@ -228,9 +245,9 @@ def __performance(self):
) as f:
f.write(table.get_string())
f.write(
- "\nSome values may be negative, Reason may be due to external resources playing with memory usage, "
- "close background tasks to get more accurate readings")
- f.write("Note: This is not a low-level memory logger, data here isn't 100% accurate!")
+ "\nSome values may be negative, Reason may be due to external resources playing with memory usage,\n"
+ "Close background tasks to get more accurate readings\n\n")
+ f.write("Note: This is not a low-level memory logger, data here isn't 100% accurate!\n")
log.info("Performance check complete! Performance log found in ACCESS/LOGS/PERFORMANCE")
except Exception as e:
log.error(f"Error writing performance log: {e}")
@@ -434,10 +451,16 @@ def check_privileges():
log.warning("UAC is enabled, this may cause issues - Please disable UAC if possible")
-def zip_generated_files():
- """Zips generated files based on the action."""
+class ZIP:
+ @classmethod
+ def files(cls):
+ """Zips generated files based on the action."""
+ if ACTION == "modded":
+ cls.__and_log("..\\MODS", "MODS")
+ cls.__and_log(".", "CODE")
- def zip_and_log(directory: str, name: str):
+ @staticmethod
+ def __and_log(directory: str, name: str):
log.debug(f"Zipping directory '{directory}' with name '{name}' under action '{ACTION}'")
zip_values = FileManagement.Zip.and_hash(
directory,
@@ -451,10 +474,6 @@ def zip_and_log(directory: str, name: str):
log.info(zip_loc)
log.debug(hash_loc)
- if ACTION == "modded":
- zip_and_log("..\\MODS", "MODS")
- zip_and_log(".", "CODE")
-
def handle_sub_action():
"""
@@ -472,8 +491,7 @@ def handle_sub_action():
elif SUB_ACTION == "reboot":
subprocess.call("shutdown /r /t 3", shell=False)
# elif sub_action == "webhook":
- # Implement this in future
- # log.warning("This feature is not implemented yet! Sorry")
+ # TODO: Implement this in future v3.5
@log.function
@@ -501,7 +519,7 @@ def Logicytics():
# Execute scripts
ExecuteScript().handler()
# Zip generated files
- zip_generated_files()
+ ZIP.files()
# Finish with sub actions
handle_sub_action()
# Finish
diff --git a/CODE/_dev.py b/CODE/_dev.py
index 7a37e3e3..aa671aea 100644
--- a/CODE/_dev.py
+++ b/CODE/_dev.py
@@ -116,7 +116,7 @@ def _handle_file_operations() -> None:
Handles file operations and logging for added, removed, and normal files.
"""
EXCLUDE_FILES = ["logicytics\\User_History.json.gz", "logicytics\\User_History.json"]
- files = Get.list_of_files(".", exclude_files=EXCLUDE_FILES)
+ files = Get.list_of_files(".", exclude_files=EXCLUDE_FILES, exclude_dirs=["SysInternal_Suite"])
added_files, removed_files, normal_files = [], [], []
clean_files_list = [file.replace('"', '') for file in CURRENT_FILES]
diff --git a/CODE/config.ini b/CODE/config.ini
index ad95cbaa..a01d2fa1 100644
--- a/CODE/config.ini
+++ b/CODE/config.ini
@@ -26,8 +26,8 @@ save_preferences = true
[System Settings]
# Do not play with these settings unless you know what you are doing
# Dev Mode allows a safe way to modify these settings!!
-version = 3.4.0
-files = "bluetooth_details.py, bluetooth_logger.py, browser_miner.ps1, cmd_commands.py, config.ini, dir_list.py, dump_memory.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, network_psutil.py, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, logicytics\Checks.py, logicytics\Config.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\User_History.json.gz, SysInternal_Suite\.sys.ignore, SysInternal_Suite\SysInternal_Suite.zip, VulnScan\Model SenseMini .3n3.pth, VulnScan\README.md, VulnScan\Vectorizer .3n3.pkl"
+version = 3.4.1
+files = "bluetooth_details.py, bluetooth_logger.py, browser_miner.ps1, cmd_commands.py, config.ini, dir_list.py, dump_memory.py, event_log.py, Logicytics.py, log_miner.py, media_backup.py, netadapter.ps1, network_psutil.py, packet_sniffer.py, property_scraper.ps1, registry.py, sensitive_data_miner.py, ssh_miner.py, sys_internal.py, tasklist.py, tree.ps1, vulnscan.py, wifi_stealer.py, window_feature_miner.ps1, wmic.py, logicytics\Checks.py, logicytics\Config.py, logicytics\Execute.py, logicytics\FileManagement.py, logicytics\Flag.py, logicytics\Get.py, logicytics\Logger.py, logicytics\User_History.json.gz, VulnScan\Model SenseMini .3n3.pth, VulnScan\README.md, VulnScan\Vectorizer .3n3.pkl"
########################################################
# The following settings are for specific modules #
@@ -56,14 +56,45 @@ model_debug = false
###################################################
+[DumpMemory Settings]
+# If the file size generated exceeds this limit,
+# the file will be truncated with a message
+# Put 0 to disable the limit - Limit is in MiB - int
+file_size_limit = 0
+# Safety margin to check, it multiplies with the size limit
+# This makes sure that after the file is created, there is still
+# disk space left for other tasks,
+# Make sure its above 1 or else it will fail
+# Put 1 to disable the limit - Limit is in MiB - float
+file_size_safety = 1.5
+
+###################################################
+
+[NetWorkPsutil Settings]
+# Total time this will take will be `sample_count * interval`
+
+# Number of samples to take for feature `measure network bandwidth usage`
+# This is an integer, and should be 1 and above
+sample_count = 5
+# Time between samples in seconds for feature `measure network bandwidth usage`
+# This is a float, and should be above 0
+interval = 1.5
+
+###################################################
+
[PacketSniffer Settings]
# The interface to sniff packets on, keep it as WiFi for most cases
# Autocorrects between WiFi and Wi-Fi
interface = WiFi
# The number of packets to sniff,
-packet_count = 10000
-# The time to timeout the sniffing process
+# Must be greater than or equal to 1 - int
+packet_count = 5000
+# The time to timeout the sniffing process only,
+# Must be greater than or equal to 5 - int
timeout = 10
+# The maximum retry time for the whole process,
+# Must be greater than or equal to 10 and timeout - int
+max_retry_time = 30
###################################################
diff --git a/CODE/dump_memory.py b/CODE/dump_memory.py
index cd8cd2d3..11b137af 100644
--- a/CODE/dump_memory.py
+++ b/CODE/dump_memory.py
@@ -5,16 +5,17 @@
import psutil
-from logicytics import log
+from logicytics import log, config
# TODO v3.4.1
# psutil.virtual_memory(): used, free, percent, total
# psutil.swap_memory(): used, free, percent, total
-# If the file size exceeds this limit, the file will be truncated with a message
-# Put 0 to disable the limit
-# TODO v3.4.1: Make this take from config.ini
-LIMIT_FILE_SIZE = 20 # Always in MiB
+LIMIT_FILE_SIZE = config.getint("DumpMemory Settings", "file_size_limit") # Always in MiB
+SAFETY_MARGIN = config.getfloat("DumpMemory Settings", "file_size_safety") # Always in MiB
+if SAFETY_MARGIN < 1:
+ log.critical("Invalid Safety Margin Inputted - Cannot proceed with dump memory")
+ exit(1)
# Capture RAM Snapshot
@@ -41,7 +42,7 @@ def capture_ram_snapshot():
log.info("Capturing RAM Snapshot...")
memory = psutil.virtual_memory()
swap = psutil.swap_memory()
- with open("Ram_Snapshot.txt", "w") as file:
+ with open("memory_dumps/Ram_Snapshot.txt", "w") as file:
try:
file.write(f"Total RAM: {memory.total / (1024 ** 3):.2f} GB\n")
file.write(f"Used RAM: {memory.used / (1024 ** 3):.2f} GB\n")
@@ -91,13 +92,13 @@ def gather_system_info():
except Exception as e:
log.error(f"Error gathering system information: {e}")
sys_info = {'Error': 'Failed to gather system information'}
- with open("SystemRam_Info.txt", "w") as file:
+ with open("memory_dumps/SystemRam_Info.txt", "w") as file:
for key, value in sys_info.items():
file.write(f"{key}: {value}\n")
log.info("System Information saved to SystemRam_Info.txt")
-# Memory Dump (Windows-specific, using psutil)
+# Memory Dump
def memory_dump():
"""
Perform a memory dump of the current process, capturing detailed metadata for each readable memory region.
@@ -127,12 +128,12 @@ def memory_dump():
try:
process = psutil.Process(pid)
- with open("Ram_Dump.txt", "wb") as dump_file:
+ with open("memory_dumps/Ram_Dump.txt", "wb") as dump_file:
total_size = 0
for mem_region in process.memory_maps(grouped=False):
# Check available disk space
if os.path.exists("Ram_Dump.txt"):
- required_space = LIMIT_FILE_SIZE * 1024 * 1024 * 1.5 # 2x safety margin
+ required_space = LIMIT_FILE_SIZE * 1024 * 1024 * SAFETY_MARGIN # 2x safety margin
free_space = psutil.disk_usage(".").free
if free_space < required_space:
log.error(f"Not enough disk space. Need {required_space / 1024 / 1024:.2f}MB")
@@ -206,6 +207,7 @@ def main():
No parameters.
No return value.
"""
+ os.makedirs("memory_dumps", exist_ok=True)
log.info("Starting system memory collection tasks...")
capture_ram_snapshot()
gather_system_info()
diff --git a/CODE/logicytics/Config.py b/CODE/logicytics/Config.py
index 9811a765..5969040b 100644
--- a/CODE/logicytics/Config.py
+++ b/CODE/logicytics/Config.py
@@ -15,7 +15,7 @@ def __config_data() -> tuple[str, str, list[str], bool, str]:
- Version (str): System version from configuration
- Files (list[str]): List of files specified in configuration
- Delete old logs (bool): Flag indicating whether to delete old log files
- - CONFIG itself
+ - config itself
Raises:
SystemExit: If the 'config.ini' file cannot be found in any of the attempted locations
@@ -30,21 +30,21 @@ def _config_path() -> str:
print("The config.ini file is not found in the expected location.")
exit(1)
- config = configparser.ConfigParser()
+ config_local = configparser.ConfigParser()
path = _config_path()
- config.read(path)
+ config_local.read(path)
- log_using_debug = config.getboolean("Settings", "log_using_debug")
- delete_old_logs = config.getboolean("Settings", "delete_old_logs")
- version = config.get("System Settings", "version")
- files = config.get("System Settings", "files").split(", ")
+ log_using_debug = config_local.getboolean("Settings", "log_using_debug")
+ delete_old_logs = config_local.getboolean("Settings", "delete_old_logs")
+ version = config_local.get("System Settings", "version")
+ files = config_local.get("System Settings", "files").split(", ")
log_using_debug = "DEBUG" if log_using_debug else "INFO"
- return log_using_debug, version, files, delete_old_logs, config
+ return log_using_debug, version, files, delete_old_logs, config_local
# Check if the script is being run directly, if not, set up the library
if __name__ == '__main__':
exit("This is a library, Please import rather than directly run.")
-DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS, CONFIG = __config_data()
+DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS, config = __config_data()
diff --git a/CODE/logicytics/Flag.py b/CODE/logicytics/Flag.py
index 2736b03c..887ab6b9 100644
--- a/CODE/logicytics/Flag.py
+++ b/CODE/logicytics/Flag.py
@@ -8,23 +8,23 @@
from collections import Counter
from datetime import datetime
-from .Config import CONFIG
+from logicytics.Config import config
# Check if the script is being run directly, if not, set up the library
if __name__ == '__main__':
exit("This is a library, Please import rather than directly run.")
else:
# Save user preferences?
- SAVE_PREFERENCES = CONFIG.getboolean("Settings", "save_preferences")
+ SAVE_PREFERENCES = config.getboolean("Settings", "save_preferences")
# Debug mode for Sentence Transformer
- DEBUG_MODE = CONFIG.getboolean("Flag Settings", "model_debug") # Debug mode for Sentence Transformer
+ DEBUG_MODE = config.getboolean("Flag Settings", "model_debug") # Debug mode for Sentence Transformer
# File for storing user history data
HISTORY_FILE = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'User_History.json.gz') # User history file
# Minimum accuracy threshold for flag suggestions
MIN_ACCURACY_THRESHOLD = float(
- CONFIG.get("Flag Settings", "accuracy_min")) # Minimum accuracy threshold for flag suggestions
- if not 0 <= MIN_ACCURACY_THRESHOLD <= 100:
- raise ValueError("accuracy_min must be between 0 and 100")
+ config.get("Flag Settings", "accuracy_min")) # Minimum accuracy threshold for flag suggestions
+ if not 1 <= MIN_ACCURACY_THRESHOLD <= 99:
+ raise ValueError("accuracy_min must be between 1 and 99")
class _Match:
@@ -61,11 +61,11 @@ def __get_sim(user_input: str, all_descriptions: list[str]) -> list[float]:
logging.getLogger("sentence_transformers").setLevel(logging.ERROR)
try:
- MODEL = SentenceTransformer(CONFIG.get("Flag Settings", "model_to_use"))
+ MODEL = SentenceTransformer(config.get("Flag Settings", "model_to_use"))
except Exception as e:
print(f"Error: {e}")
print("Please check the model name in the config file.")
- print(f"Model name {CONFIG.get('Flag Settings', 'model_to_use')} may not be valid.")
+ print(f"Model name {config.get('Flag Settings', 'model_to_use')} may not be valid.")
exit(1)
user_embedding = MODEL.encode(user_input, convert_to_tensor=True, show_progress_bar=DEBUG_MODE)
diff --git a/CODE/logicytics/Get.py b/CODE/logicytics/Get.py
index f41e93a3..8954000e 100644
--- a/CODE/logicytics/Get.py
+++ b/CODE/logicytics/Get.py
@@ -11,6 +11,7 @@ def list_of_files(
append_file_list: list[str] = None,
exclude_files: list[str] = None,
exclude_extensions: list[str] = None,
+ exclude_dirs: list[str] = None,
) -> list[str]:
"""
Retrieves a list of files in the specified directory based on given extensions and exclusion criteria.
@@ -21,6 +22,7 @@ def list_of_files(
append_file_list (list[str], optional): Existing list to append found filenames to. Defaults to None.
exclude_files (list[str], optional): List of filenames to exclude from results. Defaults to None.
exclude_extensions (list[str], optional): List of extensions to exclude from results. Defaults to None.
+ exclude_dirs (list[str], optional): List of directory names to ignore. Defaults to None.
Returns:
list[str]: A list of filenames matching the specified criteria.
@@ -28,12 +30,17 @@ def list_of_files(
Exclusion rules:
- Ignores files starting with an underscore (_)
- Skips files specified in `exclude_files`
+ - Skips directories specified in `ignore_dirs`
"""
append_file_list = append_file_list or []
exclude_files = set(exclude_files or [])
exclude_extensions = set(exclude_extensions or [])
+ exclude_dirs = set(exclude_dirs or []) # Set for faster lookup
+
+ for root, dirs, filenames in os.walk(directory):
+ # Remove ignored directories from the dirs list to prevent os.walk from entering them
+ dirs[:] = [d for d in dirs if d not in exclude_dirs]
- for root, _, filenames in os.walk(directory):
for filename in filenames:
if filename.startswith("_") or filename in exclude_files:
continue # Skip excluded files
diff --git a/CODE/logicytics/Logger.py b/CODE/logicytics/Logger.py
index c009b208..4e5ddac4 100644
--- a/CODE/logicytics/Logger.py
+++ b/CODE/logicytics/Logger.py
@@ -3,6 +3,7 @@
import inspect
import logging
import os
+import re
import time
from datetime import datetime
from typing import Type
@@ -99,13 +100,11 @@ def __init__(self, config: dict = None):
if not os.path.exists(self.filename):
self.newline()
- self.raw(
- "| Timestamp | LOG Level |"
- + " " * 71
- + "LOG Messages"
- + " " * 71
- + "|"
- )
+ self._raw("| Timestamp | LOG Level |"
+ + " " * 71
+ + "LOG Messages"
+ + " " * 71
+ + "|")
elif os.path.exists(self.filename) and config.get("delete_log", False):
with open(self.filename, "w") as f:
f.write(
@@ -177,7 +176,7 @@ def debug(self, message):
if self.color and message != "None" and message is not None:
colorlog.debug(str(message))
- def raw(self, message):
+ def _raw(self, message):
"""
Log a raw message directly to the log file.
@@ -201,12 +200,16 @@ def raw(self, message):
Raises:
Logs internal errors for Unicode or file writing issues without stopping execution
"""
- frame = inspect.stack()[1]
- if frame.function == "":
+ frame = inspect.currentframe().f_back
+ if frame and frame.f_code.co_name == "":
self.__internal(
f"Raw message called from a non-function - This is not recommended"
)
- if message != "None" and message is not None:
+ # Precompiled regex for ANSI escape codes
+ # Remove all ANSI escape sequences in one pass
+ message = re.compile(r'\033\[\d+(;\d+)*m').sub('', message)
+
+ if message and message != "None":
try:
with open(self.filename, "a", encoding="utf-8") as f:
f.write(f"{str(message)}\n")
@@ -239,9 +242,7 @@ def info(self, message):
"""
if self.color and message != "None" and message is not None:
colorlog.info(str(message))
- self.raw(
- f"[{self.__timestamp()}] > INFO: | {self.__trunc_message(str(message))}"
- )
+ self._raw(f"[{self.__timestamp()}] > INFO: | {self.__trunc_message(str(message))}")
def warning(self, message):
"""
@@ -251,9 +252,7 @@ def warning(self, message):
"""
if self.color and message != "None" and message is not None:
colorlog.warning(str(message))
- self.raw(
- f"[{self.__timestamp()}] > WARNING: | {self.__trunc_message(str(message))}"
- )
+ self._raw(f"[{self.__timestamp()}] > WARNING: | {self.__trunc_message(str(message))}")
def error(self, message):
"""
@@ -263,9 +262,7 @@ def error(self, message):
"""
if self.color and message != "None" and message is not None:
colorlog.error(str(message))
- self.raw(
- f"[{self.__timestamp()}] > ERROR: | {self.__trunc_message(str(message))}"
- )
+ self._raw(f"[{self.__timestamp()}] > ERROR: | {self.__trunc_message(str(message))}")
def critical(self, message):
"""
@@ -275,9 +272,7 @@ def critical(self, message):
"""
if self.color and message != "None" and message is not None:
colorlog.critical(str(message))
- self.raw(
- f"[{self.__timestamp()}] > CRITICAL: | {self.__trunc_message(str(message))}"
- )
+ self._raw(f"[{self.__timestamp()}] > CRITICAL: | {self.__trunc_message(str(message))}")
def string(self, message, type: str):
"""
@@ -329,9 +324,8 @@ def exception(self, message, exception_type: Type = Exception):
- Includes both the original message and the exception type in the log
"""
if self.color and message != "None" and message is not None:
- self.raw(
- f"[{self.__timestamp()}] > EXCEPTION:| {self.__trunc_message(f'{message} -> Exception provoked: {str(exception_type)}')}"
- )
+ self._raw(
+ f"[{self.__timestamp()}] > EXCEPTION:| {self.__trunc_message(f'{message} -> Exception provoked: {str(exception_type)}')}")
raise exception_type(message)
def execution(self, message_log: list[list[str, str]]):
diff --git a/CODE/logicytics/__init__.py b/CODE/logicytics/__init__.py
index c7247725..d2481937 100644
--- a/CODE/logicytics/__init__.py
+++ b/CODE/logicytics/__init__.py
@@ -2,7 +2,7 @@
import traceback
from logicytics.Checks import Check
-from logicytics.Config import DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS, CONFIG
+from logicytics.Config import DEBUG, VERSION, CURRENT_FILES, DELETE_LOGS, config
from logicytics.Execute import Execute
from logicytics.FileManagement import FileManagement
from logicytics.Flag import Flag
diff --git a/CODE/network_psutil.py b/CODE/network_psutil.py
index 1db71894..310a8d60 100644
--- a/CODE/network_psutil.py
+++ b/CODE/network_psutil.py
@@ -4,7 +4,7 @@
import psutil
-from logicytics import log, Execute
+from logicytics import log, Execute, config
class NetworkInfo:
@@ -12,8 +12,12 @@ class NetworkInfo:
A class to gather and save various network-related information.
"""
+ def __init__(self):
+ self.SAMPLE_COUNT = config.getint("NetWorkPsutil Settings", "sample_count")
+ self.INTERVAL = config.getfloat("NetWorkPsutil Settings", "interval")
+
@log.function
- def get(self):
+ async def get(self):
"""
Gathers and saves various network-related information by calling multiple internal methods.
"""
@@ -24,7 +28,7 @@ def get(self):
self.__fetch_network_interface_stats()
self.__execute_external_network_command()
self.__fetch_network_connections_with_process_info()
- self.__measure_network_bandwidth_usage()
+ await self.__measure_network_bandwidth_usage(sample_count=self.SAMPLE_COUNT, interval=self.INTERVAL)
self.__fetch_hostname_and_ip()
except Exception as e:
log.error(f"Error getting network info: {e}, Type: {type(e).__name__}")
@@ -133,7 +137,9 @@ async def __measure_network_bandwidth_usage(self, sample_count: int = 5, interva
sample_count: Number of samples to take (default: 5)
interval: Time between samples in seconds (default: 1.0)
"""
- # TODO v3.4.1: Allow config.ini to set values
+ if sample_count < 1 or interval <= 0:
+ log.critical(
+ "Invalid values passed down from configuration for `NetworkInfo.__measure_network_bandwidth_usage()`")
log.debug("Measuring network bandwidth usage...")
samples = []
for _ in range(sample_count):
@@ -181,4 +187,9 @@ def __fetch_hostname_and_ip(self):
if __name__ == "__main__":
- NetworkInfo().get()
+ try:
+ asyncio.run(NetworkInfo().get()) # Use asyncio.run to run the async get method
+ except asyncio.CancelledError:
+ log.warning("Operation cancelled by user.")
+ except Exception as err: # Catch all exceptions
+ log.error(f"An error occurred: {err}")
diff --git a/CODE/packet_sniffer.py b/CODE/packet_sniffer.py
index 5c969a04..504e04a4 100644
--- a/CODE/packet_sniffer.py
+++ b/CODE/packet_sniffer.py
@@ -1,7 +1,5 @@
from __future__ import annotations
-import os
-import warnings
from time import time
import matplotlib.pyplot as plt
@@ -10,13 +8,7 @@
from scapy.all import sniff, conf
from scapy.layers.inet import IP, TCP, UDP, ICMP
-from logicytics import log, CONFIG
-
-# Read configuration from config.ini
-CONFIG.read(os.path.join(os.path.dirname(os.path.abspath(__file__)), "config.ini"))
-config = CONFIG['PacketSniffer Settings']
-# Ignore all warnings (Wireshark issue)
-warnings.filterwarnings("ignore")
+from logicytics import log, config
class Sniff:
@@ -361,24 +353,24 @@ def correct_interface_name(interface_name: str) -> str:
}
return corrections.get(interface_name, interface_name)
- interface = config['interface']
- packet_count = int(config['packet_count'])
- timeout = int(config['timeout'])
+ interface = config.get("PacketSniffer Settings", "interface")
+ packet_count = config.getint("PacketSniffer Settings", "packet_count")
+ timeout = config.getint("PacketSniffer Settings", "timeout")
+ max_retry_time = config.getint("PacketSniffer Settings", "max_retry_time")
- if packet_count <= 0 or timeout <= 0:
+ if packet_count < 1 or timeout < 5 or max_retry_time < 10 or max_retry_time < timeout:
try:
- log.error(
- "Oops! Can't work with these values (Not your fault):\n"
- f" - Packet count: {packet_count} {'❌ (must be > 0)' if packet_count <= 0 else '✅'}\n"
- f" - Timeout: {timeout} {'❌ (must be > 0)' if timeout <= 0 else '✅'}"
+ log.critical(
+ "Oops! Can't work with these values):\n"
+ f" - Packet count: {packet_count} {'❌ (must be > 0)' if packet_count < 1 else '✅'}\n"
+ f" - Timeout: {timeout} {'❌ (must be >= 5)' if timeout < 5 else '✅'}\n"
+ f" - Max Retry Time: {max_retry_time} {'❌ (must be >= 10 and larger than timeout)' if max_retry_time < 10 or max_retry_time < timeout else '✅'}"
)
except Exception:
- log.error("Error reading configuration: Improper values for packet count or timeout")
+ log.critical("Error reading configuration: Improper values for packet count or timeout")
exit(1)
start_time = time()
- # TODO v3.4.1 -> Config.ini controlled value
- max_retry_time = 30 # seconds
for attempt in range(2): # Try original and corrected name
try:
if time() - start_time > max_retry_time:
diff --git a/CODE/vulnscan.py b/CODE/vulnscan.py
index 1d6ff6e5..f732bc7c 100644
--- a/CODE/vulnscan.py
+++ b/CODE/vulnscan.py
@@ -23,8 +23,6 @@
# Use Asynchronous File Scanning,
# Optimize Model Loading and Caching,
# Improve Feature Extraction
-
-# TODO: v3.4.1
# also add a global variable called MAX_FILE_SIZE, if its none ignore it, else only scan files under that file size (default at 50MB)
# add this to config.ini -> max_workers = min(32, os.cpu_count() * 2)
# add UNREADABLE_EXTENSIONS as well to config.ini
diff --git a/PLANS.md b/PLANS.md
index fe59e630..6424ae97 100644
--- a/PLANS.md
+++ b/PLANS.md
@@ -8,7 +8,7 @@
| Task | Version | Might or Will be done? |
|----------------------------------------------------------------------------------------------|---------|------------------------|
| Implement TODOs for v3.4.1 | v3.4.1 | ✅ |
-| Add docstrings to all functions as well as var types | v3.4.1 | ✅ |
+| Add docstrings to all functions as well as var types | v3.4.2 | ✅ |
| Implement Cleanup functionality for Logicytics if KeyboardInterrupt occurs | v3.4.2 | ✅ |
| Implement TODOs for v3.4.2 | v3.4.2 | ✅ |
| Implement logs for the logicytics lib, rather than prints | v3.4.2 | ✅ |