Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
66 changes: 66 additions & 0 deletions docs/content/pypaimon/python-api.md
Original file line number Diff line number Diff line change
Expand Up @@ -594,6 +594,72 @@ Key points about shard read:
- **Parallel Processing**: Each shard can be processed independently for better performance
- **Consistency**: Combining all shards should produce the complete table data

### Explain Scan Plan

`ReadBuilder.explain()` returns a structured view of the scan plan without reading any data files. It is useful for understanding which splits a query will produce, how aggressively the pushdown pruned the input, and whether the resulting splits can be read on the zero-copy fast path.

```python
table = catalog.get_table('default.events')
read_builder = table.new_read_builder()
predicate_builder = read_builder.new_predicate_builder()
read_builder = read_builder.with_filter(predicate_builder.equal('dt', '2026-05-16'))
print(read_builder.explain())

# == PyPaimon Scan Plan ==
# Table: default.events (PK, HASH_FIXED)
# Snapshot: 1 (schema 0)
# Predicate: dt = '2026-05-16'
# Projection: <all columns>
# Limit: <none>
#
# Partition pruning: 12 -> 4 (pruned 8)
# Bucket pruning: 4 -> 4 (pruned 0)
# File skipping: 4 -> 4 (pruned 0)
#
# Splits: 4
# raw-convertible: 4 / 4
# with DV: 0 / 4
# all-above-L0: 0 / 4
# files/split: min=1 max=1 avg=1.00
# size/split: min=2.8 KiB p50=2.9 KiB p95=3.0 KiB max=3.0 KiB
#
# Files: 4
# Total size: 11.6 KiB
# Estimated rows: 20 (merged: 20)
# Level histogram: L0=4
# Deletion files: 0
```

Pass `verbose=True` to also list every split with its partition, bucket, file count, size, level histogram, and file paths:

```python
print(read_builder.explain(verbose=True))

# ...
#
# Splits[]
# [0] partition={'dt': '2026-05-16'} bucket=3 files=1 size=2.9 KiB rows=4 raw=True dv=False
# levels: L0=1
# file: /warehouse/default.db/events/dt=2026-05-16/bucket-3/data-...parquet
# [1] partition={'dt': '2026-05-16'} bucket=2 files=1 size=2.8 KiB rows=2 raw=True dv=False
# levels: L0=1
# file: /warehouse/default.db/events/dt=2026-05-16/bucket-2/data-...parquet
# ...
```

What the fields tell you:

- **Pushdown** (`Predicate` / `Projection` / `Limit`): exactly what the reader sees after `with_filter` / `with_projection` / `with_limit`.
- **Pruning funnel** (`Partition pruning` / `Bucket pruning` / `File skipping`): three `before -> after` counts that show at which stage the predicate paid off. `n/a` means the stage did not apply — for example, bucket pruning is reported for HASH_FIXED tables where every bucket key is pinned by the predicate, and for POSTPONE_BUCKET tables that skip their synthetic-bucket entries.
- **Split shape**: `raw-convertible` counts splits that can be read zero-copy (no merge, no deletion-vector apply); `with DV` counts splits whose files need a deletion vector applied; `all-above-L0` counts splits whose data lives entirely on L1+, i.e. the merge pipeline can skip the L0 buffer.
- **File aggregates**: total file size + estimated rows (with the post-merge row estimate for primary-key tables in parentheses), plus a level histogram of where the data sits.

{{< hint info >}}
**Cost**: `explain()` reads the manifest list and manifest files but does not open any data files. It suppresses the manifest-reader's early bucket filter and forces single-threaded manifest decoding so the before/after counters are accurate. On tables where the early filter usually prunes aggressively (e.g. very wide HASH_FIXED tables with a tight predicate), this can make `explain()` measurably slower than a regular `new_scan().plan()`.
{{< /hint >}}

`ExplainResult` is a plain dataclass — alongside the human-readable `__str__` shown above, every field (`partition_pruning`, `bucket_pruning`, `file_skipping`, `split_count`, `splits_raw_convertible`, `level_histogram`, `splits`, ...) is addressable in Python for programmatic use.

## Rollback

Paimon supports rolling back a table to a previous snapshot or tag. This is useful for undoing unwanted changes or
Expand Down
241 changes: 241 additions & 0 deletions paimon-python/pypaimon/read/explain.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,241 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

import io
from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional


@dataclass
class PruningStat:
"""Before / after counters for one pruning stage.

``before`` is the input size to the stage, ``after`` is the size that
survived. Either may be ``None`` when the stage did not run (for
example, ``bucket_pruning`` is ``None`` for tables that are not
HASH_FIXED with all bucket keys pinned).
"""

before: Optional[int]
after: Optional[int]

@property
def pruned(self) -> Optional[int]:
if self.before is None or self.after is None:
return None
return self.before - self.after

def format(self) -> str:
if self.before is None and self.after is None:
return "n/a"
before = "?" if self.before is None else str(self.before)
after = "?" if self.after is None else str(self.after)
pruned = self.pruned
suffix = "" if pruned is None else " (pruned {})".format(pruned)
return "{} -> {}{}".format(before, after, suffix)


@dataclass
class ExplainSplitInfo:
"""Per-split detail surfaced when ``explain(verbose=True)`` is used."""

partition: Dict[str, Any]
bucket: int
file_count: int
row_count: int
merged_row_count: Optional[int]
file_size: int
raw_convertible: bool
has_deletion_vectors: bool
level_histogram: Dict[int, int]
deletion_file_count: int
file_paths: List[str]


@dataclass
class ExplainResult:
"""Structured scan plan returned by ``ReadBuilder.explain()``.

The compact ``__str__`` shows enough signal to reason about cost: the
snapshot, the pushed-down predicate / projection / limit, the three
pruning before-after counters, split-level shape (raw-convertible
ratio, DV ratio, all-above-L0 ratio, files/split, size/split
distribution), and the file-level totals. ``verbose=True`` adds a
block listing each split.
"""

# Identity
table_identifier: str
is_primary_key_table: bool
bucket_mode: str
deletion_vectors_enabled: bool
data_evolution_enabled: bool

# Snapshot
snapshot_id: Optional[int]
schema_id: Optional[int]

# Pushdown
predicate: Optional[str] = None
projection: Optional[List[str]] = None
limit: Optional[int] = None

# Pruning (None when not applicable)
partition_pruning: Optional[PruningStat] = None
bucket_pruning: Optional[PruningStat] = None
file_skipping: Optional[PruningStat] = None

# File-level aggregates over final splits
file_count: int = 0
total_file_size: int = 0
estimated_row_count: int = 0
estimated_merged_row_count: Optional[int] = None
deletion_file_count: int = 0
level_histogram: Dict[int, int] = field(default_factory=dict)

# Split-level aggregates (shown in compact mode too)
split_count: int = 0
splits_raw_convertible: int = 0
splits_with_deletion_vectors: int = 0
splits_all_above_l0: int = 0
files_per_split_min: int = 0
files_per_split_max: int = 0
files_per_split_avg: float = 0.0
split_size_min: int = 0
split_size_max: int = 0
split_size_avg: float = 0.0
split_size_p50: int = 0
split_size_p95: int = 0

# Verbose-only
splits: Optional[List[ExplainSplitInfo]] = None

def __str__(self) -> str:
return render_explain(self)


# ---------------------------------------------------------------------------
# Pretty-print helpers
# ---------------------------------------------------------------------------

def render_explain(result: ExplainResult) -> str:
out = io.StringIO()
out.write("== PyPaimon Scan Plan ==\n")

flags = []
flags.append("PK" if result.is_primary_key_table else "Append")
flags.append(result.bucket_mode)
if result.deletion_vectors_enabled:
flags.append("dv=on")
if result.data_evolution_enabled:
flags.append("data-evolution=on")
_line(out, "Table", "{} ({})".format(result.table_identifier, ", ".join(flags)))

if result.snapshot_id is None:
_line(out, "Snapshot", "<none> (table is empty or has no snapshot)")
else:
schema_part = "" if result.schema_id is None else " (schema {})".format(result.schema_id)
_line(out, "Snapshot", "{}{}".format(result.snapshot_id, schema_part))

_line(out, "Predicate", result.predicate if result.predicate else "<none>")
_line(out, "Projection",
"[{}]".format(", ".join(result.projection)) if result.projection else "<all columns>")
_line(out, "Limit", str(result.limit) if result.limit is not None else "<none>")

out.write("\n")
_line(out, "Partition pruning",
result.partition_pruning.format() if result.partition_pruning else "n/a")
_line(out, "Bucket pruning",
result.bucket_pruning.format() if result.bucket_pruning else "n/a")
_line(out, "File skipping",
result.file_skipping.format() if result.file_skipping else "n/a")

out.write("\n")
_line(out, "Splits", str(result.split_count))
if result.split_count > 0:
out.write(" raw-convertible: {} / {}\n".format(
result.splits_raw_convertible, result.split_count))
out.write(" with DV: {} / {}\n".format(
result.splits_with_deletion_vectors, result.split_count))
out.write(" all-above-L0: {} / {}\n".format(
result.splits_all_above_l0, result.split_count))
out.write(" files/split: min={} max={} avg={:.2f}\n".format(
result.files_per_split_min,
result.files_per_split_max,
result.files_per_split_avg))
out.write(" size/split: min={} p50={} p95={} max={}\n".format(
_format_size(result.split_size_min),
_format_size(result.split_size_p50),
_format_size(result.split_size_p95),
_format_size(result.split_size_max)))

out.write("\n")
_line(out, "Files", str(result.file_count))
_line(out, "Total size", _format_size(result.total_file_size))

rows = "{:,}".format(result.estimated_row_count)
if result.estimated_merged_row_count is not None:
rows += " (merged: {:,})".format(result.estimated_merged_row_count)
_line(out, "Estimated rows", rows)

if result.level_histogram:
levels = sorted(result.level_histogram.items())
levels_str = " ".join("L{}={}".format(lv, cnt) for lv, cnt in levels)
_line(out, "Level histogram", levels_str)
_line(out, "Deletion files", str(result.deletion_file_count))

if result.splits:
out.write("\nSplits[]\n")
for idx, split in enumerate(result.splits):
out.write(" [{}] partition={} bucket={} files={} size={} rows={} raw={} dv={}\n".format(
idx,
split.partition,
split.bucket,
split.file_count,
_format_size(split.file_size),
split.row_count,
split.raw_convertible,
split.has_deletion_vectors,
))
if split.level_histogram:
levels = sorted(split.level_histogram.items())
out.write(" levels: {}\n".format(
" ".join("L{}={}".format(lv, cnt) for lv, cnt in levels)))
for path in split.file_paths:
out.write(" file: {}\n".format(path))

return out.getvalue().rstrip("\n")


def _line(out: io.StringIO, label: str, value: str) -> None:
out.write("{:<19} {}\n".format(label + ":", value))


_SIZE_UNITS = ("B", "KiB", "MiB", "GiB", "TiB", "PiB")


def _format_size(num_bytes: int) -> str:
if num_bytes is None:
return "?"
size = float(num_bytes)
for unit in _SIZE_UNITS:
if size < 1024.0 or unit == _SIZE_UNITS[-1]:
if unit == "B":
return "{:d} {}".format(int(size), unit)
return "{:.1f} {}".format(size, unit)
size /= 1024.0
return "{:.1f} {}".format(size, _SIZE_UNITS[-1])
Loading
Loading