Skip to content

Commit ebc8e86

Browse files
committed
retry until incoming tables are available
1 parent a5b0122 commit ebc8e86

File tree

2 files changed

+102
-0
lines changed

2 files changed

+102
-0
lines changed

src/lando/utils/management/commands/etl.py

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,7 +12,9 @@
1212

1313
from django.core.management.base import BaseCommand, CommandError, CommandParser
1414
from django.db.models import Model, Q, QuerySet
15+
from google.api_core.exceptions import NotFound
1516
from google.cloud import bigquery
17+
from google.cloud.bigquery import Table
1618
from more_itertools import chunked
1719

1820
from lando.main.models import BaseModel
@@ -271,6 +273,48 @@ def setup(self, transformers: list[ModelTransformer]) -> None:
271273
)
272274
logger.info("Created incoming table for %s.", transformer.name)
273275

276+
self.wait_for_incoming_tables()
277+
278+
def wait_for_incoming_tables(
279+
self, max_retries: int = 5, retry_base_delay_s: float = 1.0
280+
) -> None:
281+
"""Wait for all incoming tables to be visible to the BigQuery API.
282+
283+
BigQuery's streaming insert API is eventually consistent with table
284+
creation, so a newly created table may not be immediately available.
285+
This method polls `get_table` for each incoming table until all are
286+
confirmed visible, preventing `NotFound` errors on the first insert.
287+
"""
288+
pending: list[Table] = list(self.incoming_tables.values())
289+
290+
for attempt in range(max_retries):
291+
still_pending: list[Table] = []
292+
for table in pending:
293+
try:
294+
self.bq_client.get_table(sql_table_id(table))
295+
except NotFound:
296+
still_pending.append(table)
297+
298+
if not still_pending:
299+
logger.debug("All incoming tables are ready.")
300+
return
301+
302+
delay = retry_base_delay_s * (2**attempt)
303+
logger.warning(
304+
"Waiting %.1fs for %d incoming table(s) to become visible.",
305+
delay,
306+
len(still_pending),
307+
)
308+
time.sleep(delay)
309+
pending = still_pending
310+
311+
table_ids = [sql_table_id(table) for table in pending]
312+
self.cleanup_incoming_tables()
313+
raise CommandError(
314+
f"Incoming tables not visible after {max_retries} retries: "
315+
f"{', '.join(table_ids)}"
316+
)
317+
274318
def load(
275319
self, transformer: ModelTransformer, queryset: QuerySet, chunk_size: int = 500
276320
) -> int:

src/lando/utils/tests/test_etl.py

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from django.contrib.auth.models import User
99
from django.core.management import call_command
1010
from django.core.management.base import CommandError
11+
from google.api_core.exceptions import NotFound
1112

1213
from lando.main.models.uplift import (
1314
RevisionUpliftJob,
@@ -17,6 +18,7 @@
1718
UpliftSubmission,
1819
)
1920
from lando.utils.management.commands.etl import (
21+
BigQueryLoader,
2022
Command,
2123
JsonLinesLoader,
2224
RepoTransformer,
@@ -545,3 +547,59 @@ def test_extract_filters_out_records_before_cutoff():
545547
assert (
546548
old_assessment.id not in result_ids
547549
), "Records before the cutoff should be excluded."
550+
551+
552+
def test_wait_for_incoming_tables_succeeds_immediately():
553+
"""Should return immediately when all tables are visible."""
554+
mock_client = MagicMock()
555+
loader = BigQueryLoader(mock_client)
556+
557+
incoming_table = MagicMock()
558+
incoming_table.project = "proj"
559+
incoming_table.dataset_id = "ds"
560+
incoming_table.table_id = "tbl_incoming"
561+
loader.incoming_tables["key"] = incoming_table
562+
563+
loader.wait_for_incoming_tables(retry_base_delay_s=0)
564+
565+
assert (
566+
mock_client.get_table.call_count == 1
567+
), "Should call `get_table` once per incoming table."
568+
569+
570+
def test_wait_for_incoming_tables_retries_on_not_found():
571+
"""Should retry when `get_table` raises `NotFound`, then succeed."""
572+
mock_client = MagicMock()
573+
mock_client.get_table.side_effect = [
574+
NotFound("Table not found."),
575+
MagicMock(),
576+
]
577+
loader = BigQueryLoader(mock_client)
578+
579+
incoming_table = MagicMock()
580+
incoming_table.project = "proj"
581+
incoming_table.dataset_id = "ds"
582+
incoming_table.table_id = "tbl_incoming"
583+
loader.incoming_tables["key"] = incoming_table
584+
585+
loader.wait_for_incoming_tables(retry_base_delay_s=0)
586+
587+
assert (
588+
mock_client.get_table.call_count == 2
589+
), "Should have retried after `NotFound`."
590+
591+
592+
def test_wait_for_incoming_tables_raises_after_max_retries():
593+
"""Should raise `CommandError` when tables never become visible."""
594+
mock_client = MagicMock()
595+
mock_client.get_table.side_effect = NotFound("Table not found.")
596+
loader = BigQueryLoader(mock_client)
597+
598+
incoming_table = MagicMock()
599+
incoming_table.project = "proj"
600+
incoming_table.dataset_id = "ds"
601+
incoming_table.table_id = "tbl_incoming"
602+
loader.incoming_tables["key"] = incoming_table
603+
604+
with pytest.raises(CommandError, match="Incoming tables not visible"):
605+
loader.wait_for_incoming_tables(max_retries=2, retry_base_delay_s=0)

0 commit comments

Comments
 (0)