|
| 1 | +#!/usr/bin/env python3 |
| 2 | +"""Export image attachments from an xcresult bundle.""" |
| 3 | + |
| 4 | +from __future__ import annotations |
| 5 | + |
| 6 | +import json |
| 7 | +import os |
| 8 | +import re |
| 9 | +import subprocess |
| 10 | +import sys |
| 11 | +from collections import deque |
| 12 | +from typing import Deque, Dict, Iterable, List, Optional, Sequence, Set, Tuple |
| 13 | + |
| 14 | + |
| 15 | +def ri_log(message: str) -> None: |
| 16 | + """Mirror the bash script's logging prefix.""" |
| 17 | + print(f"[run-ios-ui-tests] {message}", file=sys.stderr) |
| 18 | + |
| 19 | + |
| 20 | +def run_xcresult(args: Sequence[str]) -> str: |
| 21 | + cmd = ["xcrun", "xcresulttool", *args] |
| 22 | + try: |
| 23 | + result = subprocess.run( |
| 24 | + cmd, |
| 25 | + check=True, |
| 26 | + stdout=subprocess.PIPE, |
| 27 | + stderr=subprocess.PIPE, |
| 28 | + ) |
| 29 | + except subprocess.CalledProcessError as exc: |
| 30 | + stderr = exc.stderr.decode("utf-8", "ignore").strip() |
| 31 | + ri_log(f"xcresulttool command failed: {' '.join(cmd)}\n{stderr}") |
| 32 | + raise |
| 33 | + return result.stdout.decode("utf-8") |
| 34 | + |
| 35 | + |
| 36 | +def get_json(bundle_path: str, object_id: Optional[str] = None) -> Dict: |
| 37 | + args = ["get", "--path", bundle_path, "--format", "json"] |
| 38 | + if object_id: |
| 39 | + args.extend(["--id", object_id]) |
| 40 | + output = run_xcresult(args) |
| 41 | + return json.loads(output or "{}") |
| 42 | + |
| 43 | + |
| 44 | +def collect_nodes(node) -> Tuple[List[Dict], List[str]]: |
| 45 | + attachments: List[Dict] = [] |
| 46 | + refs: List[str] = [] |
| 47 | + stack: List = [node] |
| 48 | + while stack: |
| 49 | + current = stack.pop() |
| 50 | + if isinstance(current, dict): |
| 51 | + attachment_block = current.get("attachments") |
| 52 | + if isinstance(attachment_block, dict): |
| 53 | + for item in attachment_block.get("_values", []): |
| 54 | + if isinstance(item, dict): |
| 55 | + attachments.append(item) |
| 56 | + for key, value in current.items(): |
| 57 | + if key.endswith("Ref") and isinstance(value, dict): |
| 58 | + ref_id = value.get("id") |
| 59 | + if isinstance(ref_id, str) and ref_id: |
| 60 | + refs.append(ref_id) |
| 61 | + elif key.endswith("Refs") and isinstance(value, dict): |
| 62 | + for entry in value.get("_values", []): |
| 63 | + if isinstance(entry, dict): |
| 64 | + ref_id = entry.get("id") |
| 65 | + if isinstance(ref_id, str) and ref_id: |
| 66 | + refs.append(ref_id) |
| 67 | + if isinstance(value, (dict, list)): |
| 68 | + stack.append(value) |
| 69 | + elif isinstance(current, list): |
| 70 | + stack.extend(current) |
| 71 | + return attachments, refs |
| 72 | + |
| 73 | + |
| 74 | +def is_image_attachment(attachment: Dict) -> bool: |
| 75 | + uti = (attachment.get("uniformTypeIdentifier") or "").lower() |
| 76 | + filename = (attachment.get("filename") or attachment.get("name") or "").lower() |
| 77 | + if filename.endswith((".png", ".jpg", ".jpeg")): |
| 78 | + return True |
| 79 | + if "png" in uti or "jpeg" in uti: |
| 80 | + return True |
| 81 | + return False |
| 82 | + |
| 83 | + |
| 84 | +def sanitize_filename(name: str) -> str: |
| 85 | + safe = re.sub(r"[^A-Za-z0-9_.-]", "_", name) |
| 86 | + return safe or "attachment" |
| 87 | + |
| 88 | + |
| 89 | +def ensure_extension(name: str, uti: str) -> str: |
| 90 | + base, ext = os.path.splitext(name) |
| 91 | + if ext: |
| 92 | + return name |
| 93 | + uti = uti.lower() |
| 94 | + if "png" in uti: |
| 95 | + return f"{name}.png" |
| 96 | + if "jpeg" in uti: |
| 97 | + return f"{name}.jpg" |
| 98 | + return name |
| 99 | + |
| 100 | + |
| 101 | +def export_attachment( |
| 102 | + bundle_path: str, attachment: Dict, destination_dir: str, used_names: Set[str] |
| 103 | +) -> None: |
| 104 | + payload = attachment.get("payloadRef") or {} |
| 105 | + attachment_id = payload.get("id") |
| 106 | + if not attachment_id: |
| 107 | + return |
| 108 | + name = attachment.get("filename") or attachment.get("name") or attachment_id |
| 109 | + name = ensure_extension(name, attachment.get("uniformTypeIdentifier") or "") |
| 110 | + name = sanitize_filename(name) |
| 111 | + candidate = name |
| 112 | + counter = 1 |
| 113 | + while candidate in used_names: |
| 114 | + base, ext = os.path.splitext(name) |
| 115 | + candidate = f"{base}_{counter}{ext}" |
| 116 | + counter += 1 |
| 117 | + used_names.add(candidate) |
| 118 | + output_path = os.path.join(destination_dir, candidate) |
| 119 | + run_xcresult( |
| 120 | + [ |
| 121 | + "export", |
| 122 | + "--legacy", |
| 123 | + "--path", |
| 124 | + bundle_path, |
| 125 | + "--id", |
| 126 | + attachment_id, |
| 127 | + "--type", |
| 128 | + "file", |
| 129 | + "--output-path", |
| 130 | + output_path, |
| 131 | + ] |
| 132 | + ) |
| 133 | + ri_log(f"Exported attachment {candidate}") |
| 134 | + |
| 135 | + |
| 136 | +def handle_attachments( |
| 137 | + bundle_path: str, |
| 138 | + items: Iterable[Dict], |
| 139 | + destination_dir: str, |
| 140 | + used_names: Set[str], |
| 141 | + seen_attachment_ids: Set[str], |
| 142 | +) -> None: |
| 143 | + for attachment in items: |
| 144 | + payload = attachment.get("payloadRef") or {} |
| 145 | + attachment_id = payload.get("id") |
| 146 | + if not attachment_id or attachment_id in seen_attachment_ids: |
| 147 | + continue |
| 148 | + if not is_image_attachment(attachment): |
| 149 | + continue |
| 150 | + seen_attachment_ids.add(attachment_id) |
| 151 | + export_attachment(bundle_path, attachment, destination_dir, used_names) |
| 152 | + |
| 153 | + |
| 154 | +def export_bundle(bundle_path: str, destination_dir: str) -> bool: |
| 155 | + os.makedirs(destination_dir, exist_ok=True) |
| 156 | + |
| 157 | + root = get_json(bundle_path) |
| 158 | + attachments, refs = collect_nodes(root) |
| 159 | + queue: Deque[str] = deque(refs) |
| 160 | + seen_refs: Set[str] = set() |
| 161 | + seen_attachment_ids: Set[str] = set() |
| 162 | + exported_names: Set[str] = set() |
| 163 | + |
| 164 | + handle_attachments( |
| 165 | + bundle_path, attachments, destination_dir, exported_names, seen_attachment_ids |
| 166 | + ) |
| 167 | + |
| 168 | + while queue: |
| 169 | + ref_id = queue.popleft() |
| 170 | + if ref_id in seen_refs: |
| 171 | + continue |
| 172 | + seen_refs.add(ref_id) |
| 173 | + data = get_json(bundle_path, ref_id) |
| 174 | + items, nested_refs = collect_nodes(data) |
| 175 | + handle_attachments( |
| 176 | + bundle_path, items, destination_dir, exported_names, seen_attachment_ids |
| 177 | + ) |
| 178 | + for nested in nested_refs: |
| 179 | + if nested not in seen_refs: |
| 180 | + queue.append(nested) |
| 181 | + |
| 182 | + if not exported_names: |
| 183 | + ri_log("No screenshot attachments were exported from xcresult bundle") |
| 184 | + return False |
| 185 | + return True |
| 186 | + |
| 187 | + |
| 188 | +def main(argv: Sequence[str]) -> int: |
| 189 | + if len(argv) != 3: |
| 190 | + ri_log("Expected bundle path and destination directory arguments") |
| 191 | + return 1 |
| 192 | + _, bundle_path, destination_dir = argv |
| 193 | + success = export_bundle(bundle_path, destination_dir) |
| 194 | + return 0 if success else 2 |
| 195 | + |
| 196 | + |
| 197 | +if __name__ == "__main__": |
| 198 | + raise SystemExit(main(sys.argv)) |
0 commit comments