Skip to content

Commit 1d50c70

Browse files
authored
Set catalog on SchemaDeployer to overwrite the default hive_metastore (#296)
Set catalog on `SchemaDeployer` to overwrite the default `hive_metastore` ### Linked issues Resolves #294 Needs #280 (tech debt to tackle later) Progresses #278 Requires #287 for the CI to pass
1 parent 605498c commit 1d50c70

File tree

7 files changed

+117
-60
lines changed

7 files changed

+117
-60
lines changed

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ dependencies = [
4646
"coverage[toml]>=6.5",
4747
"pytest",
4848
"pylint",
49+
"databricks-labs-pytester>=0.2.1",
4950
"pytest-xdist",
5051
"pytest-cov>=4.0.0,<5.0.0",
5152
"pytest-mock>=3.0.0,<4.0.0",

src/databricks/labs/lsql/backends.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,10 +13,12 @@
1313
BadRequest,
1414
DatabricksError,
1515
DataLoss,
16+
InternalError,
1617
NotFound,
1718
PermissionDenied,
1819
Unknown,
1920
)
21+
from databricks.sdk.retries import retried
2022
from databricks.sdk.service.compute import Language
2123

2224
from databricks.labs.lsql.core import Row, StatementExecutionExt
@@ -202,6 +204,8 @@ def __init__(self, ws: WorkspaceClient, warehouse_id, *, max_records_per_batch:
202204
self._debug_truncate_bytes = debug_truncate_bytes if isinstance(debug_truncate_bytes, int) else 96
203205
super().__init__(max_records_per_batch)
204206

207+
# InternalError is retried on for resilience on sporadic Databricks issues.
208+
@retried(on=[InternalError], timeout=datetime.timedelta(minutes=1))
205209
def execute(self, sql: str, *, catalog: str | None = None, schema: str | None = None) -> None:
206210
logger.debug(f"[api][execute] {self._only_n_bytes(sql, self._debug_truncate_bytes)}")
207211
self._sql.execute(sql, catalog=catalog, schema=schema)

src/databricks/labs/lsql/core.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ class StatementExecutionExt:
140140
megabytes or gigabytes of data serialized in Apache Arrow format, and low result fetching latency, should use
141141
the stateful Databricks SQL Connector for Python."""
142142

143-
def __init__( # pylint: disable=too-many-arguments
143+
def __init__( # pylint: disable=too-many-arguments,too-many-positional-arguments
144144
self,
145145
ws: WorkspaceClient,
146146
disposition: Disposition | None = None,
Lines changed: 29 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,46 +1,53 @@
1-
import datetime as dt
21
import logging
32
import pkgutil
43
from typing import Any
54

6-
from databricks.sdk.errors import InternalError
7-
from databricks.sdk.retries import retried
8-
95
from databricks.labs.lsql.backends import Dataclass, SqlBackend
106

117
logger = logging.getLogger(__name__)
128

139

1410
class SchemaDeployer:
15-
def __init__(self, sql_backend: SqlBackend, inventory_schema: str, mod: Any):
11+
"""Deploy schema, tables, and views for a given inventory schema."""
12+
13+
def __init__(
14+
self,
15+
sql_backend: SqlBackend,
16+
schema: str,
17+
mod: Any,
18+
*,
19+
catalog: str = "hive_metastore",
20+
) -> None:
1621
self._sql_backend = sql_backend
17-
self._inventory_schema = inventory_schema
22+
self._schema = schema
1823
self._module = mod
24+
self._catalog = catalog
1925

20-
# InternalError are retried for resilience on sporadic Databricks issues
21-
@retried(on=[InternalError], timeout=dt.timedelta(minutes=1))
22-
def deploy_schema(self):
23-
logger.info(f"Ensuring {self._inventory_schema} database exists")
24-
self._sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS hive_metastore.{self._inventory_schema}")
25-
26-
def delete_schema(self):
27-
logger.info(f"deleting {self._inventory_schema} database")
26+
def deploy_schema(self) -> None:
27+
schema_full_name = f"{self._catalog}.{self._schema}"
28+
logger.info(f"Ensuring {schema_full_name} database exists")
29+
self._sql_backend.execute(f"CREATE SCHEMA IF NOT EXISTS {schema_full_name}")
2830

29-
self._sql_backend.execute(f"DROP SCHEMA IF EXISTS hive_metastore.{self._inventory_schema} CASCADE")
31+
def delete_schema(self) -> None:
32+
schema_full_name = f"{self._catalog}.{self._schema}"
33+
logger.info(f"Deleting {schema_full_name} database")
34+
self._sql_backend.execute(f"DROP SCHEMA IF EXISTS {schema_full_name} CASCADE")
3035

31-
def deploy_table(self, name: str, klass: Dataclass):
32-
logger.info(f"Ensuring {self._inventory_schema}.{name} table exists")
33-
self._sql_backend.create_table(f"hive_metastore.{self._inventory_schema}.{name}", klass)
36+
def deploy_table(self, name: str, klass: Dataclass) -> None:
37+
table_full_name = f"{self._catalog}.{self._schema}.{name}"
38+
logger.info(f"Ensuring {table_full_name} table exists")
39+
self._sql_backend.create_table(table_full_name, klass)
3440

35-
def deploy_view(self, name: str, relative_filename: str):
41+
def deploy_view(self, name: str, relative_filename: str) -> None:
3642
query = self._load(relative_filename)
37-
logger.info(f"Ensuring {self._inventory_schema}.{name} view matches {relative_filename} contents")
38-
ddl = f"CREATE OR REPLACE VIEW hive_metastore.{self._inventory_schema}.{name} AS {query}"
43+
view_full_name = f"{self._catalog}.{self._schema}.{name}"
44+
logger.info(f"Ensuring {view_full_name} view matches {relative_filename} contents")
45+
ddl = f"CREATE OR REPLACE VIEW {view_full_name} AS {query}"
3946
self._sql_backend.execute(ddl)
4047

4148
def _load(self, relative_filename: str) -> str:
4249
data = pkgutil.get_data(self._module.__name__, relative_filename)
4350
assert data is not None
4451
sql = data.decode("utf-8")
45-
sql = sql.replace("$inventory", f"hive_metastore.{self._inventory_schema}")
52+
sql = sql.replace("$inventory", f"{self._catalog}.{self._schema}")
4653
return sql

tests/integration/test_backends.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
from databricks.labs.blueprint.installation import Installation
44
from databricks.labs.blueprint.wheels import ProductInfo, WheelsV2
55

6+
from databricks.labs.lsql import Row
67
from databricks.labs.lsql.backends import SqlBackend, StatementExecutionBackend
78

9+
from . import views
10+
811
INCORRECT_SCHEMA = """
912
from databricks.labs.lsql.backends import RuntimeBackend
1013
from databricks.sdk.errors import NotFound
@@ -148,6 +151,18 @@ def test_statement_execution_backend_overrides(ws, env_or_skip):
148151
assert len(rows) == 10
149152

150153

154+
def test_statement_execution_backend_overwrites_table(ws, env_or_skip, make_random) -> None:
155+
sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"))
156+
catalog = env_or_skip("TEST_CATALOG")
157+
schema = env_or_skip("TEST_SCHEMA")
158+
159+
sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("abc", True)], views.Foo, "append")
160+
sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("xyz", True)], views.Foo, "overwrite")
161+
162+
rows = list(sql_backend.fetch(f"SELECT * FROM {catalog}.{schema}.foo"))
163+
assert rows == [Row(first="xyz", second=True)]
164+
165+
151166
def test_runtime_backend_use_statements(ws):
152167
product_info = ProductInfo.for_testing(SqlBackend)
153168
installation = Installation.assume_user_home(ws, product_info.product_name())
Lines changed: 10 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -1,37 +1,24 @@
11
import pytest
22

33
from databricks.labs.lsql import Row
4-
from databricks.labs.lsql.backends import StatementExecutionBackend
54
from databricks.labs.lsql.deployment import SchemaDeployer
65

76
from . import views
87

98

10-
@pytest.mark.xfail
11-
def test_deploys_database(ws, env_or_skip, make_random):
12-
# TODO: create per-project/per-scope catalog
13-
schema = "default"
14-
sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"))
9+
@pytest.mark.xfail(reason="Identity used in CI misses privileges to create UC resources")
10+
def test_deploys_schema(ws, sql_backend, make_random, make_catalog) -> None:
11+
"""Test deploying a full, minimal inventory schema with a single schema, table and view."""
12+
catalog = make_catalog(name=f"lsql_test_{make_random()}")
13+
schema_name = "lsql_test"
14+
table_full_name = f"{catalog.name}.{schema_name}.foo"
1515

16-
deployer = SchemaDeployer(sql_backend, schema, views)
16+
deployer = SchemaDeployer(sql_backend, schema_name, views, catalog=catalog.name)
1717
deployer.deploy_schema()
1818
deployer.deploy_table("foo", views.Foo)
1919
deployer.deploy_view("some", "some.sql")
2020

21-
sql_backend.save_table(f"{schema}.foo", [views.Foo("abc", True)], views.Foo)
22-
rows = list(sql_backend.fetch(f"SELECT * FROM {schema}.some"))
21+
sql_backend.save_table(table_full_name, [views.Foo("abc", True)], views.Foo)
2322

24-
assert rows == [Row(name="abc", id=1)]
25-
26-
27-
def test_overwrite(ws, env_or_skip, make_random):
28-
schema = "default"
29-
sql_backend = StatementExecutionBackend(ws, env_or_skip("TEST_DEFAULT_WAREHOUSE_ID"))
30-
catalog = env_or_skip("TEST_CATALOG")
31-
schema = env_or_skip("TEST_SCHEMA")
32-
33-
sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("abc", True)], views.Foo, "append")
34-
sql_backend.save_table(f"{catalog}.{schema}.foo", [views.Foo("xyz", True)], views.Foo, "overwrite")
35-
rows = list(sql_backend.fetch(f"SELECT * FROM {catalog}.{schema}.foo"))
36-
37-
assert rows == [Row(first="xyz", second=True)]
23+
rows = list(sql_backend.fetch(f"SELECT * FROM {table_full_name}"))
24+
assert rows == [Row(first="abc", second=1)]

tests/unit/test_deployment.py

Lines changed: 57 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,24 +1,46 @@
1+
import logging
12
from dataclasses import dataclass
23

4+
import pytest
5+
36
from databricks.labs.lsql.backends import MockBackend
47
from databricks.labs.lsql.deployment import SchemaDeployer
58

69
from . import views
710

811

9-
def test_deploys_view():
12+
@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"])
13+
def test_deploys_schema(caplog, inventory_catalog: str) -> None:
1014
mock_backend = MockBackend()
1115
deployment = SchemaDeployer(
1216
sql_backend=mock_backend,
13-
inventory_schema="inventory",
17+
schema="inventory",
1418
mod=views,
19+
catalog=inventory_catalog,
1520
)
1621

17-
deployment.deploy_view("some", "some.sql")
22+
with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"):
23+
deployment.deploy_schema()
1824

19-
assert mock_backend.queries == [
20-
"CREATE OR REPLACE VIEW hive_metastore.inventory.some AS SELECT\n id,\n name\nFROM hive_metastore.inventory.something"
21-
]
25+
assert mock_backend.queries == [f"CREATE SCHEMA IF NOT EXISTS {inventory_catalog}.inventory"]
26+
assert f"Ensuring {inventory_catalog}.inventory database exists" in caplog.messages
27+
28+
29+
@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"])
30+
def test_deletes_schema(caplog, inventory_catalog: str) -> None:
31+
mock_backend = MockBackend()
32+
deployment = SchemaDeployer(
33+
sql_backend=mock_backend,
34+
schema="inventory",
35+
mod=views,
36+
catalog=inventory_catalog,
37+
)
38+
39+
with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"):
40+
deployment.delete_schema()
41+
42+
assert mock_backend.queries == [f"DROP SCHEMA IF EXISTS {inventory_catalog}.inventory CASCADE"]
43+
assert f"Deleting {inventory_catalog}.inventory database" in caplog.messages
2244

2345

2446
@dataclass
@@ -27,19 +49,40 @@ class Foo:
2749
second: bool
2850

2951

30-
def test_deploys_dataclass():
52+
@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"])
53+
def test_deploys_dataclass(caplog, inventory_catalog: str) -> None:
3154
mock_backend = MockBackend()
3255
deployment = SchemaDeployer(
3356
sql_backend=mock_backend,
34-
inventory_schema="inventory",
57+
schema="inventory",
3558
mod=views,
59+
catalog=inventory_catalog,
3660
)
37-
deployment.deploy_schema()
38-
deployment.deploy_table("foo", Foo)
39-
deployment.delete_schema()
61+
62+
with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"):
63+
deployment.deploy_table("foo", Foo)
64+
65+
assert mock_backend.queries == [
66+
f"CREATE TABLE IF NOT EXISTS {inventory_catalog}.inventory.foo (first STRING NOT NULL, second BOOLEAN NOT NULL) USING DELTA",
67+
]
68+
assert f"Ensuring {inventory_catalog}.inventory.foo table exists" in caplog.messages
69+
70+
71+
@pytest.mark.parametrize("inventory_catalog", ["hive_metastore", "inventory"])
72+
def test_deploys_view(caplog, inventory_catalog: str) -> None:
73+
mock_backend = MockBackend()
74+
deployment = SchemaDeployer(
75+
sql_backend=mock_backend,
76+
schema="inventory",
77+
mod=views,
78+
catalog=inventory_catalog,
79+
)
80+
81+
with caplog.at_level(logging.INFO, logger="databricks.labs.lsql.deployment"):
82+
deployment.deploy_view("some", "some.sql")
4083

4184
assert mock_backend.queries == [
42-
"CREATE SCHEMA IF NOT EXISTS hive_metastore.inventory",
43-
"CREATE TABLE IF NOT EXISTS hive_metastore.inventory.foo (first STRING NOT NULL, second BOOLEAN NOT NULL) USING DELTA",
44-
"DROP SCHEMA IF EXISTS hive_metastore.inventory CASCADE",
85+
f"CREATE OR REPLACE VIEW {inventory_catalog}.inventory.some AS SELECT\n id,\n name\n"
86+
f"FROM {inventory_catalog}.inventory.something"
4587
]
88+
assert f"Ensuring {inventory_catalog}.inventory.some view matches some.sql contents" in caplog.messages

0 commit comments

Comments
 (0)