-
Notifications
You must be signed in to change notification settings - Fork 27
Expand file tree
/
Copy pathPagerRagnar.py
More file actions
302 lines (254 loc) · 12.1 KB
/
PagerRagnar.py
File metadata and controls
302 lines (254 loc) · 12.1 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
#PagerRagnar.py
# Entry point for Ragnar on the WiFi Pineapple Pager.
# Combines Ragnar's orchestrator with the Pager LCD display.
# Adapted from pineapple_pager_bjorn's Bjorn.py for Ragnar.
# Add local lib directory to Python path for self-contained payload
import sys
import os
_lib_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'lib')
if os.path.exists(_lib_path) and _lib_path not in sys.path:
sys.path.insert(0, _lib_path)
# Fix OpenSSL legacy provider issue for cryptography/paramiko
os.environ['CRYPTOGRAPHY_OPENSSL_NO_LEGACY'] = '1'
import threading
import signal
import logging
import time
import subprocess
import re
import json
import random
from logger import Logger
logger = Logger(name="PagerRagnar.py", level=logging.INFO)
def setup_pager_shared_data(shared_data):
"""Patch shared_data with Pager-specific attributes needed by pager_display.py.
Ragnar's shared.py loads PIL Images; the Pager needs file paths instead."""
currentdir = shared_data.currentdir
fontdir = shared_data.fontdir
staticpicdir = shared_data.staticpicdir
statuspicdir = shared_data.statuspicdir
# Font paths (pager_display uses pagerctl TTF rendering, not PIL)
shared_data.font_arial_path = os.path.join(fontdir, 'Arial.ttf')
shared_data.font_viking_path = os.path.join(fontdir, 'Viking.TTF')
# Status image path (updated by update_ragnarstatus_pager)
shared_data.ragnarstatusimage_path = None
# Static image paths dict (icon_name -> file_path)
shared_data.static_images = {}
static_names = ['ragnar1', 'port', 'frise', 'target', 'vuln', 'connected',
'bluetooth', 'wifi', 'ethernet', 'usb', 'level', 'cred',
'attack', 'attacks', 'gold', 'networkkb', 'zombie', 'data', 'money']
for name in static_names:
path = os.path.join(staticpicdir, f'{name}.bmp')
if os.path.exists(path):
shared_data.static_images[name] = path
# Status images dict (b_class -> file_path)
shared_data.status_images = {}
try:
if os.path.exists(shared_data.actions_file):
with open(shared_data.actions_file, 'r') as f:
actions = json.load(f)
for action in actions:
b_class = action.get('b_class')
if b_class:
status_dir = os.path.join(statuspicdir, b_class)
image_path = os.path.join(status_dir, f'{b_class}.bmp')
if os.path.exists(image_path):
shared_data.status_images[b_class] = image_path
except Exception as e:
logger.error(f"Error loading status image paths: {e}")
# Image series paths for animations (b_class -> list of file paths)
shared_data.pager_image_series = {}
for status in shared_data.status_list:
shared_data.pager_image_series[status] = []
status_dir = os.path.join(statuspicdir, status)
if os.path.isdir(status_dir):
for image_name in sorted(os.listdir(status_dir)):
if image_name.endswith('.bmp') and re.search(r'\d', image_name):
image_path = os.path.join(status_dir, image_name)
shared_data.pager_image_series[status].append(image_path)
# Current animation frame path
shared_data.current_image_path = None
# Monkey-patch update_ragnarstatus to use file paths
original_update = shared_data.update_ragnarstatus
def update_ragnarstatus_pager():
"""Update current status image path for Pager display."""
try:
if shared_data.ragnarorch_status in shared_data.status_images:
shared_data.ragnarstatusimage_path = shared_data.status_images[shared_data.ragnarorch_status]
else:
shared_data.ragnarstatusimage_path = shared_data.status_images.get('IDLE')
shared_data.ragnarstatustext = shared_data.ragnarorch_status
except Exception as e:
logger.error(f"Error updating ragnar status: {e}")
shared_data.update_ragnarstatus = update_ragnarstatus_pager
# Monkey-patch update_image_randomizer to use file paths
def update_image_randomizer_pager():
"""Select a random animation frame path for current status."""
try:
status = shared_data.ragnarstatustext
series = shared_data.pager_image_series
if status in series and series[status]:
idx = random.randint(0, len(series[status]) - 1)
shared_data.current_image_path = series[status][idx]
else:
if "IDLE" in series and series["IDLE"]:
idx = random.randint(0, len(series["IDLE"]) - 1)
shared_data.current_image_path = series["IDLE"][idx]
else:
shared_data.current_image_path = None
except Exception as e:
logger.error(f"Error updating image randomizer: {e}")
shared_data.update_image_randomizer = update_image_randomizer_pager
# Add a simple char-based wrap_text if the existing one requires PIL fonts
original_wrap = shared_data.wrap_text
def wrap_text_pager(text, max_chars=40, **kwargs):
"""Wrap text by character count (no PIL font needed)."""
lines = []
words = text.split()
line = ''
for word in words:
if len(line) + len(word) + 1 <= max_chars:
line = line + (' ' if line else '') + word
else:
if line:
lines.append(line)
line = word
if line:
lines.append(line)
return lines
shared_data.wrap_text = wrap_text_pager
logger.info("Pager shared_data attributes initialized")
class PagerRagnar:
"""Main class for Ragnar on Pineapple Pager."""
def __init__(self, shared_data):
self.shared_data = shared_data
self.orchestrator_thread = None
self.orchestrator = None
self._orchestrator_lock = threading.Lock()
self.shared_data.ragnar_instance = self
self.shared_data.headless_mode = False
def run(self):
"""Main loop - waits for Wi-Fi connection and starts Orchestrator."""
if hasattr(self.shared_data, 'startup_delay') and self.shared_data.startup_delay > 0:
logger.info(f"Waiting for startup delay: {self.shared_data.startup_delay} seconds")
time.sleep(self.shared_data.startup_delay)
while not self.shared_data.should_exit:
if not self.shared_data.manual_mode:
self.check_and_start_orchestrator()
time.sleep(10)
def check_and_start_orchestrator(self):
if self.is_wifi_connected():
self.shared_data.wifi_connected = True
if self.orchestrator_thread is None or not self.orchestrator_thread.is_alive():
self.start_orchestrator()
else:
self.shared_data.wifi_connected = False
logger.info("Waiting for Wi-Fi connection to start Orchestrator...")
def start_orchestrator(self):
with self._orchestrator_lock:
if self.is_wifi_connected():
self.shared_data.wifi_connected = True
if self.orchestrator_thread is None or not self.orchestrator_thread.is_alive():
logger.info("Starting Orchestrator thread...")
self.shared_data.orchestrator_should_exit = False
self.shared_data.manual_mode = False
from orchestrator import Orchestrator
self.orchestrator = Orchestrator()
self.orchestrator_thread = threading.Thread(target=self.orchestrator.run)
self.orchestrator_thread.start()
logger.info("Orchestrator thread started, automatic mode activated.")
def stop_orchestrator(self):
self.shared_data.manual_mode = True
logger.info("Stopping Orchestrator...")
if self.orchestrator_thread is not None and self.orchestrator_thread.is_alive():
self.shared_data.orchestrator_should_exit = True
self.orchestrator_thread.join()
self.shared_data.ragnarorch_status = "IDLE"
self.shared_data.ragnarstatustext2 = ""
def is_wifi_connected(self):
"""Check Wi-Fi connectivity (Pager + Pi compatible)."""
try:
for iface in ['wlan0cli', 'br-lan', 'wlan0', 'eth0']:
result = subprocess.run(['ip', 'link', 'show', iface],
capture_output=True, text=True, timeout=5)
if 'state UP' in result.stdout:
return True
return False
except Exception as e:
logger.debug(f"WiFi check error: {e}")
return False
def handle_exit(sig, frame, display_thread, ragnar_thread, web_thread=None):
from init_shared import shared_data
shared_data.should_exit = True
shared_data.orchestrator_should_exit = True
shared_data.display_should_exit = True
shared_data.webapp_should_exit = True
from pager_display import handle_exit_pager_display
display_inst = getattr(shared_data, 'display_instance', None)
handle_exit_pager_display(sig, frame, display_inst, exit_process=False)
if display_thread and display_thread.is_alive():
display_thread.join(timeout=5)
if ragnar_thread and ragnar_thread.is_alive():
ragnar_thread.join(timeout=5)
if web_thread and web_thread.is_alive():
web_thread.join(timeout=5)
logger.info("Clean exit.")
sys.exit(0)
if __name__ == "__main__":
logger.info("Starting Pager Ragnar...")
try:
from init_shared import shared_data
# Apply interface/IP from pager_menu environment variables
ragnar_interface = os.environ.get('RAGNAR_INTERFACE')
ragnar_ip = os.environ.get('RAGNAR_IP')
if ragnar_interface:
shared_data.config['wifi_default_interface'] = ragnar_interface
logger.info(f"Using interface from menu: {ragnar_interface}")
if ragnar_ip:
logger.info(f"Using IP from menu: {ragnar_ip}")
# Setup Pager-specific attributes on shared_data
setup_pager_shared_data(shared_data)
# Start display thread
logger.info("Starting pager display thread...")
shared_data.display_should_exit = False
from pager_display import PagerDisplay
display = PagerDisplay(shared_data)
display_thread = threading.Thread(target=display.run)
display_thread.start()
shared_data.display_instance = display
# Start Ragnar thread
logger.info("Starting PagerRagnar thread...")
ragnar = PagerRagnar(shared_data)
shared_data.ragnar_instance = ragnar
ragnar_thread = threading.Thread(target=ragnar.run)
ragnar_thread.start()
# Start web server (conditional on RAGNAR_WEB_UI env var)
web_thread = None
web_ui_setting = os.environ.get('RAGNAR_WEB_UI', 'on').lower()
if web_ui_setting != 'off':
logger.info("Starting the web server...")
shared_data.webapp_should_exit = False
try:
from webapp_modern import run_server
web_thread = threading.Thread(target=run_server, daemon=True)
web_thread.start()
except ImportError:
# Fall back to simple webapp if modern not available
try:
from webapp import web_thread as wt
wt.start()
web_thread = wt
except ImportError:
logger.warning("No web server module available")
else:
logger.info("Web server disabled by menu setting")
signal.signal(signal.SIGINT, lambda sig, frame: handle_exit(sig, frame, display_thread, ragnar_thread, web_thread))
signal.signal(signal.SIGTERM, lambda sig, frame: handle_exit(sig, frame, display_thread, ragnar_thread, web_thread))
# Keep main thread alive
while not shared_data.should_exit:
time.sleep(1)
except Exception as e:
logger.error(f"An exception occurred during thread start: {e}")
import traceback
traceback.print_exc()
sys.exit(1)