Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 46 additions & 14 deletions dbt/adapters/databricks/impl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@
from concurrent.futures import Future
from contextlib import contextmanager
from dataclasses import dataclass

from importlib import metadata
from multiprocessing.context import SpawnContext
from typing import TYPE_CHECKING, Any, ClassVar, Generic, Optional, Union, cast
from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, Optional, Union, cast
from uuid import uuid4

from dbt_common.behavior_flags import BehaviorFlag
Expand All @@ -20,6 +21,7 @@

from dbt.adapters.base import AdapterConfig, PythonJobHelper
from dbt.adapters.base.impl import catch_as_completed, log_code_execution

from dbt.adapters.base.meta import available
from dbt.adapters.base.relation import BaseRelation
from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support
Expand Down Expand Up @@ -101,6 +103,14 @@
) # type: ignore[typeddict-item]


class DatabricksRelationInfo(NamedTuple):
table_name: str
table_type: str
file_format: Optional[str]
table_owner: Optional[str]
databricks_table_type: Optional[str]


@dataclass
class DatabricksConfig(AdapterConfig):
file_format: str = "delta"
Expand Down Expand Up @@ -294,23 +304,31 @@ def execute(
def list_relations_without_caching( # type: ignore[override]
self, schema_relation: DatabricksRelation
) -> list[DatabricksRelation]:
empty: list[tuple[Optional[str], Optional[str], Optional[str], Optional[str]]] = []
empty: list[DatabricksRelationInfo] = []

results = handle_missing_objects(
lambda: self.get_relations_without_caching(schema_relation), empty
)

relations = []
for row in results:
name, kind, file_format, owner = row
name, kind, file_format, owner, table_type = row
metadata = None
if file_format:
metadata = {KEY_TABLE_OWNER: owner, KEY_TABLE_PROVIDER: file_format}

if table_type:
databricks_table_type = DatabricksRelation.get_databricks_table_type(table_type)
else:
databricks_table_type = None

relations.append(
self.Relation.create(
database=schema_relation.database,
schema=schema_relation.schema,
identifier=name,
type=self.Relation.get_relation_type(kind),
databricks_table_type=databricks_table_type,
metadata=metadata,
is_delta=file_format == "delta",
)
Expand All @@ -320,24 +338,28 @@ def list_relations_without_caching( # type: ignore[override]

def get_relations_without_caching(
self, relation: DatabricksRelation
) -> list[tuple[Optional[str], Optional[str], Optional[str], Optional[str]]]:
) -> list[DatabricksRelationInfo]:

if relation.is_hive_metastore():
return self._get_hive_relations(relation)
return self._get_uc_relations(relation)

def _get_uc_relations(
self, relation: DatabricksRelation
) -> list[tuple[Optional[str], Optional[str], Optional[str], Optional[str]]]:

def _get_uc_relations(self, relation: DatabricksRelation) -> list[DatabricksRelationInfo]:
kwargs = {"relation": relation}
results = self.execute_macro("get_uc_tables", kwargs=kwargs)
return [
(row["table_name"], row["table_type"], row["file_format"], row["table_owner"])
DatabricksRelationInfo(
row["table_name"],
row["table_type"],
row["file_format"],
row["table_owner"],
row["databricks_table_type"],
)
for row in results
]

def _get_hive_relations(
self, relation: DatabricksRelation
) -> list[tuple[Optional[str], Optional[str], Optional[str], Optional[str]]]:
def _get_hive_relations(self, relation: DatabricksRelation) -> list[DatabricksRelationInfo]:
kwargs = {"relation": relation}

new_rows: list[tuple[str, Optional[str]]]
Expand All @@ -351,8 +373,8 @@ def _get_hive_relations(
for row in tables:
# list_tables returns TABLE_TYPE as view for both materialized views and for
# streaming tables. Set type to "" in this case and it will be resolved below.
type = row["TABLE_TYPE"].lower() if row["TABLE_TYPE"] else None
row = (row["TABLE_NAME"], type)
rel_type = row["TABLE_TYPE"].lower() if row["TABLE_TYPE"] else None
row = (row["TABLE_NAME"], rel_type)
new_rows.append(row)

else:
Expand All @@ -369,7 +391,16 @@ def _get_hive_relations(
for row in new_rows
]

return [(row[0], row[1], None, None) for row in new_rows]
return [
DatabricksRelationInfo(
row[0],
row[1], # type: ignore[arg-type]
None,
None,
None,
)
for row in new_rows
]

@available.parse(lambda *a, **k: [])
def get_column_schema_from_query(self, sql: str) -> list[DatabricksColumn]:
Expand Down Expand Up @@ -463,6 +494,7 @@ def _get_updated_relation(
schema=relation.schema,
identifier=relation.identifier,
type=relation.type, # type: ignore
databricks_table_type=relation.databricks_table_type,
metadata=metadata,
is_delta=metadata.get(KEY_TABLE_PROVIDER) == "delta",
),
Expand Down
18 changes: 18 additions & 0 deletions dbt/adapters/databricks/relation.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,20 @@ class DatabricksRelationType(StrEnum):
MaterializedView = "materialized_view"
Foreign = "foreign"
StreamingTable = "streaming_table"
MetricView = "metric_view"
External = "external"
ManagedShallowClone = "managed_shallow_clone"
ExternalShallowClone = "external_shallow_clone"
Unknown = "unknown"


class DatabricksTableType(StrEnum):
External = "external"
Managed = "managed"
ManagedShallowClone = "managed_shallow_clone"
ExternalShallowClone = "external_shallow_clone"


@dataclass(frozen=True, eq=False, repr=False)
class DatabricksInformationSchema(InformationSchema):
quote_policy: Policy = field(default_factory=lambda: DatabricksQuotePolicy())
Expand All @@ -63,6 +71,8 @@ class DatabricksRelation(BaseRelation):
is_delta: Optional[bool] = None
metadata: Optional[dict[str, Any]] = None

databricks_table_type: Optional[DatabricksTableType] = None

@classmethod
def __pre_deserialize__(cls, data: dict[Any, Any]) -> dict[Any, Any]:
data = super().__pre_deserialize__(data)
Expand All @@ -85,6 +95,10 @@ def is_materialized_view(self) -> bool:
@property
def is_streaming_table(self) -> bool:
return self.type == DatabricksRelationType.StreamingTable

@property
def is_external_table(self) -> bool:
return self.databricks_table_type == DatabricksTableType.External

@property
def is_hudi(self) -> bool:
Expand Down Expand Up @@ -130,6 +144,10 @@ def matches(
@classproperty
def get_relation_type(cls) -> Type[DatabricksRelationType]: # noqa
return DatabricksRelationType

@classproperty
def get_databricks_table_type(cls) -> Type[DatabricksTableType]: # noqa
return DatabricksTableType

def information_schema(self, view_name: Optional[str] = None) -> InformationSchema:
# some of our data comes from jinja, where things can be `Undefined`.
Expand Down
14 changes: 12 additions & 2 deletions dbt/include/databricks/macros/adapters/metadata.sql
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,17 @@
table_name,
if(table_type in ('EXTERNAL', 'MANAGED', 'MANAGED_SHALLOW_CLONE', 'EXTERNAL_SHALLOW_CLONE'), 'table', lower(table_type)) as table_type,
lower(data_source_format) as file_format,
table_owner
table_owner,
if(
table_type IN (
'EXTERNAL',
'MANAGED',
'MANAGED_SHALLOW_CLONE',
'EXTERNAL_SHALLOW_CLONE'
),
lower(table_type),
NULL
) AS databricks_table_type
from `system`.`information_schema`.`tables`
where table_catalog = '{{ relation.database|lower }}'
and table_schema = '{{ relation.schema|lower }}'
Expand All @@ -100,4 +110,4 @@
{% endcall %}

{% do return(load_result('get_uc_tables').table) %}
{% endmacro %}
{% endmacro %}
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,6 @@
{{ return(True) }}
{% endmacro %}

{% macro databricks__create_or_replace_clone(this_relation, defer_relation) %}
create or replace
table {{ this_relation }}
shallow clone {{ defer_relation }}
{% endmacro %}

{%- materialization clone, adapter='databricks' -%}

Expand Down Expand Up @@ -43,9 +38,15 @@
{% endif %}

-- as a general rule, data platforms that can clone tables can also do atomic 'create or replace'
{% call statement('main') %}
{{ create_or_replace_clone(target_relation, defer_relation) }}
{% endcall %}
{% if other_existing_relation.is_external_table %}
{% call statement('main') %}
{{ create_or_replace_clone_external(target_relation, defer_relation) }}
{% endcall %}
{% else %}
{% call statement('main') %}
{{ create_or_replace_clone(target_relation, defer_relation) }}
{% endcall %}
{% endif %}

{% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %}
{% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{% macro databricks__create_or_replace_clone(this_relation, defer_relation) %}
create or replace
table {{ this_relation.render() }}
shallow clone {{ defer_relation.render() }}
{% endmacro %}

{% macro create_or_replace_clone_external(this_relation, defer_relation) %}
{%- set catalog_relation = adapter.build_catalog_relation(config.model) -%}

create or replace
table {{ this_relation.render() }}
shallow clone {{ defer_relation.render() }}
{{ location_clause(catalog_relation) }}

{% endmacro %}
Loading