forked from PierreGode/Ragnar
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathshared.py
More file actions
executable file
·1813 lines (1619 loc) · 84.8 KB
/
shared.py
File metadata and controls
executable file
·1813 lines (1619 loc) · 84.8 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#shared.py
# Description:
# This file, shared.py, is a core component responsible for managing shared resources and data for different modules in the Ragnar project.
# It handles the initialization and configuration of paths, logging, fonts, and images. Additionally, it sets up the environment,
# creates necessary directories and files, and manages the loading and saving of configuration settings.
#
# Key functionalities include:
# - Initializing various paths used by the application, including directories for configuration, data, actions, web resources, and logs.
# - Setting up the environment, including the e-paper display, network knowledge base, and actions JSON configuration.
# - Loading and managing fonts and images required for the application's display.
# - Handling the creation and management of a live status file to store the current status of network scans.
# - Managing configuration settings, including loading default settings, updating, and saving configurations to a JSON file.
# - Providing utility functions for reading and writing data to CSV files and DB, updating statistics, and wrapping text for display purposes.
import os
import re
import json
import importlib
import random
import time
import csv
import logging
import subprocess
import threading
import traceback
from datetime import datetime
try:
from PIL import Image, ImageFont
except ImportError:
Image = None
ImageFont = None
from logger import Logger
try:
from epd_helper import EPDHelper
except ImportError:
EPDHelper = None
try:
from db_manager import get_db
except ImportError:
get_db = None
try:
from network_storage import NetworkStorageManager
except ImportError:
NetworkStorageManager = None
try:
from multi_interface import MultiInterfaceState, NetworkContextRegistry
except ImportError:
MultiInterfaceState = None
NetworkContextRegistry = None
DEFAULT_EPD_TYPE = "epd2in13_V4"
DESIGN_REF_WIDTH = 122 # All layout coordinates are designed for this width
DESIGN_REF_HEIGHT = 250 # All layout coordinates are designed for this height
# Map web UI size keys to default driver names
SIZE_KEY_TO_DEFAULT_DRIVER = {
"2in13": "epd2in13_V4",
"2in7": "epd2in7_V2",
"2in9": "epd2in9_V2",
"3in7": "epd3in7",
"1in28_tft": "gc9a01",
"0in96_oled": "ssd1306",
"1602_lcd": "lcd1602",
}
def resolve_epd_type(size_key, current_epd_type=None):
"""Resolve a web UI size key to the correct driver name.
If the current driver is already the same size family (e.g. epd2in13_V3 for 2in13),
keep it. Otherwise, switch to the default driver for the new size.
"""
if size_key == "auto" or size_key in DISPLAY_PROFILES:
return size_key # Already a valid driver name or auto
default_driver = SIZE_KEY_TO_DEFAULT_DRIVER.get(size_key)
if not default_driver:
return size_key # Unknown key, return as-is
# If current driver is the same size family AND is a known valid profile, keep it
if current_epd_type and current_epd_type.startswith(f"epd{size_key}"):
if current_epd_type in DISPLAY_PROFILES:
return current_epd_type
return default_driver
DISPLAY_PROFILES = {
"epd2in13": {"ref_width": DESIGN_REF_WIDTH, "ref_height": DESIGN_REF_HEIGHT, "default_flip": False},
"epd2in7": {"ref_width": DESIGN_REF_WIDTH, "ref_height": DESIGN_REF_HEIGHT, "default_flip": False},
"epd2in7_V2": {"ref_width": DESIGN_REF_WIDTH, "ref_height": DESIGN_REF_HEIGHT, "default_flip": False},
"epd2in9_V2": {"ref_width": DESIGN_REF_WIDTH, "ref_height": DESIGN_REF_HEIGHT, "default_flip": False},
"epd3in7": {"ref_width": DESIGN_REF_WIDTH, "ref_height": DESIGN_REF_HEIGHT, "default_flip": False},
"epd2in13_V2": {"ref_width": DESIGN_REF_WIDTH, "ref_height": DESIGN_REF_HEIGHT, "default_flip": False},
"epd2in13_V3": {"ref_width": DESIGN_REF_WIDTH, "ref_height": DESIGN_REF_HEIGHT, "default_flip": True},
"epd2in13_V4": {"ref_width": DESIGN_REF_WIDTH, "ref_height": DESIGN_REF_HEIGHT, "default_flip": False},
# GC9A01 1.28" 240x240 round colour TFT LCD
"gc9a01": {"ref_width": DESIGN_REF_WIDTH, "ref_height": DESIGN_REF_WIDTH, "default_flip": False},
# SSD1306 0.96" 128x64 monochrome OLED
"ssd1306": {"ref_width": 128, "ref_height": 64, "default_flip": False},
# LCD1602 16x2 character LCD (I2C via PCF8574 backpack)
"lcd1602": {"ref_width": 16, "ref_height": 2, "default_flip": False},
}
logger = Logger(name="shared.py", level=logging.DEBUG) # Create a logger object
class SharedData:
"""Shared data between the different modules."""
def __init__(self):
# Detect Pager mode (set by pager_payload.sh before Python launch)
self._pager_mode = os.environ.get('RAGNAR_PAGER_MODE') == '1'
self.initialize_paths() # Initialize the paths used by the application
# --- Network storage & context (not available on Pager) ---
if not self._pager_mode and NetworkStorageManager is not None:
self.storage_manager = NetworkStorageManager(self.datadir)
else:
self.storage_manager = None
self.active_network_ssid = None
self.active_network_slug = None
self.current_network_dir = None
self.current_network_db_path = None
self.network_intelligence_dir = None
self.network_threat_dir = None
if self.storage_manager is not None:
self._apply_network_context(
self.storage_manager.get_active_context(),
configure_db=False
)
if not self._pager_mode and NetworkContextRegistry is not None:
self.context_registry = NetworkContextRegistry(self)
else:
self.context_registry = None
self.status_list = []
self.last_comment_time = time.time() # Last time a comment was displayed
self._stats_lock = threading.Lock() # Thread-safe lock for update_stats()
self.default_config = self.get_default_config() # Default configuration of the application
self.config = self.default_config.copy() # Configuration of the application
# Load existing configuration first
self.load_config()
if not self._pager_mode and MultiInterfaceState is not None:
self.multi_interface_state = MultiInterfaceState(self)
else:
self.multi_interface_state = None
# Ensure the selected EPD profile is consistent and expose flip settings early
self.config.setdefault('epd_type', DEFAULT_EPD_TYPE)
self.apply_display_profile(self.config['epd_type'], set_orientation_if_missing=True, persist=not self._pager_mode)
self.screen_reversed = bool(self.config.get('screen_reversed', False))
self.web_screen_reversed = self.screen_reversed
# Check if auth is configured and DB might be encrypted
if not self._pager_mode:
self.auth_configured = self._check_auth_configured()
else:
self.auth_configured = False
# Initialize SQLite database manager
# If auth is configured and DB is encrypted, db may be None until login
if get_db is not None:
try:
self.db = get_db(currentdir=self.currentdir)
self._configure_database()
except Exception as e:
logger.warning(f"Database initialization failed: {e}")
self.db = None
else:
self.db = None
# Update MAC blacklist without immediate save
self.update_mac_blacklist()
self.setup_environment(clear_console=False) # Setup the environment without clearing console
self.initialize_variables() # Initialize the variables used by the application
self.load_gamification_data() # Load persistent gamification progress
# Initialize network intelligence and AI service in background
# to avoid blocking startup (these are not needed immediately)
self.network_intelligence = None
self.threat_intelligence = None # type: ignore
self.ai_service = None
self.scanned_networks_count = 0
self._deferred_init_done = threading.Event()
self.create_livestatusfile()
# Defer heavy I/O (fonts, images, AI, network intelligence) to a
# background thread so the main thread can continue to start the
# display and web server sooner.
if not self._pager_mode:
threading.Thread(target=self._deferred_init, daemon=True).start()
else:
# Pager mode: load fonts/images synchronously (lightweight)
self.load_fonts()
self.load_images()
self._deferred_init_done.set()
# Start background cleanup task for old hosts (needs DB)
if not self._pager_mode and self.db is not None:
self._start_cleanup_task()
def _deferred_init(self):
"""Run heavy initialization tasks in a background thread.
Loads fonts, images, network intelligence, AI service, and
network counts without blocking the main startup path.
"""
try:
self.load_fonts()
self.load_images()
self.initialize_network_intelligence()
self.initialize_ai_service()
self.scanned_networks_count = self._calculate_scanned_networks_count()
logger.info("Deferred initialization completed")
except Exception as e:
logger.error(f"Deferred initialization error: {e}")
finally:
self._deferred_init_done.set()
def wait_for_deferred_init(self, timeout: float = 30.0) -> bool:
"""Wait for deferred init to finish (used by display before first render)."""
return self._deferred_init_done.wait(timeout=timeout)
def initialize_network_intelligence(self):
"""Initialize the network intelligence system"""
try:
from network_intelligence import NetworkIntelligence
self.network_intelligence = NetworkIntelligence(self)
logger.info("Network intelligence system initialized successfully")
except Exception as e:
logger.error(f"Failed to initialize network intelligence: {e}")
self.network_intelligence = None
def initialize_ai_service(self):
"""Initialize the AI service"""
try:
from ai_service import AIService
logger.info("Attempting to initialize AI service...")
self.ai_service = AIService(self)
if self.ai_service.is_enabled():
logger.info("AI service initialized successfully with GPT-5 Nano")
else:
init_error = getattr(self.ai_service, 'initialization_error', None)
if init_error:
logger.warning(f"AI service initialized but not enabled: {init_error}")
else:
logger.info("AI service initialized but not enabled (check configuration)")
except ImportError as e:
logger.error(f"Failed to import AI service module: {e}")
import traceback
logger.error(traceback.format_exc())
self.ai_service = None
except Exception as e:
logger.error(f"Failed to initialize AI service: {e}")
import traceback
logger.error(traceback.format_exc())
self.ai_service = None
def _calculate_scanned_networks_count(self) -> int:
"""Calculate the number of scanned networks (excluding defaults)."""
manager = getattr(self, 'storage_manager', None)
networks_dir = None
default_slug = 'default'
if manager is not None:
networks_dir = getattr(manager, 'networks_dir', None)
default_ssid = getattr(manager, 'default_ssid', None)
slugify = getattr(manager, '_slugify', None)
if callable(slugify) and default_ssid is not None:
try:
default_slug = slugify(default_ssid)
except Exception:
default_slug = 'default'
if not networks_dir:
networks_dir = os.path.join(self.datadir, 'networks')
logger.debug(f"SCANNED_NETWORKS: Calculating using directory {networks_dir}, default_slug={default_slug}")
try:
entries = os.listdir(networks_dir)
except OSError as exc:
logger.warning(f"SCANNED_NETWORKS: Unable to read {networks_dir}: {exc}")
return 0
count = 0
for entry in entries:
if entry.startswith('.'):
continue
path = os.path.join(networks_dir, entry)
if not os.path.isdir(path):
continue
if entry in (default_slug, 'default'):
continue
count += 1
logger.info(f"SCANNED_NETWORKS: Calculated count {count} (excluding {default_slug})")
return count
def initialize_paths(self):
"""Initialize the paths used by the application."""
"""Folders paths"""
self.currentdir = os.path.dirname(os.path.abspath(__file__))
# Directories directly under currentdir
self.configdir = os.path.join(self.currentdir, 'config')
self.datadir = os.path.join(self.currentdir, 'data')
self.actions_dir = os.path.join(self.currentdir, 'actions')
self.webdir = os.path.join(self.currentdir, 'web')
self.resourcesdir = os.path.join(self.currentdir, 'resources')
self.backupbasedir = os.path.join(self.currentdir, 'backup')
# Directories under backupbasedir
self.backupdir = os.path.join(self.backupbasedir, 'backups')
self.upload_dir = os.path.join(self.backupbasedir, 'uploads')
# Directories under datadir
self.logsdir = os.path.join(self.datadir, 'logs')
self.output_dir = os.path.join(self.datadir, 'output')
self.input_dir = os.path.join(self.datadir, 'input')
# Directories under output_dir
self._default_crackedpwddir = os.path.join(self.output_dir, 'crackedpwd')
self._default_datastolendir = os.path.join(self.output_dir, 'data_stolen')
self.crackedpwddir = self._default_crackedpwddir
self.datastolendir = self._default_datastolendir
self.zombiesdir = os.path.join(self.output_dir, 'zombies')
self._default_vulnerabilities_dir = os.path.join(self.output_dir, 'vulnerabilities')
self._default_scan_results_dir = os.path.join(self.output_dir, "scan_results")
self.vulnerabilities_dir = self._default_vulnerabilities_dir
self.scan_results_dir = self._default_scan_results_dir
# Directories under resourcesdir
self.picdir = os.path.join(self.resourcesdir, 'images')
self.fontdir = os.path.join(self.resourcesdir, 'fonts')
self.commentsdir = os.path.join(self.resourcesdir, 'comments')
# Directories under picdir
self.statuspicdir = os.path.join(self.picdir, 'status')
self.staticpicdir = os.path.join(self.picdir, 'static')
# Directory under input_dir
self.dictionarydir = os.path.join(self.input_dir, "dictionary")
"""Files paths"""
# Files directly under configdir
self.shared_config_json = os.path.join(self.configdir, 'shared_config.json')
self.actions_file = os.path.join(self.configdir, 'actions.json')
# Files directly under resourcesdir
self.commentsfile = os.path.join(self.commentsdir, 'comments.json')
# Files directly under datadir
self.netkbfile = os.path.join(self.datadir, "netkb.csv")
self.livestatusfile = os.path.join(self.datadir, 'livestatus.csv')
self.gamification_file = os.path.join(self.datadir, 'gamification.json')
self.pwnagotchi_status_file = os.path.join(self.datadir, 'pwnagotchi_status.json')
# Files directly under vulnerabilities_dir (kept in sync via _update_output_paths)
self.vuln_summary_file = os.path.join(self.vulnerabilities_dir, 'vulnerability_summary.csv')
self.vuln_scan_progress_file = os.path.join(self.vulnerabilities_dir, 'scan_progress.json')
# Files directly under dictionarydir
self.usersfile = os.path.join(self.dictionarydir, "users.txt")
self.passwordsfile = os.path.join(self.dictionarydir, "passwords.txt")
# Files directly under crackedpwddir
self._refresh_credential_files()
self.crackedpwd_dir = self.crackedpwddir
#Files directly under logsdir
self.webconsolelog = os.path.join(self.logsdir, 'temp_log.txt')
def _apply_network_context(self, context, configure_db=True):
"""Store active network metadata and optionally reconfigure storage."""
if not context:
return
self.active_network_ssid = context.get('ssid')
self.active_network_slug = context.get('slug')
self.current_network_dir = context.get('network_dir')
self.current_network_db_path = context.get('db_path')
self.network_intelligence_dir = context.get('intelligence_dir')
self.network_threat_dir = context.get('threat_intelligence_dir')
loot_data_dir = context.get('data_stolen_dir') or self._default_datastolendir
loot_credentials_dir = context.get('credentials_dir') or self._default_crackedpwddir
self._update_loot_paths(loot_data_dir, loot_credentials_dir)
scan_results_dir = context.get('scan_results_dir') or self._default_scan_results_dir
vulnerabilities_dir = context.get('vulnerabilities_dir') or self._default_vulnerabilities_dir
self._update_output_paths(scan_results_dir, vulnerabilities_dir)
if configure_db and hasattr(self, 'db'):
self._configure_database()
def _configure_database(self):
"""Point the singleton database at the active network store."""
if get_db is None or self._pager_mode:
return
if not self.current_network_dir or not self.current_network_db_path:
return
db = get_db(currentdir=self.currentdir)
db.configure_storage(self.current_network_dir, self.current_network_db_path)
self.db = db
def _check_auth_configured(self):
"""Check if authentication is configured by looking for the auth database."""
import sqlite3
auth_db_path = os.path.join(self.datadir, 'ragnar_auth.db')
if not os.path.exists(auth_db_path):
return False
try:
conn = sqlite3.connect(auth_db_path)
cursor = conn.cursor()
cursor.execute("SELECT COUNT(*) FROM auth")
count = cursor.fetchone()[0]
conn.close()
return count > 0
except Exception:
return False
def _refresh_network_components(self):
"""Ensure dependent subsystems follow the current network context."""
self._configure_database()
if self.network_intelligence and self.network_intelligence_dir:
self.network_intelligence.set_storage_root(self.network_intelligence_dir)
if self.threat_intelligence and self.network_threat_dir:
self.threat_intelligence.set_storage_root(self.network_threat_dir)
def _update_loot_paths(self, data_stolen_dir, credentials_dir):
"""Ensure per-network loot directories exist and refresh file pointers."""
if data_stolen_dir:
os.makedirs(data_stolen_dir, exist_ok=True)
self.datastolendir = data_stolen_dir
if credentials_dir:
os.makedirs(credentials_dir, exist_ok=True)
self.crackedpwddir = credentials_dir
self.crackedpwd_dir = self.crackedpwddir # legacy attribute name used by web UI
self._refresh_credential_files()
def _update_output_paths(self, scan_results_dir, vulnerabilities_dir):
"""Switch scan result and vulnerability dirs to the active network's paths."""
if scan_results_dir:
os.makedirs(scan_results_dir, exist_ok=True)
self.scan_results_dir = scan_results_dir
if vulnerabilities_dir:
os.makedirs(vulnerabilities_dir, exist_ok=True)
self.vulnerabilities_dir = vulnerabilities_dir
self.vuln_summary_file = os.path.join(vulnerabilities_dir, 'vulnerability_summary.csv')
self.vuln_scan_progress_file = os.path.join(vulnerabilities_dir, 'scan_progress.json')
def _refresh_credential_files(self):
"""Keep credential CSV paths aligned with the active credential directory."""
self.sshfile = os.path.join(self.crackedpwddir, 'ssh.csv')
self.smbfile = os.path.join(self.crackedpwddir, "smb.csv")
self.telnetfile = os.path.join(self.crackedpwddir, "telnet.csv")
self.ftpfile = os.path.join(self.crackedpwddir, "ftp.csv")
self.sqlfile = os.path.join(self.crackedpwddir, "sql.csv")
self.rdpfile = os.path.join(self.crackedpwddir, "rdp.csv")
def set_active_network(self, ssid):
"""Public entry point for Wi-Fi manager to switch all storage."""
if not hasattr(self, 'storage_manager'):
return
# Guard against transient Wi-Fi blips: if the new SSID is None
# (Wi-Fi momentarily lost) but we already have an active real
# network, do NOT switch. A genuine disconnection will be
# handled by the Wi-Fi monitoring loop which performs sustained
# validation before acting. Without this guard, a single
# failed check_wifi_connection() call (common on the resource-
# constrained Pi Zero W2) would call mark_all_hosts_degraded()
# and wipe the entire target list to zero.
if not ssid and self.active_network_ssid:
logger.debug(
f"Ignoring transient SSID=None switch (current network: "
f"{self.active_network_ssid})"
)
return
try:
if self.network_intelligence:
self.network_intelligence.save_intelligence_data()
if self.threat_intelligence:
self.threat_intelligence.persist_state()
except Exception as exc:
logger.warning(f"Failed to persist data before network switch: {exc}")
try:
context = self.storage_manager.activate_network(ssid)
except Exception as exc:
logger.error(f"Unable to activate network storage for '{ssid}': {exc}")
return
# Skip reconfiguration if the slug did not change
if (context.get('slug') == self.active_network_slug and
context.get('ssid') == self.active_network_ssid):
return
# Mark every alive host in the OUTGOING database as degraded so
# that hosts from the old network never linger as "alive" inside
# the new network's store (race-condition safety net) and so
# that stale hosts are immediately visible as offline if the
# user switches back later.
try:
if get_db is not None and not self._pager_mode:
outgoing_db = get_db(currentdir=self.currentdir)
outgoing_db.mark_all_hosts_degraded()
except Exception as exc:
logger.warning(f"Failed to mark outgoing hosts as degraded: {exc}")
self._apply_network_context(context, configure_db=False)
self._refresh_network_components()
logger.info(
f"Active network context updated: ssid={self.active_network_ssid or 'default'} "
f"slug={self.active_network_slug}"
)
def get_default_config(self):
""" The configuration below is used to set the default values of the configuration settings."""
""" It can be used to reset the configuration settings to their default values."""
""" You can mofify the json file shared_config.json or on the web page to change the default values of the configuration settings."""
default_profile = DISPLAY_PROFILES.get(DEFAULT_EPD_TYPE, {"ref_width": 122, "ref_height": 250, "default_flip": False})
return {
"__title_Ragnar__": "Settings",
"manual_mode": False,
"websrv": True,
"web_bind_interface": "",
"web_increment": False,
"debug_mode": False,
"scan_vuln_running": True,
"scan_vuln_no_ports": False,
"enable_attacks": False,
"release_gate_enabled": False,
"release_gate_message": "",
"retry_success_actions": True,
"retry_failed_actions": True,
"blacklistcheck": True,
"displaying_csv": True,
"log_debug": False,
"log_info": False,
"log_warning": True,
"log_error": True,
"log_critical": True,
"terminal_log_level": "all",
"startup_delay": 2,
"web_delay": 2,
"screen_delay": 1,
"comment_delaymin": 15,
"comment_delaymax": 30,
"livestatus_delay": 8,
"image_display_delaymin": 2,
"image_display_delaymax": 8,
"scan_interval": 180,
"scan_vuln_interval": 300,
"failed_retry_delay": 180,
"success_retry_delay": 300,
"action_timeout": 300,
"vuln_scan_timeout": 1800,
"ref_width": default_profile["ref_width"],
"ref_height": default_profile["ref_height"],
"epd_type": DEFAULT_EPD_TYPE,
"screen_reversed": default_profile.get("default_flip", False),
"gc9a01_mascot_color": "#96C8FF",
"ssd1306_i2c_address": "0x3C",
"lcd1602_i2c_address": "0x27",
"lcd1602_i2c_bus": 1,
"display_brightness": 8,
"__title_lists__": "List Settings",
"portlist": [20, 21, 22, 23, 25, 53, 67, 68, 69, 80, 88, 110, 111, 119, 123, 135, 137, 138, 139, 143, 161, 162, 179, 389, 443, 445, 465, 514, 515, 520, 554, 587, 631, 636, 993, 995, 1024, 1025, 1080, 1194, 1433, 1434, 1521, 1723, 1812, 1813, 1883, 1900, 2049, 2082, 2083, 2181, 2375, 2376, 2483, 2484, 25565, 3000, 3001, 3002, 3003, 3004, 3005, 3006, 3007, 3008, 3009, 4000, 5000, 5003, 5004, 5060, 5061, 5432, 5500, 5555, 5631, 5632, 5900, 5985, 5986, 6000, 6379, 6667, 6881, 6969, 7000, 7070, 8080, 8081, 8086, 8181, 8443, 8888, 9000, 9090, 9100, 9200, 9418, 9999, 10000],
"mac_scan_blacklist": [],
"ip_scan_blacklist": [],
"steal_file_names": ["ssh.csv","hack.txt","password","passwd","credential","key","secret","config","backup","settings","credentials","auth","environment","docker-compose","kubeconfig"],
"steal_file_extensions": [".txt",".conf",".xml",".db",".sql",".key",".pem",".crt",".yaml",".yml",".config",".ini",".env",".cfg"],
"__title_network__": "Network",
"aggressive_mode_enabled": False,
"ipcam_enabled": True,
"router_scanner_enabled": True,
"mqtt_scanner_enabled": True,
"snmp_scanner_enabled": True,
"nmap_scan_aggressivity": "-T4",
"portstart": 1,
"portend": 5500,
"scan_subnets": [],
"default_vulnerability_ports": [22, 80, 443],
"network_max_failed_pings": 15,
"network_device_retention_days": 14,
"network_device_retention_hours": 8, # Legacy data cleanup after 8 hours
"network_ping_grace_period_minutes": 30,
"__title_timewaits__": "Time Wait Settings",
"timewait_smb": 0,
"timewait_ssh": 0,
"timewait_telnet": 0,
"timewait_ftp": 0,
"timewait_sql": 0,
"timewait_rdp": 0,
"__title_wifi__": "Wi-Fi Management",
"wifi_known_networks": [],
"wifi_default_interface": "wlan0",
"wifi_ap_ssid": "Ragnar",
"wifi_ap_password": "ragnarconnect",
"wifi_connection_timeout": 60,
"wifi_max_attempts": 3,
"wifi_scan_interval": 300,
"wifi_monitor_enabled": True,
"wifi_auto_ap_fallback": True,
"wifi_open_network_connect": False,
"wifi_ap_timeout": 180,
"wifi_multi_network_scans_enabled": True,
"wifi_multi_scan_mode": "multi",
"wifi_multi_scan_focus_interface": "",
"wifi_multi_scan_max_interfaces": 2,
"wifi_multi_scan_max_parallel": 1,
"wifi_allowed_scan_interfaces": [],
"wifi_scan_interface_overrides": {},
"wifi_external_interface_hint": "",
"wifi_ap_idle_timeout": 180,
"wifi_reconnect_interval": 20,
"wifi_ap_cycle_enabled": True,
"wifi_initial_connection_timeout": 60,
"wifi_failsafe_cycle_limit": 10,
"__title_ethernet__": "Ethernet/LAN Settings",
"ethernet_default_interface": "eth0",
"ethernet_scan_enabled": True,
"ethernet_prefer_over_wifi": True,
"ethernet_auto_detect": True,
"network_device_retention_days": 14,
"__title_network_intelligence__": "Network Intelligence",
"network_resolution_timeout": 3600,
"network_confirmation_scans": 3,
"network_change_grace": 300,
"network_intelligence_enabled": True,
"network_auto_resolution": True,
"__title_ai__": "AI Integration (GPT-5 Nano)",
"ai_enabled": False,
"openai_api_token": "",
"ai_model": "gpt-5-nano",
"ai_analysis_enabled": True,
"ai_vulnerability_summaries": True,
"ai_network_insights": True,
"ai_max_tokens": 500,
"ai_temperature": 0.7,
"__title_pushover__": "Pushover Notifications",
"pushover_enabled": False,
"pushover_notify_new_device": True,
"pushover_notify_new_vulnerability": True,
"pushover_notify_new_credential": True,
"pushover_notify_device_lost": False,
"pushover_notify_device_back_online": False,
"__title_pwnagotchi__": "Pwnagotchi Integration",
"pwnagotchi_installed": False,
"pwnagotchi_mode": "ragnar",
"pwnagotchi_last_switch": "",
"pwnagotchi_last_status": "Not installed",
"__title_wpasec__": "wpa-sec Integration",
"wpasec_enabled": False,
"wpasec_api_key": "",
"wpasec_poll_interval": 3600,
"wpasec_auto_connect": True,
"wpasec_priority": 5
}
def apply_display_profile(self, epd_type=None, set_orientation_if_missing=False, persist=False):
"""Align reference dimensions (and optional orientation) with the chosen EPD profile."""
epd_key = epd_type or self.config.get('epd_type') or DEFAULT_EPD_TYPE
profile = DISPLAY_PROFILES.get(epd_key)
if not profile:
logger.warning(f"Unknown EPD profile '{epd_key}' – skipping display calibration")
return False
changed = False
if self.config.get('ref_width') != profile['ref_width']:
self.config['ref_width'] = profile['ref_width']
changed = True
if self.config.get('ref_height') != profile['ref_height']:
self.config['ref_height'] = profile['ref_height']
changed = True
needs_orientation = set_orientation_if_missing and 'screen_reversed' not in self.config
if needs_orientation:
desired_orientation = profile.get('default_flip', False)
if self.config.get('screen_reversed') != desired_orientation:
self.config['screen_reversed'] = desired_orientation
changed = True
if persist and changed:
self.save_config()
return changed
def _normalize_config_keys(self, config):
"""Ensure legacy or malformed configuration keys are aligned with the current schema."""
if 'web_increment ' in config:
if 'web_increment' not in config:
config['web_increment'] = config['web_increment ']
del config['web_increment ']
return config
def _remove_legacy_attributes(self):
"""Drop attributes created from legacy configuration keys that cannot be accessed normally."""
legacy_attrs = ['web_increment ']
for attr in legacy_attrs:
if hasattr(self, attr):
delattr(self, attr)
def update_mac_blacklist(self):
"""Update the MAC blacklist without immediate save."""
mac_address = self.get_raspberry_mac()
if mac_address:
if 'mac_scan_blacklist' not in self.config:
self.config['mac_scan_blacklist'] = []
if mac_address not in self.config['mac_scan_blacklist']:
self.config['mac_scan_blacklist'].append(mac_address)
logger.info(f"Added local MAC address {mac_address} to blacklist")
else:
logger.info(f"Local MAC address {mac_address} already in blacklist")
else:
logger.warning("Could not add local MAC to blacklist: MAC address not found")
def get_raspberry_mac(self):
"""Get the MAC address of the primary network interface (usually wlan0 or eth0)."""
try:
# First try wlan0 (wireless interface)
result = subprocess.run(['cat', '/sys/class/net/wlan0/address'],
capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip().lower()
# If wlan0 fails, try eth0 (ethernet interface)
result = subprocess.run(['cat', '/sys/class/net/eth0/address'],
capture_output=True, text=True)
if result.returncode == 0 and result.stdout.strip():
return result.stdout.strip().lower()
logger.warning("Could not find MAC address for wlan0 or eth0")
return None
except Exception as e:
logger.error(f"Error getting Raspberry Pi MAC address: {e}")
return None
def setup_environment(self, clear_console=False):
"""Setup the environment with the necessary directories and files."""
if clear_console:
os.system('cls' if os.name == 'nt' else 'clear')
self.create_directories() # Create all necessary directories first
self.save_config()
if self._pager_mode:
# On Pager: try to regenerate actions.json (works if deps are available),
# fall back to loading existing file if generation fails.
if not os.path.exists(self.actions_file):
try:
self.generate_actions_json()
except Exception as e:
logger.warning(f"Could not generate actions.json: {e}")
self._load_status_list_from_actions_json()
else:
# Skip costly re-import of every action module if actions.json
# is already up to date (same set of .py files in actions/).
self._generate_actions_json_if_needed()
self.delete_webconsolelog()
self.initialize_csv()
self.initialize_epd_display()
def create_directories(self):
"""Create all necessary directories for the application."""
directories_to_create = [
self.configdir,
self.datadir,
self.actions_dir,
self.webdir,
self.resourcesdir,
self.backupbasedir,
self.backupdir,
self.upload_dir,
self.logsdir,
self.output_dir,
self.input_dir,
self.crackedpwddir,
self.datastolendir,
self.zombiesdir,
self.vulnerabilities_dir,
self.scan_results_dir,
self.picdir,
self.fontdir,
self.commentsdir,
self.statuspicdir,
self.staticpicdir,
self.dictionarydir
]
for directory in directories_to_create:
try:
if not os.path.exists(directory):
os.makedirs(directory, exist_ok=True)
logger.info(f"Created directory: {directory}")
except Exception as e:
logger.error(f"Failed to create directory {directory}: {e}")
# def initialize_epd_display(self):
# """Initialize the e-paper display."""
# try:
# logger.info("Initializing EPD display...")
# time.sleep(1)
# self.epd_helper = EPDHelper(self.config["epd_type"])
# self.epd_helper = EPDHelper(self.epd_type)
# if self.config["epd_type"] == "epd2in13_V2":
# logger.info("EPD type: epd2in13_V2 screen reversed")
# self.screen_reversed = False
# self.web_screen_reversed = False
# elif self.config["epd_type"] == "epd2in13_V3":
# logger.info("EPD type: epd2in13_V3 screen reversed")
# self.screen_reversed = False
# self.web_screen_reversed = False
# elif self.config["epd_type"] == "epd2in13_V4":
# logger.info("EPD type: epd2in13_V4 screen reversed")
# self.screen_reversed = True
# self.web_screen_reversed = True
# self.epd_helper.init_full_update()
# self.width, self.height = self.epd_helper.epd.width, self.epd_helper.epd.height
# logger.info(f"EPD {self.config['epd_type']} initialized with size: {self.width}x{self.height}")
# except Exception as e:
# logger.error(f"Error initializing EPD display: {e}")
# raise
def initialize_epd_display(self):
"""Initialize the e-paper display."""
if EPDHelper is None or self._pager_mode:
logger.info("EPD not available (Pager mode or missing module) - skipping EPD init")
self.epd_helper = None
fallback_profile = DISPLAY_PROFILES.get(DEFAULT_EPD_TYPE, {"ref_width": 122, "ref_height": 250, "default_flip": False})
epd_type = self.config.get('epd_type') or DEFAULT_EPD_TYPE
profile = DISPLAY_PROFILES.get(epd_type, fallback_profile) or fallback_profile
self.width = self.config.get('ref_width', profile['ref_width'])
self.height = self.config.get('ref_height', profile['ref_height'])
self.screen_reversed = bool(self.config.get('screen_reversed', False))
self.web_screen_reversed = self.screen_reversed
return
# Character-based displays (e.g. LCD1602) have no PIL buffer interface.
# Skip EPDHelper / buffer-validation entirely to prevent the auto-detect
# fallback from overwriting the user's chosen display type.
_CHAR_DISPLAYS = {"lcd1602"}
epd_type_cfg = self.config.get("epd_type", DEFAULT_EPD_TYPE)
if epd_type_cfg in _CHAR_DISPLAYS:
profile = DISPLAY_PROFILES.get(epd_type_cfg, {})
self.width = profile.get("ref_width", 16)
self.height = profile.get("ref_height", 2)
self.epd_helper = None
self.screen_reversed = bool(self.config.get("screen_reversed", False))
self.web_screen_reversed = self.screen_reversed
self.apply_display_profile(epd_type_cfg) # keeps ref_width/ref_height in sync
logger.info(
f"Character display '{epd_type_cfg}' configured: {self.width}x{self.height}"
" — skipping EPD buffer init"
)
return
try:
logger.info("Initializing EPD display...")
epd_type = self.config.get("epd_type", DEFAULT_EPD_TYPE)
# Auto-detect if set to "auto" OR if still on factory default (user never ran installer with detection)
needs_detect = epd_type == "auto"
if not needs_detect:
# Also auto-detect if the configured driver doesn't exist or can't load
try:
EPDHelper(epd_type)
except Exception:
logger.warning(f"Configured EPD driver '{epd_type}' failed to load, switching to auto-detect")
needs_detect = True
if needs_detect:
logger.info("EPD auto-detection running...")
result = EPDHelper.auto_detect()
if result:
epd_type = result[0]
logger.info(f"Auto-detected EPD: {epd_type} ({result[1]}x{result[2]})")
self.config['epd_type'] = epd_type
self.save_config()
else:
logger.warning("Auto-detection found no display, using default")
epd_type = DEFAULT_EPD_TYPE
self.epd_helper = EPDHelper(epd_type)
self.apply_display_profile(epd_type)
self.screen_reversed = bool(self.config.get("screen_reversed", False))
self.web_screen_reversed = self.screen_reversed
logger.info(f"EPD type: {epd_type} | size: {self.epd_helper.epd.width}x{self.epd_helper.epd.height} | flipped: {self.screen_reversed}")
self.epd_helper.init_full_update()
self.width, self.height = self.epd_helper.epd.width, self.epd_helper.epd.height
# Validate the driver works by doing a test getbuffer with a blank image
try:
test_img = Image.new('1', (self.width, self.height), 255)
test_buf = self.epd_helper.epd.getbuffer(test_img)
expected_size = int(self.width / 8) * self.height
if len(test_buf) < expected_size:
raise ValueError(f"Buffer size mismatch: got {len(test_buf)}, expected {expected_size}")
except Exception as ve:
logger.warning(f"EPD driver '{epd_type}' buffer validation failed: {ve}, trying auto-detect...")
raise # Fall through to the auto-detect fallback below
logger.info(f"EPD {self.config['epd_type']} initialized with size: {self.width}x{self.height}")
except Exception as e:
logger.error(f"Error initializing EPD display: {e}")
# Try auto-detection as fallback before giving up
logger.info("Attempting auto-detection as fallback...")
try:
result = EPDHelper.auto_detect()
if result:
epd_type = result[0]
logger.info(f"Fallback auto-detected EPD: {epd_type} ({result[1]}x{result[2]})")
self.config['epd_type'] = epd_type
self.apply_display_profile(epd_type)
self.epd_helper = EPDHelper(epd_type)
self.epd_helper.init_full_update()
self.width, self.height = self.epd_helper.epd.width, self.epd_helper.epd.height
self.screen_reversed = bool(self.config.get("screen_reversed", False))
self.web_screen_reversed = self.screen_reversed
self.save_config()
logger.info(f"EPD {epd_type} initialized via fallback with size: {self.width}x{self.height}")
return
except Exception as e2:
logger.error(f"Fallback auto-detection also failed: {e2}")
logger.warning("Continuing without EPD display support")
self.epd_helper = None
fallback_profile = DISPLAY_PROFILES.get(DEFAULT_EPD_TYPE, {"ref_width": 122, "ref_height": 250, "default_flip": False})
epd_type = self.config.get('epd_type') or DEFAULT_EPD_TYPE
profile = DISPLAY_PROFILES.get(epd_type, fallback_profile) or fallback_profile
self.width = self.config.get('ref_width', profile['ref_width'])
self.height = self.config.get('ref_height', profile['ref_height'])
self.screen_reversed = bool(self.config.get('screen_reversed', False))
self.web_screen_reversed = self.screen_reversed
# NOTE: Test image code below was used to verify EPD hardware.
# Commented out to allow normal Ragnar display to show.
# Uncomment if you need to test the display again.
# from PIL import ImageDraw
# test_image = Image.new('1', (self.width, self.height), 255)
# draw = ImageDraw.Draw(test_image)
# draw.text((10, 10), "EPD Test", fill=0)
# if self.config.get("reversed", False):
# test_image = test_image.rotate(180)
# self.epd_helper.epd.display(self.epd_helper.epd.getbuffer(test_image))
# logger.info("Test image displayed on EPD.")
def initialize_variables(self):
"""Initialize the variables."""
self.should_exit = False
self.display_should_exit = False
self.orchestrator_should_exit = False
self.webapp_should_exit = False
self.ragnar_instance = None
self.gateway_info = {} # Populated by NetworkScanner.get_gateway_info()
self.wifichanged = False
self.bluetooth_active = False
self.bluetooth_scan_active = False
self.bluetooth_scan_start_time = 0.0
self.wifi_connected = False
self.wifi_signal_dbm = None # Latest RSSI value for display
self.wifi_signal_quality = None # Normalized 0-100 quality percentage
self.pan_connected = False
self.usb_active = False
self.ragnarsays = "Hacking away..."
self.ragnarorch_status = "IDLE"
self.ragnarstatustext = "IDLE"
self.ragnarstatustext2 = "Awakening..."
self.scale_factor_x = self.width / self.config['ref_width']
self.scale_factor_y = self.height / self.config['ref_height']
self.text_frame_top = int(88 * self.scale_factor_y)
self.text_frame_bottom = int(159 * self.scale_factor_y)
self.y_text = self.text_frame_top + 2
self.targetnbr = 0
self.portnbr = 0
self.vulnnbr = 0
self.crednbr = 0
self.datanbr = 0
self.zombiesnbr = 0
self.coinnbr = 0
self.levelnbr = 0
self.networkkbnbr = 0
self.attacksnbr = 0
self.vulnerable_host_count = 0
self.gamification_data = {}
self.points_per_level = 200
self.points_per_mac = 15
self.points_per_credential = 25
self.points_per_data_file = 10
self.points_per_zombie = 40
self.points_per_vulnerability = 20
self.show_first_image = True
self.network_hosts_snapshot = {}