Skip to content

Commit 54c07dc

Browse files
pritishpainfx
andauthored
Converted RuntimeBackend query executions to SDK exceptions (#769)
Convert the query execution exceptions to SDK exceptions like NotFound, PermissionDenied and SyntaxError (currently used BadRequest to map syntax errors). Integration and unit tests added. Fixes #726 --------- Co-authored-by: Serge Smertin <[email protected]>
1 parent 40ae364 commit 54c07dc

File tree

3 files changed

+297
-4
lines changed

3 files changed

+297
-4
lines changed

src/databricks/labs/ucx/framework/crawlers.py

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from typing import Any, ClassVar, Generic, Protocol, TypeVar
99

1010
from databricks.sdk import WorkspaceClient
11-
from databricks.sdk.errors import NotFound
11+
from databricks.sdk.errors import BadRequest, NotFound, PermissionDenied, Unknown
1212

1313
from databricks.labs.ucx.mixins.sql import Row, StatementExecutionExt
1414

@@ -151,11 +151,21 @@ def __init__(self):
151151

152152
def execute(self, sql):
153153
logger.debug(f"[spark][execute] {sql}")
154-
self._spark.sql(sql)
154+
try:
155+
immediate_response = self._spark.sql(sql)
156+
except Exception as e:
157+
error_message = str(e)
158+
self._raise_spark_sql_exceptions(error_message)
159+
return immediate_response
155160

156161
def fetch(self, sql) -> Iterator[Row]:
157162
logger.debug(f"[spark][fetch] {sql}")
158-
return self._spark.sql(sql).collect()
163+
try:
164+
fetch_query_response = self._spark.sql(sql).collect()
165+
except Exception as e:
166+
error_message = str(e)
167+
self._raise_spark_sql_exceptions(error_message)
168+
return fetch_query_response
159169

160170
def save_table(self, full_name: str, rows: Sequence[DataclassInstance], klass: Dataclass, mode: str = "append"):
161171
rows = self._filter_none_rows(rows, klass)
@@ -167,6 +177,19 @@ def save_table(self, full_name: str, rows: Sequence[DataclassInstance], klass: D
167177
df = self._spark.createDataFrame(rows, self._schema_for(klass))
168178
df.write.saveAsTable(full_name, mode=mode)
169179

180+
@staticmethod
181+
def _raise_spark_sql_exceptions(error_message: str):
182+
if "SCHEMA_NOT_FOUND" in error_message:
183+
raise NotFound(error_message) from None
184+
elif "TABLE_OR_VIEW_NOT_FOUND" in error_message:
185+
raise NotFound(error_message) from None
186+
elif "PARSE_SYNTAX_ERROR" in error_message:
187+
raise BadRequest(error_message) from None
188+
elif "Operation not allowed" in error_message:
189+
raise PermissionDenied(error_message) from None
190+
else:
191+
raise Unknown(error_message) from None
192+
170193

171194
class CrawlerBase(Generic[Result]):
172195
def __init__(self, backend: SqlBackend, catalog: str, schema: str, table: str, klass: type[Result]):

tests/integration/framework/test_crawlers.py

Lines changed: 165 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
from databricks.labs.blueprint.commands import CommandExecutor
2+
13
from databricks.labs.ucx.framework.crawlers import SchemaDeployer
24
from databricks.labs.ucx.hive_metastore.grants import Grant
35

@@ -9,3 +11,166 @@ def test_deploys_database(sql_backend, inventory_schema):
911
deployer.deploy_schema()
1012
deployer.deploy_table("grants", Grant)
1113
deployer.deploy_view("grant_detail", "queries/views/grant_detail.sql")
14+
15+
16+
def test_runtime_backend_incorrect_schema_and_table_handled(ws, wsfs_wheel, make_random):
17+
commands = CommandExecutor(ws.clusters, ws.command_execution, lambda: ws.config.cluster_id)
18+
19+
commands.install_notebook_library(f"/Workspace{wsfs_wheel}")
20+
query_response_incorrect_schema_execute = commands.run(
21+
f"""
22+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
23+
from databricks.sdk.errors import NotFound
24+
backend = RuntimeBackend()
25+
try:
26+
backend.execute("USE {make_random()}")
27+
return "FAILED"
28+
except NotFound as e:
29+
return "PASSED"
30+
"""
31+
)
32+
assert query_response_incorrect_schema_execute == "PASSED"
33+
34+
query_response_incorrect_table_execute = commands.run(
35+
f"""
36+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
37+
from databricks.sdk.errors import NotFound
38+
backend = RuntimeBackend()
39+
try:
40+
backend.execute("SELECT * FROM default.{make_random()}")
41+
return "FAILED"
42+
except NotFound as e:
43+
return "PASSED"
44+
"""
45+
)
46+
assert query_response_incorrect_table_execute == "PASSED"
47+
48+
query_response_incorrect_schema_fetch = commands.run(
49+
f"""
50+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
51+
from databricks.sdk.errors import NotFound
52+
backend = RuntimeBackend()
53+
try:
54+
query_response = backend.fetch("DESCRIBE {make_random()}")
55+
return "FAILED"
56+
except NotFound as e:
57+
return "PASSED"
58+
"""
59+
)
60+
assert query_response_incorrect_schema_fetch == "PASSED"
61+
62+
query_response_incorrect_table_fetch = commands.run(
63+
f"""
64+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
65+
from databricks.sdk.errors import NotFound
66+
backend = RuntimeBackend()
67+
try:
68+
query_response = backend.fetch("SELECT * FROM default.{make_random()}")
69+
return "FAILED"
70+
except NotFound as e:
71+
return "PASSED"
72+
"""
73+
)
74+
assert query_response_incorrect_table_fetch == "PASSED"
75+
76+
77+
def test_runtime_backend_incorrect_syntax_handled(ws, wsfs_wheel):
78+
commands = CommandExecutor(ws.clusters, ws.command_execution, lambda: ws.config.cluster_id)
79+
80+
commands.install_notebook_library(f"/Workspace{wsfs_wheel}")
81+
query_response_incorrect_syntax_execute = commands.run(
82+
"""
83+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
84+
from databricks.sdk.errors import BadRequest
85+
backend = RuntimeBackend()
86+
try:
87+
backend.execute("SHWO DTABASES")
88+
return "FAILED"
89+
except BadRequest:
90+
return "PASSED"
91+
"""
92+
)
93+
assert query_response_incorrect_syntax_execute == "PASSED"
94+
95+
query_response_incorrect_syntax_fetch = commands.run(
96+
"""
97+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
98+
from databricks.sdk.errors import BadRequest
99+
backend = RuntimeBackend()
100+
try:
101+
query_response = backend.fetch("SHWO DTABASES")
102+
return "FAILED"
103+
except BadRequest:
104+
return "PASSED"
105+
"""
106+
)
107+
assert query_response_incorrect_syntax_fetch == "PASSED"
108+
109+
110+
def test_runtime_backend_permission_denied_handled(ws, wsfs_wheel):
111+
commands = CommandExecutor(ws.clusters, ws.command_execution, lambda: ws.config.cluster_id)
112+
113+
commands.install_notebook_library(f"/Workspace{wsfs_wheel}")
114+
query_response_permission_denied_execute = commands.run(
115+
"""
116+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
117+
from databricks.sdk.errors import PermissionDenied
118+
backend = RuntimeBackend()
119+
try:
120+
current_user = backend.fetch("SELECT current_user()")
121+
backend.execute(f"GRANT CREATE EXTERNAL LOCATION ON METASTORE TO {current_user}")
122+
return "FAILED"
123+
except PermissionDenied:
124+
return "PASSED"
125+
"""
126+
)
127+
assert query_response_permission_denied_execute == "PASSED"
128+
129+
query_response_permission_denied_fetch = commands.run(
130+
"""
131+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
132+
from databricks.sdk.errors import PermissionDenied
133+
backend = RuntimeBackend()
134+
try:
135+
current_user = backend.fetch(f"SELECT current_user()")
136+
grants = backend.fetch(f"GRANT CREATE EXTERNAL LOCATION ON METASTORE TO {current_user}")
137+
return "FAILED"
138+
except PermissionDenied:
139+
return "PASSED"
140+
"""
141+
)
142+
assert query_response_permission_denied_fetch == "PASSED"
143+
144+
145+
def test_runtime_backend_unknown_error_handled(ws, wsfs_wheel):
146+
commands = CommandExecutor(ws.clusters, ws.command_execution, lambda: ws.config.cluster_id)
147+
148+
commands.install_notebook_library(f"/Workspace{wsfs_wheel}")
149+
150+
query_response_unknown_execute = commands.run(
151+
"""
152+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
153+
from databricks.sdk.errors import Unknown
154+
backend = RuntimeBackend()
155+
try:
156+
backend.execute("SHOW GRANTS ON METASTORE")
157+
return "FAILED"
158+
except Unknown:
159+
return "PASSED"
160+
"""
161+
)
162+
assert query_response_unknown_execute == "PASSED"
163+
164+
query_response_unknown_fetch = commands.run(
165+
"""
166+
from databricks.labs.ucx.framework.crawlers import RuntimeBackend
167+
from databricks.sdk.errors import Unknown
168+
backend = RuntimeBackend()
169+
try:
170+
grants = backend.fetch("SHOW GRANTS ON METASTORE")
171+
return "FAILED"
172+
except Unknown:
173+
return "PASSED"
174+
"""
175+
)
176+
assert query_response_unknown_fetch == "PASSED"

tests/unit/framework/test_crawlers.py

Lines changed: 106 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from unittest import mock
55

66
import pytest
7-
from databricks.sdk.errors import NotFound
7+
from databricks.sdk.errors import BadRequest, NotFound, PermissionDenied, Unknown
88
from databricks.sdk.service import sql
99

1010
from databricks.labs.ucx.framework.crawlers import (
@@ -244,3 +244,108 @@ def test_save_table_with_not_null_constraint_violated(mocker):
244244
assert (
245245
str(exc_info.value) == "Not null constraint violated for column key, row = {'key': None, 'value': 'value'}"
246246
)
247+
248+
249+
def test_raise_spark_sql_exceptions(mocker):
250+
with mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "14.0"}):
251+
pyspark_sql_session = mocker.Mock()
252+
sys.modules["pyspark.sql.session"] = pyspark_sql_session
253+
254+
rb = RuntimeBackend
255+
error_message_invalid_schema = "SCHEMA_NOT_FOUND foo schema does not exist"
256+
with pytest.raises(NotFound):
257+
rb._raise_spark_sql_exceptions(error_message_invalid_schema)
258+
259+
error_message_invalid_table = "TABLE_OR_VIEW_NOT_FOUND foo table does not exist"
260+
with pytest.raises(NotFound):
261+
rb._raise_spark_sql_exceptions(error_message_invalid_table)
262+
263+
error_message_invalid_syntax = "PARSE_SYNTAX_ERROR foo"
264+
with pytest.raises(BadRequest):
265+
rb._raise_spark_sql_exceptions(error_message_invalid_syntax)
266+
267+
error_message_permission_denied = "foo Operation not allowed"
268+
with pytest.raises(PermissionDenied):
269+
rb._raise_spark_sql_exceptions(error_message_permission_denied)
270+
271+
error_message_invalid_schema = "foo error failure"
272+
with pytest.raises(Unknown):
273+
rb._raise_spark_sql_exceptions(error_message_invalid_schema)
274+
275+
276+
def test_execute(mocker):
277+
with mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "14.0"}):
278+
pyspark_sql_session = mocker.Mock()
279+
sys.modules["pyspark.sql.session"] = pyspark_sql_session
280+
rb = RuntimeBackend()
281+
282+
sql_query = "SELECT * from bar"
283+
284+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
285+
"SCHEMA_NOT_FOUND"
286+
)
287+
with pytest.raises(NotFound):
288+
rb.execute(sql_query)
289+
290+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
291+
"TABLE_OR_VIEW_NOT_FOUND"
292+
)
293+
with pytest.raises(NotFound):
294+
rb.execute(sql_query)
295+
296+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
297+
"PARSE_SYNTAX_ERROR"
298+
)
299+
with pytest.raises(BadRequest):
300+
rb.execute(sql_query)
301+
302+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
303+
"Operation not allowed"
304+
)
305+
with pytest.raises(PermissionDenied):
306+
rb.execute(sql_query)
307+
308+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
309+
"foo error occurred"
310+
)
311+
with pytest.raises(Unknown):
312+
rb.execute(sql_query)
313+
314+
315+
def test_fetch(mocker):
316+
with mock.patch.dict(os.environ, {"DATABRICKS_RUNTIME_VERSION": "14.0"}):
317+
pyspark_sql_session = mocker.Mock()
318+
sys.modules["pyspark.sql.session"] = pyspark_sql_session
319+
rb = RuntimeBackend()
320+
321+
sql_query = "SELECT * from bar"
322+
323+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
324+
"SCHEMA_NOT_FOUND"
325+
)
326+
with pytest.raises(NotFound):
327+
rb.fetch(sql_query)
328+
329+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
330+
"TABLE_OR_VIEW_NOT_FOUND"
331+
)
332+
with pytest.raises(NotFound):
333+
rb.fetch(sql_query)
334+
335+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
336+
"PARSE_SYNTAX_ERROR"
337+
)
338+
with pytest.raises(BadRequest):
339+
rb.fetch(sql_query)
340+
341+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
342+
"Operation not allowed"
343+
)
344+
with pytest.raises(PermissionDenied):
345+
rb.fetch(sql_query)
346+
347+
pyspark_sql_session.SparkSession.builder.getOrCreate.return_value.sql.side_effect = Exception(
348+
"foo error occurred"
349+
)
350+
with pytest.raises(Unknown):
351+
rb.fetch(sql_query)

0 commit comments

Comments
 (0)