Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 11 additions & 28 deletions src/webapp/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@
from .config import databricks_vars, gcs_vars
from .utilities import databricksify_inst_name, SchemaType
from typing import List, Any
import time

# List of data medallion levels
MEDALLION_LEVELS = ["silver", "gold", "bronze"]
Expand Down Expand Up @@ -222,36 +221,20 @@ def fetch_table_data(
warehouse_id=warehouse_id,
statement=sql,
format=Format.JSON_ARRAY,
wait_timeout="10s",
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CONTINUE,
wait_timeout="30s",
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CANCEL,
)

status = getattr(resp, "status", None)
if status and status.state == "SUCCEEDED" and getattr(resp, "result", None):
# resp.results is a list of row‐arrays, resp.schema is a list of column metadata
column_names = [col.name for col in resp.manifest.schema]
rows = resp.result.data_array
else:
# A. If the SQL didn’t finish in 10 seconds, resp.statement_id will be set.
stmt_id = getattr(resp, "statement_id", None)
if not stmt_id:
raise ValueError(
f"fetch_table_data(): unexpected response state: {resp}"
)
if not resp or not getattr(resp, "status", None):
raise ValueError("fetch_table_data(): invalid response or missing status")

# B. Poll until the statement succeeds (or fails/cancels)
status = resp.status.state if getattr(resp, "status", None) else None
while status not in ("SUCCEEDED", "FAILED", "CANCELED"):
time.sleep(1)
resp2 = w.statement_execution.get_statement(statement_id=stmt_id)
status = resp2.status.state if getattr(resp2, "status", None) else None
resp = resp2
if status != "SUCCEEDED":
raise ValueError(f"fetch_table_data(): query ended with state {status}")
if resp.status.state != "SUCCEEDED":
raise ValueError(
f"fetch_table_data(): query failed with state {resp.status.state}"
)

# C. At this point, resp holds the final manifest and first chunk
column_names = [col.name for col in resp.manifest.schema]
rows = resp.result.data_array
# Extract rows
column_names = [col.name for col in resp.manifest.schema]
rows = resp.result.data_array

# Transform each row (a list of values) into a dict
return [dict(zip(column_names, row)) for row in rows]
2 changes: 0 additions & 2 deletions src/webapp/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@
import secrets
from fastapi import FastAPI, Depends, HTTPException, status, Security
from fastapi.responses import FileResponse
from fastapi.security import OAuth2PasswordRequestForm
from pydantic import BaseModel
from sqlalchemy.future import select
from sqlalchemy import update
Expand Down Expand Up @@ -38,7 +37,6 @@
create_access_token,
get_api_key,
get_api_key_hash,
check_creds,
)

# Set the logging
Expand Down
1 change: 0 additions & 1 deletion src/webapp/main_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
get_session,
ApiKeyTable,
)
from unittest.mock import patch
from .authn import get_password_hash, get_api_key_hash
from .test_helper import (
DATAKINDER,
Expand Down
Loading