Skip to content

Commit fbb3b7e

Browse files
authored
Prepare for environments with serverless notebooks (#938)
1 parent f6a58ee commit fbb3b7e

File tree

7 files changed

+83
-4
lines changed

7 files changed

+83
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,13 +3,13 @@
33
### Features
44

55
- Add `auto_liquid_cluster` config to enable Auto Liquid Clustering for Delta-based dbt models ([935](https://github.com/databricks/dbt-databricks/pull/935))
6+
- Prepare for environments for python models with serverless clusters ([938](https://github.com/databricks/dbt-databricks/pull/938))
67

78
### Fixes
89

910
- table_format: iceberg is unblocked for snapshots ([930](https://github.com/databricks/dbt-databricks/pull/930))
1011
- Fix for regression in glue table listing behavior ([934](https://github.com/databricks/dbt-databricks/pull/934))
1112

12-
1313
### Under the Hood
1414

1515
- Collapsing to a single connection manager (since the old one no longer works) ([910](https://github.com/databricks/dbt-databricks/pull/910))

dbt/adapters/databricks/api_client.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,10 @@ def __init__(self, session: Session, host: str, polling_interval: int, timeout:
326326
def submit(
327327
self, run_name: str, job_spec: dict[str, Any], **additional_job_settings: dict[str, Any]
328328
) -> str:
329+
logger.debug(
330+
f"Submitting job with run_name={run_name} and job_spec={job_spec}"
331+
" and additional_job_settings={additional_job_settings}"
332+
)
329333
submit_response = self.session.post(
330334
"/submit", json={"run_name": run_name, "tasks": [job_spec], **additional_job_settings}
331335
)

dbt/adapters/databricks/python_models/python_config.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,8 @@ class PythonModelConfig(BaseModel):
3636
cluster_id: Optional[str] = None
3737
http_path: Optional[str] = None
3838
create_notebook: bool = False
39+
environment_key: Optional[str] = None
40+
environment_dependencies: list[str] = Field(default_factory=list)
3941

4042

4143
class ParsedPythonModel(BaseModel):

dbt/adapters/databricks/python_models/python_submissions.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -209,6 +209,8 @@ def __init__(
209209
self.job_grants = parsed_model.config.python_job_config.grants
210210
self.acls = parsed_model.config.access_control_list
211211
self.additional_job_settings = parsed_model.config.python_job_config.dict()
212+
self.environment_key = parsed_model.config.environment_key
213+
self.environment_deps = parsed_model.config.environment_dependencies
212214

213215
def compile(self, path: str) -> PythonJobDetails:
214216
job_spec: dict[str, Any] = {
@@ -217,9 +219,20 @@ def compile(self, path: str) -> PythonJobDetails:
217219
"notebook_path": path,
218220
},
219221
}
220-
job_spec.update(self.cluster_spec) # updates 'new_cluster' config
221222

222223
additional_job_config = self.additional_job_settings
224+
225+
if self.environment_key:
226+
job_spec["environment_key"] = self.environment_key
227+
if self.environment_deps and not self.additional_job_settings.get("environments"):
228+
additional_job_config["environments"] = [
229+
{
230+
"environment_key": self.environment_key,
231+
"spec": {"client": "2", "dependencies": self.environment_deps},
232+
}
233+
]
234+
job_spec.update(self.cluster_spec) # updates 'new_cluster' config
235+
223236
access_control_list = self.permission_builder.build_job_permissions(
224237
self.job_grants, self.acls
225238
)

tests/functional/adapter/python_model/fixtures.py

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,32 @@ def model(dbt, spark):
4242
identifier: source
4343
"""
4444

45+
serverless_schema_with_environment = """version: 2
46+
47+
models:
48+
- name: my_versioned_sql_model
49+
versions:
50+
- v: 1
51+
- name: my_python_model
52+
config:
53+
submission_method: serverless_cluster
54+
create_notebook: true
55+
environment_key: "test_key"
56+
environment_dependencies: ["requests"]
57+
58+
sources:
59+
- name: test_source
60+
loader: custom
61+
schema: "{{ var(env_var('DBT_TEST_SCHEMA_NAME_VARIABLE')) }}"
62+
quoting:
63+
identifier: True
64+
tags:
65+
- my_test_source_tag
66+
tables:
67+
- name: test_table
68+
identifier: source
69+
"""
70+
4571
workflow_schema = """version: 2
4672
4773
models:

tests/functional/adapter/python_model/test_python_model.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -113,6 +113,21 @@ def models(self):
113113
}
114114

115115

116+
@pytest.mark.python
117+
# @pytest.mark.skip_profile("databricks_cluster", "databricks_uc_cluster")
118+
@pytest.mark.skip("Not available in Databricks yet")
119+
class TestServerlessClusterWithEnvironment(BasePythonModelTests):
120+
@pytest.fixture(scope="class")
121+
def models(self):
122+
return {
123+
"schema.yml": override_fixtures.serverless_schema_with_environment,
124+
"my_sql_model.sql": fixtures.basic_sql,
125+
"my_versioned_sql_model_v1.sql": fixtures.basic_sql,
126+
"my_python_model.py": fixtures.basic_python,
127+
"second_sql_model.sql": fixtures.second_sql,
128+
}
129+
130+
116131
@pytest.mark.python
117132
@pytest.mark.external
118133
@pytest.mark.skip_profile("databricks_cluster", "databricks_uc_sql_endpoint")

tests/unit/python/test_python_job_support.py

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -146,8 +146,16 @@ def run_name(self, parsed_model):
146146
parsed_model.config.additional_libs = []
147147
return run_name
148148

149+
@pytest.fixture
150+
def environment_key(self, parsed_model):
151+
environment_key = "test_key"
152+
parsed_model.config.environment_key = environment_key
153+
parsed_model.config.environment_dependencies = ["requests"]
154+
return environment_key
155+
149156
def test_compile__empty_configs(self, client, permission_builder, parsed_model, run_name):
150157
parsed_model.config.python_job_config.dict.return_value = {}
158+
parsed_model.config.environment_key = None
151159
compiler = PythonJobConfigCompiler(client, permission_builder, parsed_model, {})
152160
permission_builder.build_job_permissions.return_value = []
153161
details = compiler.compile("path")
@@ -162,7 +170,9 @@ def test_compile__empty_configs(self, client, permission_builder, parsed_model,
162170
}
163171
assert details.additional_job_config == {}
164172

165-
def test_compile__nonempty_configs(self, client, permission_builder, parsed_model, run_name):
173+
def test_compile__nonempty_configs(
174+
self, client, permission_builder, parsed_model, run_name, environment_key
175+
):
166176
parsed_model.config.packages = ["foo"]
167177
parsed_model.config.index_url = None
168178
parsed_model.config.python_job_config.dict.return_value = {"foo": "bar"}
@@ -176,6 +186,7 @@ def test_compile__nonempty_configs(self, client, permission_builder, parsed_mode
176186
details = compiler.compile("path")
177187
assert details.run_name == run_name
178188
assert details.job_spec == {
189+
"environment_key": environment_key,
179190
"task_key": "inner_notebook",
180191
"notebook_task": {
181192
"notebook_path": "path",
@@ -185,4 +196,12 @@ def test_compile__nonempty_configs(self, client, permission_builder, parsed_mode
185196
"access_control_list": [{"user_name": "user", "permission_level": "IS_OWNER"}],
186197
"queue": {"enabled": True},
187198
}
188-
assert details.additional_job_config == {"foo": "bar"}
199+
assert details.additional_job_config == {
200+
"foo": "bar",
201+
"environments": [
202+
{
203+
"environment_key": environment_key,
204+
"spec": {"client": "2", "dependencies": ["requests"]},
205+
}
206+
],
207+
}

0 commit comments

Comments
 (0)