Skip to content

Commit 4d59701

Browse files
committed
test(libviewer): add a generic test case to exercise sync scanning
1 parent 68970f4 commit 4d59701

File tree

1 file changed

+131
-0
lines changed

1 file changed

+131
-0
lines changed
Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
import pytest
2+
import pyarrow as pa
3+
from pathlib import Path
4+
import pyarrow.parquet as pq
5+
6+
from libviewer import Dataset
7+
8+
9+
def generate_sample_table(num_rows: int) -> pa.Table:
10+
"""Generate a sample PyArrow Table for testing.
11+
12+
Args:
13+
num_rows: Number of rows to generate
14+
15+
Returns:
16+
pa.Table with id, name, value, and category columns
17+
"""
18+
return pa.table(
19+
{
20+
"id": pa.array(range(num_rows), type=pa.int64()),
21+
"name": pa.array([f"name_{i}" for i in range(num_rows)], type=pa.string()),
22+
"value": pa.array([i * 10.5 for i in range(num_rows)], type=pa.float64()),
23+
"category": pa.array(
24+
[f"cat_{i % 5}" for i in range(num_rows)], type=pa.string()
25+
),
26+
}
27+
)
28+
29+
30+
def write_partitioned_parquet_dataset(
31+
table: pa.Table, data_dir: Path, metadata_dir: Path, num_partitions: int = 5
32+
) -> None:
33+
"""
34+
Split table into partitions and write to parquet files with metadata.
35+
36+
Args:
37+
table: The PyArrow Table to partition
38+
data_dir: Directory to write parquet data files
39+
metadata_dir: Directory to write parquet metadata files
40+
num_partitions: Number of partitions to create
41+
42+
Returns:
43+
List of dict if the written files:
44+
path: str
45+
size: int
46+
num_rows: int
47+
metadata_path: str
48+
"""
49+
data_dir.mkdir(parents=True, exist_ok=True)
50+
metadata_dir.mkdir(parents=True, exist_ok=True)
51+
52+
# Split table into partitions
53+
num_rows = len(table)
54+
partition_size = num_rows // num_partitions
55+
56+
files = []
57+
for i in range(num_partitions):
58+
start_idx = i * partition_size
59+
end_idx = start_idx + partition_size if i < num_partitions - 1 else num_rows
60+
partition_table = table.slice(start_idx, end_idx - start_idx)
61+
62+
# Write partition to parquet
63+
data_path = f"data_partition_{i}.parquet"
64+
partition_file = data_dir / data_path
65+
pq.write_table(partition_table, partition_file)
66+
67+
# Read the parquet metadata
68+
parquet_metadata = pq.read_metadata(partition_file)
69+
70+
# Write parquet metadata to a separate file
71+
metadata_path = f"metadata_partition_{i}.parquet"
72+
metadata_file = metadata_dir / metadata_path
73+
with open(metadata_file, "wb") as f:
74+
parquet_metadata.write_metadata_file(f)
75+
76+
files.append(
77+
{
78+
"path": data_path,
79+
"size": partition_file.stat().st_size,
80+
"num_rows": partition_table.num_rows,
81+
"metadata_path": metadata_path,
82+
}
83+
)
84+
85+
return files
86+
87+
88+
@pytest.mark.parametrize(
89+
("limit", "offset"),
90+
[(0, 0), (1, 0), (10, 5), (20, 15), (150, 180), (100, 900), (250, 750)],
91+
)
92+
@pytest.mark.parametrize("num_partitions", [1, 5, 10])
93+
def test_sync_scan(tmp_path, limit, offset, num_partitions):
94+
data_dir = tmp_path / "data"
95+
metadata_dir = tmp_path / "metadata"
96+
97+
table = generate_sample_table(num_rows=1000)
98+
files = write_partitioned_parquet_dataset(
99+
table=table,
100+
data_dir=data_dir,
101+
metadata_dir=metadata_dir,
102+
num_partitions=num_partitions,
103+
)
104+
105+
# Calculate expected number of files to be read
106+
partition_size = 1000 // num_partitions
107+
if limit == 0:
108+
expected_files_to_read = 0
109+
else:
110+
start_partition = offset // partition_size
111+
end_partition = (offset + limit - 1) // partition_size
112+
expected_files_to_read = min(
113+
end_partition - start_partition + 1, num_partitions
114+
)
115+
116+
dataset = Dataset(
117+
files=files,
118+
name="test_dataset",
119+
data_store=f"file://{data_dir}",
120+
metadata_store=f"file://{metadata_dir}",
121+
)
122+
123+
# Perform synchronous scan, the returned batches should match
124+
# the number of scanned files
125+
batches = dataset.sync_scan(limit=limit, offset=offset)
126+
assert len(batches) == expected_files_to_read
127+
128+
# Concatenate batches and compare with expected sliced table
129+
result = pa.Table.from_batches(batches, schema=table.schema)
130+
expected = table.slice(offset, limit)
131+
assert result.equals(expected)

0 commit comments

Comments
 (0)