Skip to content

Commit f9d5dfb

Browse files
Chief-Rishabravisuhag
authored andcommitted
feat: add upstream lineage dependencies for view/materialized view in bigquery extractor
1 parent 6b2ad35 commit f9d5dfb

File tree

5 files changed

+601
-2
lines changed

5 files changed

+601
-2
lines changed

plugins/extractors/bigquery/README.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ source:
1616
- dataset_c.table_a
1717
max_page_size: 100
1818
profile_column: true
19+
build_view_lineage: true
1920
# Only one of service_account_base64 / service_account_json is needed.
2021
# If both are present, service_account_base64 takes precedence
2122
service_account_base64: _________BASE64_ENCODED_SERVICE_ACCOUNT_________________

plugins/extractors/bigquery/bigquery.go

Lines changed: 41 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
v1beta2 "github.com/raystack/meteor/models/raystack/assets/v1beta2"
2121
"github.com/raystack/meteor/plugins"
2222
"github.com/raystack/meteor/plugins/extractors/bigquery/auditlog"
23+
"github.com/raystack/meteor/plugins/extractors/bigquery/upstream"
2324
"github.com/raystack/meteor/registry"
2425
"github.com/raystack/meteor/utils"
2526
"github.com/raystack/salt/log"
@@ -53,6 +54,7 @@ type Config struct {
5354
IsCollectTableUsage bool `json:"collect_table_usage" yaml:"collect_table_usage" mapstructure:"collect_table_usage" default:"false"`
5455
UsagePeriodInDay int64 `json:"usage_period_in_day" yaml:"usage_period_in_day" mapstructure:"usage_period_in_day" default:"7"`
5556
UsageProjectIDs []string `json:"usage_project_ids" yaml:"usage_project_ids" mapstructure:"usage_project_ids"`
57+
BuildViewLineage bool `json:"build_view_lineage" yaml:"build_view_lineage" mapstructure:"build_view_lineage" default:"false"`
5658
}
5759

5860
type Exclude struct {
@@ -83,6 +85,7 @@ exclude:
8385
- dataset_c.table_a
8486
max_page_size: 100
8587
include_column_profile: true
88+
build_view_lineage: true
8689
# Only one of service_account_base64 / service_account_json is needed.
8790
# If both are present, service_account_base64 takes precedence
8891
service_account_base64: ____base64_encoded_service_account____
@@ -432,15 +435,25 @@ func (e *Extractor) buildAsset(ctx context.Context, t *bigquery.Table, md *bigqu
432435
e.logger.Warn("error creating Any struct", "error", err)
433436
}
434437

435-
return &v1beta2.Asset{
438+
asset := &v1beta2.Asset{
436439
Urn: tableURN,
437440
Name: t.TableID,
438441
Type: "table",
439442
Description: md.Description,
440443
Service: "bigquery",
441444
Data: table,
442445
Labels: md.Labels,
443-
}, nil
446+
}
447+
448+
if e.config.BuildViewLineage && (md.Type == bigquery.ViewTable || md.Type == bigquery.MaterializedView) {
449+
query := getViewQuery(md)
450+
upstreamResources := getUpstreamResources(query)
451+
asset.Lineage = &v1beta2.Lineage{
452+
Upstreams: upstreamResources,
453+
}
454+
}
455+
456+
return asset, nil
444457
}
445458

446459
// Extract table schema
@@ -744,6 +757,32 @@ func (e *Extractor) fetchTableMetadata(ctx context.Context, tbl *bigquery.Table)
744757
return tbl.Metadata(ctx)
745758
}
746759

760+
func getViewQuery(md *bigquery.TableMetadata) string {
761+
switch md.Type {
762+
case bigquery.ViewTable:
763+
return md.ViewQuery
764+
case bigquery.MaterializedView:
765+
return md.MaterializedView.Query
766+
}
767+
return ""
768+
}
769+
770+
func getUpstreamResources(query string) []*v1beta2.Resource {
771+
upstreamDependencies := upstream.ParseTopLevelUpstreamsFromQuery(query)
772+
uniqueUpstreamDependencies := upstream.UniqueFilterResources(upstreamDependencies)
773+
var upstreams []*v1beta2.Resource
774+
for _, dependency := range uniqueUpstreamDependencies {
775+
urn := plugins.BigQueryURN(dependency.Project, dependency.Dataset, dependency.Name)
776+
upstreams = append(upstreams, &v1beta2.Resource{
777+
Urn: urn,
778+
Name: dependency.Name,
779+
Type: "table",
780+
Service: "bigquery",
781+
})
782+
}
783+
return upstreams
784+
}
785+
747786
// Register the extractor to catalog
748787
func init() {
749788
if err := registry.Extractors.Register("bigquery", func() plugins.Extractor {
Lines changed: 106 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,106 @@
1+
package upstream
2+
3+
import (
4+
"regexp"
5+
"strings"
6+
)
7+
8+
type QueryParser func(query string) []Resource
9+
10+
var (
11+
topLevelUpstreamsPattern = regexp.MustCompile("" +
12+
"(?i)(?:FROM)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-\\*?]+)`?" +
13+
"|" +
14+
"(?i)(?:JOIN)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" +
15+
"|" +
16+
"(?i)(?:WITH)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?\\s+(?:AS)" +
17+
"|" +
18+
"(?i)(?:VIEW)\\s*(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`?([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`?" +
19+
"|" +
20+
"(?i)(?:/\\*\\s*([a-zA-Z0-9@_-]*)\\s*\\*/)?\\s+`([\\w-]+)\\.([\\w-]+)\\.([\\w-]+)`\\s*(?:AS)?")
21+
22+
singleLineCommentsPattern = regexp.MustCompile(`(--.*)`)
23+
multiLineCommentsPattern = regexp.MustCompile(`(((/\*)+?[\w\W]*?(\*/)+))`)
24+
specialCommentPattern = regexp.MustCompile(`(/\*\s*(@[a-zA-Z0-9_-]+)\s*\*/)`)
25+
)
26+
27+
func ParseTopLevelUpstreamsFromQuery(query string) []Resource {
28+
cleanedQuery := cleanQueryFromComment(query)
29+
30+
resourcesFound := make(map[Resource]bool)
31+
pseudoResources := make(map[Resource]bool)
32+
33+
matches := topLevelUpstreamsPattern.FindAllStringSubmatch(cleanedQuery, -1)
34+
35+
for _, match := range matches {
36+
var projectIdx, datasetIdx, nameIdx, ignoreUpstreamIdx int
37+
tokens := strings.Fields(match[0])
38+
clause := strings.ToLower(tokens[0])
39+
40+
switch clause {
41+
case "from":
42+
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 1, 2, 3, 4
43+
case "join":
44+
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 5, 6, 7, 8
45+
case "with":
46+
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 9, 10, 11, 12
47+
case "view":
48+
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 13, 14, 15, 16
49+
default:
50+
ignoreUpstreamIdx, projectIdx, datasetIdx, nameIdx = 17, 18, 19, 20
51+
}
52+
53+
project := match[projectIdx]
54+
dataset := match[datasetIdx]
55+
name := match[nameIdx]
56+
57+
if project == "" || dataset == "" || name == "" {
58+
continue
59+
}
60+
61+
if strings.TrimSpace(match[ignoreUpstreamIdx]) == "@ignoreupstream" {
62+
continue
63+
}
64+
65+
if clause == "view" {
66+
continue
67+
}
68+
69+
resource := Resource{
70+
Project: project,
71+
Dataset: dataset,
72+
Name: name,
73+
}
74+
75+
if clause == "with" {
76+
pseudoResources[resource] = true
77+
} else {
78+
resourcesFound[resource] = true
79+
}
80+
}
81+
82+
var output []Resource
83+
84+
for resource := range resourcesFound {
85+
if pseudoResources[resource] {
86+
continue
87+
}
88+
output = append(output, resource)
89+
}
90+
91+
return output
92+
}
93+
94+
func cleanQueryFromComment(query string) string {
95+
cleanedQuery := singleLineCommentsPattern.ReplaceAllString(query, "")
96+
97+
matches := multiLineCommentsPattern.FindAllString(query, -1)
98+
for _, match := range matches {
99+
if specialCommentPattern.MatchString(match) {
100+
continue
101+
}
102+
cleanedQuery = strings.ReplaceAll(cleanedQuery, match, "")
103+
}
104+
105+
return cleanedQuery
106+
}

0 commit comments

Comments
 (0)