1+ import hashlib
2+ import base64
3+ import json
4+ import atexit
5+ import signal
6+ import requests
7+ import time
8+ import threading
9+ from typing import List , Tuple , Any , Dict
10+
11+ from Crypto .Cipher import AES
12+
13+ from octoapp .logging import LoggerLike
14+ from octoapp .appsstorage import AppInstance
15+ from octoapp .appsstorage import AppStoragePlatformHelper
16+
17+ from .elegooclient import ElegooClient
18+
19+ def sha256_urlsafe_base64 (data : bytes ) -> str :
20+ return base64 .urlsafe_b64encode (hashlib .sha256 (data ).digest ()).decode ()
21+
22+ def pbkdf2_key (password : str , salt : str ) -> str :
23+ key = hashlib .pbkdf2_hmac ('sha256' , password .encode (), salt .encode (), 100_000 )
24+ return base64 .b64encode (key ).decode ()
25+
26+ class ElegooAppStorage (AppStoragePlatformHelper ):
27+
28+ def __init__ (self , logger :LoggerLike , pluginVersion :str ):
29+ self .PluginVersion = pluginVersion
30+ self .First = False
31+ self .Logger = logger
32+ self .AppsDatabaseUrl = "https://octoapp-companion-connections-apps.europe-west1.firebasedatabase.app"
33+ self .PresenceDatabaseUrl = "https://octoapp-companion-connections.europe-west1.firebasedatabase.app"
34+ self ._ContinuouslyAnnouncePresence ()
35+ self ._RegisterLastWillHandler ()
36+
37+ def _RegisterLastWillHandler (self ):
38+ # Register cleanup for normal exit
39+ atexit .register (self ._SendLastWillInactive )
40+ # Register cleanup for SIGTERM and SIGINT
41+ signal .signal (signal .SIGTERM , lambda signum , frame : self ._SendLastWillInactive ())
42+ signal .signal (signal .SIGINT , lambda signum , frame : self ._SendLastWillInactive ())
43+
44+ def _SendLastWillInactive (self ):
45+ try :
46+ printerId = self ._GetPrinterId ()
47+ url = f"{ self .PresenceDatabaseUrl } /{ printerId } .json?print=silent"
48+ data = {
49+ "active" : "false" ,
50+ "lastSeen" : int (time .time ()),
51+ "version" : self .PluginVersion ,
52+ }
53+ resp = requests .put (url , json = data )
54+ if resp .status_code > 299 :
55+ self .Logger .error (f"Failed to send last will: { resp .status_code } { resp .text } " )
56+ else :
57+ self .Logger .info (f"Sent last will (inactive) for printer { printerId } " )
58+ except PrinterIdUnavailableException as e :
59+ self .Logger .error (f"Unable to send last will, printer id not available: { str (e )} " )
60+ except Exception as e :
61+ self .Logger .error (f"Exception in last will handler: { str (e )} " )
62+
63+ def _ContinuouslyAnnouncePresence (self ):
64+ t = threading .Thread (target = self ._DoContinuouslyAnnouncePresence )
65+ t .daemon = True
66+ t .start ()
67+
68+ def _DoContinuouslyAnnouncePresence (self ):
69+ while True :
70+ try :
71+ self ._AnnouncePresence ()
72+ time .sleep (1800 )
73+ except PrinterIdUnavailableException as e :
74+ self .Logger .error (f"Cannot announce presence: { str (e )} " )
75+ time .sleep (5 )
76+ except Exception as e :
77+ self .Logger .error (f"Failed to announce presence: { str (e )} " )
78+ time .sleep (60 )
79+
80+ def _AnnouncePresence (self ):
81+ printerId = self ._GetPrinterId ()
82+ url = f"{ self .PresenceDatabaseUrl } /{ printerId } .json?print=silent"
83+ data = {
84+ "active" : "true" ,
85+ "lastSeen" : int (time .time ()),
86+ "version" : self .PluginVersion ,
87+ }
88+ resp = requests .put (url , json = data )
89+
90+ if resp .status_code > 299 :
91+ self .Logger .error (f"Failed to announce presence: { resp .status_code } { resp .text } " )
92+ else :
93+ self .Logger .info (f"Announced presence for printer { printerId } " )
94+
95+ def GetAllApps (self ) -> List [AppInstance ]:
96+ printerId = self ._GetPrinterId ()
97+ url = f"{ self .AppsDatabaseUrl } /{ printerId } .json"
98+ resp = requests .get (url )
99+ if resp .status_code != 200 :
100+ self .Logger .error (f"Failed to fetch apps: { resp .status_code } { resp .text } " )
101+ return []
102+
103+ data = resp .json ()
104+ if data is None :
105+ return []
106+
107+ apps :List [AppInstance ] = []
108+ for appId , encryptedAppJson in data .items ():
109+ try :
110+ # Decrypt app instance (you need to implement AES-256-GCM decryption separately)
111+ decryptedJson = self ._DecryptAppInstance (appId , encryptedAppJson )
112+ appInstance = AppInstance .FromDict (decryptedJson )
113+ apps .append (appInstance )
114+ except Exception as e :
115+ self .Logger .error (f"Failed to decrypt or parse app { appId } : { str (e )} " )
116+ return apps
117+
118+ def RemoveApps (self , apps : List [AppInstance ]):
119+ printerId = self ._GetPrinterId ()
120+ for app in apps :
121+ instanceId = app .InstanceId
122+ appId = sha256_urlsafe_base64 (instanceId .encode ())
123+ url = f"{ self .AppsDatabaseUrl } /{ printerId } /{ appId } .json?print=silent"
124+ resp = requests .delete (url )
125+ if resp .status_code != 200 :
126+ self .Logger .error (f"Failed to remove app { instanceId } : { resp .status_code } { resp .text } " )
127+
128+
129+ def GetOrCreateEncryptionKey (self ) -> str :
130+ mainboardId , mainboardMac = self ._GetIdBaseValues ()
131+ return pbkdf2_key (mainboardId , mainboardMac )
132+
133+ def _GetIdBaseValues (self ) -> Tuple [str , str ]:
134+ attributes = ElegooClient .Get ().GetAttributes ()
135+ if attributes is None or attributes .MainboardId is None or attributes .MainboardMac is None :
136+ raise PrinterIdUnavailableException ("Failed to get attributes from Elegoo Client." )
137+ return (attributes .MainboardId , attributes .MainboardMac )
138+
139+ def _GetPrinterId (self ) -> str :
140+ mainboardId , mainboardMac = self ._GetIdBaseValues ()
141+ plainId = f"printer:{ mainboardId } /{ mainboardMac } "
142+ printerId = sha256_urlsafe_base64 (plainId .encode ())
143+ return printerId
144+
145+ def _DecryptAppInstance (self , appId : str , encryptedBase64 : str ) -> Dict [str , Any ]:
146+ encryptionKey = self .GetOrCreateEncryptionKey ()
147+ key_bytes = base64 .urlsafe_b64decode (encryptionKey )
148+
149+ # Decode and pad base64 if needed
150+ padded = encryptedBase64 + '=' * (- len (encryptedBase64 ) % 4 )
151+ encrypted = base64 .urlsafe_b64decode (padded )
152+
153+ # Split ciphertext and tag (GCM tag is 16 bytes at the end). Remove the first 12 bytes for IV.
154+ if len (encrypted ) < (16 + 12 ):
155+ raise ValueError ("Encrypted data too short for GCM tag" )
156+ ciphertext = encrypted [12 :- 16 ]
157+ iv = encrypted [:12 ]
158+ tag = encrypted [- 16 :]
159+
160+ cipher = AES .new (key_bytes , AES .MODE_GCM , nonce = iv )#type:ignore
161+ decrypted_bytes = cipher .decrypt_and_verify (ciphertext , tag )
162+
163+ decrypted_json = decrypted_bytes .decode ('utf-8' )
164+ return json .loads (decrypted_json )
165+
166+ class PrinterIdUnavailableException (Exception ):
167+ pass
0 commit comments