|
1 | 1 | from __future__ import annotations |
2 | 2 |
|
3 | 3 | import configparser |
4 | | -import os.path |
| 4 | +import os |
5 | 5 | import platform |
6 | 6 | import subprocess |
7 | 7 | import sys |
|
17 | 17 |
|
18 | 18 |
|
19 | 19 | class HealthCheck: |
20 | | - @log_debug.function |
21 | | - def get_online_config(self) -> bool | tuple[str, str, str]: |
| 20 | + @staticmethod |
| 21 | + def check_files(directory: str, required_files: list[str]) -> tuple[str, str]: |
| 22 | + """ |
| 23 | + Checks if all required files are present in the directory and its subdirectories. |
| 24 | +
|
| 25 | + Args: |
| 26 | + directory (str): Path to the directory to check. |
| 27 | + required_files (list[str]): List of required file names with relative paths. |
| 28 | +
|
| 29 | + Returns: |
| 30 | + tuple[str, str]: Status message and severity level. |
| 31 | + """ |
| 32 | + try: |
| 33 | + log_debug.debug(f"Checking directory: {directory}") |
| 34 | + if not os.path.exists(directory): |
| 35 | + log_debug.error(f"Directory {directory} does not exist.") |
| 36 | + return f"Directory {directory} does not exist.", "ERROR" |
| 37 | + |
| 38 | + # Gather all files with relative paths |
| 39 | + actual_files = [] |
| 40 | + for root, _, files in os.walk(directory): |
| 41 | + for file in files: |
| 42 | + relative_path = os.path.relpath(os.path.join(root, file), start=directory) |
| 43 | + actual_files.append( |
| 44 | + relative_path.replace("\\", "/").replace('"', '')) # Normalize paths for comparison |
| 45 | + |
| 46 | + log_debug.debug(f"Actual files found: {actual_files}") |
| 47 | + |
| 48 | + # Track missing and extra files |
| 49 | + missing_files = [] |
| 50 | + extra_files = [] |
| 51 | + |
| 52 | + # Normalize required files |
| 53 | + normalized_required_files = [required_file.strip().replace("\\", "/").replace('"', '') for required_file in |
| 54 | + required_files] |
| 55 | + |
| 56 | + # Check for missing files |
| 57 | + for required_file in normalized_required_files: |
| 58 | + if required_file not in actual_files: |
| 59 | + missing_files.append(required_file) |
| 60 | + |
| 61 | + log_debug.debug(f"Missing files: {missing_files}") |
| 62 | + |
| 63 | + # Check for extra files |
| 64 | + for actual_file in actual_files: |
| 65 | + if actual_file not in normalized_required_files: |
| 66 | + extra_files.append(actual_file) |
| 67 | + |
| 68 | + log_debug.debug(f"Extra files: {extra_files}") |
| 69 | + |
| 70 | + if missing_files: |
| 71 | + return f"Missing files: {', '.join(missing_files)}", "ERROR" |
| 72 | + if extra_files: |
| 73 | + return f"Extra files found: {', '.join(extra_files)}", "WARNING" |
| 74 | + return "All required files are present.", "INFO" |
| 75 | + |
| 76 | + except Exception as e: |
| 77 | + log_debug.error(f"Unexpected error during file check: {e}") |
| 78 | + return f"Unexpected error during file check: {e}", "ERROR" |
| 79 | + |
| 80 | + @staticmethod |
| 81 | + def get_online_config() -> dict | None: |
22 | 82 | """ |
23 | | - Retrieves configuration data from a remote repository and compares it with the local configuration. |
| 83 | + Retrieves configuration data from a remote repository. |
24 | 84 |
|
25 | 85 | Returns: |
26 | | - bool: False if a connection error occurs, otherwise a tuple containing version check and file check results. |
27 | | - tuple[tuple[str, str, str], tuple[str, str, str]]: A tuple containing version check and file check results. |
| 86 | + dict: Parsed configuration data if successful. |
| 87 | + None: If there was an error fetching the configuration. |
28 | 88 | """ |
29 | 89 | try: |
30 | 90 | url = "https://raw.githubusercontent.com/DefinetlyNotAI/Logicytics/main/CODE/config.ini" |
31 | 91 | config = configparser.ConfigParser() |
32 | 92 | config.read_string(requests.get(url, timeout=15).text) |
33 | | - except requests.exceptions.ConnectionError: |
34 | | - log_debug.warning("No connection found") |
35 | | - return False |
36 | | - version_check = self.__compare_versions(VERSION, config["System Settings"]["version"]) |
37 | | - |
38 | | - return version_check |
| 93 | + return config |
| 94 | + except requests.exceptions.RequestException as e: |
| 95 | + log_debug.warning(f"Connection error: {e}") |
| 96 | + return None |
39 | 97 |
|
40 | 98 | @staticmethod |
41 | | - def __compare_versions( |
42 | | - local_version: str, remote_version: str |
43 | | - ) -> tuple[str, str, str]: |
| 99 | + def compare_versions(local_version: str, remote_version: str) -> tuple[str, str, str]: |
44 | 100 | """ |
45 | | - Compares the local version with the remote version and returns a tuple containing a comparison result message, |
46 | | - a version information message, and a severity level. |
| 101 | + Compares local and remote versions. |
47 | 102 |
|
48 | 103 | Args: |
49 | | - local_version (str): The version number of the local system. |
50 | | - remote_version (str): The version number of the remote repository. |
| 104 | + local_version (str): Local version. |
| 105 | + remote_version (str): Remote version. |
51 | 106 |
|
52 | 107 | Returns: |
53 | | - tuple[str, str, str]: A tuple containing a comparison result message, a version information message, and a severity level. |
| 108 | + tuple[str, str, str]: Comparison result, version details, and severity level. |
54 | 109 | """ |
55 | 110 | if local_version == remote_version: |
56 | 111 | return "Version is up to date.", f"Your Version: {local_version}", "INFO" |
57 | | - elif local_version > remote_version: |
| 112 | + if local_version > remote_version: |
58 | 113 | return ( |
59 | 114 | "Version is ahead of the repository.", |
60 | 115 | f"Your Version: {local_version}, Repository Version: {remote_version}", |
61 | 116 | "WARNING", |
62 | 117 | ) |
63 | | - else: |
64 | | - return ( |
65 | | - "Version is behind the repository.", |
66 | | - f"Your Version: {local_version}, Repository Version: {remote_version}", |
67 | | - "ERROR", |
68 | | - ) |
| 118 | + return ( |
| 119 | + "Version is behind the repository.", |
| 120 | + f"Your Version: {local_version}, Repository Version: {remote_version}", |
| 121 | + "ERROR", |
| 122 | + ) |
69 | 123 |
|
70 | 124 |
|
71 | 125 | class DebugCheck: |
72 | 126 | @staticmethod |
73 | | - @log_debug.function |
74 | 127 | def sys_internal_binaries(path: str) -> tuple[str, str]: |
75 | 128 | """ |
76 | | - Checks the contents of the given path and determines the status of the SysInternal Binaries. |
| 129 | + Checks the SysInternal Binaries in the given directory. |
77 | 130 |
|
78 | 131 | Args: |
79 | | - path (str): The path to the directory containing the SysInternal Binaries. |
| 132 | + path (str): Directory path. |
80 | 133 |
|
81 | 134 | Returns: |
82 | | - tuple[str, str]: A tuple containing a status message and a severity level. |
83 | | - The status message indicates the result of the check. |
84 | | - The severity level is either "INFO", "WARNING", or "ERROR". |
85 | | -
|
86 | | - Raises: |
87 | | - FileNotFoundError: If the given path does not exist. |
88 | | - Exception: If an unexpected error occurs during the check. |
| 135 | + tuple[str, str]: Status message and severity level. |
89 | 136 | """ |
90 | 137 | try: |
| 138 | + if not os.path.exists(path): |
| 139 | + raise FileNotFoundError("Directory does not exist") |
| 140 | + |
91 | 141 | contents = os.listdir(path) |
92 | 142 | log_debug.debug(str(contents)) |
| 143 | + |
| 144 | + has_zip = any(file.endswith(".zip") for file in contents) |
| 145 | + has_exe = any(file.endswith(".exe") for file in contents) |
| 146 | + |
93 | 147 | if any(file.endswith(".ignore") for file in contents): |
94 | 148 | return "A `.sys.ignore` file was found - Ignoring", "WARNING" |
95 | | - if any(file.endswith(".zip") for file in contents) and not any( |
96 | | - file.endswith(".exe") for file in contents |
97 | | - ): |
98 | | - return "Only zip files - Missing EXE's due to no `ignore` file", "ERROR" |
99 | | - elif any(file.endswith(".zip") for file in contents) and any( |
100 | | - file.endswith(".exe") for file in contents |
101 | | - ): |
| 149 | + if has_zip and not has_exe: |
| 150 | + return "Only zip files - Missing EXEs due to no `ignore` file", "ERROR" |
| 151 | + if has_zip and has_exe: |
102 | 152 | return "Both zip and exe files - All good", "INFO" |
103 | | - else: |
104 | | - return ( |
105 | | - "SysInternal Binaries Not Found: Missing Files - Corruption detected", |
106 | | - "ERROR", |
107 | | - ) |
108 | | - except FileNotFoundError: |
109 | | - return ( |
110 | | - "SysInternal Binaries Not Found: Missing Directory- Corruption detected", |
111 | | - "ERROR", |
112 | | - ) |
| 153 | + |
| 154 | + return "SysInternal Binaries Not Found: Missing Files - Corruption detected", "ERROR" |
113 | 155 | except Exception as e: |
114 | | - return f"An Unexpected error occurred: {e}", "ERROR" |
| 156 | + return f"Unexpected error: {e}", "ERROR" |
115 | 157 |
|
116 | 158 | @staticmethod |
117 | | - @log_debug.function |
118 | 159 | def execution_policy() -> bool: |
119 | 160 | """ |
120 | | - Checks the current PowerShell execution policy. |
| 161 | + Checks if the execution policy is unrestricted. |
121 | 162 |
|
122 | 163 | Returns: |
123 | | - bool: True if the execution policy is unrestricted, False otherwise. |
| 164 | + bool: True if unrestricted, False otherwise. |
124 | 165 | """ |
125 | | - result = subprocess.run( |
126 | | - ["powershell", "-Command", "Get-ExecutionPolicy"], |
127 | | - capture_output=True, |
128 | | - text=True, |
129 | | - ) |
130 | | - return result.stdout.strip().lower() == "unrestricted" |
| 166 | + try: |
| 167 | + result = subprocess.run( |
| 168 | + ["powershell", "-Command", "Get-ExecutionPolicy"], |
| 169 | + capture_output=True, |
| 170 | + text=True, |
| 171 | + ) |
| 172 | + return result.stdout.strip().lower() == "unrestricted" |
| 173 | + except Exception as e: |
| 174 | + log_debug.error(f"Failed to check execution policy: {e}") |
| 175 | + return False |
131 | 176 |
|
132 | 177 | @staticmethod |
133 | | - @log_debug.function |
134 | 178 | def cpu_info() -> tuple[str, str, str]: |
135 | 179 | """ |
136 | | - Retrieves information about the CPU. |
| 180 | + Retrieves CPU details. |
137 | 181 |
|
138 | 182 | Returns: |
139 | | - tuple[str, str, str]: A tuple containing the CPU architecture, vendor ID, and model. |
| 183 | + tuple[str, str, str]: Architecture, vendor ID, and model. |
140 | 184 | """ |
141 | 185 | return ( |
142 | | - "CPU Architecture: " + platform.machine(), |
143 | | - "CPU Vendor ID: " + platform.system(), |
144 | | - "CPU Model: " + f"{platform.release()} {platform.version()}", |
| 186 | + f"CPU Architecture: {platform.machine()}", |
| 187 | + f"CPU Vendor ID: {platform.system()}", |
| 188 | + f"CPU Model: {platform.release()} {platform.version()}", |
145 | 189 | ) |
146 | 190 |
|
147 | 191 |
|
148 | | -@log_debug.function |
149 | 192 | def debug(): |
150 | 193 | """ |
151 | | - Performs a series of system checks and logs the results. |
| 194 | + Executes system checks and logs results. |
152 | 195 | """ |
153 | 196 | # Clear Debug Log |
154 | 197 | log_path = "../ACCESS/LOGS/DEBUG/DEBUG.LOG" |
155 | 198 | if os.path.exists(log_path): |
156 | 199 | os.remove(log_path) |
157 | 200 |
|
158 | | - # Check File integrity (Online) |
159 | | - online_config = HealthCheck().get_online_config() |
160 | | - if online_config: |
161 | | - version_tuple = online_config |
162 | | - log_debug.string(version_tuple[0], version_tuple[2]) |
163 | | - log_debug.raw(f"[{datetime.now().strftime('%Y-%m-%d %H:%M:%S')}] > DATA: | {version_tuple[1] + ' ' * (153 - len(version_tuple[1])) + '|'}") |
| 201 | + # Online Configuration Check |
| 202 | + config = HealthCheck.get_online_config() |
| 203 | + if config: |
| 204 | + version_check = HealthCheck.compare_versions(VERSION, config["System Settings"]["version"]) |
| 205 | + log_debug.string(version_check[0], version_check[2]) |
| 206 | + log_debug.raw(f"[{datetime.now():%Y-%m-%d %H:%M:%S}] > DATA: {version_check[1]}") |
164 | 207 |
|
165 | | - # Check SysInternal Binaries |
166 | | - message, type = DebugCheck.sys_internal_binaries("SysInternal_Suite") |
167 | | - log_debug.string(message, type) |
| 208 | + # File Integrity Check |
| 209 | + required_files = config["System Settings"].get("files", "").split(",") |
| 210 | + message, severity = HealthCheck.check_files(".", required_files) |
| 211 | + log_debug.string(message, severity) |
168 | 212 |
|
169 | | - # Check Admin |
170 | | - log_debug.info("Admin privileges found" if Check.admin() else "Admin privileges not found") |
| 213 | + # SysInternal Binaries Check |
| 214 | + message, severity = DebugCheck.sys_internal_binaries("SysInternal_Suite") |
| 215 | + log_debug.string(message, severity) |
171 | 216 |
|
172 | | - # Check UAC |
| 217 | + # System Checks |
| 218 | + log_debug.info("Admin privileges found" if Check.admin() else "Admin privileges not found") |
173 | 219 | log_debug.info("UAC enabled" if Check.uac() else "UAC disabled") |
174 | | - |
175 | | - # Log Execution Paths |
176 | 220 | log_debug.info(f"Execution path: {psutil.__file__}") |
177 | 221 | log_debug.info(f"Global execution path: {sys.executable}") |
178 | 222 | log_debug.info(f"Local execution path: {sys.prefix}") |
| 223 | + log_debug.info( |
| 224 | + "Running in a virtual environment" if sys.prefix != sys.base_prefix else "Not running in a virtual environment") |
179 | 225 |
|
180 | | - # Check if running in a virtual environment |
181 | | - log_debug.info("Running in a virtual environment" if sys.prefix != sys.base_prefix else "Not running in a virtual environment") |
182 | | - |
183 | | - # Check Execution Policy |
184 | | - log_debug.info("Execution policy is unrestricted" if DebugCheck.execution_policy() else "Execution policy is not unrestricted") |
| 226 | + # Execution Policy Check |
| 227 | + log_debug.info( |
| 228 | + "Execution policy is unrestricted" if DebugCheck.execution_policy() else "Execution policy is not unrestricted") |
185 | 229 |
|
186 | | - # Get Python Version |
| 230 | + # Python Version Check |
| 231 | + python_version = sys.version.split()[0] |
187 | 232 | try: |
188 | | - major, minor = map(int, sys.version.split()[0].split(".")[:2]) |
189 | | - if major == 3 and minor == 11: |
190 | | - log_debug.info(f"Python Version Used: {sys.version.split()[0]} - Perfect") |
| 233 | + major, minor = map(int, python_version.split(".")[:2]) |
| 234 | + if (major, minor) == (3, 11): |
| 235 | + log_debug.info(f"Python Version: {python_version} - Perfect") |
191 | 236 | elif major == 3: |
192 | | - log_debug.warning(f"Python Version Used: {sys.version.split()[0]} - Recommended Version is: 3.11.X") |
| 237 | + log_debug.warning(f"Python Version: {python_version} - Recommended: 3.11.x") |
193 | 238 | else: |
194 | | - log_debug.error(f"Python Version Used: {sys.version.split()[0]} - Incompatible Version") |
| 239 | + log_debug.error(f"Python Version: {python_version} - Incompatible") |
195 | 240 | except Exception as e: |
196 | | - log_debug.error(f"Failed to get Python Version: {e}") |
| 241 | + log_debug.error(f"Failed to parse Python Version: {e}") |
197 | 242 |
|
198 | | - # Get Repo Path |
199 | | - log_debug.info(os.path.abspath(__file__).removesuffix("\\CODE\\_debug.py")) |
| 243 | + # CPU Info |
| 244 | + for info in DebugCheck.cpu_info(): |
| 245 | + log_debug.info(info) |
200 | 246 |
|
201 | | - # Get CPU Info |
202 | | - architecture, vID, cpuModel = DebugCheck.cpu_info() |
203 | | - log_debug.info(architecture) |
204 | | - log_debug.info(vID) |
205 | | - log_debug.info(cpuModel) |
206 | | - |
207 | | - # Get config data |
| 247 | + # Final Debug Status |
208 | 248 | log_debug.info(f"Debug: {DEBUG}") |
209 | 249 |
|
210 | 250 |
|
211 | | -debug() |
212 | | -input("Press Enter to exit...") |
213 | | -exit(0) |
| 251 | +if __name__ == "__main__": |
| 252 | + debug() |
| 253 | + input("Press Enter to exit...") |
0 commit comments