|
| 1 | +#!/usr/bin/env python3 |
| 2 | +""" |
| 3 | +Fetch the latest IAB Content Taxonomy datasets from the IAB GitHub repo and |
| 4 | +normalize them into the formats expected by this tool: |
| 5 | +
|
| 6 | +- iab_mapper/data/iab_2x.json → [{"code": str, "label": str}] |
| 7 | +- iab_mapper/data/iab_3x.json → [{"id": str, "label": str, "path": [str,...], "scd": bool}] |
| 8 | +
|
| 9 | +Notes |
| 10 | +- Uses GitHub's contents API to locate the latest 2.x and 3.x files. |
| 11 | +- Supports JSON or TSV/CSV inputs; attempts to infer columns. |
| 12 | +- Set GITHUB_TOKEN to raise rate limits; otherwise unauthenticated. |
| 13 | +""" |
| 14 | +from __future__ import annotations |
| 15 | + |
| 16 | +import os |
| 17 | +import sys |
| 18 | +import json |
| 19 | +import re |
| 20 | +import io |
| 21 | +from dataclasses import dataclass |
| 22 | +from typing import Dict, List, Any, Optional, Tuple |
| 23 | + |
| 24 | +import requests |
| 25 | +import pandas as pd |
| 26 | + |
| 27 | + |
| 28 | +IAB_REPO = "InteractiveAdvertisingBureau/Taxonomies" |
| 29 | +CONTENT_DIR = "Content Taxonomies" |
| 30 | +GITHUB_API = "https://api.github.com" |
| 31 | + |
| 32 | +ROOT = os.path.abspath(os.path.join(os.path.dirname(__file__), "..")) |
| 33 | +DATA_DIR = os.path.abspath(os.path.join(ROOT, "..", "iab_mapper", "data")) |
| 34 | +RAW_DIR = os.path.join(DATA_DIR, "raw") |
| 35 | + |
| 36 | + |
| 37 | +@dataclass |
| 38 | +class FileMeta: |
| 39 | + name: str |
| 40 | + download_url: str |
| 41 | + |
| 42 | + |
| 43 | +def gh_headers() -> Dict[str, str]: |
| 44 | + token = os.getenv("GITHUB_TOKEN") |
| 45 | + if token: |
| 46 | + return {"Authorization": f"Bearer {token}", "Accept": "application/vnd.github+json"} |
| 47 | + return {"Accept": "application/vnd.github+json"} |
| 48 | + |
| 49 | + |
| 50 | +def list_content_files() -> List[FileMeta]: |
| 51 | + url = f"{GITHUB_API}/repos/{IAB_REPO}/contents/{requests.utils.quote(CONTENT_DIR, safe='')}" |
| 52 | + resp = requests.get(url, headers=gh_headers(), timeout=20) |
| 53 | + resp.raise_for_status() |
| 54 | + files = [] |
| 55 | + for item in resp.json(): |
| 56 | + if item.get("type") == "file": |
| 57 | + files.append(FileMeta(name=item["name"], download_url=item["download_url"])) |
| 58 | + return files |
| 59 | + |
| 60 | + |
| 61 | +_VER_RE = re.compile(r"Content\s*Taxonomy\s*(?P<major>\d+)\.(?P<minor>\d+)", re.I) |
| 62 | + |
| 63 | + |
| 64 | +def parse_version_from_name(name: str) -> Optional[Tuple[int, int]]: |
| 65 | + m = _VER_RE.search(name) |
| 66 | + if not m: |
| 67 | + return None |
| 68 | + return int(m.group("major")), int(m.group("minor")) |
| 69 | + |
| 70 | + |
| 71 | +def pick_latest(files: List[FileMeta], major: int) -> Optional[FileMeta]: |
| 72 | + candidates: List[Tuple[Tuple[int, int], FileMeta]] = [] |
| 73 | + for f in files: |
| 74 | + ver = parse_version_from_name(f.name) |
| 75 | + if not ver: |
| 76 | + continue |
| 77 | + maj, minr = ver |
| 78 | + if maj == major: |
| 79 | + candidates.append(((maj, minr), f)) |
| 80 | + if not candidates: |
| 81 | + return None |
| 82 | + candidates.sort(key=lambda x: x[0][1], reverse=True) |
| 83 | + return candidates[0][1] |
| 84 | + |
| 85 | + |
| 86 | +def download_text(url: str) -> str: |
| 87 | + r = requests.get(url, headers=gh_headers(), timeout=30) |
| 88 | + r.raise_for_status() |
| 89 | + return r.text |
| 90 | + |
| 91 | + |
| 92 | +def save_json(path: str, obj: Any) -> None: |
| 93 | + os.makedirs(os.path.dirname(path), exist_ok=True) |
| 94 | + with open(path, "w", encoding="utf-8") as f: |
| 95 | + json.dump(obj, f, ensure_ascii=False) |
| 96 | + |
| 97 | + |
| 98 | +def to_bool(val: Any) -> bool: |
| 99 | + if isinstance(val, bool): |
| 100 | + return val |
| 101 | + if val is None: |
| 102 | + return False |
| 103 | + s = str(val).strip().lower() |
| 104 | + return s in {"true", "1", "yes", "y"} |
| 105 | + |
| 106 | + |
| 107 | +def normalize_2x(df: pd.DataFrame) -> List[Dict[str, Any]]: |
| 108 | + cols = {c.lower(): c for c in df.columns} |
| 109 | + id_col = next((cols[k] for k in cols if k in {"code", "id", "node id", "taxonomy id"}), None) |
| 110 | + label_col = next((cols[k] for k in cols if k in {"label", "name", "node", "node name"}), None) |
| 111 | + if not id_col or not label_col: |
| 112 | + raise ValueError("Could not infer 2.x id/label columns") |
| 113 | + out: List[Dict[str, Any]] = [] |
| 114 | + for _, row in df.iterrows(): |
| 115 | + code = str(row.get(id_col) or "").strip() |
| 116 | + label = str(row.get(label_col) or "").strip() |
| 117 | + if not code or not label: |
| 118 | + continue |
| 119 | + out.append({"code": code, "label": label}) |
| 120 | + return out |
| 121 | + |
| 122 | + |
| 123 | +def normalize_3x(df: pd.DataFrame) -> List[Dict[str, Any]]: |
| 124 | + cols = {c.lower(): c for c in df.columns} |
| 125 | + id_col = next((cols[k] for k in cols if k in {"id", "node id", "taxonomy id"}), None) |
| 126 | + label_col = next((cols[k] for k in cols if k in {"label", "name", "node", "node name"}), None) |
| 127 | + path_col = next((cols[k] for k in cols if k in {"path", "full path", "taxonomy path"}), None) |
| 128 | + scd_col = next((cols[k] for k in cols if k in {"scd", "sensitive", "is scd"}), None) |
| 129 | + |
| 130 | + tier_cols = [cols[c] for c in df.columns.str.lower() if c.startswith("tier")] |
| 131 | + |
| 132 | + def row_path(r) -> List[str]: |
| 133 | + if path_col: |
| 134 | + raw = r.get(path_col) |
| 135 | + if isinstance(raw, str) and raw.strip(): |
| 136 | + parts = [p.strip() for p in re.split(r">|/|\\|,", raw) if p.strip()] |
| 137 | + if parts: |
| 138 | + return parts |
| 139 | + # Try tier columns |
| 140 | + parts = [] |
| 141 | + for t in tier_cols: |
| 142 | + val = str(r.get(t) or "").strip() |
| 143 | + if val: |
| 144 | + parts.append(val) |
| 145 | + return parts |
| 146 | + |
| 147 | + if not id_col or not label_col: |
| 148 | + raise ValueError("Could not infer 3.x id/label columns") |
| 149 | + |
| 150 | + out: List[Dict[str, Any]] = [] |
| 151 | + for _, row in df.iterrows(): |
| 152 | + id_ = str(row.get(id_col) or "").strip() |
| 153 | + label = str(row.get(label_col) or "").strip() |
| 154 | + if not id_ or not label: |
| 155 | + continue |
| 156 | + path = row_path(row) or [label] |
| 157 | + scd = to_bool(row.get(scd_col)) if scd_col else False |
| 158 | + out.append({"id": id_, "label": label, "path": path, "scd": scd}) |
| 159 | + return out |
| 160 | + |
| 161 | + |
| 162 | +def parse_table(text: str, name: str) -> pd.DataFrame: |
| 163 | + # Decide delimiter by filename extension |
| 164 | + if name.lower().endswith(".tsv"): |
| 165 | + return pd.read_csv(io.StringIO(text), sep="\t") |
| 166 | + if name.lower().endswith(".csv"): |
| 167 | + return pd.read_csv(io.StringIO(text)) |
| 168 | + # Try to parse JSON as table-like |
| 169 | + try: |
| 170 | + data = json.loads(text) |
| 171 | + except Exception: |
| 172 | + raise ValueError(f"Unsupported file format for {name}") |
| 173 | + # If it's already a flat list, return as DataFrame |
| 174 | + if isinstance(data, list) and data and isinstance(data[0], dict): |
| 175 | + return pd.DataFrame(data) |
| 176 | + # Otherwise, attempt to flatten a hierarchical JSON under common keys |
| 177 | + rows: List[Dict[str, Any]] = [] |
| 178 | + |
| 179 | + def walk(node: Dict[str, Any], ancestors: List[str]): |
| 180 | + nid = node.get("id") or node.get("code") |
| 181 | + label = node.get("label") or node.get("name") |
| 182 | + scd = node.get("scd") or node.get("is_scd") or node.get("sensitive") |
| 183 | + children = node.get("children") or node.get("nodes") or [] |
| 184 | + if nid and label: |
| 185 | + rows.append({ |
| 186 | + "id": nid, |
| 187 | + "label": label, |
| 188 | + "path": ancestors + [label], |
| 189 | + "scd": scd, |
| 190 | + }) |
| 191 | + for c in children: |
| 192 | + if isinstance(c, dict): |
| 193 | + walk(c, ancestors + [label] if label else ancestors) |
| 194 | + |
| 195 | + if isinstance(data, dict): |
| 196 | + walk(data, []) |
| 197 | + elif isinstance(data, list): |
| 198 | + for n in data: |
| 199 | + if isinstance(n, dict): |
| 200 | + walk(n, []) |
| 201 | + return pd.DataFrame(rows) |
| 202 | + |
| 203 | + |
| 204 | +def main() -> int: |
| 205 | + print("[update_catalogs] Listing IAB content taxonomy files…") |
| 206 | + files = list_content_files() |
| 207 | + f3 = pick_latest(files, major=3) |
| 208 | + f2 = pick_latest(files, major=2) |
| 209 | + if not f3: |
| 210 | + print("ERROR: Could not locate a Content Taxonomy 3.x file in the IAB repo.") |
| 211 | + return 1 |
| 212 | + if not f2: |
| 213 | + print("WARNING: Could not locate a Content Taxonomy 2.x file; continuing with 3.x only.") |
| 214 | + |
| 215 | + os.makedirs(RAW_DIR, exist_ok=True) |
| 216 | + |
| 217 | + # 3.x |
| 218 | + print(f"[update_catalogs] Downloading 3.x → {f3.name}") |
| 219 | + txt3 = download_text(f3.download_url) |
| 220 | + open(os.path.join(RAW_DIR, f3.name), "w", encoding="utf-8").write(txt3) |
| 221 | + df3 = parse_table(txt3, f3.name) |
| 222 | + try: |
| 223 | + norm3 = normalize_3x(df3) |
| 224 | + save_json(os.path.join(DATA_DIR, "iab_3x.json"), norm3) |
| 225 | + print(f"[update_catalogs] Wrote {len(norm3)} rows → iab_mapper/data/iab_3x.json") |
| 226 | + except Exception as e: |
| 227 | + print(f"ERROR: Failed to normalize 3.x: {e}") |
| 228 | + |
| 229 | + # 2.x |
| 230 | + if f2: |
| 231 | + print(f"[update_catalogs] Downloading 2.x → {f2.name}") |
| 232 | + txt2 = download_text(f2.download_url) |
| 233 | + open(os.path.join(RAW_DIR, f2.name), "w", encoding="utf-8").write(txt2) |
| 234 | + df2 = parse_table(txt2, f2.name) |
| 235 | + try: |
| 236 | + norm2 = normalize_2x(df2) |
| 237 | + save_json(os.path.join(DATA_DIR, "iab_2x.json"), norm2) |
| 238 | + print(f"[update_catalogs] Wrote {len(norm2)} rows → iab_mapper/data/iab_2x.json") |
| 239 | + except Exception as e: |
| 240 | + print(f"ERROR: Failed to normalize 2.x: {e}") |
| 241 | + |
| 242 | + print("[update_catalogs] Done.") |
| 243 | + return 0 |
| 244 | + |
| 245 | + |
| 246 | +if __name__ == "__main__": |
| 247 | + sys.exit(main()) |
| 248 | + |
| 249 | + |
0 commit comments