diff --git a/metadata-ingestion/docs/sources/glue/README.md b/metadata-ingestion/docs/sources/glue/README.md index 6343781346da21..606792e4861056 100644 --- a/metadata-ingestion/docs/sources/glue/README.md +++ b/metadata-ingestion/docs/sources/glue/README.md @@ -19,3 +19,79 @@ If you also have files in S3 that you'd like to ingest, we recommend you use Glu | Glue Job Transform | [Data Job](../../metamodel/entities/dataJob.md) | | | Glue Job Data source | [Dataset](../../metamodel/entities/dataset.md) | | | Glue Job Data sink | [Dataset](../../metamodel/entities/dataset.md) | | + +### Compatibility + +To capture lineage across Glue jobs and databases, a requirements must be met – otherwise the AWS API is unable to report any lineage. The job must be created in Glue Studio with the "Generate classic script" option turned on (this option can be accessed in the "Script" tab). Any custom scripts that do not have the proper annotations will not have reported lineage. + +### JDBC Lineage + +DataHub extracts upstream lineage for Glue job nodes that read from JDBC databases. Two node styles are supported: + +#### Named Glue Connections (Visual Editor) + +Glue Studio's visual editor stores connection references as `connection_options.connectionName`. DataHub calls the `GetConnection` API to resolve the connection and determine the platform and database. + +Supported connection types: + +| Glue `ConnectionType` | DataHub Platform | +| --------------------- | -------------------------------- | +| `JDBC` | Parsed from JDBC URL (see below) | +| `POSTGRESQL` | `postgres` | +| `MYSQL` | `mysql` | +| `REDSHIFT` | `redshift` | +| `ORACLE` | `oracle` | +| `SQLSERVER` | `mssql` | + +The table is read from `connection_options.dbtable`. If `dbtable` is absent, DataHub falls back to parsing `connection_options.query` (see [SQL Query Lineage](#sql-query-lineage) below). + +#### Inline JDBC Nodes (Script Style) + +Script-style nodes set `connection_type` to the database protocol and pass the JDBC URL inline via `connection_options.url`. Supported protocols: + +| `connection_type` | DataHub Platform | Default schema | +| ----------------- | ---------------- | -------------- | +| `postgresql` | `postgres` | `public` | +| `mysql` | `mysql` | — | +| `mariadb` | `mysql` | — | +| `redshift` | `redshift` | `public` | +| `oracle` | `oracle` | — | +| `sqlserver` | `mssql` | `dbo` | + +Example job script args that DataHub can parse: + +```python +datasource = glueContext.create_dynamic_frame.from_options( + connection_type="postgresql", + connection_options={ + "url": "jdbc:postgresql://myhost:5432/mydb", + "dbtable": "public.orders", + # or: "query": "SELECT * FROM public.orders WHERE region = 'US'" + }, +) +``` + +#### Dataset Name Construction + +Given a `dbtable` value and the resolved `(platform, database)`: + +- `dbtable = "schema.table"` → `database.schema.table` +- `dbtable = "table"` (no schema) → `database..table` if the platform has a default schema, otherwise `database.table` + +#### SQL Query Lineage + +When `dbtable` is absent and `connection_options.query` is set, DataHub uses [sqlglot](https://github.com/tobymao/sqlglot) to extract table references from the SQL string. + +**Supported:** Single-table queries, JOINs, CTEs, subqueries — all referenced tables are emitted as upstream datasets. + +```sql +-- All three tables become upstream lineage inputs +SELECT o.id, c.name, p.price +FROM orders o +JOIN customers c ON o.customer_id = c.id +JOIN products p ON o.product_id = p.id +``` + +**Not supported:** Queries that fail to parse, or queries with no table references (e.g. `SELECT 1`). These produce a warning and the node is skipped. + +> **Note:** `query`-based lineage reflects the tables referenced in the SQL at ingestion time. Dynamic SQL, parameterized queries, or queries built at runtime cannot be statically analyzed. diff --git a/metadata-ingestion/docs/sources/glue/glue_post.md b/metadata-ingestion/docs/sources/glue/glue_post.md index 22994a732d51f7..db02710b0080fd 100644 --- a/metadata-ingestion/docs/sources/glue/glue_post.md +++ b/metadata-ingestion/docs/sources/glue/glue_post.md @@ -6,10 +6,41 @@ Use the **Important Capabilities** table above as the source of truth for suppor To capture lineage across Glue jobs and databases, a requirements must be met – otherwise the AWS API is unable to report any lineage. The job must be created in Glue Studio with the "Generate classic script" option turned on (this option can be accessed in the "Script" tab). Any custom scripts that do not have the proper annotations will not have reported lineage. +#### JDBC Upstream Lineage + +When a Glue job reads from a JDBC source (e.g. PostgreSQL, MySQL, Redshift, Oracle, SQL Server), the plugin automatically extracts upstream lineage to the referenced tables. This works for both: + +- **Direct JDBC connections** specified inline in the job script (via `connection_type` and `connection_options`) +- **Named Glue connections** configured in the Glue console and referenced by `connectionName` + +Supported JDBC platforms: PostgreSQL, MySQL, MariaDB, Redshift, Oracle, SQL Server. + +The plugin resolves table references from either the `dbtable` parameter or by parsing SQL from the `query` parameter. + +#### Aligning URNs with Target Platform Connectors + +If you also ingest the JDBC source separately (e.g. using the `postgres` or `mysql` connector) and that connector uses a `platform_instance` or a different `env`, you should configure `target_platform_configs` so the URNs match: + +```yaml +source: + type: glue + config: + target_platform_configs: + postgres: + platform_instance: prod-postgres + env: PROD + mysql: + platform_instance: prod-mysql +``` + +When this is configured, dataset URNs produced by the Glue connector will include the same `platform_instance` and `env` as the target platform's connector, ensuring entities merge correctly in DataHub. If the target platform connector does not use a `platform_instance`, no configuration is needed — URNs will match by default. + ### Limitations Module behavior is constrained by source APIs, permissions, and metadata exposed by the platform. Refer to capability notes for unsupported or conditional features. +JDBC upstream lineage from SQL queries (`query` parameter) does not currently apply `target_platform_configs`. Only the `dbtable` code path uses the configured `platform_instance` and `env`. + ### Troubleshooting If ingestion fails, validate credentials, permissions, connectivity, and scope filters first. Then review ingestion logs for source-specific errors and adjust configuration accordingly. diff --git a/metadata-ingestion/docs/sources/glue/glue_pre.md b/metadata-ingestion/docs/sources/glue/glue_pre.md index 77643accd70b4c..3ae3b06b1cab64 100644 --- a/metadata-ingestion/docs/sources/glue/glue_pre.md +++ b/metadata-ingestion/docs/sources/glue/glue_pre.md @@ -8,6 +8,7 @@ This plugin extracts the following: - Column types associated with each table - Table metadata, such as owner, description and parameters - Jobs and their component transformations, data sources, and data sinks +- Upstream lineage from JDBC sources (e.g. PostgreSQL, MySQL, Redshift) referenced by Glue jobs ### Prerequisites @@ -40,12 +41,15 @@ For ingesting jobs (extract_transforms: True), the following additional permissi "Action": [ "glue:GetDataflowGraph", "glue:GetJobs", + "glue:GetConnection", "s3:GetObject", ], "Resource": "*" } ``` +The `glue:GetConnection` permission is required when Glue jobs reference named connections (e.g. JDBC connections configured in the Glue console). If your jobs only use inline connection parameters, this permission is not needed. + For profiling datasets, the following additional permissions are required: ``` diff --git a/metadata-ingestion/pyproject.toml b/metadata-ingestion/pyproject.toml index 466f643a23ed11..2320137cf5f3cf 100644 --- a/metadata-ingestion/pyproject.toml +++ b/metadata-ingestion/pyproject.toml @@ -631,6 +631,8 @@ glue = [ "boto3>=1.35.0,<2.0.0", "botocore!=1.23.0,<2.0.0", "cachetools<6.0.0", + "patchy==2.8.0", + "sqlglot[c]==30.0.3", "urllib3>=1.26,<3.0", ] diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 88719951907b37..e198136e9bc1f9 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -633,7 +633,7 @@ }, "flink": {"requests<3.0.0", "tenacity>=8.0.1,<9.0.0"}, "grafana": {"requests<3.0.0", *sqlglot_lib}, - "glue": aws_common | cachetools_lib, + "glue": aws_common | cachetools_lib | sqlglot_lib, # hdbcli is supported officially by SAP, sqlalchemy-hana is built on top but not officially supported "hana": sql_common | { diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 9868122b35e602..f12564561c698f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -29,7 +29,7 @@ from datahub.api.entities.external.lake_formation_external_entites import ( LakeFormationTag, ) -from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.common import AllowDenyPattern, ConfigModel from datahub.configuration.source_common import DatasetSourceConfigMixin from datahub.configuration.validate_field_rename import pydantic_renamed_field from datahub.emitter import mce_builder @@ -125,6 +125,7 @@ UpstreamClass, UpstreamLineageClass, ) +from datahub.sql_parsing.sqlglot_lineage import create_lineage_sql_parsed_result from datahub.utilities.delta import delta_type_to_hive_type from datahub.utilities.hive_schema_to_avro import get_schema_fields_for_hive_column from datahub.utilities.lossy_collections import LossyList @@ -136,6 +137,55 @@ VALID_PLATFORMS = [DEFAULT_PLATFORM, "athena"] GLUE_TABLE_TYPE_ICEBERG = "ICEBERG" +JDBC_PLATFORM_MAP: Dict[str, str] = { + "postgresql": "postgres", + "mysql": "mysql", + "mariadb": "mysql", + "redshift": "redshift", + "oracle": "oracle", + "sqlserver": "mssql", +} + +JDBC_DEFAULT_SCHEMA: Dict[str, str] = { + "postgres": "public", + "redshift": "public", + "mssql": "dbo", +} + +GLUE_NATIVE_CONNECTION_TYPE_MAP: Dict[str, str] = { + "POSTGRESQL": "postgres", + "MYSQL": "mysql", + "REDSHIFT": "redshift", + "ORACLE": "oracle", + "SQLSERVER": "mssql", +} + +JDBC_PREFIX = "jdbc:" + + +def _sanitize_jdbc_url(jdbc_url: str) -> str: + """Strip credentials and query parameters from a JDBC URL for safe logging.""" + inner = ( + jdbc_url[len(JDBC_PREFIX) :] if jdbc_url.startswith(JDBC_PREFIX) else jdbc_url + ) + parsed = urlparse(inner) + safe_netloc = parsed.hostname or "" + if parsed.port: + safe_netloc = f"{safe_netloc}:{parsed.port}" + return f"{JDBC_PREFIX}{parsed.scheme}://{safe_netloc}{parsed.path}" + + +class TargetPlatformConfig(ConfigModel): + """Config for aligning dataset URNs with a separately ingested platform.""" + + platform_instance: Optional[str] = Field( + default=None, + description="Platform instance used by the separate ingestion of this platform.", + ) + env: Optional[str] = Field( + default=None, + description="Environment used by the separate ingestion of this platform. Defaults to the Glue source env.", + ) class GlueSourceConfig( @@ -219,6 +269,17 @@ class GlueSourceConfig( description="When enabled, column-level lineage will be extracted between Glue table columns and storage location fields.", ) + target_platform_configs: Dict[str, TargetPlatformConfig] = Field( + default_factory=dict, + description=( + "Optional per-platform config for aligning dataset URNs with separately ingested platforms. " + "Keys are DataHub platform names (e.g. 'postgres', 'mysql', 'redshift'). " + "When provided, the platform_instance and env are applied to dataset URNs so they match " + "the URNs produced by the platform's own connector. " + "Only needed when the target platform's connector uses a platform_instance or a different env." + ), + ) + def is_profiling_enabled(self) -> bool: return self.profiling.enabled and is_profiling_enabled( self.profiling.operation_config @@ -328,6 +389,7 @@ def __init__(self, config: GlueSourceConfig, ctx: PipelineContext): self.lf_client = config.lakeformation_client self.extract_transforms = config.extract_transforms self.env = config.env + self._glue_connection_cache: Dict[str, Optional[Tuple[str, str]]] = {} self.platform_resource_repository: Optional[ "GluePlatformResourceRepository" @@ -551,6 +613,118 @@ def get_s3_uri(self, node_args): return s3_uri + def _parse_jdbc_url(self, jdbc_url: str) -> Tuple[str, str]: + """Parse a JDBC URL and return (platform, database). + + Strips the "jdbc:" prefix then uses urlparse to extract the protocol + (mapped to a DataHub platform name) and the database from the path. + """ + if not jdbc_url.startswith(JDBC_PREFIX): + raise ValueError(f"Not a valid JDBC URL: {_sanitize_jdbc_url(jdbc_url)}") + url = urlparse(jdbc_url[len(JDBC_PREFIX) :]) + protocol = url.scheme.lower() + platform = JDBC_PLATFORM_MAP.get(protocol, protocol) + database = url.path.lstrip("/").split("?")[0] + if not database: + props = dict( + part.split("=", 1) for part in url.netloc.split(";")[1:] if "=" in part + ) + database = props.get("databaseName", "") + return platform, database + + def _extract_urns_from_query( + self, query: str, platform: str, database: str, flow_urn: str, node_label: str + ) -> Optional[List[str]]: + """Parse a SQL query and return DataHub URNs for all referenced input tables. + + Uses dialect-aware parsing via sqlglot_lineage with default schema resolution. + Returns None on parse failure or when no tables are found. + """ + result = create_lineage_sql_parsed_result( + query=query, + default_db=database, + platform=platform, + platform_instance=None, + env=self.env, + default_schema=JDBC_DEFAULT_SCHEMA.get(platform), + schema_aware=False, + generate_column_lineage=False, + ) + if result.debug_info.error: + self.report_warning( + flow_urn, + f"Failed to parse SQL query for node {node_label}: {result.debug_info.error}. Skipping", + ) + return None + if not result.in_tables: + self.report_warning( + flow_urn, + f"No tables found in SQL query for node {node_label}. Skipping", + ) + return None + return result.in_tables + + def _resolve_glue_connection( + self, connection_name: str, flow_urn: str + ) -> Optional[Tuple[str, str]]: + """Resolve a named Glue connection to (platform, database).""" + if connection_name in self._glue_connection_cache: + return self._glue_connection_cache[connection_name] + + result: Optional[Tuple[str, str]] = None + try: + kwargs: Dict[str, Any] = {"Name": connection_name, "HidePassword": True} + if self.source_config.catalog_id: + kwargs["CatalogId"] = self.source_config.catalog_id + response = self.glue_client.get_connection(**kwargs) + connection = response["Connection"] + conn_type = connection.get("ConnectionType", "") + props = connection.get("ConnectionProperties", {}) + spark_props = connection.get("SparkProperties", {}) + + if conn_type == "JDBC": + jdbc_url = props.get("JDBC_CONNECTION_URL") or spark_props.get( + "JDBC_CONNECTION_URL" + ) + if not jdbc_url: + self.report_warning( + flow_urn, + f"Glue connection {connection_name!r} has no JDBC_CONNECTION_URL. Skipping", + ) + else: + try: + result = self._parse_jdbc_url(jdbc_url) + except Exception as e: + self.report_warning( + flow_urn, + f"Failed to parse JDBC URL for connection {connection_name!r}: {e}. Skipping", + ) + result = None + elif conn_type in GLUE_NATIVE_CONNECTION_TYPE_MAP: + platform = GLUE_NATIVE_CONNECTION_TYPE_MAP[conn_type] + database = props.get("DATABASE") + if not database: + self.report_warning( + flow_urn, + f"Glue connection {connection_name!r} has no DATABASE property. Skipping", + ) + else: + result = (platform, database) + else: + self.report_warning( + flow_urn, + f"Unsupported Glue connection type {conn_type!r} for connection {connection_name!r}. Skipping", + ) + except Exception as e: + self.report_warning( + flow_urn, + f"Failed to fetch Glue connection {connection_name!r}: {e}. Skipping", + ) + return None + + self._glue_connection_cache[connection_name] = result + return result + def get_dataflow_s3_names( self, dataflow_graph: Dict[str, Any] ) -> Iterator[Tuple[str, Optional[str]]]: @@ -575,6 +749,110 @@ def get_dataflow_s3_names( yield s3_uri, extension + def _make_dataset_urn_for_platform(self, platform: str, dataset_name: str) -> str: + """Build a dataset URN using target_platform_configs if available. + + If target_platform_configs has an entry for this platform, applies its + platform_instance and env so the URN matches what the platform's own + connector produces. + """ + target_config = self.source_config.target_platform_configs.get(platform) + return make_dataset_urn_with_platform_instance( + platform=platform, + name=dataset_name, + env=target_config.env if target_config and target_config.env else self.env, + platform_instance=( + target_config.platform_instance if target_config else None + ), + ) + + def _build_jdbc_dataset_name( + self, platform: str, database: str, dbtable: str + ) -> str: + if "." in dbtable: + schema, table = dbtable.rsplit(".", 1) + return f"{database}.{schema}.{table}" + default_schema = JDBC_DEFAULT_SCHEMA.get(platform) + if default_schema: + return f"{database}.{default_schema}.{dbtable}" + return f"{database}.{dbtable}" + + def _process_glue_connection_node( + self, node: Dict[str, Any], node_args: Dict[str, Any], flow_urn: str + ) -> Optional[List[str]]: + connection_options = node_args.get("connection_options", {}) + connection_name = connection_options.get("connectionName") + if not connection_name: + return None + node_label = f"{node['NodeType']}-{node['Id']}" + + resolved = self._resolve_glue_connection(connection_name, flow_urn) + if resolved is None: + return None + platform, database = resolved + + dbtable = connection_options.get("dbtable") + if dbtable: + return [ + self._make_dataset_urn_for_platform( + platform, + self._build_jdbc_dataset_name(platform, database, dbtable), + ) + ] + + query = connection_options.get("query") + if query: + return self._extract_urns_from_query( + query, platform, database, flow_urn, node_label + ) + + self.report_warning( + flow_urn, f"Missing dbtable or query for node {node_label}. Skipping" + ) + return None + + def _process_jdbc_node( + self, node: Dict[str, Any], node_args: Dict[str, Any], flow_urn: str + ) -> Optional[List[str]]: + connection_options = node_args.get("connection_options", {}) + jdbc_url = connection_options.get("url") + node_label = f"{node['NodeType']}-{node['Id']}" + + if not jdbc_url: + self.report_warning( + flow_urn, f"Missing JDBC URL for node {node_label}. Skipping" + ) + return None + + try: + platform, database = self._parse_jdbc_url(jdbc_url) + except ValueError as e: + self.report_warning( + flow_urn, + f"Failed to parse JDBC URL for node {node_label}: {e}. Skipping", + ) + return None + + dbtable = connection_options.get("dbtable") + if dbtable: + return [ + self._make_dataset_urn_for_platform( + platform, + self._build_jdbc_dataset_name(platform, database, dbtable), + ) + ] + + query = connection_options.get("query") + if query: + return self._extract_urns_from_query( + query, platform, database, flow_urn, node_label + ) + + self.report_warning( + flow_urn, f"Missing dbtable or query for node {node_label}. Skipping" + ) + return None + def process_dataflow_node( self, node: Dict[str, Any], @@ -584,6 +862,7 @@ def process_dataflow_node( s3_formats: DefaultDict[str, Set[Union[str, None]]], ) -> Optional[Dict[str, Any]]: node_type = node["NodeType"] + dataset_urns: Optional[List[str]] = None # for nodes representing datasets, we construct a dataset URN accordingly if node_type in ["DataSource", "DataSink"]: @@ -640,6 +919,24 @@ def process_dataflow_node( ) new_dataset_ids.append(f"{node['NodeType']}-{node['Id']}") + # if data object references a named Glue connection (visual editor style) + elif (node_args.get("connection_options") or {}).get("connectionName"): + _urns = self._process_glue_connection_node(node, node_args, flow_urn) + if not _urns: + return None + node_urn = _urns[0] + if len(_urns) > 1: + dataset_urns = _urns + + # if data object is a JDBC source (e.g. Postgres, MySQL, Redshift) + elif node_args.get("connection_type") in JDBC_PLATFORM_MAP: + _urns = self._process_jdbc_node(node, node_args, flow_urn) + if not _urns: + return None + node_urn = _urns[0] + if len(_urns) > 1: + dataset_urns = _urns + else: if self.source_config.ignore_unsupported_connectors: self.report_warning( @@ -656,7 +953,7 @@ def process_dataflow_node( flow_urn, job_id=f"{node['NodeType']}-{node['Id']}" ) - return { + result: Dict[str, Any] = { **node, "urn": node_urn, # to be filled in after traversing edges @@ -664,6 +961,9 @@ def process_dataflow_node( "inputDatasets": [], "outputDatasets": [], } + if dataset_urns is not None: + result["dataset_urns"] = dataset_urns + return result def process_dataflow_graph( self, @@ -718,7 +1018,9 @@ def process_dataflow_graph( # note that source nodes can't be data sinks if source_node_type == "DataSource": - target_node["inputDatasets"].append(source_node["urn"]) + target_node["inputDatasets"].extend( + source_node.get("dataset_urns", [source_node["urn"]]) + ) # keep track of input data jobs (as defined in schemas) else: target_node["inputDatajobs"].append(source_node["urn"]) diff --git a/metadata-ingestion/tests/unit/glue/test_glue_source.py b/metadata-ingestion/tests/unit/glue/test_glue_source.py index af7d948ec1469b..6114dec223e8e9 100644 --- a/metadata-ingestion/tests/unit/glue/test_glue_source.py +++ b/metadata-ingestion/tests/unit/glue/test_glue_source.py @@ -1,6 +1,7 @@ import json +from collections import defaultdict from pathlib import Path -from typing import Any, Dict, Optional, Tuple, Type, cast +from typing import Any, DefaultDict, Dict, List, Optional, Set, Tuple, Type, cast from unittest.mock import patch import pydantic @@ -14,9 +15,12 @@ from datahub.ingestion.graph.client import DataHubGraph from datahub.ingestion.sink.file import write_metadata_file from datahub.ingestion.source.aws.glue import ( + GLUE_NATIVE_CONNECTION_TYPE_MAP, + JDBC_PLATFORM_MAP, GlueProfilingConfig, GlueSource, GlueSourceConfig, + _sanitize_jdbc_url, ) from datahub.ingestion.source.state.sql_common_state import ( BaseSQLAlchemyCheckpointState, @@ -950,3 +954,483 @@ def mock_search_by_filter(*args, **kwargs): output_path=tmp_path / mce_file, golden_path=test_resources_dir / mce_golden_file, ) + + +def _make_jdbc_node( + node_id: str, + node_type: str, + connection_type: str, + jdbc_url: str, + dbtable: str, +) -> Dict[str, Any]: + return { + "Id": node_id, + "NodeType": node_type, + "Args": [ + { + "Name": "connection_type", + "Value": f'"{connection_type}"', + "Param": False, + }, + { + "Name": "connection_options", + "Value": f'{{"url": "{jdbc_url}", "dbtable": "{dbtable}"}}', + "Param": False, + }, + ], + "LineNumber": 1, + } + + +@pytest.mark.parametrize( + "jdbc_url, expected_platform, expected_database", + [ + ("jdbc:postgresql://myhost:5432/mydb", "postgres", "mydb"), + ("jdbc:mysql://myhost:3306/mydb", "mysql", "mydb"), + ("jdbc:mariadb://myhost:3306/mydb", "mysql", "mydb"), + ("jdbc:redshift://myhost:5439/mydb", "redshift", "mydb"), + ("jdbc:oracle://myhost:1521/mydb", "oracle", "mydb"), + ("jdbc:sqlserver://myhost:1433/mydb", "mssql", "mydb"), + ("jdbc:sqlserver://myhost:1433;databaseName=mydb", "mssql", "mydb"), + ("jdbc:postgresql://myhost:5432/mydb?sslmode=require", "postgres", "mydb"), + ], +) +def test_parse_jdbc_url( + jdbc_url: str, expected_platform: str, expected_database: str +) -> None: + source = glue_source() + platform, database = source._parse_jdbc_url(jdbc_url) + assert platform == expected_platform + assert database == expected_database + + +def test_parse_jdbc_url_invalid() -> None: + source = glue_source() + with pytest.raises(ValueError, match="Not a valid JDBC URL"): + source._parse_jdbc_url("postgresql://myhost/mydb") + + +@pytest.mark.parametrize( + "connection_type, jdbc_url, dbtable, expected_urn", + [ + ( + "postgresql", + "jdbc:postgresql://myhost:5432/mydb", + "public.customers", + "urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.customers,PROD)", + ), + ( + "postgresql", + "jdbc:postgresql://myhost:5432/mydb", + "customers", + "urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.customers,PROD)", + ), + ( + "mysql", + "jdbc:mysql://myhost:3306/mydb", + "myschema.orders", + "urn:li:dataset:(urn:li:dataPlatform:mysql,mydb.myschema.orders,PROD)", + ), + ( + "mysql", + "jdbc:mysql://myhost:3306/mydb", + "orders", + "urn:li:dataset:(urn:li:dataPlatform:mysql,mydb.orders,PROD)", + ), + ], +) +def test_process_dataflow_node_jdbc( + connection_type: str, + jdbc_url: str, + dbtable: str, + expected_urn: str, +) -> None: + source = glue_source() + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + node = _make_jdbc_node( + node_id="DataSource0", + node_type="DataSource", + connection_type=connection_type, + jdbc_url=jdbc_url, + dbtable=dbtable, + ) + + new_dataset_ids: List[str] = [] + new_dataset_mces: List[Any] = [] + s3_formats: DefaultDict[str, Set[Any]] = defaultdict(set) + + result = source.process_dataflow_node( + node, flow_urn, new_dataset_ids, new_dataset_mces, s3_formats + ) + + assert result is not None + assert result["urn"] == expected_urn + assert new_dataset_mces == [] + + +def test_process_dataflow_node_jdbc_missing_url() -> None: + source = glue_source() + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + node = { + "Id": "DataSource0", + "NodeType": "DataSource", + "Args": [ + {"Name": "connection_type", "Value": '"postgresql"', "Param": False}, + { + "Name": "connection_options", + "Value": '{"dbtable": "public.customers"}', + "Param": False, + }, + ], + "LineNumber": 1, + } + + result = source.process_dataflow_node(node, flow_urn, [], [], defaultdict(set)) + + assert result is None + assert source.report.warnings + + +def test_jdbc_platform_map_coverage() -> None: + source = glue_source() + for jdbc_protocol, expected_platform in JDBC_PLATFORM_MAP.items(): + url = f"jdbc:{jdbc_protocol}://host:1234/db" + platform, database = source._parse_jdbc_url(url) + assert platform == expected_platform, f"Failed for {jdbc_protocol}" + assert database == "db" + + +def _make_glue_connection_node( + node_id: str, + node_type: str, + connection_name: str, + dbtable: str, +) -> Dict[str, Any]: + return { + "Id": node_id, + "NodeType": node_type, + "Args": [ + { + "Name": "connection_options", + "Value": f'{{"connectionName": "{connection_name}", "dbtable": "{dbtable}"}}', + "Param": False, + }, + ], + "LineNumber": 1, + } + + +@pytest.mark.parametrize( + "dbtable, expected_urn", + [ + ( + "public.customers", + "urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.customers,PROD)", + ), + ( + "customers", + "urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.customers,PROD)", + ), + ], +) +def test_process_dataflow_node_glue_connection_jdbc( + dbtable: str, expected_urn: str +) -> None: + source = glue_source() + source.glue_client.get_connection = lambda **kw: { # type: ignore[method-assign] + "Connection": { + "ConnectionType": "JDBC", + "ConnectionProperties": { + "JDBC_CONNECTION_URL": "jdbc:postgresql://myhost:5432/mydb", + }, + } + } + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + node = _make_glue_connection_node( + "DataSource0", "DataSource", "My PG Connection", dbtable + ) + + result = source.process_dataflow_node(node, flow_urn, [], [], defaultdict(set)) + + assert result is not None + assert result["urn"] == expected_urn + + +@pytest.mark.parametrize( + "conn_type, dbtable, expected_urn", + [ + ( + "POSTGRESQL", + "public.orders", + "urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.orders,PROD)", + ), + ( + "POSTGRESQL", + "orders", + "urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.orders,PROD)", + ), + ( + "MYSQL", + "orders", + "urn:li:dataset:(urn:li:dataPlatform:mysql,mydb.orders,PROD)", + ), + ], +) +def test_process_dataflow_node_glue_connection_native( + conn_type: str, dbtable: str, expected_urn: str +) -> None: + source = glue_source() + source.glue_client.get_connection = lambda **kw: { # type: ignore[method-assign] + "Connection": { + "ConnectionType": conn_type, + "ConnectionProperties": { + "HOST": "myhost", + "DATABASE": "mydb", + }, + } + } + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + node = _make_glue_connection_node( + "DataSource0", "DataSource", "My Connection", dbtable + ) + + result = source.process_dataflow_node(node, flow_urn, [], [], defaultdict(set)) + + assert result is not None + assert result["urn"] == expected_urn + + +def test_process_dataflow_node_glue_connection_missing_dbtable() -> None: + source = glue_source() + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + node = { + "Id": "DataSource0", + "NodeType": "DataSource", + "Args": [ + { + "Name": "connection_options", + "Value": '{"connectionName": "My Connection"}', + "Param": False, + }, + ], + "LineNumber": 1, + } + + result = source.process_dataflow_node(node, flow_urn, [], [], defaultdict(set)) + + assert result is None + assert source.report.warnings + + +def test_process_dataflow_node_glue_connection_fetch_failure() -> None: + source = glue_source() + + def _raise(**kw: Any) -> None: + raise Exception("Connection not found") + + source.glue_client.get_connection = _raise # type: ignore[method-assign] + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + node = _make_glue_connection_node("DataSource0", "DataSource", "Missing", "mytable") + + result = source.process_dataflow_node(node, flow_urn, [], [], defaultdict(set)) + + assert result is None + assert source.report.warnings + + +def test_resolve_glue_connection_caching() -> None: + source = glue_source() + call_count = 0 + + def _get_connection(**kw: Any) -> Dict[str, Any]: + nonlocal call_count + call_count += 1 + return { + "Connection": { + "ConnectionType": "POSTGRESQL", + "ConnectionProperties": {"HOST": "h", "DATABASE": "db"}, + } + } + + source.glue_client.get_connection = _get_connection # type: ignore[method-assign] + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + + source._resolve_glue_connection("My Connection", flow_urn) + source._resolve_glue_connection("My Connection", flow_urn) + + assert call_count == 1 + + +def test_glue_native_connection_type_map_coverage() -> None: + source = glue_source() + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + for conn_type, expected_platform in GLUE_NATIVE_CONNECTION_TYPE_MAP.items(): + source._glue_connection_cache.clear() + + def _make_get_connection(ct: str) -> Any: + def _get_connection(**kw: Any) -> Dict[str, Any]: + return { + "Connection": { + "ConnectionType": ct, + "ConnectionProperties": {"HOST": "h", "DATABASE": "db"}, + } + } + + return _get_connection + + source.glue_client.get_connection = _make_get_connection(conn_type) # type: ignore[method-assign] + result = source._resolve_glue_connection(conn_type, flow_urn) + assert result is not None, f"Failed for {conn_type}" + assert result[0] == expected_platform + + +def test_resolve_glue_connection_spark_properties_fallback() -> None: + """SparkProperties (v2 schema) is used when ConnectionProperties has no JDBC_CONNECTION_URL.""" + source = glue_source() + source.glue_client.get_connection = lambda **kw: { # type: ignore[method-assign] + "Connection": { + "ConnectionType": "JDBC", + "ConnectionProperties": {}, + "SparkProperties": { + "JDBC_CONNECTION_URL": "jdbc:postgresql://myhost:5432/mydb" + }, + } + } + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + + result = source._resolve_glue_connection("My Connection", flow_urn) + + assert result == ("postgres", "mydb") + + +@pytest.mark.parametrize( + "query, expected_dbtable", + [ + ("SELECT * FROM public.customers WHERE id = 1", "public.customers"), + ("SELECT id, name FROM orders", "orders"), + ], +) +def test_process_dataflow_node_glue_connection_query_fallback( + query: str, expected_dbtable: str +) -> None: + """query ConnectionOption is used when dbtable is absent (single-table queries).""" + source = glue_source() + source.glue_client.get_connection = lambda **kw: { # type: ignore[method-assign] + "Connection": { + "ConnectionType": "JDBC", + "ConnectionProperties": { + "JDBC_CONNECTION_URL": "jdbc:postgresql://myhost:5432/mydb", + }, + } + } + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + node = { + "Id": "DataSource0", + "NodeType": "DataSource", + "Args": [ + { + "Name": "connection_options", + "Value": f'{{"connectionName": "My PG Connection", "query": "{query}"}}', + "Param": False, + }, + ], + "LineNumber": 1, + } + + result = source.process_dataflow_node(node, flow_urn, [], [], defaultdict(set)) + + assert result is not None + assert expected_dbtable in result["urn"] + + +def test_process_dataflow_node_glue_connection_query_multi_table() -> None: + """Multi-table JOIN queries produce multiple dataset URNs via dataset_urns.""" + source = glue_source() + source.glue_client.get_connection = lambda **kw: { # type: ignore[method-assign] + "Connection": { + "ConnectionType": "JDBC", + "ConnectionProperties": { + "JDBC_CONNECTION_URL": "jdbc:postgresql://myhost:5432/mydb", + }, + } + } + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + node = { + "Id": "DataSource0", + "NodeType": "DataSource", + "Args": [ + { + "Name": "connection_options", + "Value": '{"connectionName": "My PG Connection", "query": "SELECT a.id FROM orders a JOIN customers b ON a.cid = b.id"}', + "Param": False, + }, + ], + "LineNumber": 1, + } + + result = source.process_dataflow_node(node, flow_urn, [], [], defaultdict(set)) + + assert result is not None + assert not source.report.warnings + orders_urn = "urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.orders,PROD)" + customers_urn = ( + "urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.customers,PROD)" + ) + assert result["urn"] in (orders_urn, customers_urn) + assert set(result["dataset_urns"]) == {orders_urn, customers_urn} + + +def test_process_dataflow_node_jdbc_query_fallback() -> None: + """query ConnectionOption is used in the direct JDBC path when dbtable is absent.""" + source = glue_source() + flow_urn = "urn:li:dataFlow:(glue,test-job,PROD)" + node = { + "Id": "DataSource0", + "NodeType": "DataSource", + "Args": [ + { + "Name": "connection_type", + "Value": '"postgresql"', + "Param": False, + }, + { + "Name": "connection_options", + "Value": '{"url": "jdbc:postgresql://myhost:5432/mydb", "query": "SELECT * FROM public.orders"}', + "Param": False, + }, + ], + "LineNumber": 1, + } + + result = source.process_dataflow_node(node, flow_urn, [], [], defaultdict(set)) + + assert result is not None + assert ( + result["urn"] + == "urn:li:dataset:(urn:li:dataPlatform:postgres,mydb.public.orders,PROD)" + ) + + +@pytest.mark.parametrize( + "raw_url, expected_safe", + [ + ( + "jdbc:postgresql://myhost:5432/mydb", + "jdbc:postgresql://myhost:5432/mydb", + ), + ( + "jdbc:postgresql://admin:secret123@myhost:5432/mydb", + "jdbc:postgresql://myhost:5432/mydb", + ), + ( + "jdbc:postgresql://myhost:5432/mydb?user=admin&password=secret", + "jdbc:postgresql://myhost:5432/mydb", + ), + ( + "jdbc:postgresql://admin:secret@myhost:5432/mydb?ssl=true&password=extra", + "jdbc:postgresql://myhost:5432/mydb", + ), + ], +) +def test_sanitize_jdbc_url(raw_url: str, expected_safe: str) -> None: + assert _sanitize_jdbc_url(raw_url) == expected_safe diff --git a/metadata-ingestion/uv.lock b/metadata-ingestion/uv.lock index 65c2c5b0350c54..179b6613d2ffcb 100644 --- a/metadata-ingestion/uv.lock +++ b/metadata-ingestion/uv.lock @@ -1082,6 +1082,8 @@ glue = [ { name = "boto3" }, { name = "botocore" }, { name = "cachetools" }, + { name = "patchy" }, + { name = "sqlglot", extra = ["c"] }, { name = "urllib3" }, ] grafana = [ @@ -2667,6 +2669,7 @@ requires-dist = [ { name = "patchy", marker = "extra == 'dremio'", specifier = "==2.8.0" }, { name = "patchy", marker = "extra == 'druid'", specifier = "==2.8.0" }, { name = "patchy", marker = "extra == 'fivetran'", specifier = "==2.8.0" }, + { name = "patchy", marker = "extra == 'glue'", specifier = "==2.8.0" }, { name = "patchy", marker = "extra == 'grafana'", specifier = "==2.8.0" }, { name = "patchy", marker = "extra == 'hana'", specifier = "==2.8.0" }, { name = "patchy", marker = "extra == 'hive'", specifier = "==2.8.0" }, @@ -3176,6 +3179,7 @@ requires-dist = [ { name = "sqlglot", extras = ["c"], marker = "extra == 'dremio'", specifier = "==30.0.3" }, { name = "sqlglot", extras = ["c"], marker = "extra == 'druid'", specifier = "==30.0.3" }, { name = "sqlglot", extras = ["c"], marker = "extra == 'fivetran'", specifier = "==30.0.3" }, + { name = "sqlglot", extras = ["c"], marker = "extra == 'glue'", specifier = "==30.0.3" }, { name = "sqlglot", extras = ["c"], marker = "extra == 'grafana'", specifier = "==30.0.3" }, { name = "sqlglot", extras = ["c"], marker = "extra == 'hana'", specifier = "==30.0.3" }, { name = "sqlglot", extras = ["c"], marker = "extra == 'hive'", specifier = "==30.0.3" },