Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@
ConnectionWrapper,
FileBackedList,
)
from datahub.utilities.lossy_collections import LossyList
from datahub.utilities.perf_timer import PerfTimer

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -169,6 +170,10 @@ class SnowflakeQueriesExtractorReport(Report):
num_stream_queries_observed: int = 0
num_create_temp_view_queries_observed: int = 0
num_users: int = 0
num_queries_with_empty_column_name: int = 0
queries_with_empty_column_name: LossyList[str] = dataclasses.field(
default_factory=LossyList
)


@dataclass
Expand Down Expand Up @@ -626,9 +631,28 @@ def _parse_audit_log_row(

columns = set()
for modified_column in obj["columns"]:
columns.add(
self.identifiers.snowflake_identifier(modified_column["columnName"])
)
column_name = modified_column["columnName"]
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to make it very much visible that we decided to parse the query, for which we would otherwise use info coming directly from audit log, this is for 2 reasons:

  1. We want to understand why Snowflake would have such, from our perspective, malformed audit log. It would be the best to be able to pinpoint also the query involved.
  2. Parsing queries take much longer than just copying information from the audit log. This change has potential adverse effects for overall ingestion performance. We need to be aware how many queries had to be parsed by us.

So to meet above conditions we need to:

  1. Extend report object for Snowflake source, so that we can keep count of queries. Maybe saving query_id for each query which was forced to be parsed would be a good idea - use LossyList to not store too many. Such query_id could be used to retrieve actual query from the warehouse.
  2. We need to print information that this happened. I think at least info level should be used, maybe even warning. It is an open question whether we should go as far as using self.report.warning - in such case this message would appear in the Managed Ingestion UI, maybe that would be an overkill. WDYT?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for your suggestion. It makes sense to me. I've extended the SnowflakeQueriesExtractorReport to track queries with empty column names. Specifically, I've added a counter (num_queries_with_empty_column_name) and a LossyList (queries_with_empty_column_name) to store the IDs of affected queries. When an empty column name is detected, the source now logs an informational message including the query ID and a note about the performance impact, and updates the new report fields before falling back to SQL parsing.

# An empty column name in the audit log would cause an error when creating column URNs.
# To avoid this and still extract lineage, the raw query text is parsed as a fallback.
if not column_name or not column_name.strip():
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for addressing also the case of non-empty, but containing only white-spaces column names!

query_id = res["query_id"]
self.report.num_queries_with_empty_column_name += 1
self.report.queries_with_empty_column_name.append(query_id)
logger.info(f"Query {query_id} has empty column name in audit log.")

return ObservedQuery(
query=query_text,
session_id=res["session_id"],
timestamp=timestamp,
user=user,
default_db=res["default_db"],
default_schema=res["default_schema"],
query_hash=get_query_fingerprint(
query_text, self.identifiers.platform, fast=True
),
extra_info=extra_info,
)
columns.add(self.identifiers.snowflake_identifier(column_name))

upstreams.append(dataset)
column_usage[dataset] = columns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,13 +168,28 @@ def get_subjects(
query_subject_urns.add(upstream)
if include_fields:
for column in sorted(self.column_usage.get(upstream, [])):
# Skip empty column names to avoid creating invalid URNs
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to print a message here, either warning or info. Same as below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Warning logs added. Thank you for the suggestion.

if not column or not column.strip():
logger.warning(
f"Skipping empty upstream column name for query {self.query_id} on upstream {upstream}"
)
continue
query_subject_urns.add(
builder.make_schema_field_urn(upstream, column)
)
if downstream_urn:
query_subject_urns.add(downstream_urn)
if include_fields:
for column_lineage in self.column_lineage:
# Skip empty downstream columns to avoid creating invalid URNs
if (
not column_lineage.downstream.column
or not column_lineage.downstream.column.strip()
):
logger.warning(
f"Skipping empty downstream column name for query {self.query_id} on downstream {downstream_urn}"
)
continue
query_subject_urns.add(
builder.make_schema_field_urn(
downstream_urn, column_lineage.downstream.column
Expand Down
Loading
Loading