Skip to content

Commit ca17bc4

Browse files
committed
feat: add column bytes
1 parent c782d83 commit ca17bc4

File tree

3 files changed

+142
-21
lines changed

3 files changed

+142
-21
lines changed

opteryx_catalog/catalog/dataset.py

Lines changed: 12 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -955,6 +955,7 @@ def describe(self, snapshot_id: Optional[int] = None, bins: int = 10) -> dict:
955955
"file_hist_infos": [],
956956
"min_displays": [],
957957
"max_displays": [],
958+
"uncompressed_bytes": 0,
958959
}
959960

960961
total_rows = 0
@@ -1007,6 +1008,7 @@ def _decode_minmax(v):
10071008
xv = ent.get("max_values") or []
10081009
mv_disp = ent.get("min_values_display") or []
10091010
xv_disp = ent.get("max_values_display") or []
1011+
col_sizes = ent.get("column_uncompressed_sizes_in_bytes") or []
10101012

10111013
for cname, cidx in col_to_idx.items():
10121014
# nulls
@@ -1091,6 +1093,12 @@ def _decode_display(v):
10911093
except Exception:
10921094
pass
10931095

1096+
# uncompressed bytes for this column (sum across files)
1097+
try:
1098+
stats[cname]["uncompressed_bytes"] += int((col_sizes or [0])[cidx])
1099+
except Exception:
1100+
pass
1101+
10941102
# Build results per column
10951103
results: dict[str, dict] = {}
10961104
for cname, cidx in col_to_idx.items():
@@ -1212,6 +1220,7 @@ def _decode_display(v):
12121220
"min": global_min,
12131221
"max": global_max,
12141222
"null_count": s["null_count"],
1223+
"uncompressed_bytes": s["uncompressed_bytes"],
12151224
"approx_cardinality": approx_cardinality,
12161225
"distribution": distribution,
12171226
}
@@ -1229,24 +1238,6 @@ def _decode_display(v):
12291238
except Exception:
12301239
is_text = False
12311240

1232-
def _int_to_prefix(v, max_chars=16):
1233-
try:
1234-
if not isinstance(v, int):
1235-
return None
1236-
if v == 0:
1237-
return None
1238-
blen = (v.bit_length() + 7) // 8
1239-
blen = max(blen, 1)
1240-
b = v.to_bytes(blen, "big")
1241-
b = b.strip(b"\x00")
1242-
if not b:
1243-
return None
1244-
s = b.decode("utf-8", errors="replace")
1245-
s = s.rstrip("\x00")
1246-
return s[:16]
1247-
except Exception:
1248-
return None
1249-
12501241
if is_text:
12511242
# Use only textual display values collected from manifests.
12521243
# Decode bytes and strip truncation marker (0xFF) if present.
@@ -1325,8 +1316,9 @@ def refresh_manifest(self, agent: str, author: Optional[str] = None) -> Optional
13251316
import pyarrow as pa
13261317
import pyarrow.parquet as pq
13271318

1328-
prev_table = pq.read_table(pa.BufferReader(prev_data))
1329-
prev_rows = prev_table.to_pylist()
1319+
# the manifest is a parquet file, read into a pyarrow Table
1320+
prev_manifest = pq.read_table(pa.BufferReader(prev_data))
1321+
prev_rows = prev_manifest.to_pylist()
13301322
except Exception:
13311323
prev_rows = []
13321324

pyproject.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"
44

55
[project]
66
name = "opteryx-catalog"
7-
version = "0.4.19"
7+
version = "0.4.20"
88
description = "Opteryx Cloud Catalog"
99
readme = { file = "README.md", content-type = "text/markdown" }
1010
authors = [
Lines changed: 129 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,129 @@
1+
import io
2+
3+
import pyarrow as pa
4+
import pyarrow.parquet as pq
5+
6+
from opteryx_catalog.catalog.dataset import SimpleDataset
7+
from opteryx_catalog.catalog.metadata import DatasetMetadata, Snapshot
8+
9+
10+
class _MemInput:
11+
def __init__(self, data: bytes):
12+
self._data = data
13+
14+
def open(self):
15+
# Provide a file-like BytesIO which .read() returns the bytes
16+
return io.BytesIO(self._data)
17+
18+
19+
class _MemIO:
20+
def __init__(self, mapping: dict):
21+
self._mapping = mapping
22+
23+
def new_input(self, path: str):
24+
return _MemInput(self._mapping[path])
25+
26+
27+
def _build_manifest_bytes():
28+
# Construct a parquet manifest with two entries, two columns per file
29+
schema = pa.schema(
30+
[
31+
("file_path", pa.string()),
32+
("file_format", pa.string()),
33+
("record_count", pa.int64()),
34+
("file_size_in_bytes", pa.int64()),
35+
("uncompressed_size_in_bytes", pa.int64()),
36+
("column_uncompressed_sizes_in_bytes", pa.list_(pa.int64())),
37+
("null_counts", pa.list_(pa.int64())),
38+
("min_k_hashes", pa.list_(pa.int64())),
39+
("histogram_counts", pa.list_(pa.int64())),
40+
("histogram_bins", pa.int64()),
41+
("min_values", pa.list_(pa.int64())),
42+
("max_values", pa.list_(pa.int64())),
43+
("min_values_display", pa.list_(pa.string())),
44+
("max_values_display", pa.list_(pa.string())),
45+
]
46+
)
47+
48+
file_path = pa.array(["f1.parquet", "f2.parquet"], type=pa.string())
49+
file_format = pa.array(["parquet", "parquet"], type=pa.string())
50+
record_count = pa.array([10, 20], type=pa.int64())
51+
file_size_in_bytes = pa.array([100, 200], type=pa.int64())
52+
uncompressed_size_in_bytes = pa.array([1000, 2000], type=pa.int64())
53+
column_uncompressed_sizes_in_bytes = pa.array(
54+
[[100, 400], [300, 200]], type=pa.list_(pa.int64())
55+
)
56+
null_counts = pa.array([[0, 0], [0, 0]], type=pa.list_(pa.int64()))
57+
min_k_hashes = pa.array([[1, 2], [1]], type=pa.list_(pa.int64()))
58+
histogram_counts = pa.array([[1, 2], [3, 4]], type=pa.list_(pa.int64()))
59+
histogram_bins = pa.array([32, 32], type=pa.int64())
60+
min_values = pa.array([[10, 20], [5, 30]], type=pa.list_(pa.int64()))
61+
max_values = pa.array([[100, 400], [300, 200]], type=pa.list_(pa.int64()))
62+
min_values_display = pa.array([[None, None], [None, None]], type=pa.list_(pa.string()))
63+
max_values_display = pa.array([[None, None], [None, None]], type=pa.list_(pa.string()))
64+
65+
table = pa.Table.from_arrays(
66+
[
67+
file_path,
68+
file_format,
69+
record_count,
70+
file_size_in_bytes,
71+
uncompressed_size_in_bytes,
72+
column_uncompressed_sizes_in_bytes,
73+
null_counts,
74+
min_k_hashes,
75+
histogram_counts,
76+
histogram_bins,
77+
min_values,
78+
max_values,
79+
min_values_display,
80+
max_values_display,
81+
],
82+
schema=schema,
83+
)
84+
85+
buf = io.BytesIO()
86+
pq.write_table(table, buf)
87+
return buf.getvalue()
88+
89+
90+
def test_describe_includes_uncompressed_bytes():
91+
manifest_bytes = _build_manifest_bytes()
92+
manifest_path = "mem://manifest"
93+
94+
meta = DatasetMetadata(
95+
dataset_identifier="tests_temp.test",
96+
location="mem://",
97+
schema=None,
98+
properties={},
99+
)
100+
101+
# Add a schema with two columns so describe() can map names -> indices
102+
meta.schemas.append(
103+
{"schema_id": "s1", "columns": [{"name": "a"}, {"name": "b"}]}
104+
)
105+
meta.current_schema_id = "s1"
106+
107+
# Prepare snapshot referencing our in-memory manifest
108+
snap = Snapshot(
109+
snapshot_id=1,
110+
timestamp_ms=1,
111+
manifest_list=manifest_path,
112+
)
113+
meta.snapshots.append(snap)
114+
meta.current_snapshot_id = 1
115+
116+
ds = SimpleDataset(identifier="tests_temp.test", _metadata=meta)
117+
118+
# Inject our in-memory IO mapping
119+
ds.io = _MemIO({manifest_path: manifest_bytes})
120+
121+
desc = ds.describe()
122+
123+
assert "a" in desc
124+
assert "b" in desc
125+
126+
# Column 'a' should have uncompressed bytes = 100 + 300 = 400
127+
assert desc["a"]["uncompressed_bytes"] == 400
128+
# Column 'b' should have uncompressed bytes = 400 + 200 = 600
129+
assert desc["b"]["uncompressed_bytes"] == 600

0 commit comments

Comments
 (0)