1+ """Snapshot check resource for diagnosing emulator boot behavior."""
2+
3+ import logging
4+ import os
5+ from datetime import datetime
6+ from typing import Dict , List , Optional
7+
8+ from flask_restful import Resource
9+
10+ from server .utils .request_utils import get_sindarin_email
11+ from views .core .avd_profile_manager import AVDProfileManager
12+
13+ logger = logging .getLogger (__name__ )
14+
15+
16+ class SnapshotCheckResource (Resource ):
17+ """Resource for checking snapshot status and predicting boot behavior."""
18+
19+ def __init__ (self , server_instance = None ):
20+ """Initialize the snapshot check resource.
21+
22+ Args:
23+ server_instance: The AutomationServer instance
24+ """
25+ self .server = server_instance
26+ super ().__init__ ()
27+
28+ def get (self ):
29+ """Check snapshot status and predict next boot behavior for the current user.
30+
31+ Returns detailed information about:
32+ - Snapshot existence and metadata
33+ - Predicted boot type (cold/warm)
34+ - Reasons for cold boot if applicable
35+ - AVD configuration related to snapshots
36+ - User profile flags affecting boot behavior
37+ """
38+ try :
39+ # Get the user's email
40+ sindarin_email = get_sindarin_email ()
41+
42+ if not sindarin_email :
43+ return {"error" : "No email provided" , "message" : "Email parameter is required" }, 400
44+
45+ # Get the profile manager instance
46+ profile_manager = AVDProfileManager .get_instance ()
47+
48+ # Check if profile exists
49+ if sindarin_email not in profile_manager .profiles_index :
50+ return {
51+ "email" : sindarin_email ,
52+ "has_profile" : False ,
53+ "message" : "No profile exists for this user" ,
54+ }, 200
55+
56+ # Get emulator launcher to check snapshot existence
57+ emulator_launcher = None
58+ if self .server and hasattr (self .server , "automators" ):
59+ # Try to get launcher from any automator or create one
60+ for automator in self .server .automators .values ():
61+ if automator and hasattr (automator , "emulator_manager" ):
62+ emulator_launcher = automator .emulator_manager .emulator_launcher
63+ break
64+
65+ # Extract AVD name
66+ avd_name = self ._extract_avd_name_from_email (sindarin_email )
67+ if not avd_name :
68+ return {
69+ "email" : sindarin_email ,
70+ "error" : "Could not determine AVD name" ,
71+ }, 500
72+
73+ # Check snapshot existence
74+ snapshot_info = self ._check_snapshot_existence (avd_name , emulator_launcher )
75+
76+ # Get user profile data
77+ profile_data = self ._get_profile_snapshot_data (sindarin_email , profile_manager )
78+
79+ # Check AVD configuration
80+ avd_config = self ._check_avd_config (avd_name )
81+
82+ # Determine boot type and reasons
83+ boot_prediction = self ._predict_boot_type (
84+ snapshot_info , profile_data , avd_config , emulator_launcher , sindarin_email
85+ )
86+
87+ # Check if emulator is currently running
88+ is_running = self ._check_if_running (sindarin_email )
89+
90+ return {
91+ "email" : sindarin_email ,
92+ "has_profile" : True ,
93+ "avd_name" : avd_name ,
94+ "emulator_running" : is_running ,
95+ "snapshot_info" : snapshot_info ,
96+ "profile_data" : profile_data ,
97+ "avd_config" : avd_config ,
98+ "boot_prediction" : boot_prediction ,
99+ "timestamp" : datetime .now ().isoformat (),
100+ }, 200
101+
102+ except Exception as e :
103+ logger .error (f"Error checking snapshot status: { e } " )
104+ return {"error" : "Failed to check snapshot status" , "message" : str (e )}, 500
105+
106+ def _extract_avd_name_from_email (self , email : str ) -> Optional [str ]:
107+ """Extract AVD name from email address."""
108+ try :
109+ # Convert email to AVD name format
110+ avd_identifier = email .replace ("@" , "_" ).replace ("." , "_" )
111+ return f"KindleAVD_{ avd_identifier } "
112+ except Exception as e :
113+ logger .error (f"Error extracting AVD name from email { email } : { e } " )
114+ return None
115+
116+ def _check_snapshot_existence (
117+ self , avd_name : str , emulator_launcher = None
118+ ) -> Dict [str , any ]:
119+ """Check if snapshots exist for the AVD."""
120+ snapshot_info = {
121+ "default_boot_exists" : False ,
122+ "snapshot_path" : None ,
123+ "snapshot_size_mb" : None ,
124+ "all_snapshots" : [],
125+ }
126+
127+ try :
128+ # Get AVD directory from environment
129+ android_home = os .environ .get ("ANDROID_HOME" , "/opt/android-sdk" )
130+ avd_dir = os .path .join (android_home , "avd" )
131+ avd_path = os .path .join (avd_dir , f"{ avd_name } .avd" )
132+ snapshots_dir = os .path .join (avd_path , "snapshots" )
133+
134+ # Check default_boot snapshot
135+ default_boot_path = os .path .join (snapshots_dir , "default_boot" )
136+ if os .path .exists (default_boot_path ):
137+ snapshot_info ["default_boot_exists" ] = True
138+ snapshot_info ["snapshot_path" ] = default_boot_path
139+
140+ # Calculate snapshot size
141+ total_size = 0
142+ for dirpath , dirnames , filenames in os .walk (default_boot_path ):
143+ for filename in filenames :
144+ filepath = os .path .join (dirpath , filename )
145+ if os .path .exists (filepath ):
146+ total_size += os .path .getsize (filepath )
147+ snapshot_info ["snapshot_size_mb" ] = round (total_size / (1024 * 1024 ), 2 )
148+
149+ # List all snapshots
150+ if os .path .exists (snapshots_dir ):
151+ for entry in os .listdir (snapshots_dir ):
152+ snapshot_path = os .path .join (snapshots_dir , entry )
153+ if os .path .isdir (snapshot_path ):
154+ snapshot_info ["all_snapshots" ].append (entry )
155+
156+ except Exception as e :
157+ logger .error (f"Error checking snapshot existence: { e } " )
158+ snapshot_info ["error" ] = str (e )
159+
160+ return snapshot_info
161+
162+ def _get_profile_snapshot_data (
163+ self , email : str , profile_manager : AVDProfileManager
164+ ) -> Dict [str , any ]:
165+ """Get snapshot-related data from user profile."""
166+ return {
167+ "last_snapshot_timestamp" : profile_manager .get_user_field (
168+ email , "last_snapshot_timestamp"
169+ ),
170+ "created_from_seed_clone" : profile_manager .get_user_field (
171+ email , "created_from_seed_clone" , False
172+ ),
173+ "needs_device_randomization" : profile_manager .get_user_field (
174+ email , "needs_device_randomization" , False
175+ ),
176+ "post_boot_randomized" : profile_manager .get_user_field (
177+ email , "post_boot_randomized" , False
178+ ),
179+ "auth_date" : profile_manager .get_user_field (email , "auth_date" ),
180+ "auth_failed_date" : profile_manager .get_user_field (email , "auth_failed_date" ),
181+ }
182+
183+ def _check_avd_config (self , avd_name : str ) -> Dict [str , any ]:
184+ """Check AVD configuration related to snapshots."""
185+ config_data = {
186+ "config_exists" : False ,
187+ "snapshot_present" : None ,
188+ "quickboot_choice" : None ,
189+ "hw_ramSize" : None ,
190+ "hw_gfxstream" : None ,
191+ }
192+
193+ try :
194+ android_home = os .environ .get ("ANDROID_HOME" , "/opt/android-sdk" )
195+ avd_dir = os .path .join (android_home , "avd" )
196+ config_path = os .path .join (avd_dir , f"{ avd_name } .avd" , "config.ini" )
197+
198+ if os .path .exists (config_path ):
199+ config_data ["config_exists" ] = True
200+
201+ with open (config_path , "r" ) as f :
202+ for line in f :
203+ line = line .strip ()
204+ if line .startswith ("snapshot.present=" ):
205+ config_data ["snapshot_present" ] = line .split ("=" )[1 ] == "yes"
206+ elif line .startswith ("quickbootChoice=" ):
207+ config_data ["quickboot_choice" ] = int (line .split ("=" )[1 ])
208+ elif line .startswith ("hw.ramSize=" ):
209+ config_data ["hw_ramSize" ] = int (line .split ("=" )[1 ])
210+ elif line .startswith ("hw.gfxstream=" ):
211+ config_data ["hw_gfxstream" ] = int (line .split ("=" )[1 ])
212+
213+ except Exception as e :
214+ logger .error (f"Error checking AVD config: { e } " )
215+ config_data ["error" ] = str (e )
216+
217+ return config_data
218+
219+ def _predict_boot_type (
220+ self ,
221+ snapshot_info : Dict ,
222+ profile_data : Dict ,
223+ avd_config : Dict ,
224+ emulator_launcher ,
225+ email : str ,
226+ ) -> Dict [str , any ]:
227+ """Predict the next boot type and provide reasons."""
228+ cold_boot_reasons = []
229+ next_boot_type = "warm"
230+
231+ # Check if snapshot exists
232+ if not snapshot_info .get ("default_boot_exists" ):
233+ cold_boot_reasons .append ("No default_boot snapshot exists" )
234+ next_boot_type = "cold"
235+
236+ # Check if device randomization is needed
237+ if profile_data .get ("created_from_seed_clone" ) or profile_data .get (
238+ "needs_device_randomization"
239+ ):
240+ if not profile_data .get ("post_boot_randomized" ):
241+ cold_boot_reasons .append (
242+ "Device randomization needed (created from seed or needs randomization)"
243+ )
244+ next_boot_type = "cold"
245+
246+ # Check AVD configuration
247+ if avd_config .get ("snapshot_present" ) is False :
248+ cold_boot_reasons .append ("AVD config has snapshot.present=no" )
249+ next_boot_type = "cold"
250+
251+ if avd_config .get ("quickboot_choice" ) == 1 : # 1 = cold boot
252+ cold_boot_reasons .append ("AVD config has quickbootChoice=1 (cold boot)" )
253+ next_boot_type = "cold"
254+
255+ # Check if gfxstream is enabled (can affect snapshots)
256+ if avd_config .get ("hw_gfxstream" ) == 1 :
257+ cold_boot_reasons .append (
258+ "hw.gfxstream=1 may cause snapshot compatibility issues"
259+ )
260+ # This doesn't force cold boot but can cause issues
261+
262+ # If user lost authentication, they might need a cold boot
263+ if profile_data .get ("auth_failed_date" ) and not profile_data .get ("auth_date" ):
264+ cold_boot_reasons .append ("User authentication failed, may need fresh start" )
265+ # This is informational, doesn't force cold boot
266+
267+ return {
268+ "next_boot_type" : next_boot_type ,
269+ "cold_boot_reasons" : cold_boot_reasons ,
270+ "snapshot_will_be_used" : next_boot_type == "warm"
271+ and snapshot_info .get ("default_boot_exists" ),
272+ }
273+
274+ def _check_if_running (self , email : str ) -> bool :
275+ """Check if emulator is currently running for this user."""
276+ try :
277+ if self .server and hasattr (self .server , "automators" ):
278+ automator = self .server .automators .get (email )
279+ if automator and hasattr (automator , "emulator_manager" ):
280+ emulator_id , _ = automator .emulator_manager .emulator_launcher .get_running_emulator (
281+ email
282+ )
283+ return emulator_id is not None
284+ except Exception as e :
285+ logger .error (f"Error checking if emulator is running: { e } " )
286+
287+ return False
0 commit comments