|
| 1 | +# Licensed under the Apache License, Version 2.0 (the "License"); |
| 2 | +# you may not use this file except in compliance with the License. |
| 3 | +# See the License at http://www.apache.org/licenses/LICENSE-2.0 |
| 4 | +# Distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND. |
| 5 | + |
| 6 | +""" |
| 7 | +Parquet Read Node |
| 8 | +
|
| 9 | +SQL Query Execution Plan Node that reads Parquet files using the column-chunk |
| 10 | +range-read design (docs/parquet-column-reads-design.md). |
| 11 | +
|
| 12 | +Instead of downloading whole blobs into a shared-memory ring, this node: |
| 13 | +
|
| 14 | + 1. Fetches the Parquet footer for each file (two small range reads each). |
| 15 | + 2. Fans out (file × row-group) work units to a thread pool. |
| 16 | + 3. For each unit, batches all projected column ranges into one read_ranges() |
| 17 | + call, decodes with rugo, and yields the assembled row group. |
| 18 | +
|
| 19 | +The filesystem layer is taken directly from the connector (every catalog-backed |
| 20 | +connector already exposes ``self.filesystem``), so this node works identically |
| 21 | +for local disk, GCS, and S3. |
| 22 | +
|
| 23 | +Row groups are yielded in completion order — the thread pool handles overlap |
| 24 | +between I/O and decode across all files and row groups simultaneously. |
| 25 | +""" |
| 26 | + |
| 27 | +from __future__ import annotations |
| 28 | + |
| 29 | +import time |
| 30 | +from typing import Generator |
| 31 | + |
| 32 | +from opteryx import EOS |
| 33 | +from opteryx.draken.morsels.morsel import Morsel |
| 34 | +from opteryx.models import QueryProperties |
| 35 | +from opteryx.parquet_io import InMemoryParquetCache |
| 36 | +from opteryx.parquet_io import fetch_footer |
| 37 | +from opteryx.parquet_io import iter_row_groups |
| 38 | +from opteryx.utils.file_decoders import get_decoder |
| 39 | + |
| 40 | +from .read_node import ReaderNode |
| 41 | + |
| 42 | + |
| 43 | +class ParquetReadNode(ReaderNode): |
| 44 | + """Read node backed by column-chunk range reads via ``parquet_io``. |
| 45 | +
|
| 46 | + Activated for filesystem-backed connectors (GCS, S3, local) when the |
| 47 | + manifest contains only ``.parquet`` files. Falls back to the existing |
| 48 | + ``IopsReadNode`` / ``ReaderNode`` paths for mixed or non-Parquet manifests. |
| 49 | + """ |
| 50 | + |
| 51 | + def __init__(self, properties: QueryProperties, **parameters) -> None: |
| 52 | + ReaderNode.__init__(self, properties=properties, **parameters) |
| 53 | + self.predicates = parameters.get("predicates") |
| 54 | + |
| 55 | + @property |
| 56 | + def name(self) -> str: # pragma: no cover |
| 57 | + return "Parquet Read" |
| 58 | + |
| 59 | + def to_mermaid(self, nid): # pragma: no cover |
| 60 | + mermaid = f'NODE_{nid}[("**{self.name.upper()}**<br />' |
| 61 | + mermaid += f"{self.connector.dataset}<br />" |
| 62 | + mermaid += f"({self.execution_time / 1_000_000:,.2f}ms)" |
| 63 | + return mermaid + '")]' |
| 64 | + |
| 65 | + def execute(self, morsel, **kwargs) -> Generator: |
| 66 | + if morsel == EOS: |
| 67 | + yield None |
| 68 | + return |
| 69 | + |
| 70 | + orso_schema = self.parameters["schema"] |
| 71 | + |
| 72 | + # ── Empty manifest ──────────────────────────────────────────────────── |
| 73 | + if not self.manifest or self.manifest.get_file_count() == 0: |
| 74 | + from orso import DataFrame |
| 75 | + |
| 76 | + as_arrow = DataFrame(rows=[], schema=orso_schema).arrow() |
| 77 | + renames = [orso_schema.column(col).identity for col in as_arrow.column_names] |
| 78 | + as_arrow = as_arrow.rename_columns(renames) |
| 79 | + yield as_arrow |
| 80 | + return |
| 81 | + |
| 82 | + # ── Project schema to requested columns only ────────────────────────── |
| 83 | + orso_schema_cols = [ |
| 84 | + col |
| 85 | + for col in orso_schema.columns |
| 86 | + if col.identity in {c.schema_column.identity for c in self.columns} |
| 87 | + ] |
| 88 | + orso_schema.columns = orso_schema_cols |
| 89 | + self.readings["columns_read"] += len(orso_schema.columns) |
| 90 | + |
| 91 | + records_to_read = self.limit if self.limit is not None else float("inf") |
| 92 | + |
| 93 | + filesystem = self.connector.filesystem |
| 94 | + # Column names as they appear in the Parquet file (Parquet uses the |
| 95 | + # original names, not identity aliases). |
| 96 | + column_names = [col.name for col in orso_schema.columns] |
| 97 | + # Map data-file column name → query-engine identity for Morsel construction. |
| 98 | + name_to_identity = {col.name: col.identity for col in orso_schema.columns} |
| 99 | + blob_paths = self.manifest.get_file_paths() |
| 100 | + |
| 101 | + # One cache per execute() call: footers shared across all row groups of |
| 102 | + # the same file; column chunks cached for reuse across row groups with |
| 103 | + # identical content (rare but free). |
| 104 | + cache = InMemoryParquetCache() |
| 105 | + result_morsel = None |
| 106 | + |
| 107 | + decode_start = time.monotonic_ns() |
| 108 | + try: |
| 109 | + for row_group in iter_row_groups(filesystem, blob_paths, column_names, cache): |
| 110 | + path = row_group.pop("__path__") |
| 111 | + rg_idx = row_group.pop("__row_group__") |
| 112 | + |
| 113 | + # Assemble the projected columns into a Draken Morsel directly. |
| 114 | + # Each value is a DrakenVector; we map data-file names to identity |
| 115 | + # names so the morsel arrives downstream already correctly labelled. |
| 116 | + identity_names = [name_to_identity[col] for col in row_group] |
| 117 | + vectors = list(row_group.values()) |
| 118 | + result_morsel = Morsel.from_vectors(identity_names, vectors) |
| 119 | + |
| 120 | + num_rows = result_morsel.num_rows |
| 121 | + self.readings["rows_seen"] += num_rows |
| 122 | + self.readings["blobs_seen"] += 1 |
| 123 | + |
| 124 | + # ── LIMIT enforcement ───────────────────────────────────────── |
| 125 | + if records_to_read < num_rows: |
| 126 | + result_morsel = result_morsel.slice(0, int(records_to_read)) |
| 127 | + records_to_read = 0 |
| 128 | + else: |
| 129 | + records_to_read -= num_rows |
| 130 | + |
| 131 | + self.readings["blobs_read"] += 1 |
| 132 | + self.telemetry.blobs_read += 1 |
| 133 | + self.readings["rows_read"] += result_morsel.num_rows |
| 134 | + self.telemetry.rows_read += result_morsel.num_rows |
| 135 | + self.readings["bytes_processed"] += result_morsel.nbytes |
| 136 | + self.telemetry.bytes_processed += result_morsel.nbytes |
| 137 | + |
| 138 | + yield result_morsel |
| 139 | + |
| 140 | + if records_to_read <= 0: |
| 141 | + break |
| 142 | + |
| 143 | + finally: |
| 144 | + decode_ns = time.monotonic_ns() - decode_start |
| 145 | + self.readings["time_decoding_blobs"] = ( |
| 146 | + self.readings.get("time_decoding_blobs", 0) + decode_ns |
| 147 | + ) |
| 148 | + self.telemetry.time_decoding_blobs += decode_ns |
| 149 | + |
| 150 | + # ── Empty result guard ──────────────────────────────────────────────── |
| 151 | + if result_morsel is None: |
| 152 | + self.readings["empty_datasets"] += 1 |
| 153 | + yield pyarrow.Table.from_arrays( |
| 154 | + [pyarrow.array([]) for _ in arrow_schema], schema=arrow_schema |
| 155 | + ) |
0 commit comments