Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
67 changes: 42 additions & 25 deletions src/webapp/databricks.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from .config import databricks_vars, gcs_vars
from .utilities import databricksify_inst_name, SchemaType
from typing import List, Any
from databricks.sdk.errors import DatabricksError

# List of data medallion levels
MEDALLION_LEVELS = ["silver", "gold", "bronze"]
Expand Down Expand Up @@ -205,36 +206,52 @@ def fetch_table_data(
Runs a simple SELECT * FROM <catalog>.<schema>.<table> LIMIT <limit>
against the specified SQL warehouse, and returns a list of row‐dicts.
"""
w = WorkspaceClient(
host=databricks_vars["DATABRICKS_HOST_URL"],
google_service_account=gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"],
)
if not w:
raise ValueError(
"fetch_table_data(): could not initialize WorkspaceClient."
)

fq_table = f"`{catalog_name}`.`{schema_name}`.`{table_name}`"
sql = f"SELECT * FROM {fq_table} LIMIT {limit}"
try:
client = WorkspaceClient(
host=databricks_vars["DATABRICKS_HOST_URL"],
google_service_account=gcs_vars["GCP_SERVICE_ACCOUNT_EMAIL"],
)
except Exception as e:
raise ValueError(f"Failed to initialize WorkspaceClient: {e}")

resp = w.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=sql,
format=Format.JSON_ARRAY,
wait_timeout="30s",
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CANCEL,
)
# 2. Build SQL text
fully_qualified = f"`{catalog_name}`.`{schema_name}`.`{table_name}`"
sql_text = f"SELECT * FROM {fully_qualified} LIMIT {limit}"

if not resp or not getattr(resp, "status", None):
raise ValueError("fetch_table_data(): invalid response or missing status")
# 3. Execute with INLINE+JSON_ARRAY, wait up to 30s, then CANCEL if not done
try:
resp = client.statement_execution.execute_statement(
warehouse_id=warehouse_id,
statement=sql_text,
disposition="INLINE", # INLINE disposition
format=Format.JSON_ARRAY, # JSON_ARRAY format
wait_timeout="30s", # up to 30 seconds
on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CANCEL, # cancel if not done
)
except DatabricksError as e:
raise ValueError(f"Databricks API call failed: {e}")

if resp.status.state != "SUCCEEDED":
raise ValueError(
f"fetch_table_data(): query failed with state {resp.status.state}"
# 4. Check final state
state = resp.status.state
if state != "SUCCEEDED":
# If there’s an error object, include its message
err = resp.status.error
msg = (
err.message
if (err is not None and err.message)
else "No additional error info."
)
raise ValueError(f"Query did not succeed (state={state}): {msg}")

# 5. Ensure manifest and result are present
if resp.manifest is None or resp.manifest.schema is None:
raise ValueError("Query succeeded but schema manifest is missing.")
if resp.result is None or resp.result.data_array is None:
raise ValueError("Query succeeded but result data is missing.")

# Extract rows
# 6. Extract column names and rows
column_names = [col.name for col in resp.manifest.schema]
rows = resp.result.data_array
data_array = resp.result.data_array

return [dict(zip(column_names, row)) for row in rows]
return [dict(zip(column_names, row)) for row in data_array]
Loading