-
Notifications
You must be signed in to change notification settings - Fork 2.2k
Contribute Openlineage to dbt-core
#11688
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,5 @@ | ||
| kind: Features | ||
| body: Add Openlineage to dbt-core | ||
| time: 2025-06-10T11:30:00+02:00 | ||
| custom: | ||
| Author: MassyB |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,6 +31,8 @@ | |
| from dbt.exceptions import DbtProjectError, FailFastError | ||
| from dbt.flags import get_flag_dict, get_flags, set_flags | ||
| from dbt.mp_context import get_mp_context | ||
| from dbt.openlineage.common.utils import is_runnable_dbt_command | ||
| from dbt.openlineage.handler import OpenLineageHandler | ||
| from dbt.parser.manifest import parse_manifest | ||
| from dbt.plugins import set_up_plugin_manager | ||
| from dbt.profiler import profiler | ||
|
|
@@ -82,8 +84,13 @@ def wrapper(*args, **kwargs): | |
| # Reset invocation_id for each 'invocation' of a dbt command (can happen multiple times in a single process) | ||
| reset_invocation_id() | ||
|
|
||
| # Logging | ||
| # OpenLineage | ||
| ol_handler = OpenLineageHandler(ctx) | ||
| callbacks = ctx.obj.get("callbacks", []) | ||
| if is_runnable_dbt_command(flags): | ||
| callbacks.append(ol_handler.handle) | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is where the OL callback is added |
||
|
|
||
| # Logging | ||
| setup_event_logger(flags=flags, callbacks=callbacks) | ||
|
|
||
| # Tracking | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,8 +1,11 @@ | ||
| from typing import Any, Dict, Set | ||
|
|
||
| import dbt.adapters.events.adapter_types_pb2 as adapter_types_pb2 | ||
| import dbt.adapters.events.types as adapter_dbt_event_types | ||
| import dbt.events.core_types_pb2 as core_dbt_event_types_pb2 | ||
| import dbt.events.types as core_dbt_event_types | ||
| import dbt_common.events.types as dbt_event_types | ||
| import dbt_common.events.types_pb2 as dbt_event_types_pb2 | ||
|
|
||
| ALL_EVENT_TYPES: Dict[str, Any] = { | ||
| **dbt_event_types.__dict__, | ||
|
|
@@ -13,3 +16,10 @@ | |
| ALL_EVENT_NAMES: Set[str] = set( | ||
| [name for name, cls in ALL_EVENT_TYPES.items() if isinstance(cls, type)] | ||
| ) | ||
|
|
||
|
|
||
| ALL_PROTO_TYPES: Dict[str, Any] = { | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. useful to convert a dict to an actual type defined in proto |
||
| **dbt_event_types_pb2.__dict__, | ||
| **core_dbt_event_types_pb2.__dict__, | ||
| **adapter_types_pb2.__dict__, | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -2425,3 +2425,14 @@ def code(self) -> str: | |
|
|
||
| def message(self) -> str: | ||
| return f"Artifacts skipped for command : {self.msg}" | ||
|
|
||
|
|
||
| class OpenLineageException(WarnLevel): | ||
|
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. now that all the events have been moved to https://github.com/dbt-labs/proto-python-public |
||
| def code(self) -> str: | ||
| return "Z064" | ||
|
|
||
| def message(self): | ||
| return ( | ||
| f"Encountered an error while creating OpenLineageEvent: {self.exc}\n" | ||
| f"{self.exc_info}" | ||
| ) | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,196 @@ | ||
| import enum | ||
| from typing import List, Optional, Union, no_type_check | ||
|
|
||
| from openlineage.client.event_v2 import Dataset | ||
| from openlineage.client.facet_v2 import ( | ||
| datasource_dataset, | ||
| documentation_dataset, | ||
| schema_dataset, | ||
| ) | ||
|
|
||
| from dbt.adapters.contracts.connection import Credentials | ||
| from dbt.artifacts.resources.types import NodeType | ||
| from dbt.contracts.graph.manifest import Manifest | ||
| from dbt.contracts.graph.nodes import ( | ||
| GenericTestNode, | ||
| ManifestNode, | ||
| SeedNode, | ||
| SourceDefinition, | ||
| ) | ||
|
|
||
|
|
||
| class Adapter(enum.Enum): | ||
| # supported adapters | ||
| BIGQUERY = "bigquery" | ||
| SNOWFLAKE = "snowflake" | ||
| REDSHIFT = "redshift" | ||
| SPARK = "spark" | ||
| POSTGRES = "postgres" | ||
| DATABRICKS = "databricks" | ||
| SQLSERVER = "sqlserver" | ||
| DREMIO = "dremio" | ||
| ATHENA = "athena" | ||
| DUCKDB = "duckdb" | ||
| TRINO = "trino" | ||
|
|
||
| @staticmethod | ||
| def adapters() -> str: | ||
| # String representation of all supported adapter names | ||
| return ",".join([f"`{x.value}`" for x in list(Adapter)]) | ||
|
|
||
|
|
||
| class SparkConnectionMethod(enum.Enum): | ||
| THRIFT = "thrift" | ||
| ODBC = "odbc" | ||
| HTTP = "http" | ||
|
|
||
| @staticmethod | ||
| def methods(): | ||
| return [x.value for x in SparkConnectionMethod] | ||
|
|
||
|
|
||
| def extract_schema_dataset_facet( | ||
| node: Union[ManifestNode, SourceDefinition], | ||
| ) -> List[schema_dataset.SchemaDatasetFacetFields]: | ||
| if node.resource_type == NodeType.Seed: | ||
| return _extract_schema_dataset_from_seed(node) | ||
| else: | ||
| return _extract_schema_dataset_facet_from_manifest_sql_node(node) | ||
|
|
||
|
|
||
| def _extract_schema_dataset_facet_from_manifest_sql_node( | ||
| manifest_sql_node: Union[ManifestNode, SourceDefinition], | ||
| ) -> List[schema_dataset.SchemaDatasetFacetFields]: | ||
| schema_fields = [] | ||
| for column_info in manifest_sql_node.columns.values(): | ||
| description = column_info.description | ||
| name = column_info.name | ||
| data_type = column_info.data_type or "" | ||
| schema_fields.append( | ||
| schema_dataset.SchemaDatasetFacetFields( | ||
| name=name, description=description, type=data_type | ||
| ) | ||
| ) | ||
| return schema_fields | ||
|
|
||
|
|
||
| def _extract_schema_dataset_from_seed( | ||
| seed: SeedNode, | ||
| ) -> List[schema_dataset.SchemaDatasetFacetFields]: | ||
| schema_fields = [] | ||
| for col_name in seed.config.column_types: | ||
| col_type = seed.config.column_types[col_name] | ||
| schema_fields.append(schema_dataset.SchemaDatasetFacetFields(name=col_name, type=col_type)) | ||
| return schema_fields | ||
|
|
||
|
|
||
| def get_model_inputs( | ||
| node_unique_id: str, manifest: Manifest | ||
| ) -> List[ManifestNode | SourceDefinition]: | ||
| upstreams: List[ManifestNode | SourceDefinition] = [] | ||
| input_node_ids = manifest.parent_map.get(node_unique_id, []) | ||
| for input_node_id in input_node_ids: | ||
| if input_node_id.startswith("source."): | ||
| upstreams.append(manifest.sources[input_node_id]) | ||
| else: | ||
| upstreams.append(manifest.nodes[input_node_id]) | ||
| return upstreams | ||
|
|
||
|
|
||
| def node_to_dataset( | ||
| node: Union[ManifestNode, SourceDefinition], dataset_namespace: str | ||
| ) -> Dataset: | ||
| facets = { | ||
| "dataSource": datasource_dataset.DatasourceDatasetFacet( | ||
| name=dataset_namespace, uri=dataset_namespace | ||
| ), | ||
| "schema": schema_dataset.SchemaDatasetFacet(fields=extract_schema_dataset_facet(node)), | ||
| "documentation": documentation_dataset.DocumentationDatasetFacet( | ||
| description=node.description | ||
| ), | ||
| } | ||
| node_fqn = ".".join(node.fqn) | ||
| return Dataset(namespace=dataset_namespace, name=node_fqn, facets=facets) | ||
|
|
||
|
|
||
| def get_test_column(test_node: GenericTestNode) -> Optional[str]: | ||
| return test_node.test_metadata.kwargs.get("column_name") | ||
|
|
||
|
|
||
| @no_type_check | ||
| def extract_namespace(adapter: Credentials) -> str: | ||
| # Extract namespace from profile's type | ||
| if adapter.type == Adapter.SNOWFLAKE.value: | ||
| return f"snowflake://{_fix_account_name(adapter.account)}" | ||
| elif adapter.type == Adapter.BIGQUERY.value: | ||
| return "bigquery" | ||
| elif adapter.type == Adapter.REDSHIFT.value: | ||
| return f"redshift://{adapter.host}:{adapter.port}" | ||
| elif adapter.type == Adapter.POSTGRES.value: | ||
| return f"postgres://{adapter.host}:{adapter.port}" | ||
| elif adapter.type == Adapter.TRINO.value: | ||
| return f"trino://{adapter.host}:{adapter.port}" | ||
| elif adapter.type == Adapter.DATABRICKS.value: | ||
| return f"databricks://{adapter.host}" | ||
| elif adapter.type == Adapter.SQLSERVER.value: | ||
| return f"mssql://{adapter.server}:{adapter.port}" | ||
| elif adapter.type == Adapter.DREMIO.value: | ||
| return f"dremio://{adapter.software_host}:{adapter.port}" | ||
| elif adapter.type == Adapter.ATHENA.value: | ||
| return f"awsathena://athena.{adapter.region_name}.amazonaws.com" | ||
| elif adapter.type == Adapter.DUCKDB.value: | ||
| return f"duckdb://{adapter.path}" | ||
| elif adapter.type == Adapter.SPARK.value: | ||
| port = "" | ||
| if hasattr(adapter, "port"): | ||
| port = f":{adapter.port}" | ||
| elif adapter.method in [ | ||
| SparkConnectionMethod.HTTP.value, | ||
| SparkConnectionMethod.ODBC.value, | ||
| ]: | ||
| port = "443" | ||
| elif adapter.method == SparkConnectionMethod.THRIFT.value: | ||
| port = "10001" | ||
|
|
||
| if adapter.method in SparkConnectionMethod.methods(): | ||
| return f"spark://{adapter.host}{port}" | ||
| else: | ||
| raise NotImplementedError( | ||
| f"Connection method `{adapter.method}` is not " f"supported for spark adapter." | ||
| ) | ||
| else: | ||
| raise NotImplementedError( | ||
| f"Only {Adapter.adapters()} adapters are supported right now. " | ||
| f"Passed {adapter.type}" | ||
| ) | ||
|
|
||
|
|
||
| def _fix_account_name(name: str) -> str: | ||
| if not any(word in name for word in ["-", "_"]): | ||
| # If there is neither '-' nor '_' in the name, we append `.us-west-1.aws` | ||
| return f"{name}.us-west-1.aws" | ||
|
|
||
| if "." in name: | ||
| # Logic for account locator with dots remains unchanged | ||
| spl = name.split(".") | ||
| if len(spl) == 1: | ||
| account = spl[0] | ||
| region, cloud = "us-west-1", "aws" | ||
| elif len(spl) == 2: | ||
| account, region = spl | ||
| cloud = "aws" | ||
| else: | ||
| account, region, cloud = spl | ||
| return f"{account}.{region}.{cloud}" | ||
|
|
||
| # Check for existing accounts with cloud names | ||
| if cloud := next((c for c in ["aws", "gcp", "azure"] if c in name), ""): | ||
| parts = name.split(cloud) | ||
| account = parts[0].strip("-_.") | ||
|
|
||
| if not (region := parts[1].strip("-_.").replace("_", "-")): | ||
| return name | ||
| return f"{account}.{region}.{cloud}" | ||
|
|
||
| # Default case, return the original name | ||
| return name |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my pre-commit
mypystep was failing on this so I added some annotations and mypy ignore comments