diff --git a/.env.example b/.env.example index 95e41f1..f58ab2b 100644 --- a/.env.example +++ b/.env.example @@ -62,3 +62,12 @@ S3_SECRET_ACCESS_KEY= S3_BUCKET_NAME= # Region (use "auto" for Cloudflare R2) S3_REGION=auto + +# Internal Track Asset API +# Example when running via Coolify on the same server/project: +# ASSET_API_URL=http://inkycloud-f1-assets-api:8000 +ASSET_API_URL= +ASSET_API_TOKEN= +ASSET_CACHE_DIR=/app/data/asset-cache +ASSET_CACHE_TTL_HOURS=24 +ASSET_PREFETCH_ENABLED=false diff --git a/.github/workflows/sync-main-to-dev.yml b/.github/workflows/sync-main-to-dev.yml index 2e52141..1cbc24b 100644 --- a/.github/workflows/sync-main-to-dev.yml +++ b/.github/workflows/sync-main-to-dev.yml @@ -1,8 +1,8 @@ name: Sync Main to Dev on: - push: - branches: [main] + release: + types: [published] workflow_dispatch: jobs: @@ -26,6 +26,8 @@ jobs: git config user.email "41898282+github-actions[bot]@users.noreply.github.com" git fetch origin main dev + main_sha="$(git rev-parse origin/main)" + echo "main_sha=$main_sha" >> "$GITHUB_OUTPUT" git checkout -B sync/main-to-dev origin/dev if ! git merge --no-ff --no-edit origin/main; then @@ -58,9 +60,10 @@ jobs: const title = "chore: sync main into dev"; const body = [ - "Automated sync PR to keep `dev` aligned with `main` after release merges.", + "Automated sync PR to keep `dev` aligned with `main` after published releases.", "", - `- Trigger commit: ${context.sha}`, + `- Release event commit: ${context.sha}`, + "- Synced main HEAD: ${{ steps.prepare.outputs.main_sha }}", "- Merge method: merge commit", "", "This PR is managed by CI.", diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ee2a83..ad7697b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,20 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.1.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +## [1.2.7] - 2026-03-05 + +
+Backend + +#### Changed + +- **Albert Park track assets** - Replaced `app/assets/tracks/albert_park.png` with the latest map version and regenerated `app/assets/tracks_processed/albert_park.bmp` +- **Main→dev sync timing** - Updated sync automation to run after published releases so `dev` is refreshed from `main` after each release + +
+ +--- + ## [1.2.6] - 2026-03-01 ### Backend diff --git a/app/assets/tracks/albert_park.png b/app/assets/tracks/albert_park.png index 806a0c9..ded7c36 100644 Binary files a/app/assets/tracks/albert_park.png and b/app/assets/tracks/albert_park.png differ diff --git a/app/assets/tracks_processed/albert_park.bmp b/app/assets/tracks_processed/albert_park.bmp index 45ae0e9..70cff4f 100644 Binary files a/app/assets/tracks_processed/albert_park.bmp and b/app/assets/tracks_processed/albert_park.bmp differ diff --git a/app/assets/tracks_spectra6/albert_park.bmp b/app/assets/tracks_spectra6/albert_park.bmp index 5f4f292..3b78d67 100644 Binary files a/app/assets/tracks_spectra6/albert_park.bmp and b/app/assets/tracks_spectra6/albert_park.bmp differ diff --git a/app/config.py b/app/config.py index c92745d..b0285d9 100644 --- a/app/config.py +++ b/app/config.py @@ -111,6 +111,25 @@ class Config(BaseSettings): S3_BUCKET_NAME: Optional[str] = Field(default=None, description="S3 bucket name for backups") S3_REGION: str = Field("auto", description="S3 region (use 'auto' for Cloudflare R2)") + # Remote asset API settings + ASSET_API_URL: Optional[str] = Field(default=None, description="Internal asset API base URL") + ASSET_API_TOKEN: Optional[str] = Field( + default=None, description="Bearer token for the internal asset API" + ) + ASSET_CACHE_DIR: str = Field( + "/app/data/asset-cache", + description="Directory for cached remote asset binaries and manifests", + ) + ASSET_CACHE_TTL_HOURS: int = Field( + 24, + gt=0, + description="Remote asset cache lifetime in hours", + ) + ASSET_PREFETCH_ENABLED: bool = Field( + False, + description="Enable optional asset prefetching hooks", + ) + @field_validator("APP_PORT", mode="before") @classmethod def validate_port(cls, value: object, info: ValidationInfo) -> int: @@ -248,6 +267,49 @@ def validate_s3_endpoint(cls, value: object, info: ValidationInfo) -> Optional[s ) return None + @field_validator("ASSET_API_URL", mode="before") + @classmethod + def validate_asset_api_url(cls, value: object, info: ValidationInfo) -> Optional[str]: + if info.field_name is None: + return None + if value is None or value == "": + return None + adapter = TypeAdapter(HttpUrl) + try: + validated = adapter.validate_python(value) + return str(validated) + except ValidationError: + logger.warning( + "Invalid value for %s=%r; must be a valid URL. Remote assets will be disabled.", + info.field_name, + value, + ) + return None + + @field_validator("ASSET_API_TOKEN", mode="before") + @classmethod + def normalize_asset_api_token(cls, value: object) -> Optional[str]: + if value is None: + return None + if isinstance(value, str): + stripped = value.strip() + return stripped or None + return None + + @field_validator("ASSET_CACHE_TTL_HOURS", mode="before") + @classmethod + def validate_asset_cache_ttl(cls, value: object, info: ValidationInfo) -> int: + if info.field_name is None: + return 24 + default: int = cls.model_fields[info.field_name].default + try: + ttl = int(value) # type: ignore[call-overload] + if ttl > 0: + return ttl + except (TypeError, ValueError): + pass + return _warn_invalid(info.field_name, value, default, "must be a positive integer") + @lru_cache() def get_config() -> Config: diff --git a/app/services/asset_client.py b/app/services/asset_client.py new file mode 100644 index 0000000..3115979 --- /dev/null +++ b/app/services/asset_client.py @@ -0,0 +1,182 @@ +"""Remote track asset client with local disk cache and local fallback support.""" + +from __future__ import annotations + +import io +import json +import logging +from functools import lru_cache +from pathlib import Path +from time import time +from typing import Any, Iterable + +import httpx +from PIL import Image + +from app.config import get_config + +logger = logging.getLogger(__name__) + + +class AssetClient: + """Fetches remote track assets from the internal asset API.""" + + def __init__(self) -> None: + config = get_config() + self.base_url = (config.ASSET_API_URL or "").rstrip("/") + self.token = config.ASSET_API_TOKEN + self.timeout = config.REQUEST_TIMEOUT + self.cache_dir = Path(config.ASSET_CACHE_DIR) + self.cache_ttl_seconds = config.ASSET_CACHE_TTL_HOURS * 3600 + + def is_enabled(self) -> bool: + """Return True when remote asset fetching is configured.""" + return bool(self.base_url) + + def _headers(self) -> dict[str, str]: + headers = {"Accept": "application/json"} + if self.token: + headers["Authorization"] = f"Bearer {self.token}" + return headers + + def _binary_headers(self) -> dict[str, str]: + headers = {"Accept": "image/*"} + if self.token: + headers["Authorization"] = f"Bearer {self.token}" + return headers + + def _normalize_ids(self, circuit_ids: str | Iterable[str]) -> list[str]: + if isinstance(circuit_ids, str): + candidates = [circuit_ids] + else: + candidates = list(circuit_ids) + + normalized: list[str] = [] + seen: set[str] = set() + for candidate in candidates: + if not candidate: + continue + value = candidate.strip() + if not value or value in seen: + continue + normalized.append(value) + seen.add(value) + return normalized + + def _manifest_cache_path(self, circuit_id: str) -> Path: + return self.cache_dir / "manifests" / f"{circuit_id}.json" + + def _binary_cache_path(self, circuit_id: str, variant: str) -> Path: + return self.cache_dir / "tracks" / variant / f"{circuit_id}.bmp" + + def _cache_is_fresh(self, path: Path) -> bool: + if not path.exists(): + return False + age_seconds = time() - path.stat().st_mtime + return age_seconds <= self.cache_ttl_seconds + + @staticmethod + def _read_json(path: Path) -> dict[str, Any] | None: + try: + return json.loads(path.read_text()) + except Exception: + return None + + def get_manifest(self, circuit_id: str) -> dict[str, Any] | None: + """Fetch and cache manifest metadata for a circuit.""" + if not self.is_enabled(): + return None + + cache_path = self._manifest_cache_path(circuit_id) + if self._cache_is_fresh(cache_path): + cached = self._read_json(cache_path) + if cached is not None: + return cached + + url = f"{self.base_url}/v1/tracks/{circuit_id}/manifest" + try: + with httpx.Client(timeout=self.timeout, follow_redirects=True) as client: + response = client.get(url, headers=self._headers()) + response.raise_for_status() + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_text(response.text) + return response.json() + except Exception as exc: + logger.debug("Failed to fetch track manifest for %s: %s", circuit_id, exc) + return self._read_json(cache_path) if cache_path.exists() else None + + def _fetch_binary_bytes(self, circuit_id: str, variant: str) -> bytes | None: + cache_path = self._binary_cache_path(circuit_id, variant) + if self._cache_is_fresh(cache_path): + try: + return cache_path.read_bytes() + except OSError: + pass + + url = f"{self.base_url}/v1/tracks/{circuit_id}/binary" + try: + with httpx.Client(timeout=self.timeout, follow_redirects=True) as client: + response = client.get( + url, + params={"variant": variant}, + headers=self._binary_headers(), + ) + response.raise_for_status() + payload = response.content + cache_path.parent.mkdir(parents=True, exist_ok=True) + cache_path.write_bytes(payload) + return payload + except Exception as exc: + logger.debug( + "Failed to fetch remote track binary for %s (%s): %s", + circuit_id, + variant, + exc, + ) + if cache_path.exists(): + try: + return cache_path.read_bytes() + except OSError: + return None + return None + + def get_track_image(self, circuit_ids: str | Iterable[str], variant: str) -> Image.Image | None: + """Try candidate circuit ids until a remote track asset is found.""" + if not self.is_enabled(): + return None + + for circuit_id in self._normalize_ids(circuit_ids): + manifest = self.get_manifest(circuit_id) + if manifest is not None: + variants = manifest.get("variants") or {} + if variant not in variants: + continue + + payload = self._fetch_binary_bytes(circuit_id, variant) + if payload is None: + continue + + try: + image = Image.open(io.BytesIO(payload)) + image.load() + return image + except Exception as exc: + logger.warning( + "Failed to decode remote track asset for %s (%s): %s", + circuit_id, + variant, + exc, + ) + + return None + + +@lru_cache() +def get_asset_client() -> AssetClient: + """Build the asset client once per process.""" + return AssetClient() + + +def _reset_asset_client_cache_for_tests() -> None: + """Allow tests to rebuild the cached asset client after env changes.""" + get_asset_client.cache_clear() diff --git a/app/services/renderer.py b/app/services/renderer.py index 9d3b1d3..217f4cf 100644 --- a/app/services/renderer.py +++ b/app/services/renderer.py @@ -11,6 +11,7 @@ from app.config import config from app.models import ConstructorStanding, DriverStanding, HistoricalData, TeamsData +from app.services.asset_client import get_asset_client from app.services.weather_service import RAINDROP_ICON, WeatherData logger = logging.getLogger(__name__) @@ -978,8 +979,8 @@ def _draw_track_section( track_image = track_image.convert("1") final_w, final_h = track_image.size - paste_x = side_margin + (available_width - final_w) // 2 - paste_y = track_top + (available_height - final_h) // 2 + paste_x = int(side_margin + (available_width - final_w) // 2) + paste_y = int(track_top + (available_height - final_h) // 2) image.paste(track_image, (paste_x, paste_y)) else: @@ -1019,6 +1020,17 @@ def _load_track_image(race_data: dict) -> Image.Image | None: if not circuit_id: return None + normalized_ids = [circuit_id] + mapped_id = CIRCUIT_ID_MAP.get(circuit_id) + if mapped_id: + normalized_ids.append(mapped_id) + if circuit_id.lower() != circuit_id: + normalized_ids.append(circuit_id.lower()) + + remote_track = get_asset_client().get_track_image(normalized_ids, variant="1bit") + if remote_track is not None: + return remote_track + # Try pre-processed BMP first (much faster) processed_patterns = [ f"*{circuit_id}*.bmp", diff --git a/app/services/spectra6_renderer.py b/app/services/spectra6_renderer.py index b6ffa76..735f43a 100644 --- a/app/services/spectra6_renderer.py +++ b/app/services/spectra6_renderer.py @@ -11,6 +11,7 @@ from app.config import config from app.models import HistoricalData +from app.services.asset_client import get_asset_client from app.services.weather_service import RAINDROP_ICON, WeatherData logger = logging.getLogger(__name__) @@ -287,10 +288,11 @@ def _draw_track_section( # Center horizontally and vertically in available space final_w, final_h = track_image.size - paste_x = side_margin + (available_width - final_w) // 2 - paste_y = track_top + (available_height - final_h) // 2 + paste_x = int(side_margin + (available_width - final_w) // 2) + paste_y = int(track_top + (available_height - final_h) // 2) - image.paste(track_image.convert("RGB"), (paste_x, paste_y)) + paste_box = (int(paste_x), int(paste_y)) + image.paste(track_image.convert("RGB"), paste_box) else: self._draw_track_placeholder( draw, @@ -322,7 +324,17 @@ def _load_track_image(race_data: dict) -> Image.Image | None: if not circuit_id: return None - normalized_id = CIRCUIT_ID_MAP.get(circuit_id, circuit_id) + circuit_id_str = str(circuit_id) + normalized_id = str(CIRCUIT_ID_MAP.get(circuit_id_str, circuit_id_str)) + remote_candidate_ids: list[str] = [normalized_id, circuit_id_str] + lowered_id = circuit_id_str.lower() + if lowered_id not in remote_candidate_ids: + remote_candidate_ids.append(lowered_id) + + remote_track = get_asset_client().get_track_image(remote_candidate_ids, variant="spectra6") + if remote_track is not None: + return remote_track + track_path = TRACKS_SPECTRA6_DIR / f"{normalized_id}.bmp" if track_path.exists(): diff --git a/docs/track-conversion-log.md b/docs/track-conversion-log.md new file mode 100644 index 0000000..17d3002 --- /dev/null +++ b/docs/track-conversion-log.md @@ -0,0 +1,129 @@ +# Track Conversion Log + +This document tracks iterative work on F1 track source conversion, especially +for `albert_park` and the 1-bit readability tuning workflow. + +## Current Status (2026-03-05) + +- Source image is taken from F1 CDN and stored in `app/assets/tracks/albert_park.png`. +- 1-bit output is in `app/assets/tracks_processed/albert_park.bmp`. +- Spectra6 output is in `app/assets/tracks_spectra6/albert_park.bmp`. +- Current tuning target for 1-bit: + - white colored track accents + - black label backgrounds under text + - white text inside those black label backgrounds + +## Saved Scripts + +- `scripts/track_conversion_utils.py` + - shared conversion helpers + - color-aware 1-bit conversion + - Spectra6 conversion + - rendered output metrics + scoring helpers +- `scripts/convert_track_assets.py` + - optional source download + - 1-bit and Spectra6 conversion in one command +- `scripts/search_track_1bit_params.py` + - random search over conversion params + - scores final rendered output from `/calendar.bmp` + - stores best candidate outputs to `/tmp` +- `scripts/search_track_1bit_layered_parallel.py` + - layered segmentation strategy instead of plain threshold-first tuning + - uses `ProcessPoolExecutor` across all CPU cores + - ranks candidates by local preview metrics, then verifies finalists against the live render endpoint + - supports local fine-tuning around a saved best-params JSON seed +- `scripts/score_track_render.py` + - quick semantic quality score of current rendered endpoint output + +## Workflow + +1. Update source and regenerate assets: + +```bash +python scripts/convert_track_assets.py --circuit-id albert_park +``` + +If you want to fetch the current default F1 source URL first: + +```bash +python scripts/convert_track_assets.py --circuit-id albert_park --download-default-url +``` + +2. Score current output from running server: + +```bash +python scripts/score_track_render.py --tz Europe/Prague +``` + +3. Search better 1-bit params against rendered output: + +```bash +python scripts/search_track_1bit_params.py --trials 300 --seed 20260305 +``` + +For the newer layered multi-core search: + +```bash +python scripts/search_track_1bit_layered_parallel.py --trials 10000 --workers 16 --finalists 192 +``` + +For local fine-tuning around a previous winner: + +```bash +python scripts/search_track_1bit_layered_parallel.py \ + --trials 20000 \ + --workers 16 \ + --finalists 256 \ + --base-params-file /tmp/albert_park_layered_best_10000.json \ + --local-scale 0.4 +``` + +4. Validate top candidate in browser/Playwright only after script scoring. + +## Notes on Quality Comparison (without Playwright) + +When Playwright is unstable, use scripted quality checks against rendered output: + +- black pixel ratio in map region +- largest connected component size/fill ratio +- count of compact box-like components (label backgrounds) +- white ratio inside those boxes (text readability proxy) +- small-component noise count + +This is more robust than comparing raw BMP bytes or image hashes directly. + +Additional comparisons now used during tuning: + +- local preview score on a centered 500x268 track canvas +- finalist verification score from the live `/calendar.bmp` endpoint +- baseline comparison against the current default conversion pipeline +- box readability proxy (`box_white_ratio`) plus component noise count + +Current semantic-scoring work in progress: + +- `scripts/track_conversion_utils.py` now includes a first semantic reference builder and + rendered semantic scorer. +- The scorer tracks per-region fill on the rendered crop: + - `track_black_fill_1x` + - `box_black_fill_1x` + - `text_white_fill_1x` + - `bg_white_fill_1x` + - `accent_white_fill_1x` +- It also computes multi-scale semantic transfer, boundary IoU, hierarchy, and noise. +- This first implementation is useful for diagnostics, but it is not yet calibrated well + enough to fully trust automated search runs; semantic reference extraction still needs + tightening so good-looking candidates rank above obviously broken ones. + +## Session Notes + +- Iterative candidate sets were tested and compared primarily through endpoint + render scoring. +- Candidate snapshots and rendered previews were written to `/tmp` during tuning. +- Going forward, all tuning scripts are committed first, then run. +- Layered parallel search (`10000` trials, `16` workers) produced a best verified +- Layered parallel search (`10000` trials, `16` workers) produced a best verified + render score of `104.97` for `albert_park`, slightly above the previous default + baseline (`104.55`). +- Follow-up local fine-tuning (`20000` trials around the saved winner) did not beat + the broad winner; the best local result reached `104.93`, so the broad-search + winner remains the applied result. diff --git a/scripts/convert_track_assets.py b/scripts/convert_track_assets.py new file mode 100644 index 0000000..55b05c5 --- /dev/null +++ b/scripts/convert_track_assets.py @@ -0,0 +1,125 @@ +#!/usr/bin/env python3 +"""Download and convert track assets for 1-bit and Spectra6 outputs.""" + +from __future__ import annotations + +import argparse +from pathlib import Path + +from track_conversion_utils import ( + build_1bit_track, + convert_spectra6_track, + download_file, +) + +PROJECT_ROOT = Path(__file__).resolve().parent.parent + +DEFAULT_F1_TRACK_URLS = { + "albert_park": "https://media.formula1.com/image/upload/f_auto/q_auto/" + "v1751632426/common/f1/2026/track/2026trackmelbournedetailed.png", +} + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Convert track source image into 1-bit and Spectra6 BMP assets." + ) + parser.add_argument( + "--circuit-id", + default="albert_park", + help="Circuit id used to build default paths (default: albert_park)", + ) + parser.add_argument( + "--source-path", + type=Path, + help="Path to source PNG/JPG (default: app/assets/tracks/.png)", + ) + parser.add_argument( + "--source-url", + help="Optional URL to download source image before conversion", + ) + parser.add_argument( + "--download-default-url", + action="store_true", + help="Download known default F1 URL for the selected circuit-id", + ) + parser.add_argument( + "--track-out", + type=Path, + help="Output 1-bit BMP path (default: app/assets/tracks_processed/.bmp)", + ) + parser.add_argument( + "--spectra-out", + type=Path, + help="Output Spectra6 BMP path (default: app/assets/tracks_spectra6/.bmp)", + ) + parser.add_argument( + "--skip-1bit", + action="store_true", + help="Skip 1-bit conversion", + ) + parser.add_argument( + "--skip-spectra6", + action="store_true", + help="Skip Spectra6 conversion", + ) + return parser.parse_args() + + +def resolve_paths(args: argparse.Namespace) -> tuple[Path, Path, Path]: + source_path = args.source_path or ( + PROJECT_ROOT / "app" / "assets" / "tracks" / f"{args.circuit_id}.png" + ) + track_out = args.track_out or ( + PROJECT_ROOT / "app" / "assets" / "tracks_processed" / f"{args.circuit_id}.bmp" + ) + spectra_out = args.spectra_out or ( + PROJECT_ROOT / "app" / "assets" / "tracks_spectra6" / f"{args.circuit_id}.bmp" + ) + return source_path, track_out, spectra_out + + +def main() -> int: + args = parse_args() + source_path, track_out, spectra_out = resolve_paths(args) + + source_url = args.source_url + if ( + source_url is None + and args.download_default_url + and args.circuit_id in DEFAULT_F1_TRACK_URLS + ): + source_url = DEFAULT_F1_TRACK_URLS[args.circuit_id] + + if source_url: + bytes_downloaded = download_file(source_url, source_path) + print(f"Downloaded source: {source_url}") + print(f"Saved to: {source_path} ({bytes_downloaded} bytes)") + + if not source_path.exists(): + print(f"Source file not found: {source_path}") + return 1 + + print(f"Using source: {source_path}") + + if not args.skip_1bit: + onebit_stats = build_1bit_track(source_path, track_out) + print("1-bit conversion complete") + print(f" Output: {track_out}") + print(f" Size: {onebit_stats['final_dimensions']}") + print(f" Black ratio: {onebit_stats['black_ratio']:.4f}") + print(f" Bytes: {onebit_stats['output_size']}") + + if not args.skip_spectra6: + spectra_stats = convert_spectra6_track(source_path, spectra_out) + print("Spectra6 conversion complete") + print(f" Output: {spectra_out}") + print(f" Size: {spectra_stats['final_dimensions']}") + print(f" Colors used: {spectra_stats['colors_used']}") + print(f" Bytes: {spectra_stats['output_size']}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/score_track_render.py b/scripts/score_track_render.py new file mode 100644 index 0000000..9fdfba3 --- /dev/null +++ b/scripts/score_track_render.py @@ -0,0 +1,60 @@ +#!/usr/bin/env python3 +"""Score rendered track quality from `/calendar.bmp` endpoint output.""" + +from __future__ import annotations + +import argparse +from pathlib import Path + +from track_conversion_utils import ( + build_track_semantic_reference, + evaluate_rendered_semantic_quality, + fetch_calendar_render, + semantic_metrics_to_dict, +) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Score rendered 1-bit track quality.") + parser.add_argument( + "--endpoint-url", + default="http://127.0.0.1:8000/calendar.bmp?lang=en&year=2026&round=1&display=1bit&weather=false", + help="Calendar endpoint URL", + ) + parser.add_argument( + "--tz", + default="Europe/Prague", + help="Timezone to request (forces cache-busting for endpoint render)", + ) + parser.add_argument( + "--save-preview", + type=Path, + default=Path("/tmp/albert_park_render_score_preview.png"), + help="Optional path to save rendered preview PNG", + ) + parser.add_argument( + "--source-path", + type=Path, + default=Path("app/assets/tracks/albert_park.png"), + help="Source colorful track image used to build semantic reference", + ) + return parser.parse_args() + + +def main() -> int: + args = parse_args() + reference = build_track_semantic_reference(args.source_path) + rendered = fetch_calendar_render(args.endpoint_url, args.tz) + metrics = evaluate_rendered_semantic_quality(rendered, reference) + + args.save_preview.parent.mkdir(parents=True, exist_ok=True) + rendered.save(args.save_preview) + + print(f"Score: {metrics.total_score:.2f}") + print(f"Metrics: {semantic_metrics_to_dict(metrics)}") + print(f"Preview: {args.save_preview}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/search_track_1bit_layered_parallel.py b/scripts/search_track_1bit_layered_parallel.py new file mode 100644 index 0000000..7ae0eb1 --- /dev/null +++ b/scripts/search_track_1bit_layered_parallel.py @@ -0,0 +1,768 @@ +#!/usr/bin/env python3 +"""Parallel layered-search tuner for 1-bit track conversion. + +Stage 1 runs a broad multi-core search using local preview metrics. +Stage 2 verifies the best finalists against the live `/calendar.bmp` endpoint. +""" + +from __future__ import annotations + +import argparse +import heapq +import io +import json +import os +import random +import shutil +import tempfile +from concurrent.futures import ProcessPoolExecutor +from dataclasses import asdict, dataclass +from pathlib import Path +from typing import Any + +import pytz +from PIL import Image, ImageFilter + +from track_conversion_utils import ( + MAX_TRACK_HEIGHT, + MAX_TRACK_WIDTH, + SemanticScoringMetrics, + Track1BitParams, + TrackSemanticReference, + build_1bit_track, + build_track_semantic_reference, + evaluate_rendered_semantic_quality, + fetch_calendar_render, + semantic_metrics_to_dict, +) + +PROJECT_ROOT = Path(__file__).resolve().parent.parent + +PREVIEW_WIDTH = 500 +PREVIEW_HEIGHT = 268 +NEIGHBORS = ((1, 0), (-1, 0), (0, 1), (0, -1)) + +_SOURCE_DATA: dict[str, Any] | None = None +_SEMANTIC_REFERENCE: TrackSemanticReference | None = None + + +@dataclass(frozen=True) +class LayeredParams: + road_seed_gray: int + road_seed_sat: int + road_grow_gray: int + road_grow_sat: int + road_dilate_px: int + colored_sat: int + colored_val: int + road_proximity_px: int + label_min_area: int + label_min_width: int + label_min_height: int + label_min_fill: float + label_max_aspect: float + accent_max_area: int + accent_min_aspect: float + annotation_min_area: int + annotation_max_area: int + annotation_max_aspect: float + annotation_proximity_px: int + label_text_value: int + label_text_low_sat: int + label_text_low_sat_value: int + min_component_pixels: int + opaque_alpha: int + + +@dataclass(frozen=True) +class Component: + points: list[tuple[int, int]] + min_x: int + min_y: int + max_x: int + max_y: int + area: int + width: int + height: int + fill_ratio: float + + +def _pixel_to_int(pixel: int | float | tuple[int, ...]) -> int: + if isinstance(pixel, tuple): + return int(pixel[0]) if pixel else 0 + return int(pixel) + + +def _pixel_to_hsv(pixel: int | float | tuple[int, ...]) -> tuple[int, int, int]: + if isinstance(pixel, tuple): + if len(pixel) >= 3: + return int(pixel[0]), int(pixel[1]), int(pixel[2]) + if len(pixel) == 2: + return int(pixel[0]), int(pixel[1]), int(pixel[1]) + if len(pixel) == 1: + value = int(pixel[0]) + return value, 0, value + return 0, 0, 0 + value = int(pixel) + return value, 0, value + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Run multi-core layered search for 1-bit track conversion." + ) + parser.add_argument( + "--source-path", + type=Path, + default=PROJECT_ROOT / "app" / "assets" / "tracks" / "albert_park.png", + help="Source track image path", + ) + parser.add_argument( + "--output-path", + type=Path, + default=PROJECT_ROOT / "app" / "assets" / "tracks_processed" / "albert_park.bmp", + help="Destination 1-bit BMP path", + ) + parser.add_argument( + "--endpoint-url", + default="http://127.0.0.1:8000/calendar.bmp?lang=en&year=2026&round=1&display=1bit&weather=false", + help="Endpoint URL used for finalist verification", + ) + parser.add_argument("--trials", type=int, default=4000, help="Number of random candidates") + parser.add_argument( + "--workers", + type=int, + default=os.cpu_count() or 1, + help="Parallel worker count (default: CPU count)", + ) + parser.add_argument( + "--finalists", + type=int, + default=128, + help="How many top local candidates to verify via endpoint", + ) + parser.add_argument("--seed", type=int, default=20260306, help="Random seed") + parser.add_argument( + "--progress-every", + type=int, + default=200, + help="Progress interval for local search", + ) + parser.add_argument( + "--best-bmp-out", + type=Path, + default=Path("/tmp/albert_park_best_layered_parallel.bmp"), + help="Snapshot path for best BMP", + ) + parser.add_argument( + "--best-render-out", + type=Path, + default=Path("/tmp/albert_park_best_layered_parallel.png"), + help="Snapshot path for best rendered preview", + ) + parser.add_argument( + "--best-params-out", + type=Path, + default=Path("/tmp/albert_park_best_layered_parallel.json"), + help="Path for saving best parameter set as JSON", + ) + parser.add_argument( + "--base-params-file", + type=Path, + help="Optional JSON file with base params for local fine-tuning", + ) + parser.add_argument( + "--local-scale", + type=float, + default=1.0, + help="Fine-tuning jitter scale when --base-params-file is used", + ) + return parser.parse_args() + + +def _connected_components(mask: list[list[bool]]) -> list[Component]: + height = len(mask) + width = len(mask[0]) if height else 0 + visited = [[False] * width for _ in range(height)] + components: list[Component] = [] + + for y in range(height): + for x in range(width): + if not mask[y][x] or visited[y][x]: + continue + + queue = [(x, y)] + visited[y][x] = True + points: list[tuple[int, int]] = [] + min_x = min_y = 10**9 + max_x = max_y = -1 + + while queue: + cx, cy = queue.pop() + points.append((cx, cy)) + min_x = min(min_x, cx) + min_y = min(min_y, cy) + max_x = max(max_x, cx) + max_y = max(max_y, cy) + + for dx, dy in NEIGHBORS: + nx, ny = cx + dx, cy + dy + if nx < 0 or ny < 0 or nx >= width or ny >= height: + continue + if visited[ny][nx] or not mask[ny][nx]: + continue + visited[ny][nx] = True + queue.append((nx, ny)) + + comp_width = max_x - min_x + 1 + comp_height = max_y - min_y + 1 + area = len(points) + components.append( + Component( + points=points, + min_x=min_x, + min_y=min_y, + max_x=max_x, + max_y=max_y, + area=area, + width=comp_width, + height=comp_height, + fill_ratio=area / (comp_width * comp_height), + ) + ) + + return components + + +def _dilate_mask(mask: list[list[bool]], iterations: int) -> list[list[bool]]: + if iterations <= 0: + return mask + + height = len(mask) + width = len(mask[0]) if height else 0 + image = Image.new("L", (width, height), 0) + pixels = image.load() + if pixels is None: + raise RuntimeError("Could not access dilation pixels") + + for y in range(height): + for x in range(width): + if mask[y][x]: + pixels[x, y] = 255 + + for _ in range(iterations): + image = image.filter(ImageFilter.MaxFilter(size=3)) + + pixels = image.load() + if pixels is None: + raise RuntimeError("Could not access dilated pixels") + + return [[_pixel_to_int(pixels[x, y]) > 0 for x in range(width)] for y in range(height)] + + +def _prepare_source_data(source_path: Path) -> dict[str, Any]: + image = Image.open(source_path).convert("RGBA") + alpha_image = image.getchannel("A") + bbox = alpha_image.getbbox() + if bbox: + image = image.crop(bbox) + alpha_image = alpha_image.crop(bbox) + + width, height = image.size + ratio = min(MAX_TRACK_WIDTH / width, MAX_TRACK_HEIGHT / height) + if ratio < 1: + new_size = (max(1, int(width * ratio)), max(1, int(height * ratio))) + image = image.resize(new_size, Image.Resampling.LANCZOS) + alpha_image = alpha_image.resize(new_size, Image.Resampling.LANCZOS) + + composited = Image.new("RGB", image.size, (255, 255, 255)) + composited.paste(image, mask=alpha_image) + + alpha_pixels = alpha_image.load() + gray_pixels = composited.convert("L").load() + hsv_pixels = composited.convert("HSV").load() + if alpha_pixels is None or gray_pixels is None or hsv_pixels is None: + raise RuntimeError("Could not access source pixels") + + final_width, final_height = composited.size + alpha = [ + [_pixel_to_int(alpha_pixels[x, y]) for x in range(final_width)] for y in range(final_height) + ] + gray = [ + [_pixel_to_int(gray_pixels[x, y]) for x in range(final_width)] for y in range(final_height) + ] + hsv = [ + [_pixel_to_hsv(hsv_pixels[x, y]) for x in range(final_width)] for y in range(final_height) + ] + + return { + "width": final_width, + "height": final_height, + "alpha": alpha, + "gray": gray, + "hsv": hsv, + } + + +def _init_worker(source_path_str: str) -> None: + global _SEMANTIC_REFERENCE, _SOURCE_DATA + _SOURCE_DATA = _prepare_source_data(Path(source_path_str)) + _SEMANTIC_REFERENCE = build_track_semantic_reference( + Path(source_path_str), + preview_size=(PREVIEW_WIDTH, PREVIEW_HEIGHT), + ) + + +def _sample_params(rng: random.Random) -> LayeredParams: + return LayeredParams( + road_seed_gray=rng.randint(88, 126), + road_seed_sat=rng.randint(120, 255), + road_grow_gray=rng.randint(115, 180), + road_grow_sat=rng.randint(120, 255), + road_dilate_px=rng.choice([0, 0, 1]), + colored_sat=rng.randint(56, 96), + colored_val=rng.randint(70, 110), + road_proximity_px=rng.randint(1, 3), + label_min_area=rng.randint(120, 320), + label_min_width=rng.randint(12, 24), + label_min_height=rng.randint(6, 12), + label_min_fill=rng.uniform(0.50, 0.90), + label_max_aspect=rng.uniform(4.0, 10.0), + accent_max_area=rng.randint(80, 2200), + accent_min_aspect=rng.uniform(2.0, 14.0), + annotation_min_area=rng.randint(10, 40), + annotation_max_area=rng.randint(60, 260), + annotation_max_aspect=rng.uniform(1.6, 4.5), + annotation_proximity_px=rng.randint(1, 5), + label_text_value=rng.randint(100, 150), + label_text_low_sat=rng.randint(55, 110), + label_text_low_sat_value=rng.randint(130, 185), + min_component_pixels=rng.randint(4, 12), + opaque_alpha=rng.randint(28, 50), + ) + + +def _clamp_int(value: int, low: int, high: int) -> int: + return max(low, min(high, value)) + + +def _clamp_float(value: float, low: float, high: float) -> float: + return max(low, min(high, value)) + + +def _jitter_int(rng: random.Random, base: int, spread: int, low: int, high: int) -> int: + return _clamp_int(base + rng.randint(-spread, spread), low, high) + + +def _jitter_float(rng: random.Random, base: float, spread: float, low: float, high: float) -> float: + return _clamp_float(base + rng.uniform(-spread, spread), low, high) + + +def _load_params_json(path: Path) -> LayeredParams: + payload = json.loads(path.read_text()) + return LayeredParams(**payload) + + +def _sample_local_params( + rng: random.Random, base: LayeredParams, local_scale: float +) -> LayeredParams: + scale = max(0.1, local_scale) + return LayeredParams( + road_seed_gray=_jitter_int(rng, base.road_seed_gray, int(round(6 * scale)), 70, 180), + road_seed_sat=_jitter_int(rng, base.road_seed_sat, int(round(28 * scale)), 0, 255), + road_grow_gray=_jitter_int(rng, base.road_grow_gray, int(round(12 * scale)), 80, 255), + road_grow_sat=_jitter_int(rng, base.road_grow_sat, int(round(28 * scale)), 0, 255), + road_dilate_px=_clamp_int( + base.road_dilate_px + rng.choice([-1, 0, 0, 1]), + 0, + 2, + ), + colored_sat=_jitter_int(rng, base.colored_sat, int(round(10 * scale)), 30, 140), + colored_val=_jitter_int(rng, base.colored_val, int(round(10 * scale)), 40, 180), + road_proximity_px=_jitter_int(rng, base.road_proximity_px, 1, 1, 5), + label_min_area=_jitter_int(rng, base.label_min_area, int(round(50 * scale)), 40, 500), + label_min_width=_jitter_int(rng, base.label_min_width, int(round(4 * scale)), 8, 40), + label_min_height=_jitter_int(rng, base.label_min_height, int(round(2 * scale)), 4, 20), + label_min_fill=_jitter_float(rng, base.label_min_fill, 0.08 * scale, 0.2, 0.95), + label_max_aspect=_jitter_float(rng, base.label_max_aspect, 1.0 * scale, 2.0, 14.0), + accent_max_area=_jitter_int(rng, base.accent_max_area, int(round(260 * scale)), 40, 3000), + accent_min_aspect=_jitter_float(rng, base.accent_min_aspect, 0.8 * scale, 1.0, 16.0), + annotation_min_area=_jitter_int( + rng, base.annotation_min_area, int(round(6 * scale)), 1, 100 + ), + annotation_max_area=_jitter_int( + rng, base.annotation_max_area, int(round(40 * scale)), 20, 600 + ), + annotation_max_aspect=_jitter_float(rng, base.annotation_max_aspect, 0.7 * scale, 1.0, 8.0), + annotation_proximity_px=_jitter_int(rng, base.annotation_proximity_px, 1, 1, 6), + label_text_value=_jitter_int(rng, base.label_text_value, int(round(10 * scale)), 60, 200), + label_text_low_sat=_jitter_int( + rng, base.label_text_low_sat, int(round(12 * scale)), 0, 255 + ), + label_text_low_sat_value=_jitter_int( + rng, + base.label_text_low_sat_value, + int(round(12 * scale)), + 80, + 220, + ), + min_component_pixels=_jitter_int(rng, base.min_component_pixels, 2, 1, 20), + opaque_alpha=_jitter_int(rng, base.opaque_alpha, int(round(5 * scale)), 1, 100), + ) + + +def _build_layered_candidate(params: LayeredParams) -> tuple[bytes, SemanticScoringMetrics]: + global _SEMANTIC_REFERENCE, _SOURCE_DATA + if _SOURCE_DATA is None or _SEMANTIC_REFERENCE is None: + raise RuntimeError("Worker source data not initialized") + + width = int(_SOURCE_DATA["width"]) + height = int(_SOURCE_DATA["height"]) + alpha = _SOURCE_DATA["alpha"] + gray = _SOURCE_DATA["gray"] + hsv = _SOURCE_DATA["hsv"] + + opaque = [[alpha[y][x] >= params.opaque_alpha for x in range(width)] for y in range(height)] + + road_seed = [ + [ + opaque[y][x] + and gray[y][x] < params.road_seed_gray + and hsv[y][x][1] < params.road_seed_sat + for x in range(width) + ] + for y in range(height) + ] + + road_mask = [[False] * width for _ in range(height)] + seed_components = _connected_components(road_seed) + if seed_components: + largest_seed = max(seed_components, key=lambda comp: comp.area) + queue = list(largest_seed.points) + for x, y in largest_seed.points: + road_mask[y][x] = True + + while queue: + cx, cy = queue.pop() + for dx, dy in NEIGHBORS: + nx, ny = cx + dx, cy + dy + if nx < 0 or ny < 0 or nx >= width or ny >= height: + continue + if road_mask[ny][nx] or not opaque[ny][nx]: + continue + if gray[ny][nx] < params.road_grow_gray and hsv[ny][nx][1] < params.road_grow_sat: + road_mask[ny][nx] = True + queue.append((nx, ny)) + + road_mask = _dilate_mask(road_mask, params.road_dilate_px) + road_proximity = _dilate_mask(road_mask, params.road_proximity_px) + annotation_proximity = _dilate_mask(road_mask, params.annotation_proximity_px) + + colored_mask = [ + [ + opaque[y][x] + and hsv[y][x][1] >= params.colored_sat + and hsv[y][x][2] >= params.colored_val + for x in range(width) + ] + for y in range(height) + ] + + label_mask = [[False] * width for _ in range(height)] + accent_mask = [[False] * width for _ in range(height)] + + for component in _connected_components(colored_mask): + aspect_ratio = max(component.width / component.height, component.height / component.width) + touches_road = any(road_proximity[y][x] for x, y in component.points) + + if ( + not touches_road + and component.area >= params.label_min_area + and component.width >= params.label_min_width + and component.height >= params.label_min_height + and component.fill_ratio >= params.label_min_fill + and aspect_ratio <= params.label_max_aspect + ): + for x, y in component.points: + label_mask[y][x] = True + continue + + if ( + touches_road + and component.area <= params.accent_max_area + and aspect_ratio >= params.accent_min_aspect + ): + for x, y in component.points: + accent_mask[y][x] = True + + dark_small = [ + [ + opaque[y][x] + and gray[y][x] < params.road_grow_gray + and hsv[y][x][1] < params.road_grow_sat + and not road_mask[y][x] + for x in range(width) + ] + for y in range(height) + ] + + annotation_mask = [[False] * width for _ in range(height)] + for component in _connected_components(dark_small): + aspect_ratio = max(component.width / component.height, component.height / component.width) + near_road = any(annotation_proximity[y][x] for x, y in component.points) + if ( + near_road + and component.area >= params.annotation_min_area + and component.area <= params.annotation_max_area + and aspect_ratio <= params.annotation_max_aspect + and component.fill_ratio <= 0.92 + ): + for x, y in component.points: + annotation_mask[y][x] = True + + black_mask = [ + [ + (road_mask[y][x] and not accent_mask[y][x]) or label_mask[y][x] or annotation_mask[y][x] + for x in range(width) + ] + for y in range(height) + ] + + for y in range(height): + for x in range(width): + if not label_mask[y][x]: + continue + sat = hsv[y][x][1] + val = hsv[y][x][2] + if val < params.label_text_value or ( + sat < params.label_text_low_sat and val < params.label_text_low_sat_value + ): + black_mask[y][x] = False + + for component in _connected_components(black_mask): + if component.area < params.min_component_pixels: + for x, y in component.points: + black_mask[y][x] = False + + image = Image.new("1", (width, height), 1) + pixels = image.load() + if pixels is None: + raise RuntimeError("Could not access candidate pixels") + for y in range(height): + for x in range(width): + if black_mask[y][x]: + pixels[x, y] = 0 + + preview = Image.new("1", (PREVIEW_WIDTH, PREVIEW_HEIGHT), 1) + preview_x = (PREVIEW_WIDTH - width) // 2 + preview_y = (PREVIEW_HEIGHT - height) // 2 + preview.paste(image, (preview_x, preview_y)) + preview_metrics = evaluate_rendered_semantic_quality( + preview, + _SEMANTIC_REFERENCE, + roi=(0, PREVIEW_WIDTH, 0, PREVIEW_HEIGHT), + ) + + buffer = io.BytesIO() + image.save(buffer, format="BMP") + bmp_bytes = buffer.getvalue() + return bmp_bytes, preview_metrics + + +def _evaluate_candidate_preview_from_bytes(bmp_bytes: bytes) -> SemanticScoringMetrics: + global _SEMANTIC_REFERENCE + if _SEMANTIC_REFERENCE is None: + raise RuntimeError("Worker semantic reference not initialized") + image = Image.open(io.BytesIO(bmp_bytes)).convert("1") + width, height = image.size + preview = Image.new("1", (PREVIEW_WIDTH, PREVIEW_HEIGHT), 1) + preview_x = (PREVIEW_WIDTH - width) // 2 + preview_y = (PREVIEW_HEIGHT - height) // 2 + preview.paste(image, (preview_x, preview_y)) + return evaluate_rendered_semantic_quality( + preview, + _SEMANTIC_REFERENCE, + roi=(0, PREVIEW_WIDTH, 0, PREVIEW_HEIGHT), + ) + + +def _build_baseline_candidate(source_path: Path) -> dict[str, Any]: + with tempfile.NamedTemporaryFile(suffix=".bmp", delete=False) as handle: + temp_path = Path(handle.name) + + try: + build_1bit_track(source_path, temp_path, params=Track1BitParams()) + bmp_bytes = temp_path.read_bytes() + finally: + temp_path.unlink(missing_ok=True) + + preview_metrics = _evaluate_candidate_preview_from_bytes(bmp_bytes) + return { + "index": -1, + "params": "baseline-default", + "preview_metrics": preview_metrics, + "preview_score": preview_metrics.total_score, + "bmp_bytes": bmp_bytes, + } + + +def _worker_trial(payload: tuple[int, LayeredParams]) -> dict[str, Any]: + index, params = payload + bmp_bytes, preview_metrics = _build_layered_candidate(params) + preview_score = preview_metrics.total_score + return { + "index": index, + "params": params, + "preview_metrics": preview_metrics, + "preview_score": preview_score, + "bmp_bytes": bmp_bytes, + } + + +def _write_bmp(output_path: Path, bmp_bytes: bytes) -> None: + output_path.parent.mkdir(parents=True, exist_ok=True) + output_path.write_bytes(bmp_bytes) + + +def _verify_finalists( + finalists: list[dict[str, Any]], + output_path: Path, + endpoint_url: str, + best_bmp_out: Path, + best_render_out: Path, + seed: int, + semantic_reference: TrackSemanticReference, +) -> dict[str, Any]: + timezones = list(pytz.all_timezones) + random.Random(seed ^ 0x5F3759DF).shuffle(timezones) + best_result: dict[str, Any] | None = None + + for finalist_index, finalist in enumerate(finalists): + _write_bmp(output_path, finalist["bmp_bytes"]) + timezone_name = timezones[finalist_index % len(timezones)] + rendered = fetch_calendar_render(endpoint_url, timezone_name) + render_metrics = evaluate_rendered_semantic_quality(rendered, semantic_reference) + render_score = render_metrics.total_score + + if best_result is None or render_score > best_result["render_score"]: + best_result = { + **finalist, + "render_metrics": render_metrics, + "render_score": render_score, + } + best_bmp_out.parent.mkdir(parents=True, exist_ok=True) + best_render_out.parent.mkdir(parents=True, exist_ok=True) + _write_bmp(best_bmp_out, finalist["bmp_bytes"]) + rendered.save(best_render_out) + + if best_result is None: + raise RuntimeError("No finalist could be verified") + + _write_bmp(output_path, best_result["bmp_bytes"]) + return best_result + + +def main() -> int: + global _SEMANTIC_REFERENCE + args = parse_args() + if not args.source_path.exists(): + print(f"Source not found: {args.source_path}") + return 1 + + rng = random.Random(args.seed) + base_params = _load_params_json(args.base_params_file) if args.base_params_file else None + if base_params is None: + payloads = [(index, _sample_params(rng)) for index in range(args.trials)] + else: + payloads = [ + (index, _sample_local_params(rng, base_params, args.local_scale)) + for index in range(args.trials) + ] + + finalists_heap: list[tuple[float, int, dict[str, Any]]] = [] + best_preview_score = float("-inf") + semantic_reference = build_track_semantic_reference( + args.source_path, + preview_size=(PREVIEW_WIDTH, PREVIEW_HEIGHT), + ) + _SEMANTIC_REFERENCE = semantic_reference + best_preview_metrics: SemanticScoringMetrics | None = None + + baseline_result = _build_baseline_candidate(args.source_path) + heapq.heappush(finalists_heap, (baseline_result["preview_score"], -1, baseline_result)) + best_preview_score = float(baseline_result["preview_score"]) + best_preview_metrics = baseline_result["preview_metrics"] + + with ProcessPoolExecutor( + max_workers=args.workers, + initializer=_init_worker, + initargs=(str(args.source_path),), + ) as executor: + for processed, result in enumerate( + executor.map(_worker_trial, payloads, chunksize=8), start=1 + ): + preview_score = float(result["preview_score"]) + if preview_score > best_preview_score: + best_preview_score = preview_score + best_preview_metrics = result["preview_metrics"] + + heap_item = (preview_score, int(result["index"]), result) + if len(finalists_heap) < args.finalists: + heapq.heappush(finalists_heap, heap_item) + elif preview_score > finalists_heap[0][0]: + heapq.heapreplace(finalists_heap, heap_item) + + if processed % max(1, args.progress_every) == 0: + metrics = semantic_metrics_to_dict(result["preview_metrics"]) + best_metrics_dict = ( + semantic_metrics_to_dict(best_preview_metrics) + if best_preview_metrics is not None + else {} + ) + print( + f"{processed}/{args.trials} " + f"best_preview={best_preview_score:.2f} current={preview_score:.2f} " + f"current_metrics={metrics} best_metrics={best_metrics_dict}" + ) + + finalists = [item[2] for item in sorted(finalists_heap, reverse=True)] + best_result = _verify_finalists( + finalists, + output_path=args.output_path, + endpoint_url=args.endpoint_url, + best_bmp_out=args.best_bmp_out, + best_render_out=args.best_render_out, + seed=args.seed, + semantic_reference=semantic_reference, + ) + + params_display = ( + asdict(best_result["params"]) + if isinstance(best_result["params"], LayeredParams) + else best_result["params"] + ) + + print("\nLayered parallel search complete") + print(f" Workers: {args.workers}") + print(f" Trials: {args.trials}") + print(f" Search mode: {'local' if base_params is not None else 'broad'}") + print(f" Finalists verified: {len(finalists)}") + print(f" Best preview score: {best_result['preview_score']:.2f}") + print(f" Best render score: {best_result['render_score']:.2f}") + print(f" Preview metrics: {semantic_metrics_to_dict(best_result['preview_metrics'])}") + print(f" Render metrics: {semantic_metrics_to_dict(best_result['render_metrics'])}") + print(f" Params: {params_display}") + print(f" Applied output: {args.output_path}") + print(f" Best BMP snapshot: {args.best_bmp_out}") + print(f" Best render preview: {args.best_render_out}") + + if isinstance(best_result["params"], LayeredParams): + args.best_params_out.parent.mkdir(parents=True, exist_ok=True) + args.best_params_out.write_text(json.dumps(asdict(best_result["params"]), indent=2) + "\n") + print(f" Best params JSON: {args.best_params_out}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/search_track_1bit_params.py b/scripts/search_track_1bit_params.py new file mode 100644 index 0000000..2dcf392 --- /dev/null +++ b/scripts/search_track_1bit_params.py @@ -0,0 +1,166 @@ +#!/usr/bin/env python3 +"""Random-search tuner for 1-bit track conversion parameters. + +The script scores candidates using rendered `/calendar.bmp` output so the +optimization follows what users actually see, not just the raw BMP asset. +""" + +from __future__ import annotations + +import argparse +import random +import shutil +from dataclasses import asdict +from pathlib import Path + +import pytz + +from track_conversion_utils import ( + Track1BitParams, + build_1bit_track, + evaluate_rendered_track_region, + fetch_calendar_render, + metrics_to_dict, + score_track_metrics, +) + +PROJECT_ROOT = Path(__file__).resolve().parent.parent + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Search 1-bit conversion parameters against rendered output quality." + ) + parser.add_argument( + "--source-path", + type=Path, + default=PROJECT_ROOT / "app" / "assets" / "tracks" / "albert_park.png", + help="Source track image path", + ) + parser.add_argument( + "--output-path", + type=Path, + default=PROJECT_ROOT / "app" / "assets" / "tracks_processed" / "albert_park.bmp", + help="Output 1-bit BMP path used by renderer", + ) + parser.add_argument( + "--endpoint-url", + default="http://127.0.0.1:8000/calendar.bmp?lang=en&year=2026&round=1&display=1bit&weather=false", + help="Calendar endpoint for rendered quality checks", + ) + parser.add_argument( + "--trials", + type=int, + default=300, + help="Number of random trials", + ) + parser.add_argument( + "--seed", + type=int, + default=20260305, + help="Random seed", + ) + parser.add_argument( + "--progress-every", + type=int, + default=25, + help="Progress print interval", + ) + parser.add_argument( + "--best-bmp-out", + type=Path, + default=Path("/tmp/albert_park_best_track.bmp"), + help="Path for saving best candidate BMP snapshot", + ) + parser.add_argument( + "--best-render-out", + type=Path, + default=Path("/tmp/albert_park_best_render.png"), + help="Path for saving best rendered preview PNG", + ) + return parser.parse_args() + + +def sample_params(rng: random.Random) -> Track1BitParams: + """Sample one parameter set around currently successful ranges.""" + return Track1BitParams( + road_gray_threshold=rng.randint(108, 126), + road_saturation_threshold=rng.randint(140, 255), + colored_saturation_threshold=rng.randint(56, 80), + colored_value_threshold=rng.randint(78, 100), + label_min_area=rng.randint(180, 280), + label_min_width=rng.randint(14, 24), + label_min_height=rng.randint(7, 11), + label_min_fill_ratio=rng.uniform(0.62, 0.82), + label_max_aspect_ratio=rng.uniform(4.0, 6.8), + label_text_value_threshold=rng.randint(105, 135), + label_text_low_sat_threshold=rng.randint(65, 95), + label_text_value_low_sat_threshold=rng.randint(138, 168), + min_component_pixels=rng.randint(6, 10), + opaque_alpha_threshold=rng.randint(30, 45), + road_dilate_px=rng.choice([0, 0, 1]), + ) + + +def main() -> int: + args = parse_args() + if not args.source_path.exists(): + print(f"Source not found: {args.source_path}") + return 1 + + rng = random.Random(args.seed) + timezones = list(pytz.all_timezones) + rng.shuffle(timezones) + + best_score = float("-inf") + best_params: Track1BitParams | None = None + best_metrics = None + + for index in range(args.trials): + params = sample_params(rng) + build_1bit_track(args.source_path, args.output_path, params=params) + + timezone_name = timezones[index % len(timezones)] + try: + rendered = fetch_calendar_render(args.endpoint_url, timezone_name) + except Exception as exc: + print(f"{index + 1:03d}/{args.trials} FAIL fetch ({timezone_name}): {exc}") + continue + + metrics = evaluate_rendered_track_region(rendered) + score = score_track_metrics(metrics) + + if score > best_score: + best_score = score + best_params = params + best_metrics = metrics + args.best_render_out.parent.mkdir(parents=True, exist_ok=True) + rendered.save(args.best_render_out) + args.best_bmp_out.parent.mkdir(parents=True, exist_ok=True) + shutil.copy2(args.output_path, args.best_bmp_out) + + if (index + 1) % max(1, args.progress_every) == 0: + print( + f"{index + 1:03d}/{args.trials} " + f"best={best_score:.2f} current={score:.2f} " + f"metrics={metrics_to_dict(metrics)}" + ) + + if best_params is None or best_metrics is None: + print("No successful trial produced a scored candidate.") + return 2 + + build_1bit_track(args.source_path, args.output_path, params=best_params) + + print("\nBest candidate selected") + print(f" Score: {best_score:.2f}") + print(f" Metrics: {metrics_to_dict(best_metrics)}") + print(f" Params: {asdict(best_params)}") + print(f" Applied output: {args.output_path}") + print(f" Best BMP snapshot: {args.best_bmp_out}") + print(f" Best render preview: {args.best_render_out}") + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/track_conversion_utils.py b/scripts/track_conversion_utils.py new file mode 100644 index 0000000..d10093e --- /dev/null +++ b/scripts/track_conversion_utils.py @@ -0,0 +1,1081 @@ +#!/usr/bin/env python3 +"""Utilities for track conversion and rendered quality scoring. + +This module is intentionally script-friendly (no app imports) so it can be +used during rapid track asset iteration. +""" + +from __future__ import annotations + +import io +import math +import urllib.parse +import urllib.request +from collections import deque +from dataclasses import dataclass +from pathlib import Path + +from PIL import Image, ImageFilter, ImageOps + +MAX_TRACK_WIDTH = 490 +MAX_TRACK_HEIGHT = 280 +SPECTRA6_TARGET_SIZE = (494, 271) + +NEIGHBORS = ((1, 0), (-1, 0), (0, 1), (0, -1)) + +SPECTRA6_PALETTE = ( + (0, 0, 0), + (255, 255, 255), + (160, 32, 32), + (240, 224, 80), + (80, 128, 184), + (96, 128, 80), +) + + +@dataclass(frozen=True) +class Track1BitParams: + """Parameter bundle for color-aware 1-bit track conversion.""" + + road_gray_threshold: int = 118 + road_saturation_threshold: int = 255 + colored_saturation_threshold: int = 62 + colored_value_threshold: int = 85 + label_min_area: int = 220 + label_min_width: int = 20 + label_min_height: int = 9 + label_min_fill_ratio: float = 0.72 + label_max_aspect_ratio: float = 5.0 + label_text_value_threshold: int = 120 + label_text_low_sat_threshold: int = 80 + label_text_value_low_sat_threshold: int = 150 + min_component_pixels: int = 8 + opaque_alpha_threshold: int = 35 + road_dilate_px: int = 0 + + +@dataclass(frozen=True) +class Component: + """Connected component stats for a binary mask.""" + + points: list[tuple[int, int]] + min_x: int + min_y: int + max_x: int + max_y: int + area: int + width: int + height: int + fill_ratio: float + + +@dataclass(frozen=True) +class TrackRenderMetrics: + """Quality metrics extracted from the rendered calendar region.""" + + black_ratio: float + largest_area: int + largest_fill_ratio: float + box_count: int + box_white_ratio: float + noise_count: int + + +@dataclass(frozen=True) +class SemanticScoringMetrics: + """Semantic scoring metrics evaluated on the rendered track region.""" + + track_black_fill_1x: float + box_black_fill_1x: float + text_white_fill_1x: float + bg_white_fill_1x: float + accent_white_fill_1x: float + semantic_transfer_1x: float + semantic_transfer_05x: float + semantic_transfer_ms: float + boundary_iou_track_1x: float + boundary_iou_box_1x: float + boundary_iou_track_05x: float + boundary_iou_box_05x: float + boundary_iou_ms: float + hierarchy_score: float + noise_score: float + total_score: float + + +@dataclass(frozen=True) +class TrackSemanticReference: + """Semantic reference masks for rendered-track scoring.""" + + preview_size: tuple[int, int] + masks_1x: dict[str, list[list[bool]]] + masks_05x: dict[str, list[list[bool]]] + boundaries_1x: dict[str, list[list[bool]]] + boundaries_05x: dict[str, list[list[bool]]] + + +def download_file(url: str, output_path: Path, timeout: int = 30) -> int: + """Download a file and return downloaded byte count.""" + output_path.parent.mkdir(parents=True, exist_ok=True) + request = urllib.request.Request(url, headers={"User-Agent": "track-conversion/1.0"}) + with urllib.request.urlopen(request, timeout=timeout) as response: + payload = response.read() + output_path.write_bytes(payload) + return len(payload) + + +def _pixel_to_int(pixel: int | float | tuple[int, ...]) -> int: + """Normalize Pillow pixel value to integer.""" + if isinstance(pixel, tuple): + return int(pixel[0]) if pixel else 0 + return int(pixel) + + +def _pixel_to_hsv(pixel: int | float | tuple[int, ...]) -> tuple[int, int, int]: + """Normalize Pillow HSV pixel value to (h, s, v) ints.""" + if isinstance(pixel, tuple): + if len(pixel) >= 3: + return int(pixel[0]), int(pixel[1]), int(pixel[2]) + if len(pixel) == 2: + return int(pixel[0]), int(pixel[1]), int(pixel[1]) + if len(pixel) == 1: + value = int(pixel[0]) + return value, 0, value + return 0, 0, 0 + value = int(pixel) + return value, 0, value + + +def _mask_shape(mask: list[list[bool]]) -> tuple[int, int]: + """Return (width, height) for a bool mask.""" + height = len(mask) + width = len(mask[0]) if height else 0 + return width, height + + +def _empty_mask(width: int, height: int) -> list[list[bool]]: + """Create an empty bool mask.""" + return [[False] * width for _ in range(height)] + + +def _copy_mask(mask: list[list[bool]]) -> list[list[bool]]: + """Deep-copy a bool mask.""" + return [row[:] for row in mask] + + +def _or_masks(*masks: list[list[bool]]) -> list[list[bool]]: + """Return logical OR over masks of equal size.""" + if not masks: + return [] + width, height = _mask_shape(masks[0]) + merged = _empty_mask(width, height) + for y in range(height): + for x in range(width): + merged[y][x] = any(mask[y][x] for mask in masks) + return merged + + +def _subtract_mask(mask: list[list[bool]], minus: list[list[bool]]) -> list[list[bool]]: + """Return mask - minus.""" + width, height = _mask_shape(mask) + result = _empty_mask(width, height) + for y in range(height): + for x in range(width): + result[y][x] = mask[y][x] and not minus[y][x] + return result + + +def _invert_mask(mask: list[list[bool]]) -> list[list[bool]]: + """Invert a bool mask.""" + width, height = _mask_shape(mask) + return [[not mask[y][x] for x in range(width)] for y in range(height)] + + +def _mask_to_image(mask: list[list[bool]]) -> Image.Image: + """Convert bool mask to 1-bit Pillow image.""" + width, height = _mask_shape(mask) + image = Image.new("1", (width, height), 0) + pixels = image.load() + if pixels is None: + raise RuntimeError("Failed to create mask image") + for y in range(height): + for x in range(width): + if mask[y][x]: + pixels[x, y] = 1 + return image + + +def _image_to_mask(image: Image.Image) -> list[list[bool]]: + """Convert Pillow image to bool mask using nonzero as True.""" + binary = image.convert("1") + pixels = binary.load() + if pixels is None: + raise RuntimeError("Failed to read image mask pixels") + width, height = binary.size + return [[_pixel_to_int(pixels[x, y]) > 0 for x in range(width)] for y in range(height)] + + +def _resize_mask(mask: list[list[bool]], size: tuple[int, int]) -> list[list[bool]]: + """Resize bool mask with box filter and threshold at 0.5.""" + image = _mask_to_image(mask).convert("L") + resized = image.resize(size, Image.Resampling.BOX) + pixels = resized.load() + if pixels is None: + raise RuntimeError("Failed to read resized mask pixels") + width, height = resized.size + return [[_pixel_to_int(pixels[x, y]) >= 128 for x in range(width)] for y in range(height)] + + +def _paste_centered_mask(mask: list[list[bool]], canvas_size: tuple[int, int]) -> list[list[bool]]: + """Paste smaller mask centered into larger canvas.""" + source_width, source_height = _mask_shape(mask) + canvas_width, canvas_height = canvas_size + canvas = _empty_mask(canvas_width, canvas_height) + offset_x = (canvas_width - source_width) // 2 + offset_y = (canvas_height - source_height) // 2 + for y in range(source_height): + for x in range(source_width): + if mask[y][x]: + canvas[offset_y + y][offset_x + x] = True + return canvas + + +def _mask_area(mask: list[list[bool]]) -> int: + """Return number of True pixels in mask.""" + return sum(sum(1 for value in row if value) for row in mask) + + +def _mean_mask_value(mask: list[list[bool]], values: list[list[bool]]) -> float: + """Return mean of bool values restricted to mask.""" + total = 0 + count = 0 + width, height = _mask_shape(mask) + for y in range(height): + for x in range(width): + if mask[y][x]: + count += 1 + if values[y][x]: + total += 1 + return total / count if count else float("nan") + + +def _dilate_mask_radius(mask: list[list[bool]], radius: int) -> list[list[bool]]: + """Dilate mask by radius using repeated 3x3 max filters.""" + return _dilate_mask(mask, radius) + + +def _erode_mask(mask: list[list[bool]], radius: int) -> list[list[bool]]: + """Erode a bool mask using repeated 3x3 min filters.""" + if radius <= 0: + return _copy_mask(mask) + + width, height = _mask_shape(mask) + image = Image.new("L", (width, height), 0) + pixels = image.load() + if pixels is None: + raise RuntimeError("Failed to create erosion image") + for y in range(height): + for x in range(width): + if mask[y][x]: + pixels[x, y] = 255 + + for _ in range(radius): + image = image.filter(ImageFilter.MinFilter(size=3)) + + pixels = image.load() + if pixels is None: + raise RuntimeError("Failed to read eroded mask pixels") + return [[_pixel_to_int(pixels[x, y]) > 0 for x in range(width)] for y in range(height)] + + +def _boundary_mask(mask: list[list[bool]], radius: int = 1) -> list[list[bool]]: + """Return thin boundary mask from region mask.""" + eroded = _erode_mask(mask, radius) + width, height = _mask_shape(mask) + boundary = _empty_mask(width, height) + for y in range(height): + for x in range(width): + boundary[y][x] = mask[y][x] and not eroded[y][x] + return boundary + + +def _intersection_over_union(mask_a: list[list[bool]], mask_b: list[list[bool]]) -> float: + """Compute IoU for two bool masks.""" + width, height = _mask_shape(mask_a) + intersection = 0 + union = 0 + for y in range(height): + for x in range(width): + a = mask_a[y][x] + b = mask_b[y][x] + if a and b: + intersection += 1 + if a or b: + union += 1 + return intersection / union if union else 1.0 + + +def _boundary_iou( + predicted_mask: list[list[bool]], reference_mask: list[list[bool]], radius: int +) -> float: + """Compute boundary IoU between predicted and reference masks.""" + predicted_boundary = _boundary_mask(predicted_mask, radius=1) + reference_boundary = _boundary_mask(reference_mask, radius=1) + predicted_band = _dilate_mask_radius(predicted_boundary, radius) + reference_band = _dilate_mask_radius(reference_boundary, radius) + pred_match = _and_masks(predicted_boundary, reference_band) + ref_match = _and_masks(reference_boundary, predicted_band) + intersection = _or_masks(pred_match, ref_match) + union = _or_masks(predicted_boundary, reference_boundary) + return _mask_area(intersection) / max(1, _mask_area(union)) + + +def _and_masks(mask_a: list[list[bool]], mask_b: list[list[bool]]) -> list[list[bool]]: + """Return logical AND of two masks.""" + width, height = _mask_shape(mask_a) + return [[mask_a[y][x] and mask_b[y][x] for x in range(width)] for y in range(height)] + + +def _connected_components(mask: list[list[bool]]) -> list[Component]: + """Return 4-neighborhood connected components for a bool mask.""" + height = len(mask) + width = len(mask[0]) if height else 0 + visited = [[False] * width for _ in range(height)] + components: list[Component] = [] + + for y in range(height): + for x in range(width): + if not mask[y][x] or visited[y][x]: + continue + + queue = deque([(x, y)]) + visited[y][x] = True + points: list[tuple[int, int]] = [] + min_x = min_y = 10**9 + max_x = max_y = -1 + + while queue: + cx, cy = queue.popleft() + points.append((cx, cy)) + min_x = min(min_x, cx) + min_y = min(min_y, cy) + max_x = max(max_x, cx) + max_y = max(max_y, cy) + + for dx, dy in NEIGHBORS: + nx, ny = cx + dx, cy + dy + if nx < 0 or ny < 0 or nx >= width or ny >= height: + continue + if visited[ny][nx] or not mask[ny][nx]: + continue + visited[ny][nx] = True + queue.append((nx, ny)) + + comp_width = max_x - min_x + 1 + comp_height = max_y - min_y + 1 + area = len(points) + fill_ratio = area / (comp_width * comp_height) + components.append( + Component( + points=points, + min_x=min_x, + min_y=min_y, + max_x=max_x, + max_y=max_y, + area=area, + width=comp_width, + height=comp_height, + fill_ratio=fill_ratio, + ) + ) + + return components + + +def _dilate_mask(mask: list[list[bool]], iterations: int) -> list[list[bool]]: + """Dilate a bool mask with a 3x3 max filter.""" + if iterations <= 0: + return mask + + height = len(mask) + width = len(mask[0]) if height else 0 + image = Image.new("L", (width, height), 0) + pixels = image.load() + if pixels is None: + raise RuntimeError("Failed to create mask pixel access") + for y in range(height): + for x in range(width): + if mask[y][x]: + pixels[x, y] = 255 + + for _ in range(iterations): + image = image.filter(ImageFilter.MaxFilter(size=3)) + + pixels = image.load() + if pixels is None: + raise RuntimeError("Failed to read dilated mask pixel access") + return [[_pixel_to_int(pixels[x, y]) > 0 for x in range(width)] for y in range(height)] + + +def _prepare_track_rgba( + input_path: Path, + max_width: int = MAX_TRACK_WIDTH, + max_height: int = MAX_TRACK_HEIGHT, +) -> Image.Image: + """Load source track image, crop transparent margins, and fit max size.""" + image = Image.open(input_path).convert("RGBA") + alpha = image.getchannel("A") + + bbox = alpha.getbbox() + if bbox: + image = image.crop(bbox) + + width, height = image.size + ratio = min(max_width / width, max_height / height) + if ratio < 1: + new_size = (max(1, int(width * ratio)), max(1, int(height * ratio))) + image = image.resize(new_size, Image.Resampling.LANCZOS) + + return image + + +def build_1bit_track( + input_path: Path, + output_path: Path, + params: Track1BitParams | None = None, + max_width: int = MAX_TRACK_WIDTH, + max_height: int = MAX_TRACK_HEIGHT, +) -> dict[str, object]: + """Generate 1-bit BMP from source track image using color-aware rules.""" + settings = params or Track1BitParams() + rgba = _prepare_track_rgba(input_path, max_width=max_width, max_height=max_height) + + width, height = rgba.size + alpha_channel = rgba.getchannel("A") + composited = Image.new("RGB", rgba.size, (255, 255, 255)) + composited.paste(rgba, mask=alpha_channel) + + alpha = alpha_channel.load() + gray = composited.convert("L").load() + hsv = composited.convert("HSV").load() + if alpha is None or gray is None or hsv is None: + raise RuntimeError("Failed to load pixel data for 1-bit conversion") + + opaque = [ + [_pixel_to_int(alpha[x, y]) >= settings.opaque_alpha_threshold for x in range(width)] + for y in range(height) + ] + + road_seed = [ + [ + opaque[y][x] + and _pixel_to_int(gray[x, y]) < settings.road_gray_threshold + and _pixel_to_hsv(hsv[x, y])[1] < settings.road_saturation_threshold + for x in range(width) + ] + for y in range(height) + ] + + road_mask = [[False] * width for _ in range(height)] + road_components = _connected_components(road_seed) + if road_components: + largest = max(road_components, key=lambda comp: comp.area) + for x, y in largest.points: + road_mask[y][x] = True + + road_mask = _dilate_mask(road_mask, settings.road_dilate_px) + + colored_mask = [ + [ + opaque[y][x] + and _pixel_to_hsv(hsv[x, y])[1] >= settings.colored_saturation_threshold + and _pixel_to_hsv(hsv[x, y])[2] >= settings.colored_value_threshold + for x in range(width) + ] + for y in range(height) + ] + + label_mask = [[False] * width for _ in range(height)] + for component in _connected_components(colored_mask): + aspect_ratio = max( + component.width / component.height, + component.height / component.width, + ) + if ( + component.area >= settings.label_min_area + and component.width >= settings.label_min_width + and component.height >= settings.label_min_height + and component.fill_ratio >= settings.label_min_fill_ratio + and aspect_ratio <= settings.label_max_aspect_ratio + ): + for x, y in component.points: + label_mask[y][x] = True + + black_mask = [ + [road_mask[y][x] or label_mask[y][x] for x in range(width)] for y in range(height) + ] + + for y in range(height): + for x in range(width): + if not label_mask[y][x]: + continue + sat = _pixel_to_hsv(hsv[x, y])[1] + val = _pixel_to_hsv(hsv[x, y])[2] + if val < settings.label_text_value_threshold or ( + sat < settings.label_text_low_sat_threshold + and val < settings.label_text_value_low_sat_threshold + ): + black_mask[y][x] = False + + for component in _connected_components(black_mask): + if component.area < settings.min_component_pixels: + for x, y in component.points: + black_mask[y][x] = False + + output = Image.new("L", (width, height), 255) + pixels = output.load() + if pixels is None: + raise RuntimeError("Failed to create output pixel access") + for y in range(height): + for x in range(width): + if black_mask[y][x]: + pixels[x, y] = 0 + + final = output.convert("1") + output_path.parent.mkdir(parents=True, exist_ok=True) + final.save(output_path, format="BMP") + + histogram = final.convert("L").histogram() + black_pixels = histogram[0] + white_pixels = histogram[255] + total_pixels = max(1, black_pixels + white_pixels) + + return { + "output_size": output_path.stat().st_size, + "final_dimensions": final.size, + "black_ratio": black_pixels / total_pixels, + "black_pixels": black_pixels, + "white_pixels": white_pixels, + } + + +def convert_spectra6_track( + input_path: Path, + output_path: Path, + target_size: tuple[int, int] = SPECTRA6_TARGET_SIZE, +) -> dict[str, object]: + """Generate Spectra6 indexed BMP from source track image.""" + original = Image.open(input_path) + if original.mode in ("RGBA", "LA") or ( + original.mode == "P" and "transparency" in original.info + ): + rgba = original.convert("RGBA") + base = Image.new("RGB", rgba.size, (255, 255, 255)) + base.paste(rgba, mask=rgba.getchannel("A")) + rgb = base + else: + rgb = original.convert("RGB") + + contained = ImageOps.contain(rgb, target_size, Image.Resampling.LANCZOS) + canvas = Image.new("RGB", target_size, (255, 255, 255)) + offset = ((target_size[0] - contained.width) // 2, (target_size[1] - contained.height) // 2) + canvas.paste(contained, offset) + + palette_flat: list[int] = [] + for color in SPECTRA6_PALETTE: + palette_flat.extend(color) + while len(palette_flat) < 768: + palette_flat.extend([0, 0, 0]) + + palette_image = Image.new("P", (1, 1)) + palette_image.putpalette(palette_flat) + + indexed = canvas.quantize(colors=6, palette=palette_image, dither=Image.Dither.NONE) + output_path.parent.mkdir(parents=True, exist_ok=True) + indexed.save(output_path, format="BMP") + + used_colors = indexed.getcolors(maxcolors=256) + return { + "output_size": output_path.stat().st_size, + "final_dimensions": indexed.size, + "contained_dimensions": contained.size, + "offset": offset, + "colors_used": len(used_colors) if used_colors is not None else 256, + } + + +def build_track_semantic_reference( + input_path: Path, + preview_size: tuple[int, int] = (500, 268), + max_width: int = MAX_TRACK_WIDTH, + max_height: int = MAX_TRACK_HEIGHT, + opaque_alpha_threshold: int = 35, +) -> TrackSemanticReference: + """Build semantic reference masks from the colorful source track diagram.""" + rgba = _prepare_track_rgba(input_path, max_width=max_width, max_height=max_height) + alpha_channel = rgba.getchannel("A") + composited = Image.new("RGB", rgba.size, (255, 255, 255)) + composited.paste(rgba, mask=alpha_channel) + + width, height = rgba.size + alpha = alpha_channel.load() + gray = composited.convert("L").load() + hsv = composited.convert("HSV").load() + if alpha is None or gray is None or hsv is None: + raise RuntimeError("Failed to load pixel data for semantic reference") + + opaque = [ + [_pixel_to_int(alpha[x, y]) >= opaque_alpha_threshold for x in range(width)] + for y in range(height) + ] + + dark_neutral = [ + [ + opaque[y][x] and _pixel_to_int(gray[x, y]) < 138 and _pixel_to_hsv(hsv[x, y])[1] < 128 + for x in range(width) + ] + for y in range(height) + ] + + track_body = _empty_mask(width, height) + dark_components = _connected_components(dark_neutral) + if dark_components: + dominant_track = max(dark_components, key=lambda component: component.area) + for x, y in dominant_track.points: + track_body[y][x] = True + + track_proximity = _dilate_mask_radius(track_body, 3) + + saturated_colored = [ + [ + opaque[y][x] and _pixel_to_hsv(hsv[x, y])[1] >= 56 and _pixel_to_hsv(hsv[x, y])[2] >= 70 + for x in range(width) + ] + for y in range(height) + ] + + label_boxes = _empty_mask(width, height) + accent_mask = _empty_mask(width, height) + decor_ignore = _empty_mask(width, height) + + for component in _connected_components(saturated_colored): + aspect_ratio = max( + component.width / component.height, + component.height / component.width, + ) + near_track = any(track_proximity[y][x] for x, y in component.points) + + if ( + component.area >= 140 + and component.width >= 12 + and component.height >= 6 + and component.fill_ratio >= 0.48 + and aspect_ratio <= 9.5 + and not (near_track and aspect_ratio >= 6.5 and component.area < 1200) + ): + for x, y in component.points: + label_boxes[y][x] = True + continue + + if near_track and component.area <= 2200 and aspect_ratio >= 2.0: + for x, y in component.points: + accent_mask[y][x] = True + continue + + if component.area <= 220: + for x, y in component.points: + decor_ignore[y][x] = True + + track_black = _subtract_mask(track_body, accent_mask) + + text_white = _empty_mask(width, height) + for y in range(height): + for x in range(width): + if not label_boxes[y][x]: + continue + sat = _pixel_to_hsv(hsv[x, y])[1] + val = _pixel_to_hsv(hsv[x, y])[2] + if val < 125 or (sat < 92 and val < 165): + text_white[y][x] = True + + background_white = _invert_mask(_or_masks(track_black, label_boxes, decor_ignore)) + accent_white = _copy_mask(accent_mask) + + masks_track_space = { + "track_black": track_black, + "box_black": label_boxes, + "text_white": text_white, + "accent_white": accent_white, + "bg_white": background_white, + "decor_ignore": decor_ignore, + } + + masks_1x = { + key: _paste_centered_mask(mask, preview_size) for key, mask in masks_track_space.items() + } + masks_05x = { + key: _resize_mask(mask, (preview_size[0] // 2, preview_size[1] // 2)) + for key, mask in masks_1x.items() + } + + boundaries_1x = { + "track_black": _boundary_mask(masks_1x["track_black"], radius=1), + "box_black": _boundary_mask(masks_1x["box_black"], radius=1), + } + boundaries_05x = { + "track_black": _boundary_mask(masks_05x["track_black"], radius=1), + "box_black": _boundary_mask(masks_05x["box_black"], radius=1), + } + + return TrackSemanticReference( + preview_size=preview_size, + masks_1x=masks_1x, + masks_05x=masks_05x, + boundaries_1x=boundaries_1x, + boundaries_05x=boundaries_05x, + ) + + +def _crop_or_use_full(image: Image.Image, roi: tuple[int, int, int, int] | None) -> Image.Image: + """Crop to ROI if image is larger than ROI, otherwise use full image.""" + if roi is None: + return image + x0, x1, y0, y1 = roi + target_width = x1 - x0 + target_height = y1 - y0 + if image.size == (target_width, target_height): + return image + return image.crop((x0, y0, x1, y1)) + + +def _mask_from_black_pixels(image: Image.Image) -> list[list[bool]]: + """Return True where the image is black.""" + binary = image.convert("1") + pixels = binary.load() + if pixels is None: + raise RuntimeError("Failed to load candidate pixels") + width, height = binary.size + return [[_pixel_to_int(pixels[x, y]) == 0 for x in range(width)] for y in range(height)] + + +def _semantic_transfer_score( + black_mask: list[list[bool]], + white_mask: list[list[bool]], + semantic_masks: dict[str, list[list[bool]]], +) -> float: + """Compute semantic layer transfer score for one scale.""" + weighted_terms: list[tuple[float, float]] = [] + + def add(weight: float, value: float) -> None: + if not math.isnan(value): + weighted_terms.append((weight, value)) + + add(0.30, _mean_mask_value(semantic_masks["track_black"], black_mask)) + add(0.18, _mean_mask_value(semantic_masks["box_black"], black_mask)) + add(0.18, _mean_mask_value(semantic_masks["text_white"], white_mask)) + add(0.14, _mean_mask_value(semantic_masks["bg_white"], white_mask)) + add(0.10, _mean_mask_value(semantic_masks["accent_white"], white_mask)) + + white_union = _or_masks( + semantic_masks["text_white"], + semantic_masks["bg_white"], + semantic_masks["accent_white"], + ) + spill_black = _mean_mask_value(white_union, black_mask) + add(0.10, 1.0 - spill_black if not math.isnan(spill_black) else float("nan")) + + total_weight = sum(weight for weight, _ in weighted_terms) + total_score = sum(weight * value for weight, value in weighted_terms) + return total_score / total_weight if total_weight else 0.0 + + +def _hierarchy_score( + black_mask: list[list[bool]], + semantic_masks: dict[str, list[list[bool]]], +) -> float: + """Compare black-mass distribution against expected visual hierarchy.""" + track_black = _and_masks(black_mask, semantic_masks["track_black"]) + box_black = _and_masks(black_mask, semantic_masks["box_black"]) + decor_black = _and_masks(black_mask, semantic_masks["decor_ignore"]) + main_expected = _or_masks(semantic_masks["track_black"], semantic_masks["box_black"]) + outside_black = _subtract_mask(black_mask, main_expected) + + predicted = [ + float(_mask_area(track_black)), + float(_mask_area(box_black)), + float(_mask_area(decor_black)), + float(_mask_area(outside_black)), + ] + expected = [ + float(_mask_area(semantic_masks["track_black"])), + float(_mask_area(semantic_masks["box_black"])), + 0.0, + 0.0, + ] + + predicted_sum = sum(predicted) + expected_sum = sum(expected) + if predicted_sum <= 0 or expected_sum <= 0: + return 0.0 + + predicted = [value / predicted_sum for value in predicted] + expected = [value / expected_sum for value in expected] + l1_distance = sum(abs(pred - exp) for pred, exp in zip(predicted, expected, strict=False)) + return max(0.0, 1.0 - 0.5 * l1_distance) + + +def _noise_score( + black_mask: list[list[bool]], + semantic_masks: dict[str, list[list[bool]]], + max_component_pixels: int = 8, +) -> float: + """Penalize tiny black speckles outside allowed-black zones.""" + allowed_black = _dilate_mask_radius( + _or_masks(semantic_masks["track_black"], semantic_masks["box_black"]), + 1, + ) + outside_allowed = _subtract_mask(black_mask, allowed_black) + noise_count = sum( + 1 + for component in _connected_components(outside_allowed) + if component.area <= max_component_pixels + ) + return math.exp(-(noise_count / 30.0)) + + +def evaluate_rendered_semantic_quality( + image: Image.Image, + reference: TrackSemanticReference, + roi: tuple[int, int, int, int] | None = (0, 500, 92, 360), +) -> SemanticScoringMetrics: + """Score rendered output using semantic masks and contour fidelity.""" + cropped = _crop_or_use_full(image.convert("1"), roi) + black_1x = _mask_from_black_pixels(cropped) + width, height = cropped.size + expected_size = reference.preview_size + if (width, height) != expected_size: + raise ValueError(f"Unexpected crop size {(width, height)}; expected {expected_size}") + + white_1x = _invert_mask(black_1x) + + track_black_fill_1x = _mean_mask_value(reference.masks_1x["track_black"], black_1x) + box_black_fill_1x = _mean_mask_value(reference.masks_1x["box_black"], black_1x) + text_white_fill_1x = _mean_mask_value(reference.masks_1x["text_white"], white_1x) + bg_white_fill_1x = _mean_mask_value(reference.masks_1x["bg_white"], white_1x) + accent_white_fill_1x = _mean_mask_value(reference.masks_1x["accent_white"], white_1x) + + downsampled = cropped.convert("L").resize( + (expected_size[0] // 2, expected_size[1] // 2), + Image.Resampling.BOX, + ) + black_05x = _mask_from_black_pixels(downsampled.convert("1")) + white_05x = _invert_mask(black_05x) + + semantic_transfer_1x = _semantic_transfer_score(black_1x, white_1x, reference.masks_1x) + semantic_transfer_05x = _semantic_transfer_score(black_05x, white_05x, reference.masks_05x) + semantic_transfer_ms = 0.7 * semantic_transfer_1x + 0.3 * semantic_transfer_05x + + predicted_track_1x = _and_masks( + black_1x, _dilate_mask_radius(reference.masks_1x["track_black"], 4) + ) + predicted_box_1x = _and_masks(black_1x, _dilate_mask_radius(reference.masks_1x["box_black"], 2)) + predicted_track_05x = _and_masks( + black_05x, _dilate_mask_radius(reference.masks_05x["track_black"], 2) + ) + predicted_box_05x = _and_masks( + black_05x, _dilate_mask_radius(reference.masks_05x["box_black"], 1) + ) + + boundary_iou_track_1x = _boundary_iou( + predicted_track_1x, reference.masks_1x["track_black"], radius=3 + ) + boundary_iou_box_1x = _boundary_iou(predicted_box_1x, reference.masks_1x["box_black"], radius=2) + boundary_iou_track_05x = _boundary_iou( + predicted_track_05x, reference.masks_05x["track_black"], radius=2 + ) + boundary_iou_box_05x = _boundary_iou( + predicted_box_05x, reference.masks_05x["box_black"], radius=1 + ) + boundary_iou_ms = 0.7 * (0.75 * boundary_iou_track_1x + 0.25 * boundary_iou_box_1x) + 0.3 * ( + 0.75 * boundary_iou_track_05x + 0.25 * boundary_iou_box_05x + ) + + hierarchy_score = _hierarchy_score(black_1x, reference.masks_1x) + noise_score = _noise_score(black_1x, reference.masks_1x) + + base_score = ( + 0.68 * semantic_transfer_ms + + 0.18 * boundary_iou_ms + + 0.08 * hierarchy_score + + 0.06 * noise_score + ) + + hard_factor = 1.0 + if not math.isnan(track_black_fill_1x) and track_black_fill_1x < 0.78: + hard_factor *= max(0.0, track_black_fill_1x / 0.78) + if not math.isnan(box_black_fill_1x) and box_black_fill_1x < 0.72: + hard_factor *= max(0.0, box_black_fill_1x / 0.72) + if not math.isnan(bg_white_fill_1x) and bg_white_fill_1x < 0.88: + hard_factor *= max(0.0, bg_white_fill_1x / 0.88) + if not math.isnan(text_white_fill_1x) and text_white_fill_1x < 0.45: + hard_factor *= max(0.0, text_white_fill_1x / 0.45) + + total_score = 100.0 * base_score * hard_factor + + return SemanticScoringMetrics( + track_black_fill_1x=track_black_fill_1x, + box_black_fill_1x=box_black_fill_1x, + text_white_fill_1x=text_white_fill_1x, + bg_white_fill_1x=bg_white_fill_1x, + accent_white_fill_1x=accent_white_fill_1x, + semantic_transfer_1x=semantic_transfer_1x, + semantic_transfer_05x=semantic_transfer_05x, + semantic_transfer_ms=semantic_transfer_ms, + boundary_iou_track_1x=boundary_iou_track_1x, + boundary_iou_box_1x=boundary_iou_box_1x, + boundary_iou_track_05x=boundary_iou_track_05x, + boundary_iou_box_05x=boundary_iou_box_05x, + boundary_iou_ms=boundary_iou_ms, + hierarchy_score=hierarchy_score, + noise_score=noise_score, + total_score=total_score, + ) + + +def fetch_calendar_render(endpoint_url: str, timezone_name: str, timeout: int = 20) -> Image.Image: + """Fetch rendered calendar BMP from endpoint and return as 1-bit PIL image.""" + url_parts = urllib.parse.urlsplit(endpoint_url) + query = urllib.parse.parse_qsl(url_parts.query, keep_blank_values=True) + query = [(key, value) for key, value in query if key != "tz"] + query.append(("tz", timezone_name)) + final_query = urllib.parse.urlencode(query) + final_url = urllib.parse.urlunsplit( + (url_parts.scheme, url_parts.netloc, url_parts.path, final_query, url_parts.fragment) + ) + + request = urllib.request.Request(final_url, headers={"User-Agent": "track-conversion/1.0"}) + with urllib.request.urlopen(request, timeout=timeout) as response: + payload = response.read() + return Image.open(io.BytesIO(payload)).convert("1") + + +def evaluate_rendered_track_region( + image: Image.Image, + roi: tuple[int, int, int, int] = (0, 500, 92, 360), +) -> TrackRenderMetrics: + """Evaluate rendered map region and return summary metrics.""" + binary = image.convert("1") + pixels = binary.load() + if pixels is None: + raise RuntimeError("Failed to load pixels from rendered image") + image_width, image_height = binary.size + + x0 = max(0, min(image_width, roi[0])) + x1 = max(x0 + 1, min(image_width, roi[1])) + y0 = max(0, min(image_height, roi[2])) + y1 = max(y0 + 1, min(image_height, roi[3])) + + width = x1 - x0 + height = y1 - y0 + mask = [[pixels[x0 + x, y0 + y] == 0 for x in range(width)] for y in range(height)] + + components = _connected_components(mask) + if not components: + return TrackRenderMetrics(0.0, 0, 0.0, 0, 0.0, 0) + + total_pixels = width * height + black_pixels = sum(component.area for component in components) + black_ratio = black_pixels / max(1, total_pixels) + + largest = max(components, key=lambda component: component.area) + + box_count = 0 + box_white_ratios: list[float] = [] + for component in components: + aspect_ratio = max( + component.width / component.height, + component.height / component.width, + ) + if ( + 70 <= component.area <= 1000 + and component.width >= 10 + and component.height >= 5 + and component.fill_ratio >= 0.45 + and aspect_ratio <= 8 + and component.min_y > 120 + ): + box_count += 1 + box_white = 0 + box_total = component.width * component.height + for yy in range(component.min_y, component.max_y + 1): + for xx in range(component.min_x, component.max_x + 1): + if not mask[yy][xx]: + box_white += 1 + box_white_ratios.append(box_white / max(1, box_total)) + + noise_count = sum(1 for component in components if component.area < 6) + box_white_ratio = sum(box_white_ratios) / len(box_white_ratios) if box_white_ratios else 0.0 + + return TrackRenderMetrics( + black_ratio=black_ratio, + largest_area=largest.area, + largest_fill_ratio=largest.fill_ratio, + box_count=box_count, + box_white_ratio=box_white_ratio, + noise_count=noise_count, + ) + + +def score_track_metrics(metrics: TrackRenderMetrics) -> float: + """Produce a scalar score for automated candidate ranking.""" + + def bell(value: float, center: float, spread: float) -> float: + return math.exp(-(((value - center) / spread) ** 2)) + + score = ( + 40 * bell(metrics.black_ratio, 0.11, 0.03) + + 25 * bell(float(metrics.largest_area), 13200.0, 2500.0) + + 10 * bell(metrics.largest_fill_ratio, 0.12, 0.035) + + 20 * (min(metrics.box_count, 3) / 3) + + 5 * bell(metrics.box_white_ratio, 0.16, 0.08) + + 5 * bell(float(metrics.noise_count), 0.0, 12.0) + ) + + if metrics.box_count < 3: + score -= (3 - metrics.box_count) * 10 + + return score + + +def metrics_to_dict(metrics: TrackRenderMetrics) -> dict[str, float | int]: + """Convert TrackRenderMetrics to serializable dictionary.""" + return { + "black_ratio": round(metrics.black_ratio, 4), + "largest_area": metrics.largest_area, + "largest_fill_ratio": round(metrics.largest_fill_ratio, 4), + "box_count": metrics.box_count, + "box_white_ratio": round(metrics.box_white_ratio, 4), + "noise_count": metrics.noise_count, + } + + +def semantic_metrics_to_dict(metrics: SemanticScoringMetrics) -> dict[str, float]: + """Convert SemanticScoringMetrics to a serializable dictionary.""" + return { + "track_black_fill_1x": round(metrics.track_black_fill_1x, 4), + "box_black_fill_1x": round(metrics.box_black_fill_1x, 4), + "text_white_fill_1x": round(metrics.text_white_fill_1x, 4), + "bg_white_fill_1x": round(metrics.bg_white_fill_1x, 4), + "accent_white_fill_1x": round(metrics.accent_white_fill_1x, 4), + "semantic_transfer_1x": round(metrics.semantic_transfer_1x, 4), + "semantic_transfer_05x": round(metrics.semantic_transfer_05x, 4), + "semantic_transfer_ms": round(metrics.semantic_transfer_ms, 4), + "boundary_iou_track_1x": round(metrics.boundary_iou_track_1x, 4), + "boundary_iou_box_1x": round(metrics.boundary_iou_box_1x, 4), + "boundary_iou_track_05x": round(metrics.boundary_iou_track_05x, 4), + "boundary_iou_box_05x": round(metrics.boundary_iou_box_05x, 4), + "boundary_iou_ms": round(metrics.boundary_iou_ms, 4), + "hierarchy_score": round(metrics.hierarchy_score, 4), + "noise_score": round(metrics.noise_score, 4), + "total_score": round(metrics.total_score, 2), + } diff --git a/tests/test_config.py b/tests/test_config.py index 78205cf..155cda7 100644 --- a/tests/test_config.py +++ b/tests/test_config.py @@ -14,6 +14,8 @@ def test_config_defaults(): assert config.DISPLAY_WIDTH == 800 assert config.DISPLAY_HEIGHT == 480 assert config.DEFAULT_LANG in ["en", "cs"] + assert config.ASSET_API_URL is None + assert config.ASSET_CACHE_TTL_HOURS == 24 def test_config_invalid_env_falls_back(monkeypatch): @@ -24,6 +26,8 @@ def test_config_invalid_env_falls_back(monkeypatch): monkeypatch.setenv("DEFAULT_TIMEZONE", "Not/AZone") monkeypatch.setenv("UMAMI_API_URL", "not-a-url") monkeypatch.setenv("SENTRY_TRACES_SAMPLE_RATE", "2") + monkeypatch.setenv("ASSET_API_URL", "not-a-url") + monkeypatch.setenv("ASSET_CACHE_TTL_HOURS", "0") config_module._reset_config_cache_for_tests() importlib.reload(config_module) @@ -34,6 +38,8 @@ def test_config_invalid_env_falls_back(monkeypatch): assert config.DEFAULT_TIMEZONE == "Europe/Prague" assert str(config.UMAMI_API_URL) == "https://analytics.example.com/api/send" assert config.SENTRY_TRACES_SAMPLE_RATE == 0.1 + assert config.ASSET_API_URL is None + assert config.ASSET_CACHE_TTL_HOURS == 24 def test_translator_english(): diff --git a/tests/test_renderer.py b/tests/test_renderer.py index fd60339..1134c87 100644 --- a/tests/test_renderer.py +++ b/tests/test_renderer.py @@ -18,6 +18,7 @@ TeamsData, ) from app.services import renderer as renderer_module +from app.services import spectra6_renderer as spectra6_renderer_module from app.services.i18n import get_translator from app.services.renderer import Renderer from app.services.spectra6_renderer import Spectra6Colors, Spectra6Renderer @@ -951,6 +952,27 @@ def test_render_calendar_with_rgb_track_image(mock_race_data, tmp_path, monkeypa assert img.mode == "1" +def test_renderer_uses_remote_asset_client_first(mock_race_data, monkeypatch): + """Renderer prefers remote asset client before local glob fallback.""" + + remote_image = Image.new("1", (120, 80), 1) + draw = ImageDraw.Draw(remote_image) + draw.rectangle([20, 20, 100, 60], fill=0) + + class FakeAssetClient: + def get_track_image(self, circuit_ids, variant): + assert variant == "1bit" + assert "test_circuit" in list(circuit_ids) + return remote_image + + monkeypatch.setattr(renderer_module, "get_asset_client", lambda: FakeAssetClient()) + + image = Renderer._load_track_image(mock_race_data) + + assert image is not None + assert image.size == (120, 80) + + # ============================================================================ # Spectra 6 Renderer Tests (6-color E-Ink display) # ============================================================================ @@ -1019,6 +1041,25 @@ def test_spectra6_render_calendar_new_track(mock_race_data): assert img.mode == "P" +def test_spectra6_renderer_uses_remote_asset_client_first(mock_race_data, monkeypatch): + """Spectra6 renderer prefers remote asset client before local filesystem fallback.""" + + remote_image = Image.new("P", (140, 90), color=1) + + class FakeAssetClient: + def get_track_image(self, circuit_ids, variant): + assert variant == "spectra6" + assert "test_circuit" in list(circuit_ids) + return remote_image + + monkeypatch.setattr(spectra6_renderer_module, "get_asset_client", lambda: FakeAssetClient()) + + image = Spectra6Renderer._load_track_image(mock_race_data) + + assert image is not None + assert image.size == (140, 90) + + def test_spectra6_render_error_english(): """Test Spectra 6 rendering error message in English.""" translator = get_translator("en")