Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
142 changes: 134 additions & 8 deletions AGENTS.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This guide helps AI agents quickly understand and work productively with the dbt
- **What**: dbt adapter for Databricks Lakehouse platform
- **Based on**: dbt-spark adapter with Databricks-specific enhancements
- **Key Features**: Unity Catalog support, Delta Lake, Python models, streaming tables
- **Language**: Python 3.9+ with Jinja2 SQL macros
- **Language**: Python 3.10+ with Jinja2 SQL macros
- **Architecture**: Inherits from Spark adapter, extends with Databricks-specific functionality

### Essential Files to Understand
Expand All @@ -20,6 +20,7 @@ dbt/adapters/databricks/
├── connections.py # Connection management and SQL execution
├── credentials.py # Authentication (token, OAuth, Azure AD)
├── relation.py # Databricks-specific relation handling
├── dbr_capabilities.py # DBR version capability system
├── python_models/ # Python model execution on clusters
├── relation_configs/ # Table/view configuration management
└── catalogs/ # Unity Catalog vs Hive Metastore logic
Expand All @@ -33,24 +34,37 @@ dbt/include/databricks/macros/ # Jinja2 SQL templates

## 🛠 Development Environment

**Prerequisites**: Python 3.9+ installed on your system
**Prerequisites**: Python 3.10+ installed on your system

**Install Hatch** (recommended):

For Linux:

```bash
# Install Hatch globally - see https://hatch.pypa.io/dev/install/
pip install hatch
# Download and install standalone binary
curl -Lo hatch.tar.gz https://github.com/pypa/hatch/releases/latest/download/hatch-x86_64-unknown-linux-gnu.tar.gz
tar -xzf hatch.tar.gz
mkdir -p $HOME/bin
mv hatch $HOME/bin/hatch
chmod +x $HOME/bin/hatch
echo 'export PATH="$HOME/bin:$PATH"' >> ~/.zshrc
export PATH="$HOME/bin:$PATH"

# Create default environment (Hatch installs needed Python versions)
hatch env create
```

For other platforms: see https://hatch.pypa.io/latest/install/

**Essential commands**:

```bash
hatch run code-quality # Format, lint, type-check
hatch run unit # Run unit tests
hatch run cluster-e2e # Run functional tests

# For specific tests, use pytest directly:
hatch run pytest path/to/test_file.py::TestClass::test_method -v
```

> 📖 **See [Development Guide](docs/dbt-databricks-dev.md)** for comprehensive setup documentation
Expand Down Expand Up @@ -113,17 +127,38 @@ class TestCreateTable(MacroTestBase):

#### Functional Test Example

**Important**: SQL models and YAML schemas should be defined in a `fixtures.py` file in the same directory as the test, not inline in the test class. This keeps tests clean and fixtures reusable.

**fixtures.py:**

```python
my_model_sql = """
{{ config(materialized='incremental', unique_key='id') }}
select 1 as id, 'test' as name
"""

my_schema_yml = """
version: 2
models:
- name: my_model
columns:
- name: id
description: 'ID column'
"""
```

**test_my_feature.py:**

```python
from dbt.tests import util
from tests.functional.adapter.my_feature import fixtures

class TestIncrementalModel:
@pytest.fixture(scope="class")
def models(self):
return {
"my_model.sql": """
{{ config(materialized='incremental', unique_key='id') }}
select 1 as id, 'test' as name
"""
"my_model.sql": fixtures.my_model_sql,
"schema.yml": fixtures.my_schema_yml,
}

def test_incremental_run(self, project):
Expand All @@ -147,6 +182,46 @@ DatabricksAdapter (impl.py)

### Key Components

#### DBR Capability System (`dbr_capabilities.py`)

- **Purpose**: Centralized management of DBR version-dependent features
- **Key Features**:
- Per-compute caching (different clusters can have different capabilities)
- Named capabilities instead of magic version numbers
- Automatic detection of DBR version and SQL warehouse environments
- **Supported Capabilities**:
- `TIMESTAMPDIFF` (DBR 10.4+): Advanced date/time functions
- `INSERT_BY_NAME` (DBR 12.2+): Name-based column matching in INSERT
- `ICEBERG` (DBR 14.3+): Apache Iceberg table format
- `COMMENT_ON_COLUMN` (DBR 16.1+): Modern column comment syntax
- `JSON_COLUMN_METADATA` (DBR 16.2+): Efficient metadata retrieval
- **Usage in Code**:

```python
# In Python code
if adapter.has_capability(DBRCapability.ICEBERG):
# Use Iceberg features

# In Jinja macros
{% if adapter.has_dbr_capability('comment_on_column') %}
COMMENT ON COLUMN ...
{% else %}
ALTER TABLE ... ALTER COLUMN ...
{% endif %}

{% if adapter.has_dbr_capability('insert_by_name') %}
INSERT INTO table BY NAME SELECT ...
{% else %}
INSERT INTO table SELECT ... -- positional
{% endif %}
```

- **Adding New Capabilities**:
1. Add to `DBRCapability` enum
2. Add `CapabilitySpec` with version requirements
3. Use `has_capability()` or `require_capability()` in code
- **Important**: Each compute resource (identified by `http_path`) maintains its own capability cache

#### Connection Management (`connections.py`)

- Extends Spark connection manager for Databricks
Expand Down Expand Up @@ -184,6 +259,42 @@ DatabricksAdapter (impl.py)
- Override Spark macros with Databricks-specific logic
- Handle materializations (table, view, incremental, snapshot)
- Implement Databricks features (liquid clustering, column masks, tags)
- **Important**: To override a `spark__macro_name` macro, create `databricks__macro_name` (NOT `spark__macro_name`)

#### Multi-Statement SQL Execution

When a macro needs to execute multiple SQL statements (e.g., DELETE followed by INSERT), use the `execute_multiple_statements` helper:

**Pattern for Multi-Statement Strategies:**
```jinja
{% macro my_multi_statement_strategy(args) %}
{%- set statements = [] -%}

{#-- Build first statement --#}
{%- set statement1 -%}
DELETE FROM {{ target_relation }}
WHERE some_condition
{%- endset -%}
{%- do statements.append(statement1) -%}

{#-- Build second statement --#}
{%- set statement2 -%}
INSERT INTO {{ target_relation }}
SELECT * FROM {{ source_relation }}
{%- endset -%}
{%- do statements.append(statement2) -%}

{{- return(statements) -}}
{% endmacro %}
```

**How It Works:**
- Return a **list of SQL strings** from your strategy macro
- The incremental materialization automatically detects lists and calls `execute_multiple_statements()`
- Each statement executes separately via `{% call statement('main') %}`
- Used by: `delete+insert` incremental strategy (DBR < 17.1 fallback), materialized views, streaming tables

**Note:** Databricks SQL connector does NOT support semicolon-separated statements in a single execute call. Always return a list.

### Configuration System

Expand Down Expand Up @@ -256,6 +367,7 @@ Models can be configured with Databricks-specific options:

- **Development**: `docs/dbt-databricks-dev.md` - Setup and workflow
- **Testing**: `docs/testing.md` - Comprehensive testing guide
- **DBR Capabilities**: `docs/dbr-capability-system.md` - Version-dependent features
- **Contributing**: `CONTRIBUTING.MD` - Code standards and PR process
- **User Docs**: [docs.getdbt.com](https://docs.getdbt.com/reference/resource-configs/databricks-configs)

Expand All @@ -273,6 +385,11 @@ Models can be configured with Databricks-specific options:
3. **SQL Generation**: Prefer macros over Python string manipulation
4. **Testing**: Write both unit and functional tests for new features
5. **Configuration**: Use dataclasses with validation for new config options
6. **Imports**: Always import at the top of the file, never use local imports within functions or methods
7. **Version Checks**: Use capability system instead of direct version comparisons:
- ❌ `if adapter.compare_dbr_version(16, 1) >= 0:`
- ✅ `if adapter.has_capability(DBRCapability.COMMENT_ON_COLUMN):`
- ✅ `{% if adapter.has_dbr_capability('comment_on_column') %}`

## 🚨 Common Pitfalls for Agents

Expand All @@ -284,12 +401,21 @@ Models can be configured with Databricks-specific options:
6. **Follow SQL normalization** in test assertions with `assert_sql_equal()`
7. **Handle Unity Catalog vs HMS differences** in feature implementations
8. **Consider backward compatibility** when modifying existing behavior
9. **Use capability system for version checks** - Never add new `compare_dbr_version()` calls
10. **Remember per-compute caching** - Different clusters may have different capabilities in the same run
11. **Multi-statement SQL**: Don't use semicolons to separate statements - return a list instead and let `execute_multiple_statements()` handle it

## 🎯 Success Metrics

When working on this codebase, ensure:

- [ ] All tests pass (`hatch run code-quality && hatch run unit`)
- [ ] **CRITICAL: Run affected functional tests before declaring success**
- If you modified connection/capability logic: Run tests that use multiple computes or check capabilities
- If you modified incremental materializations: Run `tests/functional/adapter/incremental/`
- If you modified Python models: Run `tests/functional/adapter/python_model/`
- If you modified macros: Run tests that use those macros
- **NEVER declare "mission accomplished" without running functional tests for affected features**
- [ ] New features have both unit and functional tests
- [ ] SQL generation follows Databricks best practices
- [ ] Changes maintain backward compatibility
Expand Down
6 changes: 4 additions & 2 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -305,9 +305,11 @@ def list_schemas(self, database: Optional[str]) -> list[str]:

def check_schema_exists(self, database: Optional[str], schema: str) -> bool:
"""Check if a schema exists."""
return schema.lower() in set(
s.lower() for s in self.connections.list_schemas(database or "hive_metastore", schema)
results = self.execute_macro(
"databricks__check_schema_exists",
kwargs={"database": database or "hive_metastore", "schema": schema},
)
return len(results) > 0

def execute(
self,
Expand Down
21 changes: 21 additions & 0 deletions dbt/include/databricks/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,28 @@ SHOW TABLE EXTENDED IN {{ schema_relation.without_identifier()|lower }} LIKE '{{
{{ return(run_query_as(show_tables_sql(relation), 'show_tables')) }}
{% endmacro %}

{% macro databricks__list_schemas(database) -%}
{{ return(run_query_as(list_schemas_sql(database), 'list_schemas')) }}
{% endmacro %}

{% macro list_schemas_sql(database) %}
{% if database %}
SHOW SCHEMAS IN {{ database }}
{% else %}
SHOW SCHEMAS
{% endif %}
{% endmacro %}

{% macro databricks__check_schema_exists(database, schema) %}
{{ return(run_query_as(check_schema_exists_sql(database, schema), 'check_schema_exists')) }}
{% endmacro %}

{% macro check_schema_exists_sql(database, schema) %}
SHOW SCHEMAS IN {{ database }} LIKE '{{ schema }}'
{% endmacro %}

{% macro show_tables_sql(relation) %}

SHOW TABLES IN {{ relation.render() }}
{% endmacro %}

Expand Down
9 changes: 1 addition & 8 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -63,14 +63,8 @@ check-sdist = [
]

[tool.hatch.envs.default]
pre-install-commands = [
"pip install git+https://github.com/dbt-labs/[email protected]",
"pip install git+https://github.com/dbt-labs/[email protected]#subdirectory=dbt-adapters",
"pip install git+https://github.com/dbt-labs/[email protected]#subdirectory=dbt-tests-adapter",
"pip install git+https://github.com/dbt-labs/[email protected]#subdirectory=core",
]
dependencies = [
"dbt-spark @ git+https://github.com/dbt-labs/[email protected]#subdirectory=dbt-spark",
"dbt-tests-adapter",
"pytest",
"pytest-xdist",
"pytest-dotenv",
Expand All @@ -83,7 +77,6 @@ dependencies = [
"pydantic>=1.10.0, <2",
"pytest-cov",
]
path = ".hatch"
python = "3.9"

[tool.hatch.envs.default.scripts]
Expand Down
57 changes: 57 additions & 0 deletions tests/functional/adapter/basic/test_check_schema_exists.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
import pytest


class TestCheckSchemaExists:
"""Test the check_schema_exists adapter method."""

@pytest.fixture(scope="class", autouse=True)
def setUp(self, project):
"""Create a test schema and clean it up after tests."""
test_schema = f"{project.test_schema}_check_exists"

with project.adapter.connection_named("__test"):
relation = project.adapter.Relation.create(
database=project.database,
schema=test_schema,
)
# Drop if exists from previous run
project.adapter.drop_schema(relation)
# Create the test schema
project.adapter.create_schema(relation)

yield test_schema

# Cleanup
with project.adapter.connection_named("__test"):
project.adapter.drop_schema(relation)

def test_check_schema_exists(self, project, setUp):
"""Test that check_schema_exists correctly identifies existing and non-existing schemas."""
test_schema = setUp

with project.adapter.connection_named("__test"):
# Test 1: Verify existing schema returns True
exists = project.adapter.check_schema_exists(
database=project.database, schema=test_schema
)
assert (
exists is True
), f"Expected schema '{test_schema}' to exist but check returned False"

# Test 2: Verify non-existing schema returns False
non_existent_schema = "this_schema_definitely_does_not_exist_12345"
exists = project.adapter.check_schema_exists(
database=project.database, schema=non_existent_schema
)
assert (
exists is False
), f"Expected schema '{non_existent_schema}' to not exist but check returned True"

# Test 3: Verify existing default schema returns True (should always exist)
exists = project.adapter.check_schema_exists(
database=project.database, schema=project.test_schema
)
assert exists is True, (
f"Expected default test schema '{project.test_schema}' "
"to exist but check returned False"
)
Loading