Skip to content

Commit 3cc3b70

Browse files
sid-acrylhsheth2anshbansal
authored andcommitted
fix(ingest/powerbi): m-query fixes (datahub-project#11906)
Co-authored-by: Harshal Sheth <[email protected]> Co-authored-by: Aseem Bansal <[email protected]>
1 parent d1bf08e commit 3cc3b70

File tree

3 files changed

+78
-46
lines changed

3 files changed

+78
-46
lines changed

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/native_sql_parser.py

Lines changed: 17 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -72,10 +72,24 @@ def get_tables(native_query: str) -> List[str]:
7272
def remove_drop_statement(query: str) -> str:
7373
# Certain PowerBI M-Queries contain a combination of DROP and SELECT statements within SQL, causing SQLParser to fail on these queries.
7474
# Therefore, these occurrences are being removed.
75-
# Regular expression to match patterns like "DROP TABLE IF EXISTS #<identifier>;"
76-
pattern = r"DROP TABLE IF EXISTS #\w+;?"
7775

78-
return re.sub(pattern, "", query)
76+
patterns = [
77+
# Regular expression to match patterns like:
78+
# "DROP TABLE IF EXISTS #<identifier>;"
79+
# "DROP TABLE IF EXISTS #<identifier>, <identifier2>, ...;"
80+
# "DROP TABLE IF EXISTS #<identifier>, <identifier2>, ...\n"
81+
r"DROP\s+TABLE\s+IF\s+EXISTS\s+(?:#?\w+(?:,\s*#?\w+)*)[;\n]",
82+
]
83+
84+
new_query = query
85+
86+
for pattern in patterns:
87+
new_query = re.sub(pattern, "", new_query, flags=re.IGNORECASE)
88+
89+
# Remove extra spaces caused by consecutive replacements
90+
new_query = re.sub(r"\s+", " ", new_query).strip()
91+
92+
return new_query
7993

8094

8195
def parse_custom_sql(

metadata-ingestion/src/datahub/ingestion/source/powerbi/m_query/resolver.py

Lines changed: 51 additions & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,16 @@ def urn_creator(
8383
)
8484

8585

86+
def get_next_item(items: List[str], item: str) -> Optional[str]:
87+
if item in items:
88+
try:
89+
index = items.index(item)
90+
return items[index + 1]
91+
except IndexError:
92+
logger.debug(f'item:"{item}", not found in item-list: {items}')
93+
return None
94+
95+
8696
class AbstractDataPlatformTableCreator(ABC):
8797
"""
8898
Base class to share common functionalities among different dataplatform for M-Query parsing.
@@ -675,7 +685,7 @@ def two_level_access_pattern(
675685
data_access_func_detail.arg_list
676686
)
677687
if server is None or db_name is None:
678-
return Lineage.empty() # Return empty list
688+
return Lineage.empty() # Return an empty list
679689

680690
schema_name: str = cast(
681691
IdentifierAccessor, data_access_func_detail.identifier_accessor
@@ -782,32 +792,38 @@ def create_lineage(
782792
),
783793
)
784794

785-
if len(arguments) == 2:
786-
# It is a regular case of MS-SQL
787-
logger.debug("Handling with regular case")
788-
return self.two_level_access_pattern(data_access_func_detail)
789-
790-
if len(arguments) >= 4 and arguments[2] != "Query":
791-
logger.debug("Unsupported case is found. Second index is not the Query")
792-
return Lineage.empty()
795+
server, database = self.get_db_detail_from_argument(
796+
data_access_func_detail.arg_list
797+
)
798+
if server is None or database is None:
799+
return Lineage.empty() # Return an empty list
800+
801+
assert server
802+
assert database # to silent the lint
803+
804+
query: Optional[str] = get_next_item(arguments, "Query")
805+
if query:
806+
if self.config.enable_advance_lineage_sql_construct is False:
807+
# Use previous parser to generate URN to keep backward compatibility
808+
return Lineage(
809+
upstreams=self.create_urn_using_old_parser(
810+
query=query,
811+
db_name=database,
812+
server=server,
813+
),
814+
column_lineage=[],
815+
)
793816

794-
if self.config.enable_advance_lineage_sql_construct is False:
795-
# Use previous parser to generate URN to keep backward compatibility
796-
return Lineage(
797-
upstreams=self.create_urn_using_old_parser(
798-
query=arguments[3],
799-
db_name=arguments[1],
800-
server=arguments[0],
801-
),
802-
column_lineage=[],
817+
return self.parse_custom_sql(
818+
query=query,
819+
database=database,
820+
server=server,
821+
schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA,
803822
)
804823

805-
return self.parse_custom_sql(
806-
query=arguments[3],
807-
database=arguments[1],
808-
server=arguments[0],
809-
schema=MSSqlDataPlatformTableCreator.DEFAULT_SCHEMA,
810-
)
824+
# It is a regular case of MS-SQL
825+
logger.debug("Handling with regular case")
826+
return self.two_level_access_pattern(data_access_func_detail)
811827

812828

813829
class OracleDataPlatformTableCreator(AbstractDataPlatformTableCreator):
@@ -1154,27 +1170,19 @@ def get_db_name(self, data_access_tokens: List[str]) -> Optional[str]:
11541170
!= SupportedDataPlatform.DatabricksMultiCloud_SQL.value.powerbi_data_platform_name
11551171
):
11561172
return None
1157-
try:
1158-
if "Database" in data_access_tokens:
1159-
index = data_access_tokens.index("Database")
1160-
if data_access_tokens[index + 1] != Constant.M_QUERY_NULL:
1161-
# Database name is explicitly set in argument
1162-
return data_access_tokens[index + 1]
11631173

1164-
if "Name" in data_access_tokens:
1165-
index = data_access_tokens.index("Name")
1166-
# Next element is value of the Name. It is a database name
1167-
return data_access_tokens[index + 1]
1174+
database: Optional[str] = get_next_item(data_access_tokens, "Database")
11681175

1169-
if "Catalog" in data_access_tokens:
1170-
index = data_access_tokens.index("Catalog")
1171-
# Next element is value of the Catalog. In Databricks Catalog can also be used in place of a database.
1172-
return data_access_tokens[index + 1]
1173-
1174-
except IndexError as e:
1175-
logger.debug("Database name is not available", exc_info=e)
1176-
1177-
return None
1176+
if (
1177+
database and database != Constant.M_QUERY_NULL
1178+
): # database name is explicitly set
1179+
return database
1180+
1181+
return get_next_item( # database name is set in Name argument
1182+
data_access_tokens, "Name"
1183+
) or get_next_item( # If both above arguments are not available, then try Catalog
1184+
data_access_tokens, "Catalog"
1185+
)
11781186

11791187
def create_lineage(
11801188
self, data_access_func_detail: DataAccessFunctionDetail

metadata-ingestion/tests/integration/powerbi/test_native_sql_parser.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,3 +19,13 @@ def test_simple_from():
1919

2020
assert len(tables) == 1
2121
assert tables[0] == "OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4"
22+
23+
24+
def test_drop_statement():
25+
expected: str = "SELECT#(lf)concat((UPPER(REPLACE(SELLER,'-',''))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,'-',''))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4"
26+
27+
query: str = "DROP TABLE IF EXISTS #table1; DROP TABLE IF EXISTS #table1,#table2; DROP TABLE IF EXISTS table1; DROP TABLE IF EXISTS table1, #table2;SELECT#(lf)concat((UPPER(REPLACE(SELLER,'-',''))), MONTHID) as AGENT_KEY,#(lf)concat((UPPER(REPLACE(CLIENT_DIRECTOR,'-',''))), MONTHID) as CD_AGENT_KEY,#(lf) *#(lf)FROM#(lf)OPERATIONS_ANALYTICS.TRANSFORMED_PROD.V_APS_SME_UNITS_V4"
28+
29+
actual: str = native_sql_parser.remove_drop_statement(query)
30+
31+
assert actual == expected

0 commit comments

Comments
 (0)