Skip to content

Commit 843f82b

Browse files
authored
feat(presto-on-hive): allow v1 fieldpaths in the presto-on-hive source (#8474)
1 parent a4a8182 commit 843f82b

File tree

8 files changed

+1754
-9
lines changed

8 files changed

+1754
-9
lines changed

metadata-ingestion/src/datahub/ingestion/source/csv_enricher.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -438,7 +438,7 @@ def process_sub_resource_row(
438438
field_match = False
439439
for field_info in current_editable_schema_metadata.editableSchemaFieldInfo:
440440
if (
441-
DatasetUrn._get_simple_field_path_from_v2_field_path(
441+
DatasetUrn.get_simple_field_path_from_v2_field_path(
442442
field_info.fieldPath
443443
)
444444
== field_path

metadata-ingestion/src/datahub/ingestion/source/sql/presto_on_hive.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,11 @@ class PrestoOnHiveConfig(BasicSQLAlchemyConfig):
134134
description="By default, the connector overwrites properties every time. Set this to True to enable merging of properties with what exists on the server.",
135135
)
136136

137+
simplify_nested_field_paths: bool = Field(
138+
default=False,
139+
description="Simplify v2 field paths to v1 by default. If the schema has Union or Array types, still falls back to v2",
140+
)
141+
137142
def get_sql_alchemy_url(
138143
self, uri_opts: Optional[Dict[str, Any]] = None, database: Optional[str] = None
139144
) -> str:
@@ -527,6 +532,7 @@ def loop_tables(
527532
None,
528533
None,
529534
schema_fields,
535+
self.config.simplify_nested_field_paths,
530536
)
531537
dataset_snapshot.aspects.append(schema_metadata)
532538

@@ -756,6 +762,7 @@ def loop_views(
756762
self.platform,
757763
dataset.columns,
758764
canonical_schema=schema_fields,
765+
simplify_nested_field_paths=self.config.simplify_nested_field_paths,
759766
)
760767
dataset_snapshot.aspects.append(schema_metadata)
761768

metadata-ingestion/src/datahub/ingestion/source/sql/sql_common.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,11 +41,13 @@
4141
from datahub.ingestion.source.sql.sql_config import SQLAlchemyConfig
4242
from datahub.ingestion.source.sql.sql_utils import (
4343
add_table_to_schema_container,
44+
downgrade_schema_from_v2,
4445
gen_database_container,
4546
gen_database_key,
4647
gen_schema_container,
4748
gen_schema_key,
4849
get_domain_wu,
50+
schema_requires_v2,
4951
)
5052
from datahub.ingestion.source.state.stale_entity_removal_handler import (
5153
StaleEntityRemovalHandler,
@@ -287,7 +289,15 @@ def get_schema_metadata(
287289
pk_constraints: Optional[dict] = None,
288290
foreign_keys: Optional[List[ForeignKeyConstraint]] = None,
289291
canonical_schema: Optional[List[SchemaField]] = None,
292+
simplify_nested_field_paths: bool = False,
290293
) -> SchemaMetadata:
294+
if (
295+
simplify_nested_field_paths
296+
and canonical_schema is not None
297+
and not schema_requires_v2(canonical_schema)
298+
):
299+
canonical_schema = downgrade_schema_from_v2(canonical_schema)
300+
291301
schema_metadata = SchemaMetadata(
292302
schemaName=dataset_name,
293303
platform=make_data_platform_urn(platform),

metadata-ingestion/src/datahub/ingestion/source/sql/sql_utils.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,16 @@
1717
)
1818
from datahub.ingestion.api.workunit import MetadataWorkUnit
1919
from datahub.metadata.com.linkedin.pegasus2avro.dataset import UpstreamLineage
20+
from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField
2021
from datahub.metadata.schema_classes import DataPlatformInstanceClass
2122
from datahub.specific.dataset import DatasetPatchBuilder
2223
from datahub.utilities.registries.domain_registry import DomainRegistry
24+
from datahub.utilities.urns.dataset_urn import DatasetUrn
25+
26+
ARRAY_TOKEN = "[type=array]"
27+
UNION_TOKEN = "[type=union]"
28+
KEY_SCHEMA_PREFIX = "[key=True]."
29+
VERSION_PREFIX = "[version=2.0]."
2330

2431

2532
def gen_schema_key(
@@ -223,3 +230,27 @@ def gen_lineage(
223230

224231
for wu in lineage_workunits:
225232
yield wu
233+
234+
235+
# downgrade a schema field
236+
def downgrade_schema_field_from_v2(field: SchemaField) -> SchemaField:
237+
field.fieldPath = DatasetUrn.get_simple_field_path_from_v2_field_path(
238+
field.fieldPath
239+
)
240+
return field
241+
242+
243+
# downgrade a list of schema fields
244+
def downgrade_schema_from_v2(
245+
canonical_schema: List[SchemaField],
246+
) -> List[SchemaField]:
247+
return [downgrade_schema_field_from_v2(field) for field in canonical_schema]
248+
249+
250+
# v2 is only required in case UNION or ARRAY types are present- all other types can be represented in v1 paths
251+
def schema_requires_v2(canonical_schema: List[SchemaField]) -> bool:
252+
for field in canonical_schema:
253+
field_name = field.fieldPath
254+
if ARRAY_TOKEN in field_name or UNION_TOKEN in field_name:
255+
return True
256+
return False

metadata-ingestion/src/datahub/utilities/sqlglot_lineage.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -328,15 +328,15 @@ def _convert_schema_aspect_to_info(
328328
cls, schema_metadata: SchemaMetadataClass
329329
) -> SchemaInfo:
330330
return {
331-
DatasetUrn._get_simple_field_path_from_v2_field_path(col.fieldPath): (
331+
DatasetUrn.get_simple_field_path_from_v2_field_path(col.fieldPath): (
332332
# The actual types are more of a "nice to have".
333333
col.nativeDataType
334334
or "str"
335335
)
336336
for col in schema_metadata.fields
337337
# TODO: We can't generate lineage to columns nested within structs yet.
338338
if "."
339-
not in DatasetUrn._get_simple_field_path_from_v2_field_path(col.fieldPath)
339+
not in DatasetUrn.get_simple_field_path_from_v2_field_path(col.fieldPath)
340340
}
341341

342342
# TODO add a method to load all from graphql

metadata-ingestion/src/datahub/utilities/urns/dataset_urn.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -97,7 +97,7 @@ def _validate_entity_id(entity_id: List[str]) -> None:
9797
"""A helper function to extract simple . path notation from the v2 field path"""
9898

9999
@staticmethod
100-
def _get_simple_field_path_from_v2_field_path(field_path: str) -> str:
100+
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
101101
if field_path.startswith("[version=2.0]"):
102102
# this is a v2 field path
103103
tokens = [

0 commit comments

Comments
 (0)