Skip to content

Commit 466e4f0

Browse files
authored
fix(iqb/pipeline): always create parquet file (#41)
It definitely feels less error prone to always create a file and default to creating an empty file if there's no data.
1 parent cadd6fa commit 466e4f0

File tree

3 files changed

+64
-58
lines changed

3 files changed

+64
-58
lines changed

data/run_query.py

Lines changed: 6 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ def run_bq_query(
9191

9292
# Step 1: Execute query and save results using IQBPipeline
9393
# This creates: ./iqb/data/cache/v1/{start}/{end}/{query_name}/
94-
# - data.parquet: query results
94+
# - data.parquet: query results (empty file if no results)
9595
# - stats.json: query metadata
9696
pipeline = IQBPipeline(project_id=project_id, data_dir=data_dir)
9797
result = pipeline.execute_query_template(
@@ -101,14 +101,6 @@ def run_bq_query(
101101
)
102102
info = result.save_parquet()
103103

104-
if info.no_content:
105-
print("⚠ Query returned no results", file=sys.stderr)
106-
if output_file:
107-
output_file.write_text("[]")
108-
else:
109-
print("[]")
110-
return
111-
112104
print(f"✓ Parquet saved: {info.file_path}", file=sys.stderr)
113105

114106
# Save query statistics (timing, bytes processed, template hash)
@@ -119,6 +111,11 @@ def run_bq_query(
119111
print("Converting parquet to JSON...", file=sys.stderr)
120112
table = pq.read_table(info.file_path)
121113
records = table.to_pylist()
114+
115+
# Check if query returned no results
116+
if len(records) <= 0:
117+
print("⚠ Query returned no results", file=sys.stderr)
118+
122119
json_output = json.dumps(records, indent=2)
123120

124121
# Step 3: Write JSON to output file or stdout

library/src/iqb/pipeline.py

Lines changed: 17 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@
8383
from pathlib import Path
8484
from typing import Final
8585

86+
import pyarrow as pa
8687
import pyarrow.parquet as pq
8788
from google.cloud import bigquery, bigquery_storage_v1
8889
from google.cloud.bigquery import job, table
@@ -120,23 +121,17 @@ class CacheEntry:
120121
stats_path: Path
121122

122123

123-
# TODO(bassosimone): create an empty parquet file rather than
124-
# returning "no_content=True" in a subsequent diff. I realized
125-
# that the current approach is easy to get wrong.
126-
127-
128124
@dataclass(frozen=True)
129125
class ParquetFileInfo:
130126
"""
131127
Result of serializing a query result into the cache using parquet.
132128
129+
An empty parquet file is written if the query returns no rows.
130+
133131
Attributes:
134-
no_content: true if the query returned no content, in which case
135-
no parquet file is actually being written.
136132
file_path: full path to the written file.
137133
"""
138134

139-
no_content: bool
140135
file_path: Path
141136

142137

@@ -162,25 +157,30 @@ class QueryResult:
162157
template_hash: str
163158

164159
def save_parquet(self) -> ParquetFileInfo:
165-
"""Streams and saves the query results to data.parquet in cache_dir."""
160+
"""Streams and saves the query results to data.parquet in cache_dir.
161+
162+
If the query returns no rows, an empty parquet file is written.
163+
"""
166164
self.cache_dir.mkdir(parents=True, exist_ok=True)
167165
parquet_path = self.cache_dir / CACHE_DATA_FILENAME
168166

167+
# Note: using .as_posix to avoid paths with backslashes
168+
# that can cause issues with PyArrow on Windows
169+
posix_path = parquet_path.as_posix()
170+
169171
# Access the first batch to obtain the schema
170172
batches = self.rows.to_arrow_iterable(bqstorage_client=self.bq_read_client)
171173
first_batch = next(batches, None)
172-
if first_batch is None:
173-
return ParquetFileInfo(no_content=True, file_path=parquet_path)
174+
schema = first_batch.schema if first_batch is not None else pa.schema([])
174175

175-
# Note: using .as_posix to avoid paths with backslashes
176-
# that can cause issues with PyArrow on Windows
177-
posix_path = parquet_path.as_posix()
178-
with pq.ParquetWriter(posix_path, first_batch.schema) as writer:
179-
writer.write_batch(first_batch)
176+
# Write the possibly-empty parquet file
177+
with pq.ParquetWriter(posix_path, schema) as writer:
178+
if first_batch is not None:
179+
writer.write_batch(first_batch)
180180
for batch in batches:
181181
writer.write_batch(batch)
182182

183-
return ParquetFileInfo(no_content=False, file_path=parquet_path)
183+
return ParquetFileInfo(file_path=parquet_path)
184184

185185
def save_stats(self) -> Path:
186186
"""Writes query statistics to stats.json in cache_dir.

library/tests/iqb/pipeline_test.py

Lines changed: 41 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
from pathlib import Path
55
from unittest.mock import MagicMock, Mock, patch
66

7+
import pyarrow as pa
78
import pytest
89

910
from iqb.pipeline import (
@@ -299,38 +300,53 @@ def test_save_parquet_with_data(self, tmp_path):
299300

300301
info = result.save_parquet()
301302

302-
# Verify
303-
assert info.no_content is False
303+
# Verify file path and directory creation
304304
expected_path = cache_dir / "data.parquet"
305305
assert info.file_path == expected_path
306306
assert cache_dir.exists()
307307
mock_writer_instance.write_batch.assert_called_once_with(mock_batch)
308308

309309
def test_save_parquet_empty_results(self, tmp_path):
310-
"""Test handling of empty query results."""
310+
"""Test handling of empty query results - writes empty parquet file."""
311311
cache_dir = tmp_path / "cache"
312312

313313
# Mock empty iterator
314314
mock_rows = Mock()
315315
mock_rows.to_arrow_iterable.return_value = iter([])
316316

317-
result = QueryResult(
318-
bq_read_client=Mock(),
319-
job=Mock(),
320-
rows=mock_rows,
321-
cache_dir=cache_dir,
322-
query_start_time="2024-11-27T10:00:00.000000Z",
323-
template_hash="abc123",
324-
)
317+
# Mock ParquetWriter to verify it's called with empty schema
318+
with patch("iqb.pipeline.pq.ParquetWriter") as mock_writer:
319+
mock_writer_instance = MagicMock()
320+
mock_writer.return_value.__enter__.return_value = mock_writer_instance
325321

326-
info = result.save_parquet()
322+
result = QueryResult(
323+
bq_read_client=Mock(),
324+
job=Mock(),
325+
rows=mock_rows,
326+
cache_dir=cache_dir,
327+
query_start_time="2024-11-27T10:00:00.000000Z",
328+
template_hash="abc123",
329+
)
327330

328-
# Verify no file created, but directory exists
329-
assert info.no_content is True
330-
expected_path = cache_dir / "data.parquet"
331-
assert info.file_path == expected_path
332-
assert cache_dir.exists()
333-
assert not expected_path.exists()
331+
info = result.save_parquet()
332+
333+
# Verify empty parquet file would be created
334+
expected_path = cache_dir / "data.parquet"
335+
assert info.file_path == expected_path
336+
assert cache_dir.exists()
337+
338+
# Verify ParquetWriter was called with empty schema
339+
mock_writer.assert_called_once()
340+
call_args = mock_writer.call_args
341+
assert call_args[0][0] == expected_path.as_posix()
342+
343+
# Verify schema is empty (no fields)
344+
schema_arg = call_args[0][1]
345+
assert isinstance(schema_arg, pa.Schema)
346+
assert len(schema_arg) == 0
347+
348+
# Verify no batches were written (first_batch is None, for loop has nothing)
349+
mock_writer_instance.write_batch.assert_not_called()
334350

335351
def test_save_parquet_multiple_batches(self, tmp_path):
336352
"""Test saving multiple Arrow batches."""
@@ -362,7 +378,8 @@ def test_save_parquet_multiple_batches(self, tmp_path):
362378

363379
# Verify all batches written
364380
assert mock_writer_instance.write_batch.call_count == 3
365-
assert info.no_content is False
381+
expected_path = cache_dir / "data.parquet"
382+
assert info.file_path == expected_path
366383

367384
def test_save_parquet_creates_nested_directories(self, tmp_path):
368385
"""Test that save_parquet creates nested cache directory."""
@@ -392,7 +409,8 @@ def test_save_parquet_creates_nested_directories(self, tmp_path):
392409

393410
# Verify cache directory created
394411
assert cache_dir.exists()
395-
assert info.no_content is False
412+
expected_path = cache_dir / "data.parquet"
413+
assert info.file_path == expected_path
396414

397415

398416
class TestQueryResultSaveStats:
@@ -672,18 +690,9 @@ def test_get_cache_entry_validation_before_fs_check(self, mock_storage, mock_cli
672690
class TestParquetFileInfo:
673691
"""Test ParquetFileInfo dataclass."""
674692

675-
def test_parquet_file_info_with_content(self, tmp_path):
676-
"""Test ParquetFileInfo creation with content."""
677-
test_file = tmp_path / "test.parquet"
678-
info = ParquetFileInfo(no_content=False, file_path=test_file)
679-
680-
assert info.no_content is False
681-
assert info.file_path == test_file
682-
683-
def test_parquet_file_info_no_content(self, tmp_path):
684-
"""Test ParquetFileInfo creation without content."""
693+
def test_parquet_file_info_creation(self, tmp_path):
694+
"""Test ParquetFileInfo creation."""
685695
test_file = tmp_path / "test.parquet"
686-
info = ParquetFileInfo(no_content=True, file_path=test_file)
696+
info = ParquetFileInfo(file_path=test_file)
687697

688-
assert info.no_content is True
689698
assert info.file_path == test_file

0 commit comments

Comments
 (0)