Skip to content

Commit 6398033

Browse files
authored
Fix Python regressions in 1.9.0beta (#857)
1 parent b95a04a commit 6398033

File tree

9 files changed

+88
-34
lines changed

9 files changed

+88
-34
lines changed

dbt/adapters/databricks/api_client.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ def start(self, cluster_id: str) -> None:
8888

8989
response = self.session.post("/start", json={"cluster_id": cluster_id})
9090
if response.status_code != 200:
91-
raise DbtRuntimeError(f"Error starting terminated cluster.\n {response.content!r}")
92-
logger.debug(f"Cluster start response={response}")
91+
if self.status(cluster_id) not in ["RUNNING", "PENDING"]:
92+
raise DbtRuntimeError(f"Error starting terminated cluster.\n {response.content!r}")
93+
else:
94+
logger.debug("Presuming race condition, waiting for cluster to start")
9395

9496
self.wait_for_cluster(cluster_id)
9597

@@ -289,7 +291,7 @@ def cancel(self, command: CommandExecution) -> None:
289291
raise DbtRuntimeError(f"Cancel command {command} failed.\n {response.content!r}")
290292

291293
def poll_for_completion(self, command: CommandExecution) -> None:
292-
self._poll_api(
294+
response = self._poll_api(
293295
url="/status",
294296
params={
295297
"clusterId": command.cluster_id,
@@ -300,7 +302,13 @@ def poll_for_completion(self, command: CommandExecution) -> None:
300302
terminal_states={"Finished", "Error", "Cancelled"},
301303
expected_end_state="Finished",
302304
unexpected_end_state_func=self._get_exception,
303-
)
305+
).json()
306+
307+
if response["results"]["resultType"] == "error":
308+
raise DbtRuntimeError(
309+
f"Python model failed with traceback as:\n"
310+
f"{utils.remove_ansi(response['results']['cause'])}"
311+
)
304312

305313
def _get_exception(self, response: Response) -> None:
306314
response_json = response.json()

dbt/adapters/databricks/behaviors/columns.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,6 @@
77
from dbt.adapters.databricks.utils import handle_missing_objects
88
from dbt.adapters.sql import SQLAdapter
99

10-
GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments"
11-
1210

1311
class GetColumnsBehavior(ABC):
1412
@classmethod

dbt/adapters/databricks/impl.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@
8585
SHOW_TABLE_EXTENDED_MACRO_NAME = "show_table_extended"
8686
SHOW_TABLES_MACRO_NAME = "show_tables"
8787
SHOW_VIEWS_MACRO_NAME = "show_views"
88-
GET_COLUMNS_COMMENTS_MACRO_NAME = "get_columns_comments"
88+
8989

9090
USE_INFO_SCHEMA_FOR_COLUMNS = BehaviorFlag(
9191
name="use_info_schema_for_columns",

dbt/adapters/databricks/python_models/python_submissions.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from dbt.adapters.base import PythonJobHelper
99
from dbt.adapters.databricks.api_client import CommandExecution, DatabricksApiClient, WorkflowJobApi
1010
from dbt.adapters.databricks.credentials import DatabricksCredentials
11+
from dbt.adapters.databricks.logging import logger
1112
from dbt.adapters.databricks.python_models.python_config import ParsedPythonModel
1213
from dbt.adapters.databricks.python_models.run_tracking import PythonRunTracker
1314

@@ -70,6 +71,8 @@ def __init__(
7071

7172
@override
7273
def submit(self, compiled_code: str) -> None:
74+
logger.debug("Submitting Python model using the Command API.")
75+
7376
context_id = self.api_client.command_contexts.create(self.cluster_id)
7477
command_exec: Optional[CommandExecution] = None
7578
try:
@@ -263,6 +266,8 @@ def create(
263266

264267
@override
265268
def submit(self, compiled_code: str) -> None:
269+
logger.debug("Submitting Python model using the Job Run API.")
270+
266271
file_path = self.uploader.upload(compiled_code)
267272
job_config = self.config_compiler.compile(file_path)
268273

@@ -494,6 +499,8 @@ def create(
494499

495500
@override
496501
def submit(self, compiled_code: str) -> None:
502+
logger.debug("Submitting Python model using the Workflow API.")
503+
497504
file_path = self.uploader.upload(compiled_code)
498505

499506
workflow_config, existing_job_id = self.config_compiler.compile(file_path)
Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
2+
{% macro get_columns_comments(relation) -%}
3+
{% call statement('get_columns_comments', fetch_result=True) -%}
4+
describe table {{ relation|lower }}
5+
{% endcall %}
6+
7+
{% do return(load_result('get_columns_comments').table) %}
8+
{% endmacro %}
9+
10+
{% macro get_columns_comments_via_information_schema(relation) -%}
11+
{% call statement('repair_table', fetch_result=False) -%}
12+
REPAIR TABLE {{ relation|lower }} SYNC METADATA
13+
{% endcall %}
14+
{% call statement('get_columns_comments_via_information_schema', fetch_result=True) -%}
15+
select
16+
column_name,
17+
full_data_type,
18+
comment
19+
from `system`.`information_schema`.`columns`
20+
where
21+
table_catalog = '{{ relation.database|lower }}' and
22+
table_schema = '{{ relation.schema|lower }}' and
23+
table_name = '{{ relation.identifier|lower }}'
24+
{% endcall %}
25+
26+
{% do return(load_result('get_columns_comments_via_information_schema').table) %}
27+
{% endmacro %}

dbt/include/databricks/macros/adapters/persist_docs.sql

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -20,33 +20,6 @@
2020
{% do run_query(comment_query) %}
2121
{% endmacro %}
2222

23-
{% macro get_columns_comments(relation) -%}
24-
{% call statement('get_columns_comments', fetch_result=True) -%}
25-
describe table {{ relation|lower }}
26-
{% endcall %}
27-
28-
{% do return(load_result('get_columns_comments').table) %}
29-
{% endmacro %}
30-
31-
{% macro get_columns_comments_via_information_schema(relation) -%}
32-
{% call statement('repair_table', fetch_result=False) -%}
33-
REPAIR TABLE {{ relation|lower }} SYNC METADATA
34-
{% endcall %}
35-
{% call statement('get_columns_comments_via_information_schema', fetch_result=True) -%}
36-
select
37-
column_name,
38-
full_data_type,
39-
comment
40-
from `system`.`information_schema`.`columns`
41-
where
42-
table_catalog = '{{ relation.database|lower }}' and
43-
table_schema = '{{ relation.schema|lower }}' and
44-
table_name = '{{ relation.identifier|lower }}'
45-
{% endcall %}
46-
47-
{% do return(load_result('get_columns_comments_via_information_schema').table) %}
48-
{% endmacro %}
49-
5023
{% macro databricks__persist_docs(relation, model, for_relation, for_columns) -%}
5124
{%- if for_relation and config.persist_relation_docs() and model.description %}
5225
{% do alter_table_comment(relation, model) %}

tests/functional/adapter/python_model/fixtures.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,15 @@ def model(dbt, spark):
99
return spark.createDataFrame(data, schema=['test', 'test2'])
1010
"""
1111

12+
python_error_model = """
13+
import pandas as pd
14+
15+
def model(dbt, spark):
16+
raise Exception("This is an error")
17+
18+
return pd.DataFrame()
19+
"""
20+
1221
serverless_schema = """version: 2
1322
1423
models:

tests/functional/adapter/python_model/test_python_model.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,25 @@ class TestPythonModel(BasePythonModelTests):
1717
pass
1818

1919

20+
@pytest.mark.python
21+
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
22+
class TestPythonFailureModel:
23+
@pytest.fixture(scope="class")
24+
def models(self):
25+
return {"my_failure_model.py": override_fixtures.python_error_model}
26+
27+
def test_failure_model(self, project):
28+
util.run_dbt(["run"], expect_pass=False)
29+
30+
31+
@pytest.mark.python
32+
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
33+
class TestPythonFailureModelNotebook(TestPythonFailureModel):
34+
@pytest.fixture(scope="class")
35+
def project_config_update(self):
36+
return {"models": {"+create_notebook": "true"}}
37+
38+
2039
@pytest.mark.python
2140
@pytest.mark.skip_profile("databricks_uc_sql_endpoint")
2241
class TestPythonIncrementalModel(BasePythonIncrementalTests):

tests/unit/api_client/test_command_api.py

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def test_poll_for_completion__200(self, _, api, session, host, execution):
8686
session.get.return_value.status_code = 200
8787
session.get.return_value.json.return_value = {
8888
"status": "Finished",
89+
"results": {"resultType": "finished"},
8990
}
9091

9192
api.poll_for_completion(execution)
@@ -99,3 +100,15 @@ def test_poll_for_completion__200(self, _, api, session, host, execution):
99100
},
100101
json=None,
101102
)
103+
104+
@freezegun.freeze_time("2020-01-01")
105+
@patch("dbt.adapters.databricks.api_client.time.sleep")
106+
def test_poll_for_completion__200_with_error(self, _, api, session, host, execution):
107+
session.get.return_value.status_code = 200
108+
session.get.return_value.json.return_value = {
109+
"status": "Finished",
110+
"results": {"resultType": "error", "cause": "race condition"},
111+
}
112+
113+
with pytest.raises(DbtRuntimeError, match="Python model failed"):
114+
api.poll_for_completion(execution)

0 commit comments

Comments
 (0)