Skip to content

Commit cadd6fa

Browse files
authored
feat(pipeline): allow checking whether entry exists (#40)
We developed a cache for BigQuery queries but we don't allow checking whether an entry exist. So, add support for that, and, while there, allow to conditionally fetch if missing. This should be enough to update the `./data` pipeline code to avoid re-running expensive queries when we have data.
1 parent 4d8f392 commit cadd6fa

File tree

2 files changed

+418
-32
lines changed

2 files changed

+418
-32
lines changed

library/src/iqb/pipeline.py

Lines changed: 156 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -13,18 +13,21 @@
1313
$datadir/cache/v1/$since/$until/$query_type/stats.json
1414
1515
Each query result is stored in a directory containing the data file (data.parquet)
16-
and query metadata (stats.json). The stats file records query execution details
16+
and the query metadata (stats.json). The stats file records query execution details
1717
such as start time (RFC3339 format with Z suffix), duration, and bytes processed
1818
for transparency and debugging
1919
20-
The `$since` and `$until` variables are timestamps using the ISO8601 format
20+
The `$since` and `$until` variables are timestamps using the RFC3339 format
2121
with UTC timezone, formatted to be file-system friendly (i.e., without
22-
including the ":" character). For example: `20251126T123600Z`.
22+
including the ":" character). For example: `20251126T100000Z`.
2323
2424
The `$since` timestamp is included and the `$until` one is excluded. This
2525
simplifies specifying time ranges significantly (e.g., October 2025 is
2626
represented using `since=20251001T000000Z` and `until=20251101T000000Z`).
2727
28+
Note that when using BigQuery the second component of the path will always
29+
be `T000000Z` because we do not support hourly range queries for now.
30+
2831
The fact that we use explicit timestamps allows the cache to contain any
2932
kind of time range, including partially overlapping ones. This content-
3033
addressable approach means the time range IS the path identifier, eliminating
@@ -91,6 +94,36 @@
9194
"uploads_by_country",
9295
}
9396

97+
# Cache file names
98+
CACHE_DATA_FILENAME: Final[str] = "data.parquet"
99+
CACHE_STATS_FILENAME: Final[str] = "stats.json"
100+
101+
102+
@dataclass(frozen=True)
103+
class ParsedTemplateName:
104+
"""Container for a parsed template name."""
105+
106+
value: str
107+
108+
109+
@dataclass(frozen=True)
110+
class CacheEntry:
111+
"""
112+
Reference to a cache entry containing query results and metadata.
113+
114+
Attributes:
115+
data_path: Path to data.parquet file
116+
stats_path: Path to stats.json file
117+
"""
118+
119+
data_path: Path
120+
stats_path: Path
121+
122+
123+
# TODO(bassosimone): create an empty parquet file rather than
124+
# returning "no_content=True" in a subsequent diff. I realized
125+
# that the current approach is easy to get wrong.
126+
94127

95128
@dataclass(frozen=True)
96129
class ParquetFileInfo:
@@ -131,7 +164,7 @@ class QueryResult:
131164
def save_parquet(self) -> ParquetFileInfo:
132165
"""Streams and saves the query results to data.parquet in cache_dir."""
133166
self.cache_dir.mkdir(parents=True, exist_ok=True)
134-
parquet_path = self.cache_dir / "data.parquet"
167+
parquet_path = self.cache_dir / CACHE_DATA_FILENAME
135168

136169
# Access the first batch to obtain the schema
137170
batches = self.rows.to_arrow_iterable(bqstorage_client=self.bq_read_client)
@@ -156,7 +189,7 @@ def save_stats(self) -> Path:
156189
Path to the written stats.json file.
157190
"""
158191
self.cache_dir.mkdir(parents=True, exist_ok=True)
159-
stats_path = self.cache_dir / "stats.json"
192+
stats_path = self.cache_dir / CACHE_STATS_FILENAME
160193

161194
# Calculate query duration from BigQuery job
162195
query_duration_seconds = None
@@ -199,6 +232,72 @@ def __init__(self, project_id: str, data_dir: str | Path | None = None):
199232
self.bq_read_clnt = bigquery_storage_v1.BigQueryReadClient()
200233
self.data_dir = cache.data_dir_or_default(data_dir)
201234

235+
def _cache_dir_path(
236+
self,
237+
tname: ParsedTemplateName,
238+
start_time: datetime,
239+
end_time: datetime,
240+
) -> Path:
241+
fs_date_format = "%Y%m%dT000000Z"
242+
start_dir = start_time.strftime(fs_date_format)
243+
end_dir = end_time.strftime(fs_date_format)
244+
return self.data_dir / "cache" / "v1" / start_dir / end_dir / tname.value
245+
246+
def get_cache_entry(
247+
self,
248+
template: str,
249+
start_date: str,
250+
end_date: str,
251+
*,
252+
fetch_if_missing: bool = False,
253+
) -> CacheEntry:
254+
"""
255+
Get or create a cache entry for the given query.
256+
257+
Args:
258+
template: name for the query template (e.g., "downloads_by_country")
259+
start_date: Date when to start the query (included) -- format YYYY-MM-DD
260+
end_date: Date when to end the query (excluded) -- format YYYY-MM-DD
261+
fetch_if_missing: if True, execute query and save if cache doesn't exist.
262+
Default is False (do not fetch automatically).
263+
264+
Returns:
265+
CacheEntry with paths to data.parquet and stats.json.
266+
267+
Raises:
268+
FileNotFoundError: if cache doesn't exist and fetch_if_missing is False.
269+
"""
270+
# 1. parse the start and the end dates
271+
start_time, end_time = _parse_both_dates(start_date, end_date)
272+
273+
# 2. ensure the template name is correct
274+
tname = _parse_template_name(template)
275+
276+
# 3. obtain information about the cache dir and files
277+
cache_dir = self._cache_dir_path(tname, start_time, end_time)
278+
data_path = cache_dir / CACHE_DATA_FILENAME
279+
stats_path = cache_dir / CACHE_STATS_FILENAME
280+
281+
# 4. check if cache exists
282+
if data_path.exists() and stats_path.exists():
283+
return CacheEntry(data_path=data_path, stats_path=stats_path)
284+
285+
# 5. handle missing cache without auto-fetching
286+
if not fetch_if_missing:
287+
raise FileNotFoundError(
288+
f"Cache entry not found for {template} "
289+
f"({start_date} to {end_date}). "
290+
f"Set fetch_if_missing=True to execute query."
291+
)
292+
293+
# 6. execute query and update the cache
294+
result = self._execute_query_template(tname, start_time, end_time)
295+
result.save_parquet()
296+
result.save_stats()
297+
298+
# 7. return information about the cache entry
299+
return CacheEntry(data_path=data_path, stats_path=stats_path)
300+
202301
def execute_query_template(
203302
self,
204303
template: str,
@@ -217,31 +316,38 @@ def execute_query_template(
217316
A QueryResult instance.
218317
"""
219318
# 1. parse the start and the end dates
220-
start_time = _parse_date(start_date)
221-
end_time = _parse_date(end_date)
222-
if start_time > end_time:
223-
raise ValueError(f"start_date must be <= end_date, got: {start_date} > {end_date}")
224-
225-
# 2. load the query template and get its hash
226-
if template not in VALID_TEMPLATE_NAMES:
227-
valid = ", ".join(sorted(VALID_TEMPLATE_NAMES))
228-
raise ValueError(f"Unknown template {template!r}; valid templates: {valid}")
229-
query, template_hash = _load_query_template(template, start_date, end_date)
230-
231-
# 3. record query start time (RFC3339 format with Z suffix)
319+
start_time, end_time = _parse_both_dates(start_date, end_date)
320+
321+
# 2. ensure the template name is correct
322+
tname = _parse_template_name(template)
323+
324+
# 3. defer to the private implementation
325+
return self._execute_query_template(
326+
tname=tname,
327+
start_time=start_time,
328+
end_time=end_time,
329+
)
330+
331+
def _execute_query_template(
332+
self,
333+
tname: ParsedTemplateName,
334+
start_time: datetime,
335+
end_time: datetime,
336+
) -> QueryResult:
337+
# 1. load the actual query
338+
query, template_hash = _load_query_template(tname, start_time, end_time)
339+
340+
# 2. record query start time (RFC3339 format with Z suffix)
232341
query_start_time = datetime.now(UTC).strftime("%Y-%m-%dT%H:%M:%S.%fZ")
233342

234-
# 4. execute the query and get job and iterable rows
343+
# 3. execute the query and get job and iterable rows
235344
job = self.client.query(query)
236345
rows = job.result()
237346

238-
# 5. compute the directory where we would save the results
239-
fs_date_format = "%Y%m%dT000000Z"
240-
start_dir = start_time.strftime(fs_date_format)
241-
end_dir = end_time.strftime(fs_date_format)
242-
cache_dir = self.data_dir / "cache" / "v1" / start_dir / end_dir / template
347+
# 4. compute the directory where we would save the results
348+
cache_dir = self._cache_dir_path(tname, start_time, end_time)
243349

244-
# 6. return the result object
350+
# 5. return the result object
245351
return QueryResult(
246352
bq_read_client=self.bq_read_clnt,
247353
job=job,
@@ -252,28 +358,50 @@ def execute_query_template(
252358
)
253359

254360

361+
def _parse_both_dates(start_date: str, end_date: str) -> tuple[datetime, datetime]:
362+
"""Parses both dates and ensures start_date <= end_date."""
363+
start_time = _parse_date(start_date)
364+
end_time = _parse_date(end_date)
365+
if start_time > end_time:
366+
raise ValueError(f"start_date must be <= end_date, got: {start_date} > {end_date}")
367+
return start_time, end_time
368+
369+
370+
def _parse_template_name(value: str) -> ParsedTemplateName:
371+
"""Ensure that the template name is a valid template name."""
372+
if value not in VALID_TEMPLATE_NAMES:
373+
valid = ", ".join(sorted(VALID_TEMPLATE_NAMES))
374+
raise ValueError(f"Unknown template {value!r}; valid templates: {valid}")
375+
return ParsedTemplateName(value=value)
376+
377+
255378
def _parse_date(value: str) -> datetime:
379+
"""Ensure that a single date is consistent with the format and return it parsed."""
256380
try:
257381
return datetime.strptime(value, "%Y-%m-%d")
258382
except ValueError as e:
259383
raise ValueError(f"Invalid date format: {value} (expected YYYY-MM-DD)") from e
260384

261385

262-
def _load_query_template(name: str, start_date: str, end_date: str) -> tuple[str, str]:
386+
def _load_query_template(
387+
tname: ParsedTemplateName,
388+
start_date: datetime,
389+
end_date: datetime,
390+
) -> tuple[str, str]:
263391
"""Load and instantiate a query template.
264392
265393
Returns:
266394
Tuple of (instantiated_query, template_hash) where template_hash is
267395
the SHA256 hash of the original template file.
268396
"""
269-
query_file = files(queries).joinpath(f"{name}.sql")
397+
query_file = files(queries).joinpath(f"{tname.value}.sql")
270398
template_text = query_file.read_text()
271399

272400
# Compute hash of the original template (before substitution)
273401
template_hash = hashlib.sha256(template_text.encode("utf-8")).hexdigest()
274402

275403
# Instantiate the template
276-
query = template_text.replace("{START_DATE}", start_date)
277-
query = query.replace("{END_DATE}", end_date)
404+
query = template_text.replace("{START_DATE}", start_date.strftime("%Y-%m-%d"))
405+
query = query.replace("{END_DATE}", end_date.strftime("%Y-%m-%d"))
278406

279407
return query, template_hash

0 commit comments

Comments
 (0)