Skip to content

Commit b0a451d

Browse files
committed
refined fetch table function
1 parent 361735c commit b0a451d

File tree

1 file changed

+60
-22
lines changed

1 file changed

+60
-22
lines changed

src/webapp/databricks.py

Lines changed: 60 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -309,7 +309,9 @@ def delete_inst(self, inst_name: str) -> None:
309309
f"Tables or schemas could not be deleted for {medallion}{e}"
310310
)
311311

312-
def fetch_table_data(self, catalog_name: str, inst_name: str, table_name: str, warehouse_id: str) -> List[Dict[str, Any]]:
312+
def fetch_table_data(
313+
self, catalog_name: str, inst_name: str, table_name: str, warehouse_id: str
314+
) -> List[Dict[str, Any]]:
313315
w = WorkspaceClient(
314316
host=databricks_vars["DATABRICKS_HOST_URL"],
315317
google_service_account=gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"],
@@ -318,38 +320,53 @@ def fetch_table_data(self, catalog_name: str, inst_name: str, table_name: str, w
318320
table_fqn = f"`{catalog_name}`.`{schema}_silver`.`{table_name}`"
319321
sql = f"SELECT * FROM {table_fqn}"
320322

321-
#1) Execute INLINE + poll until SUCCEEDED
323+
# 1) Execute INLINE + poll until SUCCEEDED
322324
resp = w.statement_execution.execute_statement(
323-
warehouse_id=warehouse_id, statement=sql,
324-
disposition=Disposition.INLINE, format=Format.JSON_ARRAY,
325-
wait_timeout="30s", on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CONTINUE,
325+
warehouse_id=warehouse_id,
326+
statement=sql,
327+
disposition=Disposition.INLINE,
328+
format=Format.JSON_ARRAY,
329+
wait_timeout="30s",
330+
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CONTINUE,
326331
)
327332

328-
MAX_BYTES = 20 * 1024 * 1024 # 20 MiB
333+
MAX_BYTES = 20 * 1024 * 1024 # 20 MiB
329334
POLL_INTERVAL_S = 1.0
330-
POLL_TIMEOUT_S = 300.0 # 5 minutes
335+
POLL_TIMEOUT_S = 300.0 # 5 minutes
331336

332337
start = time.time()
333-
while not resp.status or resp.status.state not in {"SUCCEEDED", "FAILED", "CANCELED"}:
338+
while not resp.status or resp.status.state not in {
339+
"SUCCEEDED",
340+
"FAILED",
341+
"CANCELED",
342+
}:
334343
if time.time() - start > POLL_TIMEOUT_S:
335344
raise TimeoutError("Timed out waiting for statement to finish (INLINE)")
336345
time.sleep(POLL_INTERVAL_S)
337346
resp = w.statement_execution.get_statement(statement_id=resp.statement_id)
338347
if resp.status.state != "SUCCEEDED":
339-
msg = resp.status.error.message if resp.status and resp.status.error else "no details"
348+
msg = (
349+
resp.status.error.message
350+
if resp.status and resp.status.error
351+
else "no details"
352+
)
340353
raise ValueError(f"Statement ended in {resp.status.state}: {msg}")
341354

342-
if not (resp.manifest and resp.manifest.schema and resp.manifest.schema.columns):
355+
if not (
356+
resp.manifest and resp.manifest.schema and resp.manifest.schema.columns
357+
):
343358
raise ValueError("Schema/columns missing.")
344359
cols = [c.name for c in resp.manifest.schema.columns]
345360

346-
#2) Build INLINE records until ~20 MiB; if projected to exceed, switch to EXTERNAL_LINKS ---
361+
# 2) Build INLINE records until ~20 MiB; if projected to exceed, switch to EXTERNAL_LINKS ---
347362
records: List[Dict[str, Any]] = []
348363
bytes_so_far, have_items = 0, False
349364

350365
def add_row(rd: Dict[str, Any]) -> bool:
351366
nonlocal bytes_so_far, have_items
352-
b = json.dumps(rd, ensure_ascii=False, separators=(",", ":")).encode("utf-8")
367+
b = json.dumps(rd, ensure_ascii=False, separators=(",", ":")).encode(
368+
"utf-8"
369+
)
353370
projected = bytes_so_far + (1 if have_items else 0) + len(b) + 2
354371
if projected > MAX_BYTES:
355372
return False
@@ -386,30 +403,51 @@ def consume_inline_chunk(chunk_obj) -> bool:
386403
if not inline_over_limit:
387404
return records # INLINE fit under 20 MiB
388405

389-
#3) Re-execute with EXTERNAL_LINKS, then download each presigned URL (no auth header) ---
406+
# 3) Re-execute with EXTERNAL_LINKS, then download each presigned URL (no auth header) ---
390407
resp = w.statement_execution.execute_statement(
391-
warehouse_id=warehouse_id, statement=sql,
392-
disposition=Disposition.EXTERNAL_LINKS, format=Format.JSON_ARRAY,
393-
wait_timeout="30s", on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CONTINUE,
408+
warehouse_id=warehouse_id,
409+
statement=sql,
410+
disposition=Disposition.EXTERNAL_LINKS,
411+
format=Format.JSON_ARRAY,
412+
wait_timeout="30s",
413+
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CONTINUE,
394414
)
395415
start = time.time()
396-
while not resp.status or resp.status.state not in {"SUCCEEDED", "FAILED", "CANCELED"}:
416+
while not resp.status or resp.status.state not in {
417+
"SUCCEEDED",
418+
"FAILED",
419+
"CANCELED",
420+
}:
397421
if time.time() - start > POLL_TIMEOUT_S:
398-
raise TimeoutError("Timed out waiting for statement to finish (EXTERNAL_LINKS)")
422+
raise TimeoutError(
423+
"Timed out waiting for statement to finish (EXTERNAL_LINKS)"
424+
)
399425
time.sleep(POLL_INTERVAL_S)
400426
resp = w.statement_execution.get_statement(statement_id=resp.statement_id)
401427
if resp.status.state != "SUCCEEDED":
402-
msg = resp.status.error.message if resp.status and resp.status.error else "no details"
403-
raise ValueError(f"Statement (EXTERNAL_LINKS) ended in {resp.status.state}: {msg}")
428+
msg = (
429+
resp.status.error.message
430+
if resp.status and resp.status.error
431+
else "no details"
432+
)
433+
raise ValueError(
434+
f"Statement (EXTERNAL_LINKS) ended in {resp.status.state}: {msg}"
435+
)
404436

405-
if not (resp.manifest and resp.manifest.schema and resp.manifest.schema.columns):
437+
if not (
438+
resp.manifest and resp.manifest.schema and resp.manifest.schema.columns
439+
):
406440
raise ValueError("Schema/columns missing (EXTERNAL_LINKS).")
407441
cols = [c.name for c in resp.manifest.schema.columns]
408442

409443
def consume_external_result(result_obj):
410444
links = getattr(result_obj, "external_links", None) or []
411445
for l in links:
412-
url = l.external_link if hasattr(l, "external_link") else l.get("external_link")
446+
url = (
447+
l.external_link
448+
if hasattr(l, "external_link")
449+
else l.get("external_link")
450+
)
413451
r = requests.get(url, timeout=120)
414452
r.raise_for_status()
415453
for row in r.json():

0 commit comments

Comments
 (0)