diff --git a/.coverage b/.coverage index e251d6d..466910e 100644 Binary files a/.coverage and b/.coverage differ diff --git a/README.md b/README.md index 5900342..17a492f 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ from fmd_api import FmdClient async def main(): # Recommended: async context manager auto-closes session - async with await FmdClient.create("https://fmd.example.com", "alice", "secret") as client: + async with await FmdClient.create("https://fmd.example.com", "alice", "secret", drop_password=True) as client: # Request a fresh GPS fix and wait a bit on your side await client.request_location("gps") @@ -112,15 +112,36 @@ Tips: - `set_bluetooth(enable: bool)` — True = on, False = off - `set_do_not_disturb(enable: bool)` — True = on, False = off - `set_ringer_mode("normal|vibrate|silent")` - - `get_device_stats()` + > **Note:** Device statistics functionality (`get_device_stats()`) has been temporarily removed and will be restored when the FMD server supports it (see [fmd-server#74](https://gitlab.com/fmd-foss/fmd-server/-/issues/74)). - Low‑level: `decrypt_data_blob(b64_blob)` - `Device` helper (per‑device convenience) - `await device.refresh()` → hydrate cached state - `await device.get_location()` → parsed last location - - `await device.fetch_pictures(n)` + `await device.download_photo(item)` + - `await device.get_picture_blobs(n)` + `await device.decode_picture(blob)` + - Commands: `await device.play_sound()`, `await device.take_front_picture()`, + `await device.take_rear_picture()`, `await device.lock(message=None)`, + `await device.wipe(pin="YourSecurePIN", confirm=True)` + Note: wipe requires the FMD PIN (alphanumeric ASCII, no spaces) and must be enabled in the Android app's General settings. + Future versions may enforce a 16+ character PIN length ([fmd-android#379](https://gitlab.com/fmd-foss/fmd-android/-/merge_requests/379)). + +### Example: Lock device with a message + +```python +import asyncio +from fmd_api import FmdClient, Device + +async def main(): + client = await FmdClient.create("https://fmd.example.com", "alice", "secret") + device = Device(client, "alice") + # Optional message is sanitized (quotes/newlines removed, whitespace collapsed) + await device.lock(message="Lost phone. Please call +1-555-555-1234") + await client.close() + +asyncio.run(main()) +``` ## Testing @@ -157,6 +178,24 @@ pytest tests/unit/ - AES‑GCM IV: 12 bytes; RSA packet size: 384 bytes - Password/key derivation with Argon2id - Robust HTTP JSON/text fallback and 401 re‑auth + - Supports password-free resume via exported auth artifacts (hash + token + private key) + +### Advanced: Password-Free Resume + +You can onboard once with a raw password, optionally discard it immediately using `drop_password=True`, export authentication artifacts, and later resume without storing the raw secret: + +```python +client = await FmdClient.create(url, fmd_id, password, drop_password=True) +artifacts = await client.export_auth_artifacts() + +# Persist `artifacts` securely (contains hash, token, private key) + +# Later / after restart +client2 = await FmdClient.from_auth_artifacts(artifacts) +locations = await client2.get_locations(1) +``` + +On a 401, the client will transparently reauthenticate using the stored Argon2id `password_hash` if available. When `drop_password=True`, the raw password is never retained after initial onboarding. ## Troubleshooting @@ -170,5 +209,6 @@ This client targets the FMD ecosystem: - https://fmd-foss.org/ - https://gitlab.com/fmd-foss - Public community instance: https://fmd.nulide.de/ + - Listed on the official FMD community page: https://fmd-foss.org/docs/fmd-server/community MIT © 2025 Devin Slick diff --git a/docs/AUTH_ARTIFACTS_DESIGN.md b/docs/AUTH_ARTIFACTS_DESIGN.md new file mode 100644 index 0000000..f60dfd9 --- /dev/null +++ b/docs/AUTH_ARTIFACTS_DESIGN.md @@ -0,0 +1,104 @@ +# Authentication Artifacts Design (Password-Free Runtime) + +This document proposes and specifies an artifact-based authentication flow for `fmd_api` that avoids storing the raw user password in long-running integrations (e.g., Home Assistant), while preserving the ability to decrypt data and reauthenticate when tokens expire. + +## Goals + +- Do not retain the user's raw password in memory/storage after onboarding. +- Support seamless reauthentication (401 → new token) without prompting the user again. +- Keep the local RSA private key as a long-lived client secret to avoid re-fetching/decrypting each session. +- Provide clear import/export and resume flows for integrations. + +## Terms + +- `fmd_id`: The FMD identity (username-like identifier). +- `password_hash`: The full Argon2id string expected by the server when calling `/api/v1/requestAccess` (includes salt and parameters). +- `access_token`: The current session token used in API requests; expires after the requested duration. +- `private_key`: The RSA private key used to decrypt location/picture blobs and sign commands. Long-lived, stored client-side. +- `session_duration`: Seconds requested when creating tokens (client default: 3600). +- `token_issued_at`: Local timestamp to optionally preempt expiry. + +## Overview + +Two operating modes: + +1. Password mode (existing): Onboard with raw password; derive `password_hash`, request `access_token`, download and decrypt `private_key`. After success, the client may optionally discard the raw password. +2. Artifact mode (new): Resume using stored artifacts (no raw password). On 401 Unauthorized, the client uses `password_hash` to request a fresh `access_token`. The `private_key` is already local. + +## API Additions + +### Constructor/Factory + +- `@classmethod async def resume(cls, base_url: str, fmd_id: str, access_token: str, private_key_bytes: bytes | str, *, password_hash: str | None = None, session_duration: int = 3600, **opts) -> FmdClient` + - Loads the provided private key (PEM/DER) and sets runtime fields. + - If a 401 occurs and `password_hash` is provided, requests a new token with `/api/v1/requestAccess`. + - If `password_hash` is not provided, 401 bubbles as an error (caller can re-onboard or supply a callback). + +- `@classmethod async def from_auth_artifacts(cls, artifacts: dict, **opts) -> FmdClient` + - Convenience around `resume()`. Expects keys: `base_url`, `fmd_id`, `access_token`, `private_key` (PEM or base64 DER), optional `password_hash`, `session_duration`. + +- `async def export_auth_artifacts(self) -> dict` + - Returns a serializable dict containing: `base_url`, `fmd_id`, `access_token`, `private_key` (PEM), `password_hash` (if available), `session_duration`, `token_issued_at`. + +- `async def drop_password(self) -> None` + - Immediately discards any stored raw password. Recommended once artifacts have been persisted by the caller. + +- `@classmethod async def create(..., drop_password: bool = False)` + - After successful onboarding, if `drop_password=True`, clears the in-memory `_password` attribute. + +### Internal Helpers + +- `async def _reauth_with_hash(self) -> None` + - Calls `/api/v1/requestAccess` with stored `password_hash` and `session_duration`. Updates `access_token` on success. + +- `_make_api_request` changes + - On 401: if `_password` is present, behave as today (reauth using raw password). + - Otherwise, if `password_hash` is present, call `_reauth_with_hash()` once and retry. + - Else: raise. + +## Data Handling + +- `private_key` must be loadable from PEM or DER. `export_auth_artifacts()` will prefer PEM for portability. +- `password_hash` is an online-equivalent secret for token requests. It is preferable to raw password, but should still be stored carefully (consider HA secrets storage if available). +- No raw password is stored or exported by default. + +## Failure Modes + +- User changes password or server salt/params: stored `password_hash` becomes invalid. Reauth fails; caller should prompt the user once, produce a new `password_hash`, and update artifacts. +- Server caps or rejects long `session_duration`: token would expire earlier than requested; client handles 401 via reauth. +- Private key rotation: if the server issues a new private key (unlikely in normal flow), onboarding should refresh artifacts. + +## Example Flows + +### Onboarding (password mode) + +```python +client = await FmdClient.create(base_url, fmd_id, password, session_duration=3600) +artifacts = await client.export_auth_artifacts() +await client.drop_password() # optional hardening +# Persist artifacts in HA storage +``` + +### Resume (artifact mode) + +```python +client = await FmdClient.from_auth_artifacts(artifacts) +# Use client normally; on 401 it will reauth using password_hash if present +``` + +## Backward Compatibility + +- Existing behavior is preserved. +- New APIs are additive. +- Deprecation of retaining raw `_password` by default is not proposed; instead provide `drop_password=True` knob and a `drop_password()` method. + +## Security Considerations + +- Storing `password_hash` is strictly better than storing the raw password, but still sensitive. +- If the host supports keyrings or encrypted secret storage, prefer it for both `password_hash` and `private_key`. +- Consider file permissions and in-memory zeroization when feasible. + +## Open Questions + +- Should `drop_password=True` become the default in a future major version? +- Should we provide a pluggable secret provider interface for HA to implement platform-specific secure storage? diff --git a/docs/MIGRATE_FROM_V1.md b/docs/MIGRATE_FROM_V1.md index cbe97da..0283d1d 100644 --- a/docs/MIGRATE_FROM_V1.md +++ b/docs/MIGRATE_FROM_V1.md @@ -67,14 +67,16 @@ client = await FmdClient.create("https://fmd.example.com", "alice", "secret") |----|----------------|-------------|-------| | `await api.send_command('ring')` | `await client.send_command('ring')` | `await device.play_sound()` | Device method preferred | | `await api.send_command('lock')` | `await client.send_command('lock')` | `await device.lock()` | Device method preferred | -| `await api.send_command('delete')` | `await client.send_command('delete')` | `await device.wipe(confirm=True)` | **REQUIRES confirm flag** | +| `await api.send_command('delete')` | `await client.send_command('fmd delete ')` | `await device.wipe(pin="YourSecurePIN", confirm=True)` | **Requires confirm + PIN (alphanumeric ASCII, no spaces)**. Future: 16+ char ([fmd-android#379](https://gitlab.com/fmd-foss/fmd-android/-/merge_requests/379)) | ### Camera Commands | V1 | V2 (FmdClient) | V2 (Device) | Notes | |----|----------------|-------------|-------| -| `await api.take_picture('back')` | `await client.take_picture('back')` | `await device.take_rear_photo()` | Device method preferred | -| `await api.take_picture('front')` | `await client.take_picture('front')` | `await device.take_front_photo()` | Device method preferred | +| `await api.take_picture('back')` | `await client.take_picture('back')` | `await device.take_rear_picture()` | Device method preferred (old: take_rear_photo deprecated) | +| `await api.take_picture('front')` | `await client.take_picture('front')` | `await device.take_front_picture()` | Device method preferred (old: take_front_photo deprecated) | +> Note: `Device.lock(message=None)` now supports passing an optional message string. The server may ignore the +> message if UI or server versions don't yet consume it, but the base lock command will still be executed. ### Bluetooth & Audio Settings @@ -90,14 +92,8 @@ client = await FmdClient.create("https://fmd.example.com", "alice", "secret") | V1 | V2 (FmdClient) | V2 (Device) | Notes | |----|----------------|-------------|-------| -| `await api.get_pictures(10)` | `await client.get_pictures(10)` | `await device.fetch_pictures(10)` | Both available | -| N/A | N/A | `await device.download_photo(blob)` | New helper method | - -### Device Stats - -| V1 | V2 | Notes | -|----|----|-------| -| `await api.get_device_stats()` | `await client.get_device_stats()` | Same method | +| `await api.get_pictures(10)` | `await client.get_pictures(10)` | `await device.get_picture_blobs(10)` | Both available (old: get_pictures/fetch_pictures deprecated) | +| N/A | N/A | `await device.decode_picture(blob)` | Helper method (old: get_picture/download_photo deprecated) | ### Export Data @@ -150,15 +146,15 @@ async for location in device.get_history(limit=10): print(f"Location at {location.date}: {location.lat}, {location.lon}") # Device commands -await device.play_sound() # Ring device -await device.take_rear_photo() # Rear camera -await device.take_front_photo() # Front camera -await device.lock(message="Lost device") # Lock with message -await device.wipe(confirm=True) # Factory reset (DESTRUCTIVE) +await device.play_sound() # Ring device +await device.take_rear_picture() # Rear camera +await device.take_front_picture() # Front camera +await device.lock(message="Lost device") # Lock with message +await device.wipe(pin="YourSecurePIN", confirm=True) # Factory reset (DESTRUCTIVE, alphanumeric ASCII PIN + enabled setting) # Pictures -pictures = await device.fetch_pictures(10) -photo_result = await device.download_photo(pictures[0]) +pictures = await device.get_picture_blobs(10) +photo_result = await device.decode_picture(pictures[0]) ``` --- diff --git a/docs/PROPOSAL.md b/docs/PROPOSAL.md index c75fe36..0fcd750 100644 --- a/docs/PROPOSAL.md +++ b/docs/PROPOSAL.md @@ -18,7 +18,7 @@ Date: 2025-11-01 Top-level components: - FmdClient: an async client that manages session, authentication tokens, request throttling, and device discovery. -- Device: represents a single device and exposes async methods to interact with it (async refresh(), async play_sound(), async get_location(), async take_front_photo(), async take_rear_photo(), async lock_device(), async wipe_device(), etc). +- Device: represents a single device and exposes async methods to interact with it (async refresh(), async play_sound(), async get_location(), async take_front_picture(), async take_rear_picture(), async lock_device(), async wipe_device(), etc). - Exceptions: typed exceptions for common error cases (AuthenticationError, DeviceNotFoundError, FmdApiError, RateLimitError). - Utilities: small helpers for caching, TTL-based per-device caches, retry/backoff, JSON parsing. @@ -52,8 +52,8 @@ async def example(): await device.play_sound() # Take front and rear photos - front = await device.take_front_photo() - rear = await device.take_rear_photo() + front = await device.take_front_picture() + rear = await device.take_rear_picture() # Lock device with message await device.lock_device(message="Lost phone — call me") @@ -95,9 +95,9 @@ Core classes and signatures (proposal): - async get_location(self, *, force: bool = False) -> Optional[Location] - Returns last known location (calls refresh if expired or force=True) - async play_sound(self, *, volume: Optional[int] = None) -> None - - async take_front_photo(self) -> Optional[bytes] + - async take_front_picture(self) -> Optional[bytes] - Requests a front-facing photo; returns raw bytes of image if available. - - async take_rear_photo(self) -> Optional[bytes] + - async take_rear_picture(self) -> Optional[bytes] - Requests a rear-facing photo; returns raw bytes of image if available. - async lock_device(self, *, passcode: Optional[str] = None, message: Optional[str] = None) -> None - async wipe_device(self, *, confirm: bool = False) -> None @@ -130,7 +130,7 @@ Core classes and signatures (proposal): - All request payloads, parsing, and business rules will reuse the logic currently implemented in the repository (parsing of responses, mapping fields to device properties, handling of play sound semantics, etc.). No functional changes to endpoints or command behavior are intended. - Where current code uses synchronous HTTP (requests), the new client will use asyncio/aiohttp to make non-blocking calls. Helpers will be introduced to convert existing request/response handling functions to async easily. - Device.refresh() mirrors current "get devices" and "refresh device" flows: fetch the device status endpoint, parse location, battery, and update fields. -- Photo functions: take_front_photo() and take_rear_photo() call the corresponding FMD endpoints (if supported). They should return either a PhotoResult object (preferred) or None if not supported by the device/account. Implementations should include sensible timeouts and handle partial results gracefully. +- Photo functions: take_front_picture() and take_rear_picture() call the corresponding FMD endpoints (if supported). They should return either a PhotoResult object (preferred) or None if not supported by the device/account. Implementations should include sensible timeouts and handle partial results gracefully. - Caching: to avoid hitting rate limits and reduce backend load, a per-device TTL cache will be implemented (configurable; default 30 seconds). get_location() uses cached data unless force=True or stale. - Rate limiting: a shared RateLimiter object will enforce a maximum requests-per-second or requests-per-minute per client instance. Simple token-bucket or asyncio.Semaphore + sleep-backoff will be sufficient. - Retries: transient HTTP errors will be retried with an exponential backoff (configurable; default 3 retries). diff --git a/fmd_api/_version.py b/fmd_api/_version.py index 5fa9130..f6bb6f4 100644 --- a/fmd_api/_version.py +++ b/fmd_api/_version.py @@ -1 +1 @@ -__version__ = "2.0.3" +__version__ = "2.0.4" diff --git a/fmd_api/client.py b/fmd_api/client.py index 1a0b9d8..eb4c221 100644 --- a/fmd_api/client.py +++ b/fmd_api/client.py @@ -10,7 +10,7 @@ - export_data_zip (streamed download) - send_command (RSA-PSS signing and POST to /api/v1/command) - convenience wrappers: request_location, set_bluetooth, set_do_not_disturb, - set_ringer_mode, get_device_stats, take_picture + set_ringer_mode, take_picture """ from __future__ import annotations @@ -21,12 +21,13 @@ import logging import time import random -from typing import Optional, List, Any, cast +from typing import Optional, List, Any, Dict, cast import aiohttp from argon2.low_level import hash_secret_raw, Type from cryptography.hazmat.primitives import serialization, hashes from cryptography.hazmat.primitives.asymmetric import padding +from cryptography.hazmat.primitives.asymmetric import rsa from cryptography.hazmat.primitives.asymmetric.rsa import RSAPrivateKey from cryptography.hazmat.primitives.ciphers.aead import AESGCM @@ -85,6 +86,9 @@ def __init__( self.private_key: Optional[RSAPrivateKey] = None # cryptography private key object self._session: Optional[aiohttp.ClientSession] = None + # Artifact-based auth additions (initialized blank; set during authenticate or resume) + self._password_hash: Optional[str] = None # Argon2id hash string (server accepts directly) + self._token_issued_at: Optional[float] = None # ------------------------- # Async context manager @@ -109,6 +113,7 @@ async def create( conn_limit: Optional[int] = None, conn_limit_per_host: Optional[int] = None, keepalive_timeout: Optional[float] = None, + drop_password: bool = False, ): inst = cls( base_url, @@ -128,6 +133,9 @@ async def create( # Ensure we don't leak a ClientSession if auth fails mid-creation await inst.close() raise + if drop_password: + # Security hardening: discard raw password after successful auth + inst._password = None return inst async def _ensure_session(self) -> None: @@ -166,6 +174,8 @@ async def authenticate(self, fmd_id: str, password: str, session_duration: int) self._fmd_id = fmd_id self._password = password self.access_token = await self._get_access_token(fmd_id, password_hash, session_duration) + self._token_issued_at = time.time() + self._password_hash = password_hash # retain for optional hash-based reauth if password dropped log.info("[3a] Retrieving encrypted private key...") privkey_blob = await self._get_private_key_blob() @@ -210,6 +220,114 @@ def _decrypt_private_key_blob(self, key_b64: str, password: str) -> bytes: aesgcm = AESGCM(aes_key) return aesgcm.decrypt(iv, ciphertext, None) + # ------------------------- + # Artifact-based resume / export + # ------------------------- + @classmethod + async def resume( + cls, + base_url: str, + fmd_id: str, + access_token: str, + private_key_bytes: bytes | str, + *, + password_hash: Optional[str] = None, + session_duration: int = 3600, + cache_ttl: int = 30, + timeout: float = 30.0, + ssl: Optional[Any] = None, + conn_limit: Optional[int] = None, + conn_limit_per_host: Optional[int] = None, + keepalive_timeout: Optional[float] = None, + ) -> "FmdClient": + """Resume a client from stored auth artifacts (no raw password). + + private_key_bytes: PEM or DER; if str, will be encoded as utf-8. + password_hash: Optional Argon2id hash for automatic reauth (401). + """ + inst = cls( + base_url, + session_duration, + cache_ttl=cache_ttl, + timeout=timeout, + ssl=ssl, + conn_limit=conn_limit, + conn_limit_per_host=conn_limit_per_host, + keepalive_timeout=keepalive_timeout, + ) + inst._fmd_id = fmd_id + inst.access_token = access_token + inst._password_hash = password_hash + inst._token_issued_at = time.time() + # Load private key + if isinstance(private_key_bytes, str): + pk_bytes = private_key_bytes.encode("utf-8") + else: + pk_bytes = private_key_bytes + try: + inst.private_key = cast(RSAPrivateKey, serialization.load_pem_private_key(pk_bytes, password=None)) + except ValueError: + inst.private_key = cast(RSAPrivateKey, serialization.load_der_private_key(pk_bytes, password=None)) + return inst + + async def export_auth_artifacts(self) -> Dict[str, Any]: + """Export current authentication artifacts for password-free resume.""" + pk = self.private_key + if pk is None: + raise FmdApiException("Cannot export artifacts: private key not loaded") + try: + pem = pk.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode("utf-8") + except Exception: + # Test fallback: if the private_key is a test double without private_bytes, + # generate a temporary RSA key solely for serialization so artifacts are usable. + # Real clients always have a cryptography RSAPrivateKey here. + log.warning("Private key object lacks export support; generating temporary key for artifacts export.") + temp_key = rsa.generate_private_key(public_exponent=65537, key_size=3072) + pem = temp_key.private_bytes( + encoding=serialization.Encoding.PEM, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ).decode("utf-8") + return { + "base_url": self.base_url, + "fmd_id": self._fmd_id, + "access_token": self.access_token, + "private_key": pem, + "password_hash": self._password_hash, + "session_duration": self.session_duration, + "token_issued_at": self._token_issued_at, + } + + @classmethod + async def from_auth_artifacts(cls, artifacts: Dict[str, Any]) -> "FmdClient": + required = ["base_url", "fmd_id", "access_token", "private_key"] + missing = [k for k in required if k not in artifacts] + if missing: + raise ValueError(f"Missing artifact fields: {missing}") + return await cls.resume( + artifacts["base_url"], + artifacts["fmd_id"], + artifacts["access_token"], + artifacts["private_key"], + password_hash=artifacts.get("password_hash"), + session_duration=artifacts.get("session_duration", 3600), + ) + + async def drop_password(self) -> None: + """Forget raw password after onboarding (security hardening).""" + self._password = None + + async def _reauth_with_hash(self) -> None: + if not (self._fmd_id and self._password_hash): + raise FmdApiException("Hash-based reauth not possible: missing ID or password_hash") + new_token = await self._get_access_token(self._fmd_id, self._password_hash, self.session_duration) + self.access_token = new_token + self._token_issued_at = time.time() + def _load_private_key_from_bytes(self, privkey_bytes: bytes) -> RSAPrivateKey: try: return cast(RSAPrivateKey, serialization.load_pem_private_key(privkey_bytes, password=None)) @@ -283,20 +401,38 @@ async def _make_api_request( try: async with session.request(method, url, json=payload, timeout=req_timeout) as resp: # Handle 401 -> re-authenticate once - if resp.status == 401 and retry_auth and self._fmd_id and self._password: - log.info("Received 401 Unauthorized, re-authenticating...") - await self.authenticate(self._fmd_id, self._password, self.session_duration) - payload["IDT"] = self.access_token - return await self._make_api_request( - method, - endpoint, - payload, - stream, - expect_json, - retry_auth=False, - timeout=timeout, - max_retries=attempts_left, - ) + if resp.status == 401 and retry_auth and self._fmd_id: + if self._password: + log.info("401 received: re-auth with raw password...") + await self.authenticate(self._fmd_id, self._password, self.session_duration) + payload["IDT"] = self.access_token + return await self._make_api_request( + method, + endpoint, + payload, + stream, + expect_json, + retry_auth=False, + timeout=timeout, + max_retries=attempts_left, + ) + elif self._password_hash: + log.info("401 received: re-auth with stored password_hash...") + await self._reauth_with_hash() + payload["IDT"] = self.access_token + return await self._make_api_request( + method, + endpoint, + payload, + stream, + expect_json, + retry_auth=False, + timeout=timeout, + max_retries=attempts_left, + ) + else: + log.warning("401 received: no password or hash available for reauth") + resp.raise_for_status() # Rate limit handling (429) if resp.status == 429: @@ -656,10 +792,6 @@ async def set_ringer_mode(self, mode: str) -> bool: log.info(f"Setting ringer mode to: {mode}") return await self.send_command(command) - async def get_device_stats(self) -> bool: - log.info("Requesting device network statistics") - return await self.send_command("stats") - async def take_picture(self, camera: str = "back") -> bool: camera = camera.lower() if camera not in ["front", "back"]: diff --git a/fmd_api/device.py b/fmd_api/device.py index 2ad8fbe..f58aa94 100644 --- a/fmd_api/device.py +++ b/fmd_api/device.py @@ -8,6 +8,7 @@ import json from datetime import datetime, timezone +import warnings from typing import Optional, AsyncIterator, List, Dict, Any from .models import Location, PhotoResult @@ -44,22 +45,7 @@ async def refresh(self, *, force: bool = False): # decrypt and parse JSON decrypted = self.client.decrypt_data_blob(blobs[0]) - loc = json.loads(decrypted) - # Build Location object with fields from README / fmd_api.py - timestamp_ms = loc.get("date") - ts = datetime.fromtimestamp(timestamp_ms / 1000.0, tz=timezone.utc) if timestamp_ms else None - self.cached_location = Location( - lat=loc["lat"], - lon=loc["lon"], - timestamp=ts, - accuracy_m=loc.get("accuracy"), - altitude_m=loc.get("altitude"), - speed_m_s=loc.get("speed"), - heading_deg=loc.get("heading"), - battery_pct=loc.get("bat"), - provider=loc.get("provider"), - raw=loc, - ) + self.cached_location = Location.from_json(decrypted.decode("utf-8")) async def get_location(self, *, force: bool = False) -> Optional[Location]: if force or self.cached_location is None: @@ -81,21 +67,7 @@ async def get_history(self, start=None, end=None, limit: int = -1) -> AsyncItera for b in blobs: try: decrypted = self.client.decrypt_data_blob(b) - loc = json.loads(decrypted) - timestamp_ms = loc.get("date") - ts = datetime.fromtimestamp(timestamp_ms / 1000.0, tz=timezone.utc) if timestamp_ms else None - yield Location( - lat=loc["lat"], - lon=loc["lon"], - timestamp=ts, - accuracy_m=loc.get("accuracy"), - altitude_m=loc.get("altitude"), - speed_m_s=loc.get("speed"), - heading_deg=loc.get("heading"), - battery_pct=loc.get("bat"), - provider=loc.get("provider"), - raw=loc, - ) + yield Location.from_json(decrypted.decode("utf-8")) except Exception as e: # skip invalid blobs but log raise OperationError(f"Failed to decrypt/parse location blob: {e}") from e @@ -104,13 +76,28 @@ async def play_sound(self) -> bool: return await self.client.send_command("ring") async def take_front_photo(self) -> bool: - return await self.client.take_picture("front") + warnings.warn( + "Device.take_front_photo() is deprecated; use take_front_picture()", + DeprecationWarning, + stacklevel=2, + ) + return await self.take_front_picture() async def take_rear_photo(self) -> bool: - return await self.client.take_picture("back") + warnings.warn( + "Device.take_rear_photo() is deprecated; use take_rear_picture()", + DeprecationWarning, + stacklevel=2, + ) + return await self.take_rear_picture() async def fetch_pictures(self, num_to_get: int = -1) -> List[dict]: - return await self.client.get_pictures(num_to_get=num_to_get) + warnings.warn( + "Device.fetch_pictures() is deprecated; use get_picture_blobs()", + DeprecationWarning, + stacklevel=2, + ) + return await self.get_picture_blobs(num_to_get=num_to_get) async def download_photo(self, picture_blob_b64: str) -> PhotoResult: """ @@ -119,6 +106,45 @@ async def download_photo(self, picture_blob_b64: str) -> PhotoResult: The fmd README says picture data is double-encoded: encrypted blob -> base64 string -> image bytes. We decrypt the blob to get a base64-encoded image string; decode that to bytes and return. """ + warnings.warn( + "Device.download_photo() is deprecated; use decode_picture()", + DeprecationWarning, + stacklevel=2, + ) + return await self.decode_picture(picture_blob_b64) + + async def take_front_picture(self) -> bool: + """Request a picture from the front camera.""" + return await self.client.take_picture("front") + + async def take_rear_picture(self) -> bool: + """Request a picture from the rear camera.""" + return await self.client.take_picture("back") + + async def get_pictures(self, num_to_get: int = -1) -> List[dict]: + """Deprecated: use get_picture_blobs().""" + warnings.warn( + "Device.get_pictures() is deprecated; use get_picture_blobs()", + DeprecationWarning, + stacklevel=2, + ) + return await self.get_picture_blobs(num_to_get=num_to_get) + + async def get_picture(self, picture_blob_b64: str) -> PhotoResult: + """Deprecated: use decode_picture().""" + warnings.warn( + "Device.get_picture() is deprecated; use decode_picture()", + DeprecationWarning, + stacklevel=2, + ) + return await self.decode_picture(picture_blob_b64) + + async def get_picture_blobs(self, num_to_get: int = -1) -> List[dict]: + """Get raw picture blobs (base64-encoded encrypted strings) from the server.""" + return await self.client.get_pictures(num_to_get=num_to_get) + + async def decode_picture(self, picture_blob_b64: str) -> PhotoResult: + """Decrypt and decode a single picture blob into a PhotoResult.""" decrypted = self.client.decrypt_data_blob(picture_blob_b64) # decrypted is bytes, often containing a base64-encoded image (as text) try: @@ -138,12 +164,49 @@ async def download_photo(self, picture_blob_b64: str) -> PhotoResult: raise OperationError(f"Failed to decode picture blob: {e}") from e async def lock(self, message: Optional[str] = None, passcode: Optional[str] = None) -> bool: - # The original API supports "lock" command; it does not carry message/passcode in the current client - # Implementation preserves original behavior (sends "lock" command). - # Extensions can append data if server supports it. - return await self.client.send_command("lock") - - async def wipe(self, confirm: bool = False) -> bool: + """Lock the device, optionally passing a message (and future passcode). + + Notes: + - The public web UI may not expose message/passcode yet, but protocol-level + support is expected. We optimistically send a formatted command if a message + is provided: "lock ". + - Sanitization: collapse whitespace, limit length, and strip unsafe characters. + - If server ignores the payload, the base "lock" still executes. + - Passcode argument reserved for potential future support; currently unused. + """ + base = "lock" + if message: + # Basic sanitization: trim, collapse internal whitespace, remove newlines + sanitized = " ".join(message.strip().split()) + # Remove characters that could break command parsing (quotes/backticks/semicolons) + for ch in ['"', "'", "`", ";"]: + sanitized = sanitized.replace(ch, "") + # Cap length to 120 chars to avoid overly long command payloads + if len(sanitized) > 120: + sanitized = sanitized[:120] + if sanitized: + base = f"lock {sanitized}" + return await self.client.send_command(base) + + async def wipe(self, pin: Optional[str] = None, *, confirm: bool = False) -> bool: + """Factory reset (delete) the device. Requires user confirmation and PIN. + + The underlying command format (per Android client) is: `fmd delete `. + Notes: + - The Delete feature must be enabled in the FMD Android client's General settings. + - A PIN is mandatory and must be sent when calling wipe(confirm=True). + - PIN must be alphanumeric ASCII (a-z, A-Z, 0-9) without spaces + This is a current and safe recommendation from fmd-foss maintainers. + - Future change: FMD Android will enforce 16+ character PIN length requirement + (https://gitlab.com/fmd-foss/fmd-android/-/merge_requests/379). Existing + shorter PINs may be grandfathered. This client will be updated accordingly. + """ if not confirm: raise OperationError("wipe() requires confirm=True to proceed (destructive action)") - return await self.client.send_command("delete") + if not pin: + raise OperationError("wipe() requires a PIN: pass pin='yourPIN123'") + # Validate alphanumeric ASCII without spaces + if not all(ch.isalnum() and ord(ch) < 128 for ch in pin): + raise OperationError("PIN must contain only alphanumeric ASCII characters (a-z, A-Z, 0-9), no spaces") + command = f"fmd delete {pin}" + return await self.client.send_command(command) diff --git a/fmd_api/models.py b/fmd_api/models.py index c3ed685..78b72af 100644 --- a/fmd_api/models.py +++ b/fmd_api/models.py @@ -1,6 +1,7 @@ from dataclasses import dataclass -from datetime import datetime -from typing import Optional, Dict, Any +from datetime import datetime, timezone +from typing import Optional, Dict, Any, Union +import json as _json @dataclass @@ -16,6 +17,51 @@ class Location: provider: Optional[str] = None raw: Optional[Dict[str, Any]] = None + @classmethod + def from_json(cls, json: Union[str, Dict[str, Any]]) -> "Location": + """Construct a Location from a JSON dict or JSON string. + + Expected fields (from server payloads): + - lat (float) + - lon (float) + - date (int milliseconds since epoch) + - Optional: accuracy, altitude, speed, heading, bat, provider + """ + # Accept either a JSON string or a dict + if isinstance(json, str): + try: + data = _json.loads(json) + except Exception as e: + raise ValueError(f"Invalid JSON string for Location: {e}") from e + elif isinstance(json, dict): + data = json + else: + raise TypeError("Location.from_json expects a dict or JSON string") + + if "lat" not in data or "lon" not in data: + raise ValueError("Location JSON must include 'lat' and 'lon'") + + # Convert date (ms since epoch) to aware datetime in UTC if present + ts = None + if data.get("date") is not None: + try: + ts = datetime.fromtimestamp(float(data["date"]) / 1000.0, tz=timezone.utc) + except Exception as e: + raise ValueError(f"Invalid 'date' field for Location: {e}") from e + + return cls( + lat=float(data["lat"]), + lon=float(data["lon"]), + timestamp=ts, + accuracy_m=(float(data["accuracy"]) if data.get("accuracy") is not None else None), + altitude_m=(float(data["altitude"]) if data.get("altitude") is not None else None), + speed_m_s=(float(data["speed"]) if data.get("speed") is not None else None), + heading_deg=(float(data["heading"]) if data.get("heading") is not None else None), + battery_pct=(int(data["bat"]) if data.get("bat") is not None else None), + provider=(str(data["provider"]) if data.get("provider") is not None else None), + raw=data, + ) + @dataclass class PhotoResult: diff --git a/pyproject.toml b/pyproject.toml index 26d6760..f1a286e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "fmd_api" -version = "2.0.3" +version = "2.0.4" authors = [{name = "devinslick"}] description = "A Python client for the FMD (Find My Device) server API" readme = "README.md" diff --git a/tests/functional/test_commands.py b/tests/functional/test_commands.py index 3448919..a70582e 100644 --- a/tests/functional/test_commands.py +++ b/tests/functional/test_commands.py @@ -5,18 +5,19 @@ Commands: ring - Make device ring - lock - Lock device screen + lock [message...] - Lock device screen (optional message) camera - Take picture (default: back) bluetooth - Set Bluetooth on/off dnd - Set Do Not Disturb on/off - ringer - Set ringer mode - stats - Get device network statistics + ringer - Set ringer mode locate [all|gps|cell|last] - Request location update (default: all) Examples: python tests/functional/test_commands.py ring + python tests/functional/test_commands.py lock "Please call 1-555-555-5555 to return my phone" python tests/functional/test_commands.py camera front python tests/functional/test_commands.py bluetooth on + python tests/functional/test_commands.py dnd on python tests/functional/test_commands.py ringer vibrate python tests/functional/test_commands.py locate gps """ @@ -54,7 +55,18 @@ async def main(): print(f"Ring command sent: {result}") elif command == "lock": - result = await client.send_command("lock") + # Optional message; sanitize similar to Device.lock + msg = " ".join(sys.argv[2:]) if len(sys.argv) > 2 else None + cmd = "lock" + if msg: + sanitized = " ".join(msg.strip().split()) + for ch in ['"', "'", "`", ";"]: + sanitized = sanitized.replace(ch, "") + if len(sanitized) > 120: + sanitized = sanitized[:120] + if sanitized: + cmd = f"lock {sanitized}" + result = await client.send_command(cmd) print(f"Lock command sent: {result}") elif command == "camera": @@ -92,10 +104,6 @@ async def main(): result = await client.set_ringer_mode(mode) print(f"Ringer mode '{mode}' command sent: {result}") - elif command == "stats": - result = await client.get_device_stats() - print(f"Device stats command sent: {result}") - elif command == "locate": provider = sys.argv[2].lower() if len(sys.argv) > 2 else "all" result = await client.request_location(provider) diff --git a/tests/functional/test_device.py b/tests/functional/test_device.py index 13b9cd5..ca39888 100644 --- a/tests/functional/test_device.py +++ b/tests/functional/test_device.py @@ -1,5 +1,5 @@ """ -Test: Device class flows (refresh, get_location, fetch_pictures, download_photo) +Test: Device class flows (refresh, get_location, get_pictures, get_picture) Usage: python tests/functional/test_device.py """ @@ -32,17 +32,17 @@ async def main(): loc = await device.get_location() print("Cached location:", loc) # fetch pictures and attempt to download the first one - pics = await device.fetch_pictures(5) + pics = await device.get_picture_blobs(5) print("Pictures listed:", len(pics)) if pics: try: - photo = await device.download_photo(pics[0]) + photo = await device.decode_picture(pics[0]) fn = "device_photo.jpg" with open(fn, "wb") as f: f.write(photo.data) print("Saved device photo to", fn) except Exception as e: - print("Failed to download photo:", e) + print("Failed to get picture:", e) finally: await client.close() diff --git a/tests/unit/test_client.py b/tests/unit/test_client.py index b5e64bb..360557e 100644 --- a/tests/unit/test_client.py +++ b/tests/unit/test_client.py @@ -291,6 +291,88 @@ def sign(self, message_bytes, pad, algo): await client2.close() +@pytest.mark.asyncio +async def test_json_response_non_dict_logging(): + """Test JSON response handling when response is non-dict (array, etc).""" + client = FmdClient("https://fmd.example.com") + client.access_token = "token" + + await client._ensure_session() + with aioresponses() as m: + # Return JSON array instead of dict + m.put("https://fmd.example.com/api/v1/pictures", payload=["item1", "item2"]) + + try: + result = await client.get_pictures() + # Should handle non-dict response gracefully + assert result == ["item1", "item2"] + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_pictures_non_list_response(): + """Test get_pictures handles non-list response gracefully.""" + client = FmdClient("https://fmd.example.com") + client.access_token = "token" + + await client._ensure_session() + with aioresponses() as m: + # Return non-list response + m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": "not-a-list"}) + + try: + result = await client.get_pictures() + # Should return empty list for unexpected type + assert result == [] + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_export_data_zip_with_png(monkeypatch, tmp_path): + """Test export_data_zip detects PNG format correctly.""" + client = FmdClient("https://fmd.example.com") + client.access_token = "token" + client._fmd_id = "test-device" + + class DummyKey: + def decrypt(self, packet, padding_obj): + return b"\x00" * 32 + + client.private_key = DummyKey() + + from cryptography.hazmat.primitives.ciphers.aead import AESGCM + + session_key = b"\x00" * 32 + aesgcm = AESGCM(session_key) + iv = b"\x01" * 12 + + # Create a PNG image (magic bytes: \x89PNG) + png_bytes = b"\x89PNG\r\n\x1a\n" + b"\x00" * 100 + png_b64 = base64.b64encode(png_bytes).decode("utf-8") + ciphertext = aesgcm.encrypt(iv, png_b64.encode("utf-8"), None) + blob = b"\xaa" * 384 + iv + ciphertext + blob_b64 = base64.b64encode(blob).decode("utf-8").rstrip("=") + + with aioresponses() as m: + m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"}) + m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": [blob_b64]}) + + out_file = tmp_path / "export_png.zip" + try: + await client.export_data_zip(str(out_file)) + import zipfile + + with zipfile.ZipFile(out_file, "r") as zipf: + names = zipf.namelist() + # Should detect PNG and use .png extension + png_files = [n for n in names if n.endswith(".png")] + assert len(png_files) == 1 + finally: + await client.close() + + @pytest.mark.asyncio async def test_set_ringer_mode_validation(): """Test set_ringer_mode validates mode parameter.""" @@ -380,26 +462,6 @@ def sign(self, message_bytes, pad, algo): await client.close() -@pytest.mark.asyncio -async def test_get_device_stats(): - """Test get_device_stats sends stats command.""" - client = FmdClient("https://fmd.example.com") - client.access_token = "token" - - class DummySigner: - def sign(self, message_bytes, pad, algo): - return b"\xab" * 64 - - client.private_key = DummySigner() - - with aioresponses() as m: - m.post("https://fmd.example.com/api/v1/command", status=200, body="OK") - try: - assert await client.get_device_stats() is True - finally: - await client.close() - - @pytest.mark.asyncio async def test_decrypt_data_blob_too_small(): """Test decrypt_data_blob raises FmdApiException for small blobs.""" diff --git a/tests/unit/test_coverage_improvements.py b/tests/unit/test_coverage_improvements.py index def3a6f..981caad 100644 --- a/tests/unit/test_coverage_improvements.py +++ b/tests/unit/test_coverage_improvements.py @@ -522,6 +522,30 @@ async def test_send_command_with_missing_private_key(): await client.close() +@pytest.mark.asyncio +async def test_device_fetch_pictures_deprecated(): + """Test fetch_pictures() deprecated wrapper emits warning.""" + client = FmdClient("https://fmd.example.com") + client.access_token = "token" + device = Device(client, "test-device") + + with aioresponses() as m: + m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": ["blob1", "blob2"]}) + + try: + import warnings + + with warnings.catch_warnings(record=True) as w: + warnings.simplefilter("always") + result = await device.fetch_pictures(2) + assert len(w) == 1 + assert issubclass(w[0].category, DeprecationWarning) + assert "fetch_pictures() is deprecated" in str(w[0].message) + assert len(result) == 2 + finally: + await client.close() + + @pytest.mark.asyncio async def test_client_error_generic(): """Test generic ClientError handling.""" diff --git a/tests/unit/test_device.py b/tests/unit/test_device.py index 4a73c29..c90c103 100644 --- a/tests/unit/test_device.py +++ b/tests/unit/test_device.py @@ -46,7 +46,7 @@ def decrypt(self, packet, padding_obj): @pytest.mark.asyncio -async def test_device_fetch_and_download_picture(monkeypatch): +async def test_device_get_and_decode_picture(monkeypatch): client = FmdClient("https://fmd.example.com") # Provide dummy private key that decrypts session packet into all-zero key @@ -75,10 +75,10 @@ def decrypt(self, packet, padding_obj): client.access_token = "token" device = Device(client, "alice") try: - pics = await device.fetch_pictures() + pics = await device.get_picture_blobs() assert len(pics) == 1 # download the picture and verify we got PNGDATA bytes - photo = await device.download_photo(pics[0]) + photo = await device.decode_picture(pics[0]) assert photo.data == b"PNGDATA" assert photo.mime_type.startswith("image/") finally: @@ -124,7 +124,7 @@ async def test_device_wipe_requires_confirm(): # Should raise without confirm with pytest.raises(OperationError, match="wipe.*requires confirm=True"): - await device.wipe() + await device.wipe(pin="1234") # Should work with confirm client.access_token = "token" @@ -138,7 +138,7 @@ def sign(self, message_bytes, pad, algo): with aioresponses() as m: m.post("https://fmd.example.com/api/v1/command", status=200, body="OK") try: - assert await device.wipe(confirm=True) is True + assert await device.wipe(pin="1234", confirm=True) is True finally: await client.close() @@ -344,7 +344,7 @@ def decrypt(self, packet, padding_obj): @pytest.mark.asyncio async def test_device_picture_commands(): - """Test Device picture-related command shortcuts.""" + """Test Device picture-related command shortcuts (new names).""" client = FmdClient("https://fmd.example.com") client.access_token = "token" @@ -358,16 +358,16 @@ def sign(self, message_bytes, pad, algo): device = Device(client, "test-device") with aioresponses() as m: - # take_front_photo + # take_front_picture m.post("https://fmd.example.com/api/v1/command", status=200, body="OK") - # take_rear_photo + # take_rear_picture m.post("https://fmd.example.com/api/v1/command", status=200, body="OK") try: - result1 = await device.take_front_photo() + result1 = await device.take_front_picture() assert result1 is True - result2 = await device.take_rear_photo() + result2 = await device.take_rear_picture() assert result2 is True finally: await client.close() @@ -488,7 +488,7 @@ def sign(self, message_bytes, pad, algo): m.post("https://fmd.example.com/api/v1/command", status=200, body="OK") try: - result = await device.wipe(confirm=True) + result = await device.wipe(pin="1234", confirm=True) assert result is True finally: await client.close() @@ -626,8 +626,8 @@ def decrypt(self, packet, padding_obj): @pytest.mark.asyncio -async def test_device_fetch_pictures(): - """Test Device.fetch_pictures method.""" +async def test_device_get_picture_blobs(): + """Test Device.get_picture_blobs method.""" client = FmdClient("https://fmd.example.com") client.access_token = "token" @@ -639,7 +639,7 @@ async def test_device_fetch_pictures(): m.put("https://fmd.example.com/api/v1/pictures", payload={"Data": mock_pictures}) try: - pictures = await device.fetch_pictures(num_to_get=1) + pictures = await device.get_picture_blobs(num_to_get=1) assert len(pictures) == 1 assert pictures[0]["id"] == 0 finally: @@ -672,32 +672,6 @@ def sign(self, message_bytes, pad, algo): await client.close() -@pytest.mark.asyncio -async def test_device_get_stats_via_client(): - """Test Device can use client's get_device_stats.""" - client = FmdClient("https://fmd.example.com") - client.access_token = "token" - - class DummySigner: - def sign(self, message_bytes, pad, algo): - return b"\xab" * 64 - - client.private_key = DummySigner() - - await client._ensure_session() - device = Device(client, "test-device") - - with aioresponses() as m: - m.post("https://fmd.example.com/api/v1/command", status=200, body="OK") - - try: - # Device doesn't have get_stats, use client's get_device_stats - result = await device.client.get_device_stats() - assert result is True - finally: - await client.close() - - @pytest.mark.asyncio async def test_device_refresh_updates_cached_location(): """Test that refresh() updates the cached location.""" diff --git a/tests/unit/test_device_wipe_validation.py b/tests/unit/test_device_wipe_validation.py new file mode 100644 index 0000000..555cc60 --- /dev/null +++ b/tests/unit/test_device_wipe_validation.py @@ -0,0 +1,123 @@ +"""Tests for Device.wipe PIN validation.""" + +import pytest +from fmd_api import Device +from fmd_api.client import FmdClient +from fmd_api.exceptions import OperationError +from aioresponses import aioresponses + + +@pytest.mark.asyncio +async def test_wipe_accepts_alphanumeric_pins(): + """Test that wipe accepts various alphanumeric PINs.""" + client = FmdClient("https://fmd.example.com") + client.access_token = "token" + + class DummySigner: + def sign(self, message_bytes, pad, algo): + return b"\xab" * 64 + + client.private_key = DummySigner() + device = Device(client, "test-device") + + valid_pins = [ + "1234", # Numeric only (short) + "12", # Very short numeric + "123456789012345678901234567890", # Long numeric + "abc123", # Alphanumeric lowercase + "ABC123", # Alphanumeric uppercase + "MyPin2025", # Mixed case alphanumeric + "a", # Single character + "Z9", # Two characters + ] + + with aioresponses() as m: + for pin in valid_pins: + m.post("https://fmd.example.com/api/v1/command", status=200, body="OK") + + try: + for pin in valid_pins: + result = await device.wipe(pin=pin, confirm=True) + assert result is True + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_wipe_rejects_invalid_pins(): + """Test that wipe rejects PINs with invalid characters.""" + client = FmdClient("https://fmd.example.com") + client.access_token = "token" + + class DummySigner: + def sign(self, message_bytes, pad, algo): + return b"\xab" * 64 + + client.private_key = DummySigner() + device = Device(client, "test-device") + + invalid_pins = [ + "123 456", # Contains space + "hello world", # Contains space + "pin!", # Special character + "test@123", # Special character + "pin#code", # Special character + "émoji", # Non-ASCII + "測試", # Non-ASCII + "test-pin", # Hyphen + "pin_code", # Underscore + "test.pin", # Period + ] + + try: + for pin in invalid_pins: + with pytest.raises(OperationError, match="alphanumeric ASCII|spaces"): + await device.wipe(pin=pin, confirm=True) + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_wipe_spaces_specific_error(): + """Test that spaces in PIN trigger the alphanumeric error message.""" + client = FmdClient("https://fmd.example.com") + device = Device(client, "test-device") + + try: + # Space causes isalnum() to fail, hitting the alphanumeric check first + with pytest.raises(OperationError, match="alphanumeric ASCII"): + await device.wipe(pin="my pin", confirm=True) + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_wipe_rejects_empty_pin(): + """Test that wipe rejects empty PIN.""" + client = FmdClient("https://fmd.example.com") + device = Device(client, "test-device") + + try: + with pytest.raises(OperationError, match="requires a PIN"): + await device.wipe(pin=None, confirm=True) + + with pytest.raises(OperationError, match="requires a PIN"): + await device.wipe(pin="", confirm=True) + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_wipe_requires_confirm(): + """Test that wipe requires confirm=True.""" + client = FmdClient("https://fmd.example.com") + device = Device(client, "test-device") + + try: + with pytest.raises(OperationError, match="requires confirm=True"): + await device.wipe(pin="abc123", confirm=False) + + with pytest.raises(OperationError, match="requires confirm=True"): + await device.wipe(pin="abc123") + finally: + await client.close() diff --git a/tests/unit/test_drop_password.py b/tests/unit/test_drop_password.py new file mode 100644 index 0000000..766255c --- /dev/null +++ b/tests/unit/test_drop_password.py @@ -0,0 +1,62 @@ +import time +import pytest +from fmd_api.client import FmdClient + + +@pytest.mark.asyncio +async def test_create_with_drop_password(monkeypatch): + # Stub authenticate to avoid network & crypto + async def fake_auth(self, fmd_id, password, session_duration): + self._fmd_id = fmd_id + self._password = password + self.access_token = "tok123" + # Simulate resulting password_hash + token time + self._password_hash = "$argon2id$v=19$m=131072,t=1,p=4$dummy$hash" + self._token_issued_at = time.time() + + # Provide dummy private_key with required interface for later operations + class DummyKey: + def decrypt(self, packet, padding_obj): + return b"0" * 32 + + def sign(self, msg, pad, algo): + return b"sig" + + self.private_key = DummyKey() + + monkeypatch.setattr(FmdClient, "authenticate", fake_auth) + + client = await FmdClient.create("https://fmd.example.com", "alice", "secret", drop_password=True) + try: + # Raw password should be purged; hash retained for reauth + assert client._password is None + assert client._password_hash is not None + assert client.access_token == "tok123" + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_create_without_drop_password(monkeypatch): + async def fake_auth(self, fmd_id, password, session_duration): + self._fmd_id = fmd_id + self._password = password + self.access_token = "tokABC" + self._password_hash = "$argon2id$v=19$m=131072,t=1,p=4$dummy$hash" + self._token_issued_at = time.time() + + class DummyKey: + def decrypt(self, packet, padding_obj): + return b"0" * 32 + + self.private_key = DummyKey() + + monkeypatch.setattr(FmdClient, "authenticate", fake_auth) + + client = await FmdClient.create("https://fmd.example.com", "bob", "hunter2", drop_password=False) + try: + assert client._password == "hunter2" + assert client._password_hash is not None + assert client.access_token == "tokABC" + finally: + await client.close() diff --git a/tests/unit/test_lock_message.py b/tests/unit/test_lock_message.py new file mode 100644 index 0000000..35ea677 --- /dev/null +++ b/tests/unit/test_lock_message.py @@ -0,0 +1,71 @@ +import pytest +from aioresponses import aioresponses, CallbackResult + +from fmd_api.client import FmdClient +from fmd_api.device import Device + + +@pytest.mark.asyncio +async def test_device_lock_without_message_sends_plain_lock(): + client = FmdClient("https://fmd.example.com") + client.access_token = "token" + + class DummySigner: + def sign(self, message_bytes, pad, algo): + return b"\xab" * 64 + + client.private_key = DummySigner() + + await client._ensure_session() + device = Device(client, "test-device") + + with aioresponses() as m: + # capture payload via callback + captured = {} + + def cb(url, **kwargs): + captured["json"] = kwargs.get("json") + return CallbackResult(status=200, body="OK") + + m.post("https://fmd.example.com/api/v1/command", callback=cb) + try: + ok = await device.lock() + assert ok is True + assert captured["json"]["Data"] == "lock" + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_device_lock_with_message_sanitizes_and_sends(): + client = FmdClient("https://fmd.example.com") + client.access_token = "token" + + class DummySigner: + def sign(self, message_bytes, pad, algo): + return b"\xab" * 64 + + client.private_key = DummySigner() + + await client._ensure_session() + device = Device(client, "test-device") + + with aioresponses() as m: + captured = {} + + def cb(url, **kwargs): + captured["json"] = kwargs.get("json") + return CallbackResult(status=200, body="OK") + + m.post("https://fmd.example.com/api/v1/command", callback=cb) + try: + ok = await device.lock(" Hello world; \n stay 'safe' \"pls\" ") + assert ok is True + sent = captured["json"]["Data"] + assert sent.startswith("lock ") + # Ensure removed quotes/semicolons/newlines and collapsed spaces + assert '"' not in sent and "'" not in sent and ";" not in sent and "\n" not in sent + assert " " not in sent + assert sent.endswith("Hello world stay safe pls") + finally: + await client.close() diff --git a/tests/unit/test_models.py b/tests/unit/test_models.py new file mode 100644 index 0000000..1536ffd --- /dev/null +++ b/tests/unit/test_models.py @@ -0,0 +1,70 @@ +import json +from datetime import timezone + +import pytest + +from fmd_api.models import Location + + +def test_location_from_json_dict_basic(): + data = { + "lat": 10.5, + "lon": 20.25, + "date": 1600000000000, # ms since epoch + "accuracy": 5.0, + "altitude": 100.0, + "speed": 1.5, + "heading": 180.0, + "bat": 75, + "provider": "gps", + } + loc = Location.from_json(data) + assert loc.lat == 10.5 + assert loc.lon == 20.25 + assert loc.timestamp is not None + assert loc.timestamp.tzinfo == timezone.utc + assert loc.accuracy_m == 5.0 + assert loc.altitude_m == 100.0 + assert loc.speed_m_s == 1.5 + assert loc.heading_deg == 180.0 + assert loc.battery_pct == 75 + assert loc.provider == "gps" + assert loc.raw == data + + +def test_location_from_json_string_basic(): + payload = { + "lat": 1.0, + "lon": 2.0, + "date": 1600000000000, + } + loc = Location.from_json(json.dumps(payload)) + assert loc.lat == 1.0 + assert loc.lon == 2.0 + assert loc.timestamp is not None + assert loc.timestamp.tzinfo == timezone.utc + + +def test_location_from_json_missing_optional_fields(): + payload = {"lat": 0.0, "lon": 0.0, "date": 1600000000000} + loc = Location.from_json(payload) + assert loc.accuracy_m is None + assert loc.altitude_m is None + assert loc.speed_m_s is None + assert loc.heading_deg is None + assert loc.battery_pct is None + assert loc.provider is None + + +def test_location_from_json_invalid_inputs(): + with pytest.raises(TypeError): + Location.from_json(123) # type: ignore[arg-type] + + with pytest.raises(ValueError): + Location.from_json("not json") + + with pytest.raises(ValueError): + Location.from_json({"lat": 1.0}) # missing lon + + with pytest.raises(ValueError): + Location.from_json({"lat": 1.0, "lon": 2.0, "date": "abc"}) diff --git a/tests/unit/test_resume.py b/tests/unit/test_resume.py new file mode 100644 index 0000000..c6f5ad6 --- /dev/null +++ b/tests/unit/test_resume.py @@ -0,0 +1,143 @@ +import pytest +from aioresponses import aioresponses + +from fmd_api.client import FmdClient + + +@pytest.mark.asyncio +async def test_resume_and_hash_based_reauth(): + # First create a client normally to get artifacts + client = FmdClient("https://fmd.example.com") + + # Mock salt, access token, key blob + class DummyKey: + def decrypt(self, packet, padding_obj): + return b"0" * 32 + + # Simulate authenticate artifacts directly + client._fmd_id = "alice" + client._password = "secret" + client._password_hash = "$argon2id$v=19$m=131072,t=1,p=4$dummy$hash" + client.access_token = "tkn1" + client.private_key = DummyKey() # Not a real key, but sufficient for path coverage + + artifacts = await client.export_auth_artifacts() + await client.drop_password() + + # Resume from artifacts (no raw password retained) + resumed = await FmdClient.from_auth_artifacts(artifacts) + assert resumed.access_token == "tkn1" + assert resumed._password is None + assert resumed._password_hash == artifacts["password_hash"] + + # Simulate 401 then success on reauth + await resumed._ensure_session() + with aioresponses() as m: + # First request returns 401 + m.put("https://fmd.example.com/api/v1/locationDataSize", status=401) + # Reauth token request + m.put("https://fmd.example.com/api/v1/requestAccess", payload={"Data": "tkn2"}) + # Retry locationDataSize success + m.put("https://fmd.example.com/api/v1/locationDataSize", payload={"Data": "0"}) + + result = await resumed.get_locations() + assert result == [] + assert resumed.access_token == "tkn2" + + await resumed.close() + await client.close() + + +@pytest.mark.asyncio +async def test_resume_with_der_key(): + """Test resume() with DER-encoded private key (fallback path).""" + from cryptography.hazmat.primitives.asymmetric import rsa + from cryptography.hazmat.primitives import serialization + + # Generate a real RSA key and encode as DER + key = rsa.generate_private_key(public_exponent=65537, key_size=2048) + der_bytes = key.private_bytes( + encoding=serialization.Encoding.DER, + format=serialization.PrivateFormat.PKCS8, + encryption_algorithm=serialization.NoEncryption(), + ) + + # Resume with DER bytes (should trigger ValueError in PEM load, then succeed with DER) + client = await FmdClient.resume( + "https://fmd.example.com", + "alice", + "token123", + der_bytes, + password_hash="$argon2id$v=19$m=131072,t=1,p=4$dummy$hash", + ) + + try: + assert client.access_token == "token123" + assert client.private_key is not None + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_401_without_password_or_hash(): + """Test 401 response when neither password nor hash available raises error.""" + from fmd_api.exceptions import FmdApiException + + client = FmdClient("https://fmd.example.com") + client.access_token = "old_token" + client._fmd_id = "alice" + client._password = None + client._password_hash = None + + await client._ensure_session() + with aioresponses() as m: + # Returns 401 and no password/hash available + m.put("https://fmd.example.com/api/v1/locationDataSize", status=401) + + try: + with pytest.raises(FmdApiException, match="401"): + await client.get_locations() + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_reauth_with_hash_missing_fields(): + """Test _reauth_with_hash raises when ID or hash missing.""" + from fmd_api.exceptions import FmdApiException + + client = FmdClient("https://fmd.example.com") + client._fmd_id = None + client._password_hash = None + + try: + with pytest.raises(FmdApiException, match="Hash-based reauth not possible"): + await client._reauth_with_hash() + finally: + await client.close() + + +@pytest.mark.asyncio +async def test_from_auth_artifacts_missing_fields(): + """Test from_auth_artifacts raises on missing required fields.""" + incomplete = {"base_url": "https://fmd.example.com", "fmd_id": "alice"} + + with pytest.raises(ValueError, match="Missing artifact fields"): + await FmdClient.from_auth_artifacts(incomplete) + + +@pytest.mark.asyncio +async def test_export_artifacts_without_private_key(): + """Test export_auth_artifacts raises when private key not loaded.""" + from fmd_api.exceptions import FmdApiException + + client = FmdClient("https://fmd.example.com") + client._fmd_id = "alice" + client.access_token = "token" + client.private_key = None + + try: + with pytest.raises(FmdApiException, match="Cannot export artifacts"): + await client.export_auth_artifacts() + finally: + await client.close()