88import time
99from abc import ABC , abstractmethod
1010from datetime import datetime , timezone
11+ from io import BytesIO
12+ from pathlib import Path
13+ from typing import BinaryIO
1114
15+ from anisette import Anisette , AnisetteHeaders
1216from typing_extensions import override
1317
14- from findmy .util .closable import Closable
18+ from findmy .util .abc import Closable , Serializable
1519from findmy .util .http import HttpSession
1620
1721logger = logging .getLogger (__name__ )
1822
1923
20- class BaseAnisetteProvider (Closable , ABC ):
24+ class BaseAnisetteProvider (Closable , Serializable , ABC ):
2125 """
2226 Abstract base class for Anisette providers.
2327
@@ -27,22 +31,13 @@ class BaseAnisetteProvider(Closable, ABC):
2731 @property
2832 @abstractmethod
2933 def otp (self ) -> str :
30- """
31- A seemingly random base64 string containing 28 bytes.
32-
33- TODO: Figure out how to generate this.
34- """
34+ """A seemingly random base64 string containing 28 bytes."""
3535 raise NotImplementedError
3636
3737 @property
3838 @abstractmethod
3939 def machine (self ) -> str :
40- """
41- A base64 encoded string of 60 'random' bytes.
42-
43- We're not sure how this is generated, we have to rely on the server.
44- TODO: Figure out how to generate this.
45- """
40+ """A base64 encoded string of 60 'random' bytes."""
4641 raise NotImplementedError
4742
4843 @property
@@ -177,6 +172,24 @@ def __init__(self, server_url: str) -> None:
177172 self ._anisette_data : dict [str , str ] | None = None
178173 self ._anisette_data_expires_at : float = 0
179174
175+ @override
176+ def serialize (self ) -> dict :
177+ """See `BaseAnisetteProvider.serialize`."""
178+ return {
179+ "type" : "aniRemote" ,
180+ "url" : self ._server_url ,
181+ }
182+
183+ @classmethod
184+ @override
185+ def deserialize (cls , data : dict ) -> RemoteAnisetteProvider :
186+ """See `BaseAnisetteProvider.deserialize`."""
187+ assert data ["type" ] == "aniRemote"
188+
189+ server_url = data ["url" ]
190+
191+ return cls (server_url )
192+
180193 @property
181194 @override
182195 def otp (self ) -> str :
@@ -219,22 +232,101 @@ async def close(self) -> None:
219232 await self ._http .close ()
220233
221234
222- # TODO(malmeloo): implement using pyprovision
223- # https://github.com/malmeloo/FindMy.py/issues/2
224235class LocalAnisetteProvider (BaseAnisetteProvider ):
225- """Anisette provider. Generates headers without a remote server using pyprovision."""
236+ """Anisette provider. Generates headers without a remote server using the `anisette` library."""
237+
238+ def __init__ (
239+ self ,
240+ * ,
241+ state_blob : BinaryIO | None = None ,
242+ libs_path : str | Path | None = None ,
243+ ) -> None :
244+ """Initialize the provider."""
245+ super ().__init__ ()
246+
247+ if isinstance (libs_path , str ):
248+ libs_path = Path (libs_path )
249+
250+ if libs_path is None or not libs_path .is_file ():
251+ logger .info (
252+ "The Anisette engine will download libraries required for operation, "
253+ "this may take a few seconds..." ,
254+ )
255+ logger .info (
256+ "To speed up future local Anisette initializations, "
257+ "provide a filesystem path to load the libraries from." ,
258+ )
259+
260+ files : list [BinaryIO | Path ] = []
261+ if state_blob is not None :
262+ files .append (state_blob )
263+ if libs_path is not None and libs_path .exists ():
264+ files .append (libs_path )
265+
266+ self ._ani = Anisette .load (* files )
267+ self ._ani_data : AnisetteHeaders | None = None
268+ self ._libs_path : Path | None = libs_path
269+
270+ if libs_path is not None :
271+ self ._ani .save_libs (libs_path )
272+ if state_blob is not None and not self ._ani .is_provisioned :
273+ logger .warning (
274+ "The Anisette state that was loaded has not yet been provisioned. "
275+ "Was the previous session saved properly?" ,
276+ )
277+
278+ @override
279+ def serialize (self ) -> dict :
280+ """See `BaseAnisetteProvider.serialize`."""
281+ with BytesIO () as buf :
282+ self ._ani .save_provisioning (buf )
283+ prov_data = base64 .b64encode (buf .getvalue ()).decode ("utf-8" )
284+
285+ return {
286+ "type" : "aniLocal" ,
287+ "prov_data" : prov_data ,
288+ }
289+
290+ @classmethod
291+ @override
292+ def deserialize (cls , data : dict , libs_path : str | Path | None = None ) -> LocalAnisetteProvider :
293+ """See `BaseAnisetteProvider.deserialize`."""
294+ assert data ["type" ] == "aniLocal"
295+
296+ state_blob = BytesIO (base64 .b64decode (data ["prov_data" ]))
297+
298+ return cls (state_blob = state_blob , libs_path = libs_path )
299+
300+ @override
301+ async def get_headers (
302+ self ,
303+ user_id : str ,
304+ device_id : str ,
305+ serial : str = "0" ,
306+ with_client_info : bool = False ,
307+ ) -> dict [str , str ]:
308+ """See `BaseAnisetteProvider.get_headers`_."""
309+ self ._ani_data = self ._ani .get_data ()
310+
311+ return await super ().get_headers (user_id , device_id , serial , with_client_info )
226312
227313 @property
228314 @override
229315 def otp (self ) -> str :
230316 """See `BaseAnisetteProvider.otp`_."""
231- raise NotImplementedError
317+ machine = (self ._ani_data or {}).get ("X-Apple-I-MD" )
318+ if machine is None :
319+ logger .warning ("X-Apple-I-MD header not found! Returning fallback..." )
320+ return machine or ""
232321
233322 @property
234323 @override
235324 def machine (self ) -> str :
236325 """See `BaseAnisetteProvider.machine`_."""
237- raise NotImplementedError
326+ machine = (self ._ani_data or {}).get ("X-Apple-I-MD-M" )
327+ if machine is None :
328+ logger .warning ("X-Apple-I-MD-M header not found! Returning fallback..." )
329+ return machine or ""
238330
239331 @override
240332 async def close (self ) -> None :
0 commit comments