|
| 1 | +import fused |
| 2 | +import fsspec |
| 3 | + |
| 4 | + |
| 5 | +@fused.udf() |
| 6 | +def udf(path: str): |
| 7 | + """ |
| 8 | + File UDF that accepts paths with empty/missing extensions. |
| 9 | + Detects the MIME type from the actual file content (magic bytes), |
| 10 | + infers a plausible extension, and delegates to the matching File UDF. |
| 11 | +
|
| 12 | + Routing table (extension -> UDF): |
| 13 | + .parquet/.pq -> UDF_Preview_Parquet |
| 14 | + .xlsx/.xls -> UDF_Pandas_Excel |
| 15 | + .tif/.tiff -> UDF_GeoTIFF_File |
| 16 | + .nc/.nc4/.hdf5 -> UDF_NetCDF_File |
| 17 | + .png/.jpg/.jpeg/.bmp/etc -> UDF_ImageIO_File |
| 18 | + .mp4/.avi/.mov/.mkv/.webm -> UDF_Video_File |
| 19 | + .docx/.doc -> UDF_DOCX_File |
| 20 | + .csv/.tsv/.json/.txt/etc -> UDF_Text_File |
| 21 | + .zip -> UDF_GeoPandas_ZIP |
| 22 | + .geojson/.shp/.gpkg/.kml -> UDF_Preview_Parquet |
| 23 | + """ |
| 24 | + import os |
| 25 | + |
| 26 | + # --- Step 1: Determine if the path already has a known extension --- |
| 27 | + _, ext = os.path.splitext(path) |
| 28 | + ext = ext.lower().strip() |
| 29 | + |
| 30 | + KNOWN_EXTENSIONS = { |
| 31 | + ".csv", |
| 32 | + ".tsv", |
| 33 | + ".parquet", |
| 34 | + ".pq", |
| 35 | + ".json", |
| 36 | + ".geojson", |
| 37 | + ".xlsx", |
| 38 | + ".xls", |
| 39 | + ".shp", |
| 40 | + ".gpkg", |
| 41 | + ".kml", |
| 42 | + ".tif", |
| 43 | + ".tiff", |
| 44 | + ".zip", |
| 45 | + ".docx", |
| 46 | + ".doc", |
| 47 | + ".nc", |
| 48 | + ".nc4", |
| 49 | + ".hdf5", |
| 50 | + ".png", |
| 51 | + ".jpg", |
| 52 | + ".jpeg", |
| 53 | + ".bmp", |
| 54 | + ".webp", |
| 55 | + ".gif", |
| 56 | + ".mp4", |
| 57 | + ".avi", |
| 58 | + ".mov", |
| 59 | + ".mkv", |
| 60 | + ".webm", |
| 61 | + ".txt", |
| 62 | + ".log", |
| 63 | + ".md", |
| 64 | + ".xml", |
| 65 | + ".yaml", |
| 66 | + ".yml", |
| 67 | + } |
| 68 | + |
| 69 | + if ext not in KNOWN_EXTENSIONS: |
| 70 | + # --- Step 2: Open the file with fsspec to sniff its type --- |
| 71 | + print(f"No recognized extension for: {path}") |
| 72 | + print("Opening file to detect MIME type from content...") |
| 73 | + |
| 74 | + # --- Step 3: Detect MIME type from magic bytes --- |
| 75 | + ext = _detect_extension(path) |
| 76 | + print(f"Detected extension: {ext}") |
| 77 | + else: |
| 78 | + print(f"Extension already known: {ext}") |
| 79 | + |
| 80 | + # --- Step 4: Map extension to the right File UDF --- |
| 81 | + udf_name = _route_to_udf(ext) |
| 82 | + print(f"Routing to: UDF_{udf_name}") |
| 83 | + |
| 84 | + # --- Step 5: Load and call the target File UDF — let it handle everything --- |
| 85 | + target = fused.load(f"UDF_{udf_name}") |
| 86 | + return target(path=path) |
| 87 | + |
| 88 | + |
| 89 | +def _detect_extension(path: str) -> str: |
| 90 | + """Detect file extension by reading magic bytes from the file header via fsspec.""" |
| 91 | + import struct |
| 92 | + |
| 93 | + with fsspec.open(path, "rb") as f: |
| 94 | + header = f.read(512) |
| 95 | + |
| 96 | + # Parquet: magic bytes "PAR1" |
| 97 | + if header[:4] == b"PAR1": |
| 98 | + return ".parquet" |
| 99 | + |
| 100 | + # ZIP (also .xlsx, .shp in zip, .gpkg sometimes) |
| 101 | + if header[:2] == b"PK": |
| 102 | + if b"xl/" in header or b"[Content_Types].xml" in header: |
| 103 | + return ".xlsx" |
| 104 | + if b"word/" in header: |
| 105 | + return ".docx" |
| 106 | + return ".zip" |
| 107 | + |
| 108 | + # GeoTIFF / TIFF |
| 109 | + if header[:2] in (b"II", b"MM") and len(header) >= 4: |
| 110 | + byte_order = "<" if header[:2] == b"II" else ">" |
| 111 | + magic = struct.unpack(f"{byte_order}H", header[2:4])[0] |
| 112 | + if magic == 42 or magic == 43: # 42=TIFF, 43=BigTIFF |
| 113 | + return ".tiff" |
| 114 | + |
| 115 | + # HDF5 / NetCDF4 |
| 116 | + if header[:4] == b"\x89HDF" or header[:8] == b"\x89HDF\r\n\x1a\n": |
| 117 | + return ".nc" |
| 118 | + # Classic NetCDF (CDF magic) |
| 119 | + if header[:3] == b"CDF": |
| 120 | + return ".nc" |
| 121 | + |
| 122 | + # PNG |
| 123 | + if header[:8] == b"\x89PNG\r\n\x1a\n": |
| 124 | + return ".png" |
| 125 | + # JPEG |
| 126 | + if header[:2] == b"\xff\xd8": |
| 127 | + return ".jpg" |
| 128 | + # GIF |
| 129 | + if header[:4] == b"GIF8": |
| 130 | + return ".gif" |
| 131 | + # WebP |
| 132 | + if header[:4] == b"RIFF" and header[8:12] == b"WEBP": |
| 133 | + return ".webp" |
| 134 | + # BMP |
| 135 | + if header[:2] == b"BM": |
| 136 | + return ".bmp" |
| 137 | + |
| 138 | + # Video: MP4/MOV (ftyp box) |
| 139 | + if header[4:8] == b"ftyp": |
| 140 | + return ".mp4" |
| 141 | + # AVI |
| 142 | + if header[:4] == b"RIFF" and header[8:12] == b"AVI ": |
| 143 | + return ".avi" |
| 144 | + # MKV/WebM (Matroska) |
| 145 | + if header[:4] == b"\x1a\x45\xdf\xa3": |
| 146 | + return ".mkv" |
| 147 | + |
| 148 | + # GeoJSON or JSON: starts with { or [ |
| 149 | + text_start = header.lstrip() |
| 150 | + if text_start[:1] in (b"{", b"["): |
| 151 | + try: |
| 152 | + text_sample = header.decode("utf-8", errors="ignore") |
| 153 | + if '"Feature' in text_sample or '"geometry"' in text_sample: |
| 154 | + return ".geojson" |
| 155 | + return ".json" |
| 156 | + except Exception: |
| 157 | + return ".json" |
| 158 | + |
| 159 | + # XML-based formats |
| 160 | + try: |
| 161 | + text_sample = header.decode("utf-8", errors="ignore").lower() |
| 162 | + if "<kml" in text_sample: |
| 163 | + return ".kml" |
| 164 | + if "<?xml" in text_sample: |
| 165 | + return ".xml" |
| 166 | + except Exception: |
| 167 | + pass |
| 168 | + |
| 169 | + # CSV heuristic: text with commas / tabs and newlines |
| 170 | + try: |
| 171 | + text_sample = header.decode("utf-8", errors="strict") |
| 172 | + newline_count = text_sample.count("\n") |
| 173 | + comma_count = text_sample.count(",") |
| 174 | + tab_count = text_sample.count("\t") |
| 175 | + if newline_count >= 1 and (comma_count >= 2 or tab_count >= 2): |
| 176 | + return ".csv" |
| 177 | + except UnicodeDecodeError: |
| 178 | + pass |
| 179 | + |
| 180 | + # Excel .xls (legacy BIFF / OLE2) |
| 181 | + if header[:8] == b"\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1": |
| 182 | + return ".xls" |
| 183 | + |
| 184 | + # GeoPackage (SQLite) |
| 185 | + if header[:16] == b"SQLite format 3\x00": |
| 186 | + return ".gpkg" |
| 187 | + |
| 188 | + # Fallback: treat as text |
| 189 | + try: |
| 190 | + header.decode("utf-8", errors="strict") |
| 191 | + print("Could not determine type from magic bytes, falling back to .txt") |
| 192 | + return ".txt" |
| 193 | + except UnicodeDecodeError: |
| 194 | + print("Binary file of unknown type, falling back to .txt") |
| 195 | + return ".txt" |
| 196 | + |
| 197 | + |
| 198 | +def _route_to_udf(ext: str) -> str: |
| 199 | + """Map a file extension to the appropriate existing File UDF name.""" |
| 200 | + ext = ext.lower() |
| 201 | + |
| 202 | + ROUTING = { |
| 203 | + ".parquet": "Preview_Parquet", |
| 204 | + ".pq": "Preview_Parquet", |
| 205 | + ".xlsx": "Pandas_Excel", |
| 206 | + ".xls": "Pandas_Excel", |
| 207 | + ".tif": "GeoTIFF_File", |
| 208 | + ".tiff": "GeoTIFF_File", |
| 209 | + ".nc": "NetCDF_File", |
| 210 | + ".nc4": "NetCDF_File", |
| 211 | + ".hdf5": "NetCDF_File", |
| 212 | + ".png": "ImageIO_File", |
| 213 | + ".jpg": "ImageIO_File", |
| 214 | + ".jpeg": "ImageIO_File", |
| 215 | + ".bmp": "ImageIO_File", |
| 216 | + ".gif": "ImageIO_File", |
| 217 | + ".webp": "ImageIO_File", |
| 218 | + ".mp4": "Video_File", |
| 219 | + ".avi": "Video_File", |
| 220 | + ".mov": "Video_File", |
| 221 | + ".mkv": "Video_File", |
| 222 | + ".webm": "Video_File", |
| 223 | + ".docx": "DOCX_File", |
| 224 | + ".doc": "DOCX_File", |
| 225 | + ".csv": "Text_File", |
| 226 | + ".tsv": "Text_File", |
| 227 | + ".json": "Text_File", |
| 228 | + ".txt": "Text_File", |
| 229 | + ".log": "Text_File", |
| 230 | + ".md": "Text_File", |
| 231 | + ".xml": "Text_File", |
| 232 | + ".yaml": "Text_File", |
| 233 | + ".yml": "Text_File", |
| 234 | + ".geojson": "Preview_Parquet", |
| 235 | + ".shp": "Preview_Parquet", |
| 236 | + ".gpkg": "Preview_Parquet", |
| 237 | + ".kml": "Preview_Parquet", |
| 238 | + ".zip": "GeoPandas_ZIP", |
| 239 | + } |
| 240 | + |
| 241 | + return ROUTING.get(ext, "Text_File") |
0 commit comments