diff --git a/dbt/adapters/databricks/impl.py b/dbt/adapters/databricks/impl.py index 21285bbb3..f25376a4b 100644 --- a/dbt/adapters/databricks/impl.py +++ b/dbt/adapters/databricks/impl.py @@ -6,9 +6,10 @@ from concurrent.futures import Future from contextlib import contextmanager from dataclasses import dataclass + from importlib import metadata from multiprocessing.context import SpawnContext -from typing import TYPE_CHECKING, Any, ClassVar, Generic, Optional, Union, cast +from typing import TYPE_CHECKING, Any, ClassVar, Generic, NamedTuple, Optional, Union, cast from uuid import uuid4 from dbt_common.behavior_flags import BehaviorFlag @@ -20,6 +21,7 @@ from dbt.adapters.base import AdapterConfig, PythonJobHelper from dbt.adapters.base.impl import catch_as_completed, log_code_execution + from dbt.adapters.base.meta import available from dbt.adapters.base.relation import BaseRelation from dbt.adapters.capability import Capability, CapabilityDict, CapabilitySupport, Support @@ -101,6 +103,14 @@ ) # type: ignore[typeddict-item] +class DatabricksRelationInfo(NamedTuple): + table_name: str + table_type: str + file_format: Optional[str] + table_owner: Optional[str] + databricks_table_type: Optional[str] + + @dataclass class DatabricksConfig(AdapterConfig): file_format: str = "delta" @@ -294,23 +304,31 @@ def execute( def list_relations_without_caching( # type: ignore[override] self, schema_relation: DatabricksRelation ) -> list[DatabricksRelation]: - empty: list[tuple[Optional[str], Optional[str], Optional[str], Optional[str]]] = [] + empty: list[DatabricksRelationInfo] = [] + results = handle_missing_objects( lambda: self.get_relations_without_caching(schema_relation), empty ) relations = [] for row in results: - name, kind, file_format, owner = row + name, kind, file_format, owner, table_type = row metadata = None if file_format: metadata = {KEY_TABLE_OWNER: owner, KEY_TABLE_PROVIDER: file_format} + + if table_type: + databricks_table_type = DatabricksRelation.get_databricks_table_type(table_type) + else: + databricks_table_type = None + relations.append( self.Relation.create( database=schema_relation.database, schema=schema_relation.schema, identifier=name, type=self.Relation.get_relation_type(kind), + databricks_table_type=databricks_table_type, metadata=metadata, is_delta=file_format == "delta", ) @@ -320,24 +338,28 @@ def list_relations_without_caching( # type: ignore[override] def get_relations_without_caching( self, relation: DatabricksRelation - ) -> list[tuple[Optional[str], Optional[str], Optional[str], Optional[str]]]: + ) -> list[DatabricksRelationInfo]: + if relation.is_hive_metastore(): return self._get_hive_relations(relation) return self._get_uc_relations(relation) - def _get_uc_relations( - self, relation: DatabricksRelation - ) -> list[tuple[Optional[str], Optional[str], Optional[str], Optional[str]]]: + + def _get_uc_relations(self, relation: DatabricksRelation) -> list[DatabricksRelationInfo]: kwargs = {"relation": relation} results = self.execute_macro("get_uc_tables", kwargs=kwargs) return [ - (row["table_name"], row["table_type"], row["file_format"], row["table_owner"]) + DatabricksRelationInfo( + row["table_name"], + row["table_type"], + row["file_format"], + row["table_owner"], + row["databricks_table_type"], + ) for row in results ] - def _get_hive_relations( - self, relation: DatabricksRelation - ) -> list[tuple[Optional[str], Optional[str], Optional[str], Optional[str]]]: + def _get_hive_relations(self, relation: DatabricksRelation) -> list[DatabricksRelationInfo]: kwargs = {"relation": relation} new_rows: list[tuple[str, Optional[str]]] @@ -351,8 +373,8 @@ def _get_hive_relations( for row in tables: # list_tables returns TABLE_TYPE as view for both materialized views and for # streaming tables. Set type to "" in this case and it will be resolved below. - type = row["TABLE_TYPE"].lower() if row["TABLE_TYPE"] else None - row = (row["TABLE_NAME"], type) + rel_type = row["TABLE_TYPE"].lower() if row["TABLE_TYPE"] else None + row = (row["TABLE_NAME"], rel_type) new_rows.append(row) else: @@ -369,7 +391,16 @@ def _get_hive_relations( for row in new_rows ] - return [(row[0], row[1], None, None) for row in new_rows] + return [ + DatabricksRelationInfo( + row[0], + row[1], # type: ignore[arg-type] + None, + None, + None, + ) + for row in new_rows + ] @available.parse(lambda *a, **k: []) def get_column_schema_from_query(self, sql: str) -> list[DatabricksColumn]: @@ -463,6 +494,7 @@ def _get_updated_relation( schema=relation.schema, identifier=relation.identifier, type=relation.type, # type: ignore + databricks_table_type=relation.databricks_table_type, metadata=metadata, is_delta=metadata.get(KEY_TABLE_PROVIDER) == "delta", ), diff --git a/dbt/adapters/databricks/relation.py b/dbt/adapters/databricks/relation.py index 8518b7aee..b28487f71 100644 --- a/dbt/adapters/databricks/relation.py +++ b/dbt/adapters/databricks/relation.py @@ -38,12 +38,20 @@ class DatabricksRelationType(StrEnum): MaterializedView = "materialized_view" Foreign = "foreign" StreamingTable = "streaming_table" + MetricView = "metric_view" External = "external" ManagedShallowClone = "managed_shallow_clone" ExternalShallowClone = "external_shallow_clone" Unknown = "unknown" +class DatabricksTableType(StrEnum): + External = "external" + Managed = "managed" + ManagedShallowClone = "managed_shallow_clone" + ExternalShallowClone = "external_shallow_clone" + + @dataclass(frozen=True, eq=False, repr=False) class DatabricksInformationSchema(InformationSchema): quote_policy: Policy = field(default_factory=lambda: DatabricksQuotePolicy()) @@ -63,6 +71,8 @@ class DatabricksRelation(BaseRelation): is_delta: Optional[bool] = None metadata: Optional[dict[str, Any]] = None + databricks_table_type: Optional[DatabricksTableType] = None + @classmethod def __pre_deserialize__(cls, data: dict[Any, Any]) -> dict[Any, Any]: data = super().__pre_deserialize__(data) @@ -85,6 +95,10 @@ def is_materialized_view(self) -> bool: @property def is_streaming_table(self) -> bool: return self.type == DatabricksRelationType.StreamingTable + + @property + def is_external_table(self) -> bool: + return self.databricks_table_type == DatabricksTableType.External @property def is_hudi(self) -> bool: @@ -130,6 +144,10 @@ def matches( @classproperty def get_relation_type(cls) -> Type[DatabricksRelationType]: # noqa return DatabricksRelationType + + @classproperty + def get_databricks_table_type(cls) -> Type[DatabricksTableType]: # noqa + return DatabricksTableType def information_schema(self, view_name: Optional[str] = None) -> InformationSchema: # some of our data comes from jinja, where things can be `Undefined`. diff --git a/dbt/include/databricks/macros/adapters/metadata.sql b/dbt/include/databricks/macros/adapters/metadata.sql index a63cbb753..4d4d5c8af 100644 --- a/dbt/include/databricks/macros/adapters/metadata.sql +++ b/dbt/include/databricks/macros/adapters/metadata.sql @@ -90,7 +90,17 @@ table_name, if(table_type in ('EXTERNAL', 'MANAGED', 'MANAGED_SHALLOW_CLONE', 'EXTERNAL_SHALLOW_CLONE'), 'table', lower(table_type)) as table_type, lower(data_source_format) as file_format, - table_owner + table_owner, + if( + table_type IN ( + 'EXTERNAL', + 'MANAGED', + 'MANAGED_SHALLOW_CLONE', + 'EXTERNAL_SHALLOW_CLONE' + ), + lower(table_type), + NULL + ) AS databricks_table_type from `system`.`information_schema`.`tables` where table_catalog = '{{ relation.database|lower }}' and table_schema = '{{ relation.schema|lower }}' @@ -100,4 +110,4 @@ {% endcall %} {% do return(load_result('get_uc_tables').table) %} -{% endmacro %} \ No newline at end of file +{% endmacro %} diff --git a/dbt/include/databricks/macros/materializations/clone.sql b/dbt/include/databricks/macros/materializations/clone/clone.sql similarity index 86% rename from dbt/include/databricks/macros/materializations/clone.sql rename to dbt/include/databricks/macros/materializations/clone/clone.sql index 74cb11fdf..c9a283e12 100644 --- a/dbt/include/databricks/macros/materializations/clone.sql +++ b/dbt/include/databricks/macros/materializations/clone/clone.sql @@ -2,11 +2,6 @@ {{ return(True) }} {% endmacro %} -{% macro databricks__create_or_replace_clone(this_relation, defer_relation) %} - create or replace - table {{ this_relation }} - shallow clone {{ defer_relation }} -{% endmacro %} {%- materialization clone, adapter='databricks' -%} @@ -43,9 +38,15 @@ {% endif %} -- as a general rule, data platforms that can clone tables can also do atomic 'create or replace' - {% call statement('main') %} - {{ create_or_replace_clone(target_relation, defer_relation) }} - {% endcall %} + {% if other_existing_relation.is_external_table %} + {% call statement('main') %} + {{ create_or_replace_clone_external(target_relation, defer_relation) }} + {% endcall %} + {% else %} + {% call statement('main') %} + {{ create_or_replace_clone(target_relation, defer_relation) }} + {% endcall %} + {% endif %} {% set should_revoke = should_revoke(existing_relation, full_refresh_mode=True) %} {% do apply_grants(target_relation, grant_config, should_revoke=should_revoke) %} diff --git a/dbt/include/databricks/macros/materializations/clone/strategies.sql b/dbt/include/databricks/macros/materializations/clone/strategies.sql new file mode 100644 index 000000000..5beb7c854 --- /dev/null +++ b/dbt/include/databricks/macros/materializations/clone/strategies.sql @@ -0,0 +1,15 @@ +{% macro databricks__create_or_replace_clone(this_relation, defer_relation) %} + create or replace + table {{ this_relation.render() }} + shallow clone {{ defer_relation.render() }} +{% endmacro %} + +{% macro create_or_replace_clone_external(this_relation, defer_relation) %} + {%- set catalog_relation = adapter.build_catalog_relation(config.model) -%} + + create or replace + table {{ this_relation.render() }} + shallow clone {{ defer_relation.render() }} + {{ location_clause(catalog_relation) }} + +{% endmacro %} \ No newline at end of file diff --git a/tests/unit/macros/adapters/test_metadata_macros.py b/tests/unit/macros/adapters/test_metadata_macros.py new file mode 100644 index 000000000..1b8741fd0 --- /dev/null +++ b/tests/unit/macros/adapters/test_metadata_macros.py @@ -0,0 +1,188 @@ +from unittest.mock import Mock + +import pytest + +from tests.unit.macros.base import MacroTestBase + + +class TestMetadataMacros(MacroTestBase): + @pytest.fixture(scope="class") + def template_name(self) -> str: + return "adapters/metadata.sql" + + @pytest.fixture(scope="class") + def macro_folders_to_load(self) -> list: + return ["macros", "macros/adapters"] + + @pytest.fixture + def mock_schema_relation(self): + relation = Mock() + relation.database = "test_db" + relation.schema = "test_schema" + relation.render = Mock(return_value="`test_db`.`test_schema`") + return relation + + @pytest.fixture + def mock_information_schema(self): + info_schema = Mock() + info_schema.database = "test_db" + info_schema.is_hive_metastore = Mock(return_value=False) + return info_schema + + @pytest.fixture + def mock_relations_list(self): + relation1 = Mock() + relation1.schema = "test_schema1" + relation1.identifier = "test_table1" + + relation2 = Mock() + relation2.schema = "test_schema2" + relation2.identifier = "test_table2" + + return [relation1, relation2] + + def test_show_table_extended_sql(self, template_bundle, relation): + result = self.run_macro(template_bundle.template, "show_table_extended_sql", relation) + + expected_sql = "SHOW TABLE EXTENDED IN `some_database`.`some_schema` LIKE 'some_table'" + self.assert_sql_equal(result, expected_sql) + + def test_show_tables_sql(self, template_bundle, mock_schema_relation): + result = self.run_macro(template_bundle.template, "show_tables_sql", mock_schema_relation) + + expected_sql = "SHOW TABLES IN `test_db`.`test_schema`" + self.assert_sql_equal(result, expected_sql) + + def test_show_views_sql(self, template_bundle, mock_schema_relation): + result = self.run_macro(template_bundle.template, "show_views_sql", mock_schema_relation) + + expected_sql = "SHOW VIEWS IN `test_db`.`test_schema`" + self.assert_sql_equal(result, expected_sql) + + def test_get_relation_last_modified_sql_unity_catalog( + self, context, template_bundle, mock_information_schema, mock_relations_list + ): + context["current_timestamp"] = Mock(return_value="current_timestamp()") + mock_information_schema.is_hive_metastore.return_value = False + + result = self.run_macro( + template_bundle.template, + "get_relation_last_modified_sql", + mock_information_schema, + mock_relations_list, + ) + + expected_sql = """ + SELECT + table_schema AS schema, + table_name AS identifier, + last_altered AS last_modified, + current_timestamp() AS snapshotted_at + FROM `system`.`information_schema`.`tables` + WHERE table_catalog = 'test_db' + AND ( + (table_schema = 'test_schema1' AND table_name = 'test_table1') OR + (table_schema = 'test_schema2' AND table_name = 'test_table2') + ) + """ + self.assert_sql_equal(result, expected_sql) + + def test_get_relation_last_modified_sql_hive_metastore( + self, context, template_bundle, mock_information_schema, mock_relations_list + ): + context["current_timestamp"] = Mock(return_value="current_timestamp()") + mock_information_schema.is_hive_metastore.return_value = True + + result = self.run_macro( + template_bundle.template, + "get_relation_last_modified_sql", + mock_information_schema, + mock_relations_list, + ) + + expected_sql = """ + SELECT + 'test_schema1' AS schema, + 'test_table1' AS identifier, + max(timestamp) AS last_modified, + current_timestamp() AS snapshotted_at + FROM (DESCRIBE HISTORY test_schema1.test_table1) + UNION ALL + SELECT + 'test_schema2' AS schema, + 'test_table2' AS identifier, + max(timestamp) AS last_modified, + current_timestamp() AS snapshotted_at + FROM (DESCRIBE HISTORY test_schema2.test_table2) + """ + self.assert_sql_equal(result, expected_sql) + + def test_get_view_description_sql(self, template_bundle, relation): + result = self.run_macro(template_bundle.template, "get_view_description_sql", relation) + + expected_sql = """ + SELECT * + FROM `system`.`information_schema`.`views` + WHERE table_catalog = 'some_database' + AND table_schema = 'some_schema' + AND table_name = 'some_table' + """ + self.assert_sql_equal(result, expected_sql) + + def test_get_uc_tables_sql_with_identifier(self, template_bundle, relation): + result = self.run_macro(template_bundle.template, "get_uc_tables_sql", relation) + + expected_sql = """ + SELECT + table_name, + if(table_type IN ('EXTERNAL', 'MANAGED', 'MANAGED_SHALLOW_CLONE', 'EXTERNAL_SHALLOW_CLONE'), 'table', lower(table_type)) AS table_type, + lower(data_source_format) AS file_format, + table_owner, + if(table_type IN ('EXTERNAL', 'MANAGED', 'MANAGED_SHALLOW_CLONE', 'EXTERNAL_SHALLOW_CLONE'), lower(table_type), null) AS databricks_table_type + FROM `system`.`information_schema`.`tables` + WHERE table_catalog = 'some_database' + AND table_schema = 'some_schema' + AND table_name = 'some_table' + """ # noqa + self.assert_sql_equal(result, expected_sql) + + def test_get_uc_tables_sql_without_identifier(self, template_bundle, mock_schema_relation): + mock_schema_relation.identifier = None + + result = self.run_macro(template_bundle.template, "get_uc_tables_sql", mock_schema_relation) + + expected_sql = """ + SELECT + table_name, + if(table_type IN ('EXTERNAL', 'MANAGED', 'MANAGED_SHALLOW_CLONE', 'EXTERNAL_SHALLOW_CLONE'), 'table', lower(table_type)) AS table_type, + lower(data_source_format) AS file_format, + table_owner, + if(table_type IN ('EXTERNAL', 'MANAGED', 'MANAGED_SHALLOW_CLONE', 'EXTERNAL_SHALLOW_CLONE'), lower(table_type), null) AS databricks_table_type + FROM `system`.`information_schema`.`tables` + WHERE table_catalog = 'test_db' + AND table_schema = 'test_schema' + """ # noqa + self.assert_sql_equal(result, expected_sql) + + def test_case_sensitivity(self, template_bundle): + relation = Mock() + relation.database = "TEST_DB" + relation.schema = "TEST_SCHEMA" + relation.identifier = "TEST_TABLE" + relation.render = Mock(return_value="`TEST_DB`.`TEST_SCHEMA`.`TEST_TABLE`") + + result = self.run_macro(template_bundle.template, "get_uc_tables_sql", relation) + + expected_sql = """ + SELECT + table_name, + if(table_type IN ('EXTERNAL', 'MANAGED', 'MANAGED_SHALLOW_CLONE', 'EXTERNAL_SHALLOW_CLONE'), 'table', lower(table_type)) AS table_type, + lower(data_source_format) AS file_format, + table_owner, + if(table_type IN ('EXTERNAL', 'MANAGED', 'MANAGED_SHALLOW_CLONE', 'EXTERNAL_SHALLOW_CLONE'), lower(table_type), null) AS databricks_table_type + FROM `system`.`information_schema`.`tables` + WHERE table_catalog = 'test_db' + AND table_schema = 'test_schema' + AND table_name = 'test_table' + """ # noqa + self.assert_sql_equal(result, expected_sql) \ No newline at end of file diff --git a/tests/unit/macros/materializations/test_clone_macros.py b/tests/unit/macros/materializations/test_clone_macros.py new file mode 100644 index 000000000..47af5dcef --- /dev/null +++ b/tests/unit/macros/materializations/test_clone_macros.py @@ -0,0 +1,89 @@ +from unittest.mock import Mock + +import pytest + +from dbt.adapters.databricks import constants +from tests.unit.macros.base import MacroTestBase +from tests.unit.utils import unity_relation + + +class TestCloneStrategies(MacroTestBase): + t_location_root = "/mnt/root_dev/" + + def render_clone_macro(self, template_bundle, macro, s_relation, t_relation) -> str: + external_path = f"{self.t_location_root}{template_bundle.relation.identifier}" + adapter_mock = template_bundle.template.globals["adapter"] + adapter_mock.compute_external_path.return_value = external_path + return self.run_macro( + template_bundle.template, + macro, + t_relation, + s_relation, + ) + + @pytest.fixture(scope="class") + def template_name(self) -> str: + return "strategies.sql" + + @pytest.fixture(scope="class") + def macro_folders_to_load(self) -> list: + return ["macros/materializations/clone", "macros/relations", "macros"] + + @pytest.fixture(scope="class") + def databricks_template_names(self) -> list: + return ["location.sql"] + + @pytest.fixture + def s_relation(self): + relation = Mock() + relation.database = "some_database" + relation.schema = "some_schema_prod" + relation.identifier = "some_table" + relation.render = Mock(return_value="`some_database`.`some_schema_prod`.`some_table`") + relation.without_identifier = Mock(return_value="`some_database`.`some_schema_prod`") + relation.type = "table" + return relation + + @pytest.fixture + def catalog_relation(self, template_bundle): + t_relation = unity_relation( + file_format=constants.DELTA_FILE_FORMAT, + location_root=self.t_location_root, + location_path=template_bundle.relation.identifier, + ) + template_bundle.context["adapter"].build_catalog_relation.return_value = t_relation + return t_relation + + def test_create_or_replace_clone(self, template_bundle, s_relation): + sql = self.render_clone_macro( + template_bundle, + # TODO: will dispatch work here? + "databricks__create_or_replace_clone", + s_relation, + template_bundle.relation, + ) + + expected = self.clean_sql( + "create or replace " + f"table {template_bundle.relation.render()} " + f"shallow clone {s_relation.render()}" + ) + + assert expected == sql + + def test_create_or_replace_clone_external(self, template_bundle, catalog_relation, s_relation): + sql = self.render_clone_macro( + template_bundle, + "create_or_replace_clone_external", + s_relation, + template_bundle.relation, + ) + + expected = self.clean_sql( + "create or replace " + f"table {template_bundle.relation.render()} " + f"shallow clone {s_relation.render()} " + f"location '{catalog_relation.location}'" + ) + + assert expected == sql \ No newline at end of file diff --git a/tests/unit/test_adapter.py b/tests/unit/test_adapter.py index f8c3b4a17..90aed1a4d 100644 --- a/tests/unit/test_adapter.py +++ b/tests/unit/test_adapter.py @@ -11,10 +11,18 @@ from dbt.adapters.databricks.column import DatabricksColumn from dbt.adapters.databricks.credentials import ( CATALOG_KEY_IN_SESSION_PROPERTIES, + DBT_DATABRICKS_HTTP_SESSION_HEADERS, + DBT_DATABRICKS_INVOCATION_ENV, ) from dbt.adapters.databricks.impl import get_identifier_list_string -from dbt.adapters.databricks.relation import DatabricksRelation, DatabricksRelationType +from dbt.adapters.databricks.relation import ( + DatabricksRelation, + DatabricksRelationType, + DatabricksTableType, +) + from dbt.adapters.databricks.utils import check_not_found_error + from dbt.config import RuntimeConfig from tests.unit.utils import config_from_parts_or_dicts @@ -352,7 +360,9 @@ def test_list_relations_without_caching__no_relations(self, _): @patch("dbt.adapters.databricks.api_client.DatabricksApiClient.create") def test_list_relations_without_caching__some_relations(self, _): with patch.object(DatabricksAdapter, "get_relations_without_caching") as mocked: - mocked.return_value = [("name", "table", "hudi", "owner")] + mocked.return_value = [ + DatabricksRelationInfo("name", "table", "hudi", "owner", "external") + ] adapter = DatabricksAdapter(Mock(flags={}), get_context("spawn")) relations = adapter.list_relations("database", "schema") assert len(relations) == 1 @@ -361,13 +371,15 @@ def test_list_relations_without_caching__some_relations(self, _): assert relation.database == "database" assert relation.schema == "schema" assert relation.type == DatabricksRelationType.Table + assert relation.databricks_table_type == DatabricksTableType.External assert relation.owner == "owner" + assert relation.is_external_table assert relation.is_hudi @patch("dbt.adapters.databricks.api_client.DatabricksApiClient.create") def test_list_relations_without_caching__hive_relation(self, _): with patch.object(DatabricksAdapter, "get_relations_without_caching") as mocked: - mocked.return_value = [("name", "table", None, None)] + mocked.return_value = [DatabricksRelationInfo("name", "table", None, None, None)] adapter = DatabricksAdapter(Mock(flags={}), get_context("spawn")) relations = adapter.list_relations("database", "schema") assert len(relations) == 1 diff --git a/tests/unit/test_relation.py b/tests/unit/test_relation.py index 5bde2fa5e..c772cd076 100644 --- a/tests/unit/test_relation.py +++ b/tests/unit/test_relation.py @@ -178,6 +178,12 @@ class TestRelationsFunctions: def test_is_hive_metastore(self, database, expected): assert relation.is_hive_metastore(database) is expected + def test_is_external_table(self): + relation = DatabricksRelation.create( + identifier="external_table", databricks_table_type="external" + ) + assert relation.is_external_table is True + @pytest.mark.parametrize( "input, expected", [ diff --git a/tests/unit/utils.py b/tests/unit/utils.py index 6fdf86cdf..e66e03a13 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -1,5 +1,8 @@ import os from copy import deepcopy +from typing import Optional + +from dbt.adapters.databricks import catalogs, constants from dbt.config import Profile, Project, RuntimeConfig from dbt.config.project import PartialProject @@ -78,3 +81,21 @@ def config_from_parts_or_dicts(project, profile, packages=None, selectors=None, args.vars = cli_vars args.profile_dir = "/dev/null" return RuntimeConfig.from_parts(project=project, profile=profile, args=args) + +def unity_relation( + table_format: Optional[str] = None, + file_format: Optional[str] = None, + location_root: Optional[str] = None, + location_path: Optional[str] = None, +) -> catalogs.DatabricksCatalogRelation: + + catalog_integration = constants.DEFAULT_UNITY_CATALOG + + return catalogs.DatabricksCatalogRelation( + catalog_type=catalog_integration.catalog_type, + catalog_name=catalog_integration.catalog_name, + table_format=table_format or catalog_integration.table_format, + file_format=file_format or catalog_integration.file_format, + external_volume=location_root or catalog_integration.external_volume, + location_path=location_path, + ) \ No newline at end of file