Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 15 additions & 43 deletions CODE/Logicytics.py
Original file line number Diff line number Diff line change
@@ -1,32 +1,19 @@
from __future__ import annotations

import datetime
import threading
from typing import Any

from __lib_class import *

"""
This python script is the main entry point for the tool called Logicytics.
The script performs various actions based on command-line flags and configuration settings.

Here's a high-level overview of what the script does:

1. Initializes directories and checks for admin privileges.
2. Parses command-line flags and sets up logging.
3. Performs special actions based on flags, such as debugging, updating, or restoring backups.
4. Creates an execution list of files to run, which can be filtered based on flags.
5. Runs the files in the execution list, either sequentially or in parallel using threading.
6. Zips generated files and attempts to delete event logs.
7. Performs sub-actions, such as shutting down or rebooting the system, or sending a webhook.

The script appears to be designed to be highly configurable and modular,
with many options and flags that can be used to customize its behavior.
"""
# Initialization
FileManagement.mkdir()
log = Log({"log_level": DEBUG})


class Health:
@staticmethod
@log.function
def backup(directory: str, name: str):
"""
Creates a backup of a specified directory by zipping its contents and moving it to a designated backup location.
Expand All @@ -53,6 +40,7 @@ def backup(directory: str, name: str):
shutil.move(f"{name}.zip", "../ACCESS/BACKUP")

@staticmethod
@log.function
def update() -> tuple[str, str]:
"""
Updates the repository by pulling the latest changes from the remote repository.
Expand All @@ -79,6 +67,7 @@ def update() -> tuple[str, str]:
return output, "info"


@log.function
def get_flags() -> tuple[str, str]:
"""
Retrieves the command-line flags and sub-actions.
Expand Down Expand Up @@ -106,6 +95,7 @@ def get_flags() -> tuple[str, str]:
return actions, sub_actions


@log.function
def special_execute(file_path: str):
"""
Executes a Python script in a new command prompt window.
Expand All @@ -120,6 +110,7 @@ def special_execute(file_path: str):
exit(0)


@log.function
def handle_special_actions():
"""
Handles special actions based on the provided action flag.
Expand Down Expand Up @@ -186,6 +177,7 @@ def handle_special_actions():
exit(0)


@log.function
def check_privileges():
"""
Checks if the script is running with administrative privileges and handles UAC (User Account Control) settings.
Expand All @@ -207,6 +199,7 @@ def check_privileges():
log.warning("UAC is enabled, this may cause issues - Please disable UAC if possible")


@log.function
def generate_execution_list(actions: str) -> list | list[str] | list[str | Any]:
"""
Creates an execution list based on the provided action.
Expand Down Expand Up @@ -269,32 +262,15 @@ def generate_execution_list(actions: str) -> list | list[str] | list[str | Any]:
return execution_list


def attempt_hide():
"""
Attempts to delete Windows event logs from the current day.

Parameters:
None

Returns:
None
"""
today = datetime.date.today()
log_path = r"C:\Windows\System32\winevt\Logs"

for file in os.listdir(log_path):
if file.endswith(".evtx") and file.startswith(today.strftime("%Y-%m-%d")):
subprocess.run(f'del "{os.path.join(log_path, file)}"', shell=False)


@log.function
def execute_scripts():
"""Executes the scripts in the execution list based on the action."""
# Check weather to use threading or not, as well as execute code
if action == "threaded" or action == "depth":
def threaded_execution(execution_list_thread, index_thread):
log.debug(f"Thread {index_thread} started")
try:
log.execute_log_parse(Execute.script(execution_list_thread[index_thread]))
log.parse_execution(Execute.script(execution_list_thread[index_thread]))
log.info(f"{execution_list_thread[index_thread]} executed")
except UnicodeDecodeError as err:
log.error(f"Error in thread: {err}")
Expand Down Expand Up @@ -322,14 +298,15 @@ def threaded_execution(execution_list_thread, index_thread):
try:
execution_list = generate_execution_list(action)
for file in range(len(execution_list)): # Loop through List
log.execute_log_parse(Execute.script(execution_list[file]))
log.parse_execution(Execute.script(execution_list[file]))
log.info(f"{execution_list[file]} executed")
except UnicodeDecodeError as e:
log.error(f"Error in code: {e}")
except Exception as e:
log.error(f"Error in code: {e}")


@log.function
def zip_generated_files():
"""Zips generated files based on the action."""

Expand All @@ -347,6 +324,7 @@ def zip_and_log(directory, name):
zip_and_log(".", "CODE")


@log.function
def handle_sub_action():
"""
Handles sub-actions based on the provided sub_action flag.
Expand All @@ -363,11 +341,7 @@ def handle_sub_action():
# log.warning("This feature is not implemented yet! Sorry")


# Initialization
FileManagement.mkdir()

if __name__ == "__main__":
log = Log({"log_level": DEBUG})
# Get flags and configs
action, sub_action = get_flags()
# Check for special actions
Expand All @@ -379,8 +353,6 @@ def handle_sub_action():
execute_scripts()
# Zip generated files
zip_generated_files()
# Attempt event log deletion
attempt_hide()
# Finish with sub actions
log.info("Completed successfully!")
# Finish with sub actions
Expand Down
6 changes: 2 additions & 4 deletions CODE/__lib_class.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,14 @@
import ctypes
import hashlib
import json
import os
import os.path
import shutil
import subprocess
import zipfile
from datetime import datetime
from pathlib import Path
from subprocess import CompletedProcess

from __lib_log import Log
from __lib_log import *


class Flag:
Expand Down Expand Up @@ -316,7 +314,7 @@ class Zip:
__move_files(filename: str):
Moves the zip file and its hash file to designated directories.

and_hash(self, path: str, name: str, flag: str) -> tuple | str:
and_hash(cls, path: str, name: str, flag: str) -> tuple | str:
Zips files, generates a SHA256 hash, and moves the files.
"""

Expand Down
54 changes: 34 additions & 20 deletions CODE/__lib_log.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,25 @@ def __pad_message(message: str) -> str:
else message[:150] + "..."
) + "|"

def __internal(self, message):
"""
Logs an internal message.

:param message: The internal message to be logged.
"""
if self.color and message != "None" and message is not None:
colorlog.log(self.INTERNAL_LOG_LEVEL, str(message))

@staticmethod
def debug(message):
"""
Logs a debug message.

:param message: The debug message to be logged.
"""
if message != "None" and message is not None:
colorlog.debug(str(message))

def raw(self, message):
"""
Logs a raw message directly to the log file.
Expand Down Expand Up @@ -172,16 +191,6 @@ def critical(self, message):
f"[{self.__timestamp()}] > CRITICAL: | {self.__pad_message(str(message))}"
)

@staticmethod
def debug(message):
"""
Logs a debug message.

:param message: The debug message to be logged.
"""
if message != "None" and message is not None:
colorlog.debug(str(message))

def string(self, message, type: str):
"""
Logs a message with a specified type. Supported types are 'debug', 'info', 'warning', 'error', 'critical'
Expand Down Expand Up @@ -212,21 +221,26 @@ def exception(self, message, exception_type: Type = Exception):
)
raise exception_type(message)

def __internal(self, message):
"""
Logs an internal message.

:param message: The internal message to be logged.
"""
if self.color and message != "None" and message is not None:
colorlog.log(self.INTERNAL_LOG_LEVEL, str(message))

def execute_log_parse(self, message_log):
def parse_execution(self, message_log: list[list[str]]):
if message_log:
for message_list in message_log:
if len(message_list) == 2:
self.string(message_list[0], message_list[1])

def function(self, func: callable):
def wrapper(*args, **kwargs):
if not callable(func):
self.exception(f"Function {func.__name__} is not callable.",
TypeError)
start_time = datetime.now()
self.debug(f"Running the function {func.__name__}().")
result = func(*args, **kwargs)
end_time = datetime.now()
elapsed_time = end_time - start_time
self.debug(f"Function {func.__name__}() executed in {elapsed_time}.")
return result
return wrapper


if __name__ == "__main__":
Log().exception(
Expand Down
27 changes: 0 additions & 27 deletions CODE/__wrapper__.py

This file was deleted.

9 changes: 7 additions & 2 deletions CODE/_debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@


class HealthCheck:
@log_debug.function
def get_online_config(
self,
) -> bool | tuple[tuple[str, str, str], tuple[str, str, str]]:
Expand Down Expand Up @@ -92,7 +93,8 @@ def __check_files(local_files: list, remote_files: list) -> tuple[str, str, str]

class DebugCheck:
@staticmethod
def SysInternal_Binaries(path: str) -> tuple[str, str]:
@log_debug.function
def sys_internal_binaries(path: str) -> tuple[str, str]:
"""
Checks the contents of the given path and determines the status of the SysInternal Binaries.

Expand Down Expand Up @@ -135,6 +137,7 @@ def SysInternal_Binaries(path: str) -> tuple[str, str]:
return f"An Unexpected error occurred: {e}", "ERROR"

@staticmethod
@log_debug.function
def execution_policy() -> bool:
"""
Checks the current PowerShell execution policy.
Expand All @@ -150,6 +153,7 @@ def execution_policy() -> bool:
return result.stdout.strip().lower() == "unrestricted"

@staticmethod
@log_debug.function
def cpu_info() -> tuple[str, str, str]:
"""
Retrieves information about the CPU.
Expand All @@ -164,6 +168,7 @@ def cpu_info() -> tuple[str, str, str]:
)


@log_debug.function
def debug():
"""
Performs a series of system checks and logs the results.
Expand All @@ -181,7 +186,7 @@ def debug():
log_debug.string(file_tuple[0], file_tuple[2])

# Check SysInternal Binaries
message, type = DebugCheck.SysInternal_Binaries("SysInternal_Suite")
message, type = DebugCheck.sys_internal_binaries("SysInternal_Suite")
log_debug.string(message, type)

# Check Admin
Expand Down
6 changes: 4 additions & 2 deletions CODE/_dev.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@

from __lib_class import *

if __name__ == "__main__":
log_dev = Log({"log_level": DEBUG})


class Dev:
@staticmethod
Expand Down Expand Up @@ -54,6 +57,7 @@ def __prompt_user(question: str, file_to_open: str = None, special: bool = False
except Exception as e:
log_dev.error(e)

@log_dev.function
def dev_checks(self) -> str | None:
"""
Performs a series of checks to ensure that the developer has followed the required guidelines and best practices.
Expand Down Expand Up @@ -93,8 +97,6 @@ def dev_checks(self) -> str | None:
return str(e)


if __name__ == "__main__":
log_dev = Log({"log_level": DEBUG})
message = Dev().dev_checks()
if message is not None:
log_dev.error(message)
1 change: 1 addition & 0 deletions CODE/_extra.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
log = Log({"log_level": DEBUG})


@log.function
def menu():
"""
Displays a menu of available executable scripts in the '../EXTRA/EXTRA' directory,
Expand Down
1 change: 1 addition & 0 deletions CODE/cmd_commands.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
log = Log({"log_level": DEBUG})


@log.function
def command(file: str, commands: str, message: str, encoding: str = "UTF-8") -> None:
"""
Executes a command and writes the output to a file.
Expand Down
Loading
Loading