Skip to content

Commit 547e1f4

Browse files
feat(ingestion/tableau): support column level lineage for custom sql (#8466)
Co-authored-by: MohdSiddiqueBagwan <[email protected]>
1 parent 6607434 commit 547e1f4

12 files changed

+44478
-95
lines changed

metadata-ingestion/src/datahub/ingestion/source/tableau.py

Lines changed: 220 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,24 @@
44
from dataclasses import dataclass
55
from datetime import datetime
66
from functools import lru_cache
7-
from typing import Any, Dict, Iterable, List, Optional, Set, Tuple, Union, cast
7+
from typing import (
8+
Any,
9+
Callable,
10+
Dict,
11+
Iterable,
12+
List,
13+
Optional,
14+
Set,
15+
Tuple,
16+
Union,
17+
cast,
18+
)
819

920
import dateutil.parser as dp
1021
import tableauserverclient as TSC
1122
from pydantic import root_validator, validator
1223
from pydantic.fields import Field
1324
from requests.adapters import ConnectionError
14-
from sqllineage.runner import LineageRunner
1525
from tableauserverclient import (
1626
PersonalAccessTokenAuth,
1727
Server,
@@ -71,8 +81,11 @@
7181
dashboard_graphql_query,
7282
database_tables_graphql_query,
7383
embedded_datasource_graphql_query,
84+
get_overridden_info,
7485
get_unique_custom_sql,
86+
make_fine_grained_lineage_class,
7587
make_table_urn,
88+
make_upstream_class,
7689
published_datasource_graphql_query,
7790
query_metadata,
7891
sheet_graphql_query,
@@ -120,10 +133,15 @@
120133
OwnershipClass,
121134
OwnershipTypeClass,
122135
SubTypesClass,
123-
UpstreamClass,
124136
ViewPropertiesClass,
125137
)
126138
from datahub.utilities import config_clean
139+
from datahub.utilities.sqlglot_lineage import (
140+
ColumnLineageInfo,
141+
SchemaResolver,
142+
SqlParsingResult,
143+
sqlglot_lineage,
144+
)
127145

128146
logger: logging.Logger = logging.getLogger(__name__)
129147

@@ -870,14 +888,14 @@ def _create_upstream_table_lineage(
870888
f"A total of {len(upstream_tables)} upstream table edges found for datasource {datasource[tableau_constant.ID]}"
871889
)
872890

873-
if datasource.get(tableau_constant.FIELDS):
874-
datasource_urn = builder.make_dataset_urn_with_platform_instance(
875-
platform=self.platform,
876-
name=datasource[tableau_constant.ID],
877-
platform_instance=self.config.platform_instance,
878-
env=self.config.env,
879-
)
891+
datasource_urn = builder.make_dataset_urn_with_platform_instance(
892+
platform=self.platform,
893+
name=datasource[tableau_constant.ID],
894+
platform_instance=self.config.platform_instance,
895+
env=self.config.env,
896+
)
880897

898+
if datasource.get(tableau_constant.FIELDS):
881899
if self.config.extract_column_level_lineage:
882900
# Find fine grained lineage for datasource column to datasource column edge,
883901
# upstream columns may be from same datasource
@@ -1140,6 +1158,57 @@ def get_upstream_fields_of_field_in_datasource(self, datasource, datasource_urn)
11401158
)
11411159
return fine_grained_lineages
11421160

1161+
def get_upstream_fields_from_custom_sql(
1162+
self, datasource: dict, datasource_urn: str
1163+
) -> List[FineGrainedLineage]:
1164+
fine_grained_lineages: List[FineGrainedLineage] = []
1165+
1166+
parsed_result = self.parse_custom_sql(
1167+
datasource=datasource,
1168+
datasource_urn=datasource_urn,
1169+
env=self.config.env,
1170+
platform=self.platform,
1171+
platform_instance=self.config.platform_instance,
1172+
func_overridden_info=None, # Here we don't want to override any information from configuration
1173+
)
1174+
1175+
if parsed_result is None:
1176+
logger.info(
1177+
f"Failed to extract column level lineage from datasource {datasource_urn}"
1178+
)
1179+
return fine_grained_lineages
1180+
1181+
cll: List[ColumnLineageInfo] = (
1182+
parsed_result.column_lineage
1183+
if parsed_result.column_lineage is not None
1184+
else []
1185+
)
1186+
for cll_info in cll:
1187+
downstream = (
1188+
[
1189+
builder.make_schema_field_urn(
1190+
datasource_urn, cll_info.downstream.column
1191+
)
1192+
]
1193+
if cll_info.downstream is not None
1194+
and cll_info.downstream.column is not None
1195+
else []
1196+
)
1197+
upstreams = [
1198+
builder.make_schema_field_urn(column_ref.table, column_ref.column)
1199+
for column_ref in cll_info.upstreams
1200+
]
1201+
fine_grained_lineages.append(
1202+
FineGrainedLineage(
1203+
downstreamType=FineGrainedLineageDownstreamType.FIELD,
1204+
downstreams=downstream,
1205+
upstreamType=FineGrainedLineageUpstreamType.FIELD_SET,
1206+
upstreams=upstreams,
1207+
)
1208+
)
1209+
1210+
return fine_grained_lineages
1211+
11431212
def get_transform_operation(self, field):
11441213
field_type = field[tableau_constant.TYPE_NAME]
11451214
if field_type in (
@@ -1176,6 +1245,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
11761245
platform_instance=self.config.platform_instance,
11771246
env=self.config.env,
11781247
)
1248+
11791249
dataset_snapshot = DatasetSnapshot(
11801250
urn=csql_urn,
11811251
aspects=[self.get_data_platform_instance()],
@@ -1225,14 +1295,20 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
12251295
csql_urn, tables, datasource
12261296
)
12271297
elif self.config.extract_lineage_from_unsupported_custom_sql_queries:
1298+
logger.debug("Extracting TLL & CLL from custom sql")
12281299
# custom sql tables may contain unsupported sql, causing incomplete lineage
12291300
# we extract the lineage from the raw queries
12301301
yield from self._create_lineage_from_unsupported_csql(
12311302
csql_urn, csql
12321303
)
1233-
12341304
# Schema Metadata
1235-
columns = csql.get(tableau_constant.COLUMNS, [])
1305+
# if condition is needed as graphQL return "cloumns": None
1306+
columns: List[Dict[Any, Any]] = (
1307+
cast(List[Dict[Any, Any]], csql.get(tableau_constant.COLUMNS))
1308+
if tableau_constant.COLUMNS in csql
1309+
and csql.get(tableau_constant.COLUMNS) is not None
1310+
else []
1311+
)
12361312
schema_metadata = self.get_schema_metadata_for_custom_sql(columns)
12371313
if schema_metadata is not None:
12381314
dataset_snapshot.aspects.append(schema_metadata)
@@ -1450,53 +1526,143 @@ def _create_lineage_to_upstream_tables(
14501526
aspect=upstream_lineage,
14511527
)
14521528

1453-
def _create_lineage_from_unsupported_csql(
1454-
self, csql_urn: str, csql: dict
1455-
) -> Iterable[MetadataWorkUnit]:
1456-
database = csql.get(tableau_constant.DATABASE) or {}
1529+
def parse_custom_sql(
1530+
self,
1531+
datasource: dict,
1532+
datasource_urn: str,
1533+
platform: str,
1534+
env: str,
1535+
platform_instance: Optional[str],
1536+
func_overridden_info: Optional[
1537+
Callable[
1538+
[
1539+
str,
1540+
Optional[str],
1541+
Optional[Dict[str, str]],
1542+
Optional[TableauLineageOverrides],
1543+
],
1544+
Tuple[Optional[str], Optional[str], str, str],
1545+
]
1546+
],
1547+
) -> Optional["SqlParsingResult"]:
1548+
1549+
database_info = datasource.get(tableau_constant.DATABASE) or {}
1550+
1551+
if datasource.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL) in (None, False):
1552+
logger.debug(f"datasource {datasource_urn} is not created from custom sql")
1553+
return None
1554+
14571555
if (
1458-
csql.get(tableau_constant.IS_UNSUPPORTED_CUSTOM_SQL, False)
1459-
and tableau_constant.NAME in database
1460-
and tableau_constant.CONNECTION_TYPE in database
1556+
tableau_constant.NAME not in database_info
1557+
or tableau_constant.CONNECTION_TYPE not in database_info
14611558
):
1462-
upstream_tables = []
1463-
query = csql.get(tableau_constant.QUERY)
1464-
parser = LineageRunner(query)
1465-
1466-
try:
1467-
for table in parser.source_tables:
1468-
split_table = str(table).split(".")
1469-
if len(split_table) == 2:
1470-
datset = make_table_urn(
1471-
env=self.config.env,
1472-
upstream_db=database.get(tableau_constant.NAME),
1473-
connection_type=database.get(
1474-
tableau_constant.CONNECTION_TYPE, ""
1475-
),
1476-
schema=split_table[0],
1477-
full_name=split_table[1],
1478-
platform_instance_map=self.config.platform_instance_map,
1479-
lineage_overrides=self.config.lineage_overrides,
1480-
)
1481-
upstream_tables.append(
1482-
UpstreamClass(
1483-
type=DatasetLineageType.TRANSFORMED, dataset=datset
1484-
)
1485-
)
1486-
except Exception as e:
1487-
self.report.report_warning(
1488-
key="csql-lineage",
1489-
reason=f"Unable to retrieve lineage from query. "
1490-
f"Query: {query} "
1491-
f"Reason: {str(e)} ",
1559+
logger.debug(
1560+
f"database information is missing from datasource {datasource_urn}"
1561+
)
1562+
return None
1563+
1564+
query = datasource.get(tableau_constant.QUERY)
1565+
if query is None:
1566+
logger.debug(
1567+
f"raw sql query is not available for datasource {datasource_urn}"
1568+
)
1569+
return None
1570+
1571+
logger.debug(f"Parsing sql={query}")
1572+
1573+
upstream_db = database_info.get(tableau_constant.NAME)
1574+
1575+
if func_overridden_info is not None:
1576+
# Override the information as per configuration
1577+
upstream_db, platform_instance, platform, _ = func_overridden_info(
1578+
database_info[tableau_constant.CONNECTION_TYPE],
1579+
database_info.get(tableau_constant.NAME),
1580+
self.config.platform_instance_map,
1581+
self.config.lineage_overrides,
1582+
)
1583+
1584+
logger.debug(
1585+
f"Overridden info upstream_db={upstream_db}, platform_instance={platform_instance}, platform={platform}"
1586+
)
1587+
1588+
parsed_result: Optional["SqlParsingResult"] = None
1589+
try:
1590+
schema_resolver = (
1591+
self.ctx.graph._make_schema_resolver(
1592+
platform=platform,
1593+
platform_instance=platform_instance,
1594+
env=env,
1595+
)
1596+
if self.ctx.graph is not None
1597+
else SchemaResolver(
1598+
platform=platform,
1599+
platform_instance=platform_instance,
1600+
env=env,
1601+
graph=None,
14921602
)
1493-
upstream_lineage = UpstreamLineage(upstreams=upstream_tables)
1494-
yield self.get_metadata_change_proposal(
1495-
csql_urn,
1496-
aspect_name=tableau_constant.UPSTREAM_LINEAGE,
1497-
aspect=upstream_lineage,
14981603
)
14991604

1605+
if schema_resolver.graph is None:
1606+
logger.warning(
1607+
"Column Level Lineage extraction would not work as DataHub graph client is None."
1608+
)
1609+
1610+
parsed_result = sqlglot_lineage(
1611+
query,
1612+
schema_resolver=schema_resolver,
1613+
default_db=upstream_db,
1614+
)
1615+
except Exception as e:
1616+
self.report.report_warning(
1617+
key="csql-lineage",
1618+
reason=f"Unable to retrieve lineage from query. "
1619+
f"Query: {query} "
1620+
f"Reason: {str(e)} ",
1621+
)
1622+
1623+
return parsed_result
1624+
1625+
def _create_lineage_from_unsupported_csql(
1626+
self, csql_urn: str, csql: dict
1627+
) -> Iterable[MetadataWorkUnit]:
1628+
1629+
parsed_result = self.parse_custom_sql(
1630+
datasource=csql,
1631+
datasource_urn=csql_urn,
1632+
env=self.config.env,
1633+
platform=self.platform,
1634+
platform_instance=self.config.platform_instance,
1635+
func_overridden_info=get_overridden_info,
1636+
)
1637+
1638+
if parsed_result is None:
1639+
logger.info(
1640+
f"Failed to extract table level lineage for datasource {csql_urn}"
1641+
)
1642+
return
1643+
1644+
upstream_tables = make_upstream_class(parsed_result)
1645+
1646+
logger.debug(f"Upstream tables = {upstream_tables}")
1647+
1648+
fine_grained_lineages: List[FineGrainedLineage] = []
1649+
if self.config.extract_column_level_lineage:
1650+
logger.info("Extracting CLL from custom sql")
1651+
fine_grained_lineages = make_fine_grained_lineage_class(
1652+
parsed_result, csql_urn
1653+
)
1654+
1655+
upstream_lineage = UpstreamLineage(
1656+
upstreams=upstream_tables,
1657+
fineGrainedLineages=fine_grained_lineages,
1658+
)
1659+
1660+
yield self.get_metadata_change_proposal(
1661+
csql_urn,
1662+
aspect_name=tableau_constant.UPSTREAM_LINEAGE,
1663+
aspect=upstream_lineage,
1664+
)
1665+
15001666
def _get_schema_metadata_for_datasource(
15011667
self, datasource_fields: List[dict]
15021668
) -> Optional[SchemaMetadata]:

0 commit comments

Comments
 (0)