Skip to content

Commit 4e338af

Browse files
committed
init commit
Signed-off-by: Massy Bourennani <[email protected]>
1 parent 3e59360 commit 4e338af

32 files changed

+16575
-11
lines changed

core/dbt/cli/options.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -33,8 +33,9 @@ def __init__(self, *args, **kwargs) -> None:
3333
else:
3434
assert isinstance(option_type, ChoiceTuple), msg
3535

36-
def add_to_parser(self, parser: OptionParser, ctx: Context):
37-
def parser_process(value: str, state: ParsingState):
36+
@t.no_type_check
37+
def add_to_parser(self, parser: OptionParser, ctx: Context): # type: ignore[valid-type]
38+
def parser_process(value: str, state: ParsingState): # type: ignore[valid-type]
3839
# method to hook to the parser.process
3940
done = False
4041
value_list = str.split(value, " ")

core/dbt/cli/requires.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
from dbt.exceptions import DbtProjectError, FailFastError
3232
from dbt.flags import get_flag_dict, get_flags, set_flags
3333
from dbt.mp_context import get_mp_context
34+
from dbt.openlineage.common.utils import is_runnable_dbt_command
35+
from dbt.openlineage.handler import OpenLineageHandler
3436
from dbt.parser.manifest import parse_manifest
3537
from dbt.plugins import set_up_plugin_manager
3638
from dbt.profiler import profiler
@@ -82,8 +84,13 @@ def wrapper(*args, **kwargs):
8284
# Reset invocation_id for each 'invocation' of a dbt command (can happen multiple times in a single process)
8385
reset_invocation_id()
8486

85-
# Logging
87+
# OpenLineage
88+
ol_handler = OpenLineageHandler(ctx)
8689
callbacks = ctx.obj.get("callbacks", [])
90+
if is_runnable_dbt_command(flags):
91+
callbacks.append(ol_handler.handle)
92+
93+
# Logging
8794
setup_event_logger(flags=flags, callbacks=callbacks)
8895

8996
# Tracking

core/dbt/events/__init__.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
from typing import Any, Dict, Set
22

3+
import dbt.adapters.events.adapter_types_pb2 as adapter_types_pb2
34
import dbt.adapters.events.types as adapter_dbt_event_types
5+
import dbt.events.core_types_pb2 as core_dbt_event_types_pb2
46
import dbt.events.types as core_dbt_event_types
57
import dbt_common.events.types as dbt_event_types
8+
import dbt_common.events.types_pb2 as dbt_event_types_pb2
69

710
ALL_EVENT_TYPES: Dict[str, Any] = {
811
**dbt_event_types.__dict__,
@@ -13,3 +16,10 @@
1316
ALL_EVENT_NAMES: Set[str] = set(
1417
[name for name, cls in ALL_EVENT_TYPES.items() if isinstance(cls, type)]
1518
)
19+
20+
21+
ALL_PROTO_TYPES: Dict[str, Any] = {
22+
**dbt_event_types_pb2.__dict__,
23+
**core_dbt_event_types_pb2.__dict__,
24+
**adapter_types_pb2.__dict__,
25+
}

core/dbt/events/types.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2416,3 +2416,14 @@ def code(self) -> str:
24162416

24172417
def message(self) -> str:
24182418
return f"Artifacts skipped for command : {self.msg}"
2419+
2420+
2421+
class OpenLineageException(WarnLevel):
2422+
def code(self) -> str:
2423+
return "Z064"
2424+
2425+
def message(self):
2426+
return (
2427+
f"Encountered an error while creating OpenLineageEvent: {self.exc}\n"
2428+
f"{self.exc_info}"
2429+
)

core/dbt/openlineage/__init__.py

Whitespace-only changes.

core/dbt/openlineage/common/__init__.py

Whitespace-only changes.
Lines changed: 196 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,196 @@
1+
import enum
2+
from typing import List, Optional, Union, no_type_check
3+
4+
from openlineage.client.event_v2 import Dataset
5+
from openlineage.client.facet_v2 import (
6+
datasource_dataset,
7+
documentation_dataset,
8+
schema_dataset,
9+
)
10+
11+
from dbt.adapters.contracts.connection import Credentials
12+
from dbt.artifacts.resources.types import NodeType
13+
from dbt.contracts.graph.manifest import Manifest
14+
from dbt.contracts.graph.nodes import (
15+
GenericTestNode,
16+
ManifestNode,
17+
SeedNode,
18+
SourceDefinition,
19+
)
20+
21+
22+
class Adapter(enum.Enum):
23+
# supported adapters
24+
BIGQUERY = "bigquery"
25+
SNOWFLAKE = "snowflake"
26+
REDSHIFT = "redshift"
27+
SPARK = "spark"
28+
POSTGRES = "postgres"
29+
DATABRICKS = "databricks"
30+
SQLSERVER = "sqlserver"
31+
DREMIO = "dremio"
32+
ATHENA = "athena"
33+
DUCKDB = "duckdb"
34+
TRINO = "trino"
35+
36+
@staticmethod
37+
def adapters() -> str:
38+
# String representation of all supported adapter names
39+
return ",".join([f"`{x.value}`" for x in list(Adapter)])
40+
41+
42+
class SparkConnectionMethod(enum.Enum):
43+
THRIFT = "thrift"
44+
ODBC = "odbc"
45+
HTTP = "http"
46+
47+
@staticmethod
48+
def methods():
49+
return [x.value for x in SparkConnectionMethod]
50+
51+
52+
def extract_schema_dataset_facet(
53+
node: Union[ManifestNode, SourceDefinition],
54+
) -> List[schema_dataset.SchemaDatasetFacetFields]:
55+
if node.resource_type == NodeType.Seed:
56+
return _extract_schema_dataset_from_seed(node)
57+
else:
58+
return _extract_schema_dataset_facet_from_manifest_sql_node(node)
59+
60+
61+
def _extract_schema_dataset_facet_from_manifest_sql_node(
62+
manifest_sql_node: Union[ManifestNode, SourceDefinition],
63+
) -> List[schema_dataset.SchemaDatasetFacetFields]:
64+
schema_fields = []
65+
for column_info in manifest_sql_node.columns.values():
66+
description = column_info.description
67+
name = column_info.name
68+
data_type = column_info.data_type or ""
69+
schema_fields.append(
70+
schema_dataset.SchemaDatasetFacetFields(
71+
name=name, description=description, type=data_type
72+
)
73+
)
74+
return schema_fields
75+
76+
77+
def _extract_schema_dataset_from_seed(
78+
seed: SeedNode,
79+
) -> List[schema_dataset.SchemaDatasetFacetFields]:
80+
schema_fields = []
81+
for col_name in seed.config.column_types:
82+
col_type = seed.config.column_types[col_name]
83+
schema_fields.append(schema_dataset.SchemaDatasetFacetFields(name=col_name, type=col_type))
84+
return schema_fields
85+
86+
87+
def get_model_inputs(
88+
node_unique_id: str, manifest: Manifest
89+
) -> List[ManifestNode | SourceDefinition]:
90+
upstreams: List[ManifestNode | SourceDefinition] = []
91+
input_node_ids = manifest.parent_map.get(node_unique_id, [])
92+
for input_node_id in input_node_ids:
93+
if input_node_id.startswith("source."):
94+
upstreams.append(manifest.sources[input_node_id])
95+
else:
96+
upstreams.append(manifest.nodes[input_node_id])
97+
return upstreams
98+
99+
100+
def node_to_dataset(
101+
node: Union[ManifestNode, SourceDefinition], dataset_namespace: str
102+
) -> Dataset:
103+
facets = {
104+
"dataSource": datasource_dataset.DatasourceDatasetFacet(
105+
name=dataset_namespace, uri=dataset_namespace
106+
),
107+
"schema": schema_dataset.SchemaDatasetFacet(fields=extract_schema_dataset_facet(node)),
108+
"documentation": documentation_dataset.DocumentationDatasetFacet(
109+
description=node.description
110+
),
111+
}
112+
node_fqn = ".".join(node.fqn)
113+
return Dataset(namespace=dataset_namespace, name=node_fqn, facets=facets)
114+
115+
116+
def get_test_column(test_node: GenericTestNode) -> Optional[str]:
117+
return test_node.test_metadata.kwargs.get("column_name")
118+
119+
120+
@no_type_check
121+
def extract_namespace(adapter: Credentials) -> str:
122+
# Extract namespace from profile's type
123+
if adapter.type == Adapter.SNOWFLAKE.value:
124+
return f"snowflake://{_fix_account_name(adapter.account)}"
125+
elif adapter.type == Adapter.BIGQUERY.value:
126+
return "bigquery"
127+
elif adapter.type == Adapter.REDSHIFT.value:
128+
return f"redshift://{adapter.host}:{adapter.port}"
129+
elif adapter.type == Adapter.POSTGRES.value:
130+
return f"postgres://{adapter.host}:{adapter.port}"
131+
elif adapter.type == Adapter.TRINO.value:
132+
return f"trino://{adapter.host}:{adapter.port}"
133+
elif adapter.type == Adapter.DATABRICKS.value:
134+
return f"databricks://{adapter.host}"
135+
elif adapter.type == Adapter.SQLSERVER.value:
136+
return f"mssql://{adapter.server}:{adapter.port}"
137+
elif adapter.type == Adapter.DREMIO.value:
138+
return f"dremio://{adapter.software_host}:{adapter.port}"
139+
elif adapter.type == Adapter.ATHENA.value:
140+
return f"awsathena://athena.{adapter.region_name}.amazonaws.com"
141+
elif adapter.type == Adapter.DUCKDB.value:
142+
return f"duckdb://{adapter.path}"
143+
elif adapter.type == Adapter.SPARK.value:
144+
port = ""
145+
if hasattr(adapter, "port"):
146+
port = f":{adapter.port}"
147+
elif adapter.method in [
148+
SparkConnectionMethod.HTTP.value,
149+
SparkConnectionMethod.ODBC.value,
150+
]:
151+
port = "443"
152+
elif adapter.method == SparkConnectionMethod.THRIFT.value:
153+
port = "10001"
154+
155+
if adapter.method in SparkConnectionMethod.methods():
156+
return f"spark://{adapter.host}{port}"
157+
else:
158+
raise NotImplementedError(
159+
f"Connection method `{adapter.method}` is not " f"supported for spark adapter."
160+
)
161+
else:
162+
raise NotImplementedError(
163+
f"Only {Adapter.adapters()} adapters are supported right now. "
164+
f"Passed {adapter.type}"
165+
)
166+
167+
168+
def _fix_account_name(name: str) -> str:
169+
if not any(word in name for word in ["-", "_"]):
170+
# If there is neither '-' nor '_' in the name, we append `.us-west-1.aws`
171+
return f"{name}.us-west-1.aws"
172+
173+
if "." in name:
174+
# Logic for account locator with dots remains unchanged
175+
spl = name.split(".")
176+
if len(spl) == 1:
177+
account = spl[0]
178+
region, cloud = "us-west-1", "aws"
179+
elif len(spl) == 2:
180+
account, region = spl
181+
cloud = "aws"
182+
else:
183+
account, region, cloud = spl
184+
return f"{account}.{region}.{cloud}"
185+
186+
# Check for existing accounts with cloud names
187+
if cloud := next((c for c in ["aws", "gcp", "azure"] if c in name), ""):
188+
parts = name.split(cloud)
189+
account = parts[0].strip("-_.")
190+
191+
if not (region := parts[1].strip("-_.").replace("_", "-")):
192+
return name
193+
return f"{account}.{region}.{cloud}"
194+
195+
# Default case, return the original name
196+
return name

0 commit comments

Comments
 (0)