|
| 1 | +import enum |
| 2 | +from typing import Any, List, Optional, Union, no_type_check |
| 3 | + |
| 4 | +from openlineage.client.event_v2 import Dataset |
| 5 | +from openlineage.client.facet_v2 import ( |
| 6 | + datasource_dataset, |
| 7 | + documentation_dataset, |
| 8 | + schema_dataset, |
| 9 | +) |
| 10 | + |
| 11 | +from dbt.adapters.contracts.connection import Credentials |
| 12 | +from dbt.artifacts.resources.types import NodeType |
| 13 | +from dbt.contracts.graph.manifest import Manifest |
| 14 | +from dbt.contracts.graph.nodes import ( |
| 15 | + GenericTestNode, |
| 16 | + ManifestNode, |
| 17 | + ManifestSQLNode, |
| 18 | + ModelNode, |
| 19 | + SeedNode, |
| 20 | + SingularTestNode, |
| 21 | + SourceDefinition, |
| 22 | +) |
| 23 | + |
| 24 | + |
| 25 | +class Adapter(enum.Enum): |
| 26 | + # supported adapters |
| 27 | + BIGQUERY = "bigquery" |
| 28 | + SNOWFLAKE = "snowflake" |
| 29 | + REDSHIFT = "redshift" |
| 30 | + SPARK = "spark" |
| 31 | + POSTGRES = "postgres" |
| 32 | + DATABRICKS = "databricks" |
| 33 | + SQLSERVER = "sqlserver" |
| 34 | + DREMIO = "dremio" |
| 35 | + ATHENA = "athena" |
| 36 | + DUCKDB = "duckdb" |
| 37 | + TRINO = "trino" |
| 38 | + |
| 39 | + @staticmethod |
| 40 | + def adapters() -> str: |
| 41 | + # String representation of all supported adapter names |
| 42 | + return ",".join([f"`{x.value}`" for x in list(Adapter)]) |
| 43 | + |
| 44 | + |
| 45 | +class SparkConnectionMethod(enum.Enum): |
| 46 | + THRIFT = "thrift" |
| 47 | + ODBC = "odbc" |
| 48 | + HTTP = "http" |
| 49 | + |
| 50 | + @staticmethod |
| 51 | + def methods(): |
| 52 | + return [x.value for x in SparkConnectionMethod] |
| 53 | + |
| 54 | + |
| 55 | +def extract_schema_dataset_facet( |
| 56 | + node: Union[ManifestNode, SourceDefinition], |
| 57 | +) -> List[schema_dataset.SchemaDatasetFacetFields]: |
| 58 | + if node.resource_type == NodeType.Seed: |
| 59 | + return _extract_schema_dataset_from_seed(node) |
| 60 | + else: |
| 61 | + return _extract_schema_dataset_facet_from_manifest_sql_node(node) |
| 62 | + |
| 63 | + |
| 64 | +def _extract_schema_dataset_facet_from_manifest_sql_node( |
| 65 | + manifest_sql_node: Union[ManifestNode, SourceDefinition], |
| 66 | +) -> List[schema_dataset.SchemaDatasetFacetFields]: |
| 67 | + schema_fields = [] |
| 68 | + for column_info in manifest_sql_node.columns.values(): |
| 69 | + description = column_info.description |
| 70 | + name = column_info.name |
| 71 | + data_type = column_info.data_type or "" |
| 72 | + schema_fields.append( |
| 73 | + schema_dataset.SchemaDatasetFacetFields( |
| 74 | + name=name, description=description, type=data_type |
| 75 | + ) |
| 76 | + ) |
| 77 | + return schema_fields |
| 78 | + |
| 79 | + |
| 80 | +def _extract_schema_dataset_from_seed( |
| 81 | + seed: SeedNode, |
| 82 | +) -> List[schema_dataset.SchemaDatasetFacetFields]: |
| 83 | + schema_fields = [] |
| 84 | + for col_name in seed.config.column_types: |
| 85 | + col_type = seed.config.column_types[col_name] |
| 86 | + schema_fields.append(schema_dataset.SchemaDatasetFacetFields(name=col_name, type=col_type)) |
| 87 | + return schema_fields |
| 88 | + |
| 89 | + |
| 90 | +def get_model_inputs( |
| 91 | + node_unique_id: str, manifest: Manifest |
| 92 | +) -> List[ManifestNode | SourceDefinition]: |
| 93 | + upstreams: List[ManifestNode | SourceDefinition] = [] |
| 94 | + input_node_ids = manifest.parent_map.get(node_unique_id, []) |
| 95 | + for input_node_id in input_node_ids: |
| 96 | + if input_node_id.startswith("source."): |
| 97 | + upstreams.append(manifest.sources[input_node_id]) |
| 98 | + else: |
| 99 | + upstreams.append(manifest.nodes[input_node_id]) |
| 100 | + return upstreams |
| 101 | + |
| 102 | + |
| 103 | +def node_to_dataset( |
| 104 | + node: Union[ManifestNode, SourceDefinition], dataset_namespace: str |
| 105 | +) -> Dataset: |
| 106 | + facets = { |
| 107 | + "dataSource": datasource_dataset.DatasourceDatasetFacet( |
| 108 | + name=dataset_namespace, uri=dataset_namespace |
| 109 | + ), |
| 110 | + "schema": schema_dataset.SchemaDatasetFacet(fields=extract_schema_dataset_facet(node)), |
| 111 | + "documentation": documentation_dataset.DocumentationDatasetFacet( |
| 112 | + description=node.description |
| 113 | + ), |
| 114 | + } |
| 115 | + node_fqn = ".".join(node.fqn) |
| 116 | + return Dataset(namespace=dataset_namespace, name=node_fqn, facets=facets) |
| 117 | + |
| 118 | + |
| 119 | +def get_test_column(test_node: GenericTestNode) -> Optional[str]: |
| 120 | + return test_node.test_metadata.kwargs.get("column_name") |
| 121 | + |
| 122 | + |
| 123 | +@no_type_check |
| 124 | +def extract_namespace(adapter: Credentials) -> str: |
| 125 | + # Extract namespace from profile's type |
| 126 | + if adapter.type == Adapter.SNOWFLAKE.value: |
| 127 | + return f"snowflake://{_fix_account_name(adapter.account)}" |
| 128 | + elif adapter.type == Adapter.BIGQUERY.value: |
| 129 | + return "bigquery" |
| 130 | + elif adapter.type == Adapter.REDSHIFT.value: |
| 131 | + return f"redshift://{adapter.host}:{adapter.port}" |
| 132 | + elif adapter.type == Adapter.POSTGRES.value: |
| 133 | + return f"postgres://{adapter.host}:{adapter.port}" |
| 134 | + elif adapter.type == Adapter.TRINO.value: |
| 135 | + return f"trino://{adapter.host}:{adapter.port}" |
| 136 | + elif adapter.type == Adapter.DATABRICKS.value: |
| 137 | + return f"databricks://{adapter.host}" |
| 138 | + elif adapter.type == Adapter.SQLSERVER.value: |
| 139 | + return f"mssql://{adapter.server}:{adapter.port}" |
| 140 | + elif adapter.type == Adapter.DREMIO.value: |
| 141 | + return f"dremio://{adapter.software_host}:{adapter.port}" |
| 142 | + elif adapter.type == Adapter.ATHENA.value: |
| 143 | + return f"awsathena://athena.{adapter.region_name}.amazonaws.com" |
| 144 | + elif adapter.type == Adapter.DUCKDB.value: |
| 145 | + return f"duckdb://{adapter.path}" |
| 146 | + elif adapter.type == Adapter.SPARK.value: |
| 147 | + port = "" |
| 148 | + if hasattr(adapter, "port"): |
| 149 | + port = f":{adapter.port}" |
| 150 | + elif adapter.method in [ |
| 151 | + SparkConnectionMethod.HTTP.value, |
| 152 | + SparkConnectionMethod.ODBC.value, |
| 153 | + ]: |
| 154 | + port = "443" |
| 155 | + elif adapter.method == SparkConnectionMethod.THRIFT.value: |
| 156 | + port = "10001" |
| 157 | + |
| 158 | + if adapter.method in SparkConnectionMethod.methods(): |
| 159 | + return f"spark://{adapter.host}{port}" |
| 160 | + else: |
| 161 | + raise NotImplementedError( |
| 162 | + f"Connection method `{adapter.method}` is not " f"supported for spark adapter." |
| 163 | + ) |
| 164 | + else: |
| 165 | + raise NotImplementedError( |
| 166 | + f"Only {Adapter.adapters()} adapters are supported right now. " |
| 167 | + f"Passed {adapter.type}" |
| 168 | + ) |
| 169 | + |
| 170 | + |
| 171 | +def _fix_account_name(name: str) -> str: |
| 172 | + if not any(word in name for word in ["-", "_"]): |
| 173 | + # If there is neither '-' nor '_' in the name, we append `.us-west-1.aws` |
| 174 | + return f"{name}.us-west-1.aws" |
| 175 | + |
| 176 | + if "." in name: |
| 177 | + # Logic for account locator with dots remains unchanged |
| 178 | + spl = name.split(".") |
| 179 | + if len(spl) == 1: |
| 180 | + account = spl[0] |
| 181 | + region, cloud = "us-west-1", "aws" |
| 182 | + elif len(spl) == 2: |
| 183 | + account, region = spl |
| 184 | + cloud = "aws" |
| 185 | + else: |
| 186 | + account, region, cloud = spl |
| 187 | + return f"{account}.{region}.{cloud}" |
| 188 | + |
| 189 | + # Check for existing accounts with cloud names |
| 190 | + if cloud := next((c for c in ["aws", "gcp", "azure"] if c in name), ""): |
| 191 | + parts = name.split(cloud) |
| 192 | + account = parts[0].strip("-_.") |
| 193 | + |
| 194 | + if not (region := parts[1].strip("-_.").replace("_", "-")): |
| 195 | + return name |
| 196 | + return f"{account}.{region}.{cloud}" |
| 197 | + |
| 198 | + # Default case, return the original name |
| 199 | + return name |
0 commit comments