Skip to content

Commit 7171123

Browse files
authored
feat(pipeline)!: explicit BigQuery enable kwarg (#122)
Without this kwarg, a user is one command away from burning their daily querying allowance (true story, happened yesterday). So, let us provide an explicit knob to choose whether to enable or disable BigQuery without any default. Why no default? Well, a true default would defeat the diff purpose. A false default would hide that BigQuery does not run unless you really want to, which could become confusing. An explicit knob, instead, forces the developer to think about whether they really do want to use BigQuery in a specific point in the codebase, and also allows to carry over this decision directly from CLI flags (where it could default to false). I am using a kwarg for get_cache_entry instead of using one for the whole pipeline because that enables creating a pipeline, doing a dry run, then prompting the user whether to perform a real run all without having to construct a new pipeline. BREAKING CHANGE: `IQBPipeline.get_cache_entry` now requires an explicit `enable_bigquery=True|False` kwarg.
1 parent d372786 commit 7171123

File tree

4 files changed

+58
-5
lines changed

4 files changed

+58
-5
lines changed

library/src/iqb/pipeline/pipeline.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,7 @@ def get_cache_entry(
8888
self,
8989
*,
9090
dataset_name: str,
91+
enable_bigquery: bool,
9192
start_date: str,
9293
end_date: str,
9394
) -> PipelineCacheEntry:
@@ -99,6 +100,7 @@ def get_cache_entry(
99100
100101
Args:
101102
dataset_name: Name for the dataset (e.g., "downloads_by_country")
103+
enable_bigquery: Whether to enabled querying from BigQuery.
102104
start_date: Date when to start the query (included) -- format YYYY-MM-DD
103105
end_date: Date when to end the query (excluded) -- format YYYY-MM-DD
104106
@@ -113,7 +115,8 @@ def get_cache_entry(
113115
)
114116

115117
# 2. prepare for synching from BigQuery
116-
entry.syncers.append(self._bq_syncer)
118+
if enable_bigquery:
119+
entry.syncers.append(self._bq_syncer)
117120

118121
# 3. return the entry
119122
return entry

library/src/iqb/scripting/iqb_pipeline.py

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,14 +17,20 @@
1717

1818
@dataclass(frozen=True, kw_only=True)
1919
class Pipeline:
20-
"""Wrapper for IQBPipeline providing convenience methods for scripting."""
20+
"""
21+
Wrapper for IQBPipeline providing convenience methods for scripting.
22+
23+
Attributes:
24+
pipeline: The IQBPipeline to use.
25+
"""
2126

2227
pipeline: IQBPipeline
2328

2429
def sync_mlab(
2530
self,
2631
granularity: str,
2732
*,
33+
enable_bigquery: bool,
2834
end_date: str,
2935
start_date: str,
3036
) -> None:
@@ -36,6 +42,7 @@ def sync_mlab(
3642
3743
Arguments:
3844
end_date: exclusive end date as a YYYY-MM-DD string.
45+
enable_bigquery: whether to enable querying from BigQuery.
3946
granularity: geographical granularity to use.
4047
start_date: incluive start date as a YYYY-MM-DD string.
4148
@@ -57,6 +64,7 @@ def sync_mlab(
5764
granularity=granularity,
5865
table=table,
5966
),
67+
enable_bigquery=enable_bigquery,
6068
start_date=start_date,
6169
end_date=end_date,
6270
)

library/tests/iqb/pipeline/pipeline_test.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -177,6 +177,7 @@ def test_get_cache_entry_when_exists(self, mock_client, tmp_path):
177177
# Get cache entry (should not execute query)
178178
entry = pipeline.get_cache_entry(
179179
dataset_name="downloads_by_country",
180+
enable_bigquery=False,
180181
start_date="2024-10-01",
181182
end_date="2024-11-01",
182183
)
@@ -233,6 +234,7 @@ def mock_save_stats():
233234
# Get cache entry (BigQuery syncer is automatically added)
234235
entry = pipeline.get_cache_entry(
235236
dataset_name="downloads_by_country",
237+
enable_bigquery=True,
236238
start_date="2024-10-01",
237239
end_date="2024-11-01",
238240
)
@@ -257,6 +259,25 @@ def mock_save_stats():
257259
assert entry.data_parquet_file_path().exists()
258260
assert entry.stats_json_file_path().exists()
259261

262+
@patch("iqb.pipeline.pipeline.PipelineBQPQClient")
263+
def test_get_cache_entry_without_bigquery_syncer(self, mock_client, tmp_path):
264+
"""Test that get_cache_entry does not add BigQuery syncer when disabled."""
265+
pipeline = IQBPipeline(project="test-project", data_dir=tmp_path)
266+
267+
entry = pipeline.get_cache_entry(
268+
dataset_name="downloads_by_country",
269+
enable_bigquery=False,
270+
start_date="2024-10-01",
271+
end_date="2024-11-01",
272+
)
273+
274+
assert entry.syncers == []
275+
276+
with entry.lock(), pytest.raises(FileNotFoundError, match="Cache entry not found"):
277+
entry.sync()
278+
279+
mock_client.return_value.execute_query.assert_not_called()
280+
260281
@patch("iqb.pipeline.pipeline.PipelineBQPQClient")
261282
def test_bq_syncer_skip_when_exists(self, mock_client, tmp_path):
262283
"""Test that _bq_syncer skips BigQuery when cache files already exist."""
@@ -280,6 +301,7 @@ def test_bq_syncer_skip_when_exists(self, mock_client, tmp_path):
280301
# Get cache entry (BigQuery syncer is automatically added)
281302
entry = pipeline.get_cache_entry(
282303
dataset_name="downloads_by_country",
304+
enable_bigquery=True,
283305
start_date="2024-10-01",
284306
end_date="2024-11-01",
285307
)
@@ -310,6 +332,7 @@ def test_bq_syncer_failure(self, mock_client, tmp_path):
310332
# Get cache entry
311333
entry = pipeline.get_cache_entry(
312334
dataset_name="downloads_by_country",
335+
enable_bigquery=True,
313336
start_date="2024-10-01",
314337
end_date="2024-11-01",
315338
)
@@ -343,6 +366,7 @@ def test_get_cache_entry_validation_checks(self, mock_client, tmp_path):
343366
with pytest.raises(ValueError, match="Invalid dataset name"):
344367
pipeline.get_cache_entry(
345368
dataset_name="invalid_dataset-name",
369+
enable_bigquery=False,
346370
start_date="2024-10-01",
347371
end_date="2024-11-01",
348372
)
@@ -351,6 +375,7 @@ def test_get_cache_entry_validation_checks(self, mock_client, tmp_path):
351375
with pytest.raises(ValueError, match="start_date must be < end_date"):
352376
pipeline.get_cache_entry(
353377
dataset_name="downloads_by_country",
378+
enable_bigquery=False,
354379
start_date="2024-11-01",
355380
end_date="2024-10-01",
356381
)

library/tests/iqb/scripting/iqb_pipeline_test.py

Lines changed: 20 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,12 @@ def test_syncs_missing_entries(self) -> None:
3838
pipeline.get_cache_entry.side_effect = [entry_download, entry_upload]
3939

4040
wrapper = iqb_pipeline.Pipeline(pipeline=pipeline)
41-
wrapper.sync_mlab("country", start_date="2024-01-01", end_date="2024-02-01")
41+
wrapper.sync_mlab(
42+
"country",
43+
enable_bigquery=True,
44+
start_date="2024-01-01",
45+
end_date="2024-02-01",
46+
)
4247

4348
assert entry_download.synced is True
4449
assert entry_upload.synced is True
@@ -47,11 +52,13 @@ def test_syncs_missing_entries(self) -> None:
4752
assert pipeline.get_cache_entry.call_args_list == [
4853
call(
4954
dataset_name="downloads_by_country",
55+
enable_bigquery=True,
5056
start_date="2024-01-01",
5157
end_date="2024-02-01",
5258
),
5359
call(
5460
dataset_name="uploads_by_country",
61+
enable_bigquery=True,
5562
start_date="2024-01-01",
5663
end_date="2024-02-01",
5764
),
@@ -64,7 +71,12 @@ def test_skips_existing_entries(self) -> None:
6471
pipeline.get_cache_entry.side_effect = [entry_download, entry_upload]
6572

6673
wrapper = iqb_pipeline.Pipeline(pipeline=pipeline)
67-
wrapper.sync_mlab("country", start_date="2024-01-01", end_date="2024-02-01")
74+
wrapper.sync_mlab(
75+
"country",
76+
enable_bigquery=False,
77+
start_date="2024-01-01",
78+
end_date="2024-02-01",
79+
)
6880

6981
assert entry_download.synced is False
7082
assert entry_upload.synced is False
@@ -74,7 +86,12 @@ def test_invalid_granularity_raises(self) -> None:
7486
wrapper = iqb_pipeline.Pipeline(pipeline=pipeline)
7587

7688
with pytest.raises(ValueError, match="invalid granularity value"):
77-
wrapper.sync_mlab("nope", start_date="2024-01-01", end_date="2024-02-01")
89+
wrapper.sync_mlab(
90+
"nope",
91+
enable_bigquery=False,
92+
start_date="2024-01-01",
93+
end_date="2024-02-01",
94+
)
7895

7996
pipeline.get_cache_entry.assert_not_called()
8097

0 commit comments

Comments
 (0)