Skip to content

Commit b3d80e5

Browse files
mayurinehatetreff7esasikowitz
authored
feat(ingest/bigquery): usage for views (#8046)
Co-authored-by: Tamas Nemeth <[email protected]> Co-authored-by: Andrew Sikowitz <[email protected]>
1 parent 1f67463 commit b3d80e5

File tree

8 files changed

+494
-43
lines changed

8 files changed

+494
-43
lines changed

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_audit.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -261,6 +261,8 @@ class QueryEvent:
261261
default_dataset: Optional[str] = None
262262
numAffectedRows: Optional[int] = None
263263

264+
query_on_view: bool = False
265+
264266
@staticmethod
265267
def get_missing_key_entry(entry: AuditLogEntry) -> Optional[str]:
266268
return get_first_missing_key(
@@ -344,6 +346,8 @@ def from_entry(
344346
BigQueryTableRef.from_spec_obj(spec).get_sanitized_table_ref()
345347
for spec in raw_ref_views
346348
]
349+
query_event.query_on_view = True
350+
347351
# payload
348352
query_event.payload = entry.payload if debug_include_full_payloads else None
349353
if not query_event.job_name:
@@ -420,6 +424,8 @@ def from_exported_bigquery_audit_metadata(
420424
BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref()
421425
for spec in raw_ref_views
422426
]
427+
query_event.query_on_view = True
428+
423429
# payload
424430
query_event.payload = payload if debug_include_full_payloads else None
425431

@@ -487,6 +493,8 @@ def from_entry_v2(
487493
BigQueryTableRef.from_string_name(spec).get_sanitized_table_ref()
488494
for spec in raw_ref_views
489495
]
496+
query_event.query_on_view = True
497+
490498
# payload
491499
query_event.payload = payload if debug_include_full_payloads else None
492500

@@ -519,6 +527,8 @@ class ReadEvent:
519527

520528
payload: Any
521529

530+
from_query: bool = False
531+
522532
# We really should use composition here since the query isn't actually
523533
# part of the read event, but this solution is just simpler.
524534
# query: Optional["QueryEvent"] = None # populated via join
@@ -582,6 +592,27 @@ def from_entry(
582592
)
583593
return readEvent
584594

595+
@classmethod
596+
def from_query_event(
597+
cls,
598+
read_resource: BigQueryTableRef,
599+
query_event: QueryEvent,
600+
debug_include_full_payloads: bool = False,
601+
) -> "ReadEvent":
602+
603+
readEvent = ReadEvent(
604+
actor_email=query_event.actor_email,
605+
timestamp=query_event.timestamp,
606+
resource=read_resource,
607+
fieldsRead=[],
608+
readReason="JOB",
609+
jobName=query_event.job_name,
610+
payload=query_event.payload if debug_include_full_payloads else None,
611+
from_query=True,
612+
)
613+
614+
return readEvent
615+
585616
@classmethod
586617
def from_exported_bigquery_audit_metadata(
587618
cls, row: BigQueryAuditMetadata, debug_include_full_payloads: bool = False

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_config.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,11 @@ class BigQueryUsageConfig(BaseUsageConfig):
2828
description="Correction to pad start_time and end_time with. For handling the case where the read happens within our time range but the query completion event is delayed and happens after the configured end time.",
2929
)
3030

31+
apply_view_usage_to_tables: bool = Field(
32+
default=False,
33+
description="Whether to apply view's usage to its base tables. If set to False, uses sql parser and applies usage to views / tables mentioned in the query. If set to True, usage is applied to base tables only.",
34+
)
35+
3136

3237
class BigQueryV2Config(
3338
BigQueryBaseConfig,

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,9 @@ class BigQueryV2Report(ProfilingSqlReport):
6868
total_query_log_entries: int = 0
6969
num_read_events: int = 0
7070
num_query_events: int = 0
71+
num_view_query_events: int = 0
72+
num_view_query_events_failed_sql_parsing: int = 0
73+
num_view_query_events_failed_table_identification: int = 0
7174
num_filtered_read_events: int = 0
7275
num_filtered_query_events: int = 0
7376
num_usage_query_hash_collisions: int = 0

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/usage.py

Lines changed: 119 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
AuditEvent,
3737
AuditLogEntry,
3838
BigQueryAuditMetadata,
39+
BigqueryTableIdentifier,
3940
BigQueryTableRef,
4041
QueryEvent,
4142
ReadEvent,
@@ -53,6 +54,7 @@
5354
make_usage_workunit,
5455
)
5556
from datahub.metadata.schema_classes import OperationClass, OperationTypeClass
57+
from datahub.utilities.bigquery_sql_parser import BigQuerySQLParser
5658
from datahub.utilities.file_backed_collections import ConnectionWrapper, FileBackedDict
5759
from datahub.utilities.perf_timer import PerfTimer
5860

@@ -142,7 +144,7 @@ def bigquery_audit_metadata_query_template(
142144
AND
143145
(
144146
(
145-
JSON_EXTRACT_SCALAR(protopayload_auditlog.methodName) IN
147+
protopayload_auditlog.methodName IN
146148
(
147149
"google.cloud.bigquery.v2.JobService.Query",
148150
"google.cloud.bigquery.v2.JobService.InsertJob"
@@ -184,6 +186,7 @@ def __init__(self, config: BigQueryV2Config):
184186
e.timestamp, config.bucket_duration
185187
),
186188
"user": lambda e: e.actor_email,
189+
"from_query": lambda e: int(e.from_query),
187190
},
188191
cache_max_size=config.file_backed_cache_size,
189192
# Evict entire cache to reduce db calls.
@@ -198,6 +201,7 @@ def __init__(self, config: BigQueryV2Config):
198201
extra_columns={
199202
"query": lambda e: e.query,
200203
"is_read": lambda e: int(e.statementType in READ_STATEMENT_TYPES),
204+
"on_view": lambda e: int(e.query_on_view),
201205
},
202206
cache_max_size=config.file_backed_cache_size,
203207
cache_eviction_batch_size=max(int(config.file_backed_cache_size * 0.9), 1),
@@ -328,6 +332,20 @@ def usage_statistics(self, top_n: int) -> Iterator[UsageStatistic]:
328332
column_freq=json.loads(row["column_freq"] or "[]"),
329333
)
330334

335+
def delete_original_read_events_for_view_query_events(self) -> None:
336+
self.read_events.sql_query(
337+
"""
338+
DELETE FROM
339+
read_events
340+
WHERE
341+
read_events.from_query = 0 AND
342+
read_events.name in (
343+
SELECT q.key FROM query_events q WHERE q.on_view = 1
344+
)
345+
""",
346+
refs=[self.query_events],
347+
)
348+
331349
def report_disk_usage(self, report: BigQueryV2Report) -> None:
332350
report.usage_state_size = str(
333351
{
@@ -342,7 +360,7 @@ def report_disk_usage(self, report: BigQueryV2Report) -> None:
342360
class BigQueryUsageExtractor:
343361
"""
344362
This plugin extracts the following:
345-
* Statistics on queries issued and tables and columns accessed (excludes views)
363+
* Statistics on queries issued and tables and columns accessed
346364
* Aggregation of these statistics into buckets, by day or hour granularity
347365
348366
:::note
@@ -389,6 +407,26 @@ def _run(
389407
logger.error("Error processing usage", exc_info=True)
390408
self.report.report_warning("usage-ingestion", str(e))
391409

410+
def generate_read_events_from_query(
411+
self, query_event_on_view: QueryEvent
412+
) -> Iterable[AuditEvent]:
413+
try:
414+
tables = self.get_tables_from_query(
415+
query_event_on_view.project_id,
416+
query_event_on_view.query,
417+
)
418+
assert tables is not None and len(tables) != 0
419+
for table in tables:
420+
yield AuditEvent.create(
421+
ReadEvent.from_query_event(table, query_event_on_view)
422+
)
423+
except Exception as ex:
424+
logger.debug(
425+
f"Generating read events failed for this query on view: {query_event_on_view.query}. "
426+
f"Usage won't be added. The error was {ex}."
427+
)
428+
self.report.num_view_query_events_failed_sql_parsing += 1
429+
392430
def _ingest_events(
393431
self,
394432
events: Iterable[AuditEvent],
@@ -397,8 +435,33 @@ def _ingest_events(
397435
) -> None:
398436
"""Read log and store events in usage_state."""
399437
num_aggregated = 0
438+
num_generated = 0
400439
for audit_event in events:
401440
try:
441+
# Note for View Usage:
442+
# If Query Event references a view, bigquery audit logs do not contain Read Event for view
443+
# in its audit logs, but only for it base tables. To extract usage for views, we parse the
444+
# sql query to find bigquery tables and views read in the query and generate Read Events
445+
# for them in our code (`from_query`=True). For such Query Events, we delete the original
446+
# Read Events coming from Bigquery audit logs and keep only generated ones.
447+
448+
# Caveats of SQL parsing approach used here:
449+
# 1. If query parsing fails, usage for such query is not considered/counted.
450+
# 2. Due to limitations of query parsing, field level usage is not available.
451+
# To limit the impact, we use query parsing only for those queries that reference at least
452+
# one view. For all other queries, field level usage is available through bigquery audit logs.
453+
if (
454+
audit_event.query_event
455+
and audit_event.query_event.query_on_view
456+
and not self.config.usage.apply_view_usage_to_tables
457+
):
458+
query_event = audit_event.query_event
459+
self.report.num_view_query_events += 1
460+
461+
for new_event in self.generate_read_events_from_query(query_event):
462+
num_generated += self._store_usage_event(
463+
new_event, usage_state, table_refs
464+
)
402465
num_aggregated += self._store_usage_event(
403466
audit_event, usage_state, table_refs
404467
)
@@ -409,6 +472,10 @@ def _ingest_events(
409472
self._report_error("store-event", e)
410473
logger.info(f"Total number of events aggregated = {num_aggregated}.")
411474

475+
if self.report.num_view_query_events > 0:
476+
logger.info(f"Total number of read events generated = {num_generated}.")
477+
usage_state.delete_original_read_events_for_view_query_events()
478+
412479
def _generate_operational_workunits(
413480
self, usage_state: BigQueryUsageState, table_refs: Collection[str]
414481
) -> Iterable[MetadataWorkUnit]:
@@ -903,6 +970,56 @@ def _get_parsed_bigquery_log_events(
903970
f"log-parse-{project_id}", e, group="usage-log-parse"
904971
)
905972

973+
def get_tables_from_query(
974+
self, default_project: str, query: str
975+
) -> Optional[List[BigQueryTableRef]]:
976+
"""
977+
This method attempts to parse bigquery objects read in the query
978+
"""
979+
if not query:
980+
return None
981+
982+
parsed_tables = set()
983+
try:
984+
parser = BigQuerySQLParser(
985+
query,
986+
self.config.sql_parser_use_external_process,
987+
use_raw_names=self.config.lineage_sql_parser_use_raw_names,
988+
)
989+
tables = parser.get_tables()
990+
except Exception as ex:
991+
logger.debug(
992+
f"Sql parsing failed on this query on view: {query}. "
993+
f"Usage won't be added. The error was {ex}."
994+
)
995+
return None
996+
997+
for table in tables:
998+
parts = table.split(".")
999+
if len(parts) == 2:
1000+
parsed_tables.add(
1001+
BigQueryTableRef(
1002+
BigqueryTableIdentifier(
1003+
project_id=default_project, dataset=parts[0], table=parts[1]
1004+
)
1005+
).get_sanitized_table_ref()
1006+
)
1007+
elif len(parts) == 3:
1008+
parsed_tables.add(
1009+
BigQueryTableRef(
1010+
BigqueryTableIdentifier(
1011+
project_id=parts[0], dataset=parts[1], table=parts[2]
1012+
)
1013+
).get_sanitized_table_ref()
1014+
)
1015+
else:
1016+
logger.debug(
1017+
f"Invalid table identifier {table} when parsing query on view {query}"
1018+
)
1019+
self.report.num_view_query_events_failed_table_identification += 1
1020+
1021+
return list(parsed_tables)
1022+
9061023
def _report_error(
9071024
self, label: str, e: Exception, group: Optional[str] = None
9081025
) -> None:

metadata-ingestion/src/datahub/ingestion/source_config/usage/snowflake_usage.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,5 +15,5 @@ class SnowflakeUsageConfig(BaseUsageConfig):
1515
)
1616
apply_view_usage_to_tables: bool = pydantic.Field(
1717
default=False,
18-
description="Allow/deny patterns for views in snowflake dataset names.",
18+
description="Whether to apply view's usage to its base tables. If set to True, usage is applied to base tables only.",
1919
)

metadata-ingestion/tests/performance/bigquery.py

Lines changed: 29 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
import random
33
import uuid
44
from collections import defaultdict
5-
from typing import Dict, Iterable, List
5+
from typing import Dict, Iterable, List, cast
66

77
from typing_extensions import get_args
88

@@ -15,7 +15,7 @@
1515
)
1616
from datahub.ingestion.source.bigquery_v2.bigquery_config import BigQueryV2Config
1717
from datahub.ingestion.source.bigquery_v2.usage import OPERATION_STATEMENT_TYPES
18-
from tests.performance.data_model import Query, StatementType, Table
18+
from tests.performance.data_model import Query, StatementType, Table, View
1919

2020
# https://cloud.google.com/bigquery/docs/reference/auditlogs/rest/Shared.Types/BigQueryAuditMetadata.TableDataRead.Reason
2121
READ_REASONS = [
@@ -55,6 +55,14 @@ def generate_events(
5555
else random.choice(projects)
5656
)
5757
job_name = str(uuid.uuid4())
58+
referencedViews = list(
59+
dict.fromkeys(
60+
ref_from_table(field.table, table_to_project)
61+
for field in query.fields_accessed
62+
if field.table.is_view()
63+
)
64+
)
65+
5866
yield AuditEvent.create(
5967
QueryEvent(
6068
job_name=job_name,
@@ -72,24 +80,34 @@ def generate_events(
7280
for field in query.fields_accessed
7381
if not field.table.is_view()
7482
)
75-
),
76-
referencedViews=list(
77-
dict.fromkeys(
78-
ref_from_table(field.table, table_to_project)
83+
)
84+
+ list(
85+
dict.fromkeys( # Preserve order
86+
ref_from_table(parent, table_to_project)
7987
for field in query.fields_accessed
8088
if field.table.is_view()
89+
for parent in cast(View, field.table).parents
8190
)
8291
),
92+
referencedViews=referencedViews,
8393
payload=dataclasses.asdict(query)
8494
if config.debug_include_full_payloads
8595
else None,
96+
query_on_view=True if referencedViews else False,
8697
)
8798
)
88-
table_accesses = defaultdict(list)
99+
table_accesses = defaultdict(set)
89100
for field in query.fields_accessed:
90-
table_accesses[ref_from_table(field.table, table_to_project)].append(
91-
field.column
92-
)
101+
if not field.table.is_view():
102+
table_accesses[ref_from_table(field.table, table_to_project)].add(
103+
field.column
104+
)
105+
else:
106+
# assuming that same fields are accessed in parent tables
107+
for parent in cast(View, field.table).parents:
108+
table_accesses[ref_from_table(parent, table_to_project)].add(
109+
field.column
110+
)
93111

94112
for ref, columns in table_accesses.items():
95113
yield AuditEvent.create(
@@ -98,7 +116,7 @@ def generate_events(
98116
timestamp=query.timestamp,
99117
actor_email=query.actor,
100118
resource=ref,
101-
fieldsRead=columns,
119+
fieldsRead=list(columns),
102120
readReason=random.choice(READ_REASONS),
103121
payload=dataclasses.asdict(query)
104122
if config.debug_include_full_payloads

metadata-ingestion/tests/performance/test_bigquery_usage.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,11 @@ def run_test():
3737
config = BigQueryV2Config(
3838
start_time=seed_metadata.start_time,
3939
end_time=seed_metadata.end_time,
40-
usage=BigQueryUsageConfig(include_top_n_queries=True, top_n_queries=10),
40+
usage=BigQueryUsageConfig(
41+
include_top_n_queries=True,
42+
top_n_queries=10,
43+
apply_view_usage_to_tables=True,
44+
),
4145
file_backed_cache_size=1000,
4246
)
4347
usage_extractor = BigQueryUsageExtractor(config, report)

0 commit comments

Comments
 (0)