Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions elementary/clients/dbt/command_line_dbt_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import yaml

from elementary.clients.dbt.base_dbt_runner import BaseDbtRunner
from elementary.clients.dbt.databricks_patch import apply_databricks_patch
from elementary.clients.dbt.dbt_log import parse_dbt_output
from elementary.exceptions.exceptions import DbtCommandError, DbtLsCommandError
from elementary.monitor.dbt_project_utils import is_dbt_package_up_to_date
Expand All @@ -28,6 +29,8 @@ class DbtCommandResult:


class CommandLineDbtRunner(BaseDbtRunner):
_dbx_patch_applied = False

def __init__(
self,
project_dir: str,
Expand All @@ -51,6 +54,11 @@ def __init__(
)
self.raise_on_failure = raise_on_failure
self.env_vars = env_vars

# Apply databricks compatibility patch for version 1.10.2 only once
if not CommandLineDbtRunner._dbx_patch_applied:
CommandLineDbtRunner._dbx_patch_applied = apply_databricks_patch()

if force_dbt_deps:
self.deps()
elif run_deps_if_needed:
Expand Down
124 changes: 124 additions & 0 deletions elementary/clients/dbt/databricks_patch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
"""Databricks compatibility patch for dbt-databricks 1.10.2."""
import logging
from typing import Any, Union

logger = logging.getLogger(__name__)


def is_unsupported_object(model: Any) -> bool:
"""Check if the object is a Macro or other unsupported type"""
return hasattr(model, "__class__") and "Macro" in str(model.__class__)
Comment on lines +8 to +10
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Use more robust type checking instead of string-based detection.

The current approach of checking for "Macro" in the string representation of the class is fragile and could break with different Python versions or object representations.

def is_unsupported_object(model: Any) -> bool:
-    """Check if the object is a Macro or other unsupported type"""
-    return hasattr(model, "__class__") and "Macro" in str(model.__class__)
+    """Check if the object is a Macro or other unsupported type"""
+    try:
+        # Check for specific dbt Macro types
+        return (hasattr(model, "__class__") and 
+                (model.__class__.__name__ == "Macro" or 
+                 "Macro" in [cls.__name__ for cls in model.__class__.__mro__]))
+    except (AttributeError, TypeError):
+        return False
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def is_unsupported_object(model: Any) -> bool:
"""Check if the object is a Macro or other unsupported type"""
return hasattr(model, "__class__") and "Macro" in str(model.__class__)
def is_unsupported_object(model: Any) -> bool:
"""Check if the object is a Macro or other unsupported type"""
try:
# Check for specific dbt Macro types
return (hasattr(model, "__class__") and
(model.__class__.__name__ == "Macro" or
"Macro" in [cls.__name__ for cls in model.__class__.__mro__]))
except (AttributeError, TypeError):
return False
🤖 Prompt for AI Agents
In elementary/clients/dbt/databricks_patch.py around lines 8 to 10, replace the
fragile string-based check for "Macro" in the class name with a more robust type
checking method. Use isinstance or check the class directly against the known
Macro type or its base class to determine if the object is a Macro, ensuring
compatibility across Python versions and avoiding reliance on string
representations.



def safe_catalog_name(model: Any) -> str:
try:
if is_unsupported_object(model):
logger.debug(
"Received unsupported object type for catalog_name, using unity as default"
)
return "unity"
# Handle RelationConfig objects
if hasattr(model, "config") and model.config and hasattr(model.config, "get"):
catalog = model.config.get("catalog")
if catalog:
return catalog
# Fallback to unity catalog
return "unity"
except Exception as e:
logger.debug(
f"Failed to parse catalog name from model: {e}. Using unity as default."
)
return "unity"


def safe_file_format(model: Any) -> Union[str, None]:
try:
if is_unsupported_object(model):
return None
return safe_get(model, "file_format")
except Exception as e:
logger.debug(f"Failed to get file_format from model: {e}")
return None


def safe_location_path(model: Any) -> Union[str, None]:
try:
if is_unsupported_object(model):
return None
if not hasattr(model, "config") or not model.config:
return None
if model.config.get("include_full_name_in_path"):
return f"{model.database}/{model.schema}/{model.identifier}"
return model.identifier if hasattr(model, "identifier") else None
except Exception as e:
logger.debug(f"Failed to get location_path from model: {e}")
return None


def safe_location_root(model: Any) -> Union[str, None]:
try:
if is_unsupported_object(model):
return None
return safe_get(model, "location_root")
except Exception as e:
logger.debug(f"Failed to get location_root from model: {e}")
return None


def safe_table_format(model: Any) -> Union[str, None]:
try:
if is_unsupported_object(model):
return None
return safe_get(model, "table_format")
except Exception as e:
logger.debug(f"Failed to get table_format from model: {e}")
return None


def safe_get(
model: Any, setting: str, case_sensitive: Union[bool, None] = False
) -> Union[str, None]:
try:
if is_unsupported_object(model):
return None
# Check if model has config attribute
if not hasattr(model, "config") or not model.config:
return None
# Check if config has get method
if not hasattr(model.config, "get"):
return None
value = model.config.get(setting)
if value:
return value if case_sensitive else value.lower()
return None
except Exception as e:
logger.debug(f"Failed to get {setting} from model config: {e}")
return None
Comment on lines +78 to +96
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue

Fix inconsistent parameter usage in safe_get function.

The case_sensitive parameter defaults to False but is used in a way that suggests it should default to True or None. When case_sensitive=False, the function calls .lower() on the value, but this could cause issues if the value is not a string.

def safe_get(
-    model: Any, setting: str, case_sensitive: Union[bool, None] = False
+    model: Any, setting: str, case_sensitive: bool = True
 ) -> Union[str, None]:
     try:
         if is_unsupported_object(model):
             return None
         # Check if model has config attribute
         if not hasattr(model, "config") or not model.config:
             return None
         # Check if config has get method
         if not hasattr(model.config, "get"):
             return None
         value = model.config.get(setting)
         if value:
-            return value if case_sensitive else value.lower()
+            if isinstance(value, str) and not case_sensitive:
+                return value.lower()
+            return str(value) if value is not None else None
         return None
     except Exception as e:
         logger.debug(f"Failed to get {setting} from model config: {e}")
         return None
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
def safe_get(
model: Any, setting: str, case_sensitive: Union[bool, None] = False
) -> Union[str, None]:
try:
if is_unsupported_object(model):
return None
# Check if model has config attribute
if not hasattr(model, "config") or not model.config:
return None
# Check if config has get method
if not hasattr(model.config, "get"):
return None
value = model.config.get(setting)
if value:
return value if case_sensitive else value.lower()
return None
except Exception as e:
logger.debug(f"Failed to get {setting} from model config: {e}")
return None
def safe_get(
model: Any, setting: str, case_sensitive: bool = True
) -> Union[str, None]:
try:
if is_unsupported_object(model):
return None
# Check if model has config attribute
if not hasattr(model, "config") or not model.config:
return None
# Check if config has get method
if not hasattr(model.config, "get"):
return None
value = model.config.get(setting)
if value:
if isinstance(value, str) and not case_sensitive:
return value.lower()
return str(value) if value is not None else None
return None
except Exception as e:
logger.debug(f"Failed to get {setting} from model config: {e}")
return None
🤖 Prompt for AI Agents
In elementary/clients/dbt/databricks_patch.py between lines 78 and 96, the
safe_get function uses the case_sensitive parameter inconsistently by defaulting
it to False and then calling .lower() on the value, which may not be a string.
To fix this, change the default of case_sensitive to True or None to reflect
that values should be returned as-is by default, and add a type check before
calling .lower() to ensure the value is a string when case_sensitive is False.



def apply_databricks_patch() -> bool:
"""Apply monkey patch to fix dbt-databricks 1.10.2 compatibility issues.

Returns:
bool: True if patch was successfully applied, False otherwise.
"""
try:
from dbt.adapters.databricks import parse_model # type: ignore

# Replace problematic functions with safe versions
parse_model.catalog_name = safe_catalog_name
parse_model.file_format = safe_file_format
parse_model.location_path = safe_location_path
parse_model.location_root = safe_location_root
parse_model.table_format = safe_table_format
parse_model._get = safe_get

logger.debug("Applied dbt-databricks 1.10.2 compatibility patch")
return True

except ImportError:
# parse_model module doesn't exist in older versions
return False
except Exception as e:
logger.debug(f"Failed to apply dbt-databricks compatibility patch: {e}")
return False