|
8 | 8 | from .config import databricks_vars, gcs_vars |
9 | 9 | from .utilities import databricksify_inst_name, SchemaType |
10 | 10 | from typing import List, Any |
11 | | -import time |
12 | 11 |
|
13 | 12 | # List of data medallion levels |
14 | 13 | MEDALLION_LEVELS = ["silver", "gold", "bronze"] |
@@ -222,36 +221,20 @@ def fetch_table_data( |
222 | 221 | warehouse_id=warehouse_id, |
223 | 222 | statement=sql, |
224 | 223 | format=Format.JSON_ARRAY, |
225 | | - wait_timeout="10s", |
226 | | - on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CONTINUE, |
| 224 | + wait_timeout="30s", |
| 225 | + on_wait_timeout=ExecuteStatementRequestOnWaitTimeout.CANCEL, |
227 | 226 | ) |
228 | 227 |
|
229 | | - status = getattr(resp, "status", None) |
230 | | - if status and status.state == "SUCCEEDED" and getattr(resp, "result", None): |
231 | | - # resp.results is a list of row‐arrays, resp.schema is a list of column metadata |
232 | | - column_names = [col.name for col in resp.manifest.schema] |
233 | | - rows = resp.result.data_array |
234 | | - else: |
235 | | - # A. If the SQL didn’t finish in 10 seconds, resp.statement_id will be set. |
236 | | - stmt_id = getattr(resp, "statement_id", None) |
237 | | - if not stmt_id: |
238 | | - raise ValueError( |
239 | | - f"fetch_table_data(): unexpected response state: {resp}" |
240 | | - ) |
| 228 | + if not resp or not getattr(resp, "status", None): |
| 229 | + raise ValueError("fetch_table_data(): invalid response or missing status") |
241 | 230 |
|
242 | | - # B. Poll until the statement succeeds (or fails/cancels) |
243 | | - status = resp.status.state if getattr(resp, "status", None) else None |
244 | | - while status not in ("SUCCEEDED", "FAILED", "CANCELED"): |
245 | | - time.sleep(1) |
246 | | - resp2 = w.statement_execution.get_statement(statement_id=stmt_id) |
247 | | - status = resp2.status.state if getattr(resp2, "status", None) else None |
248 | | - resp = resp2 |
249 | | - if status != "SUCCEEDED": |
250 | | - raise ValueError(f"fetch_table_data(): query ended with state {status}") |
| 231 | + if resp.status.state != "SUCCEEDED": |
| 232 | + raise ValueError( |
| 233 | + f"fetch_table_data(): query failed with state {resp.status.state}" |
| 234 | + ) |
251 | 235 |
|
252 | | - # C. At this point, resp holds the final manifest and first chunk |
253 | | - column_names = [col.name for col in resp.manifest.schema] |
254 | | - rows = resp.result.data_array |
| 236 | + # Extract rows |
| 237 | + column_names = [col.name for col in resp.manifest.schema] |
| 238 | + rows = resp.result.data_array |
255 | 239 |
|
256 | | - # Transform each row (a list of values) into a dict |
257 | 240 | return [dict(zip(column_names, row)) for row in rows] |
0 commit comments