|
1 | 1 | from datetime import datetime, timezone |
2 | 2 | from functools import lru_cache |
3 | | -from typing import Dict, Iterable, List, Optional |
| 3 | +from typing import Dict, Iterable, List, Optional, Union |
4 | 4 |
|
5 | 5 | import dateutil.parser as dp |
6 | 6 | import pydantic |
@@ -60,6 +60,10 @@ class MetabaseConfig(DatasetLineageProviderConfigBase): |
60 | 60 | default=None, |
61 | 61 | description="Custom mappings between metabase database engines and DataHub platforms", |
62 | 62 | ) |
| 63 | + database_id_to_instance_map: Optional[Dict[str, str]] = Field( |
| 64 | + default=None, |
| 65 | + description="Custom mappings between metabase database id and DataHub platform instance", |
| 66 | + ) |
63 | 67 | default_schema: str = Field( |
64 | 68 | default="public", |
65 | 69 | description="Default schema name to use when schema is not provided in an SQL query", |
@@ -122,7 +126,9 @@ def __init__(self, ctx: PipelineContext, config: MetabaseConfig): |
122 | 126 | super().__init__(ctx) |
123 | 127 | self.config = config |
124 | 128 | self.report = SourceReport() |
| 129 | + self.setup_session() |
125 | 130 |
|
| 131 | + def setup_session(self) -> None: |
126 | 132 | login_response = requests.post( |
127 | 133 | f"{self.config.connect_uri}/api/session", |
128 | 134 | None, |
@@ -272,6 +278,16 @@ def _get_ownership(self, creator_id: int) -> Optional[OwnershipClass]: |
272 | 278 | user_info_response.raise_for_status() |
273 | 279 | user_details = user_info_response.json() |
274 | 280 | except HTTPError as http_error: |
| 281 | + if ( |
| 282 | + http_error.response is not None |
| 283 | + and http_error.response.status_code == 404 |
| 284 | + ): |
| 285 | + self.report.report_warning( |
| 286 | + key=f"metabase-user-{creator_id}", |
| 287 | + reason=f"User {creator_id} is blocked in Metabase or missing", |
| 288 | + ) |
| 289 | + return None |
| 290 | + # For cases when the error is not 404 but something else |
275 | 291 | self.report.report_failure( |
276 | 292 | key=f"metabase-user-{creator_id}", |
277 | 293 | reason=f"Unable to retrieve User info. " f"Reason: {str(http_error)}", |
@@ -524,6 +540,36 @@ def get_source_table_from_id(self, table_id): |
524 | 540 |
|
525 | 541 | return None, None |
526 | 542 |
|
| 543 | + @lru_cache(maxsize=None) |
| 544 | + def get_platform_instance( |
| 545 | + self, platform: Union[str, None] = None, datasource_id: Union[int, None] = None |
| 546 | + ) -> Union[str, None]: |
| 547 | + """ |
| 548 | + Method will attempt to detect `platform_instance` by checking |
| 549 | + `database_id_to_instance_map` and `platform_instance_map` mappings. |
| 550 | + If `database_id_to_instance_map` is defined it is first checked for |
| 551 | + `datasource_id` extracted from Metabase. If this mapping is not defined |
| 552 | + or corresponding key is not found, `platform_instance_map` mapping |
| 553 | + is checked for datasource platform. If no mapping found `None` |
| 554 | + is returned. |
| 555 | + :param str platform: DataHub platform name (e.g. `postgres` or `clickhouse`) |
| 556 | + :param int datasource_id: Numeric datasource ID received from Metabase API |
| 557 | + :return: platform instance name or None |
| 558 | + """ |
| 559 | + platform_instance = None |
| 560 | + # For cases when metabase has several platform instances (e.g. several individual ClickHouse clusters) |
| 561 | + if datasource_id is not None and self.config.database_id_to_instance_map: |
| 562 | + platform_instance = self.config.database_id_to_instance_map.get( |
| 563 | + str(datasource_id) |
| 564 | + ) |
| 565 | + |
| 566 | + # If Metabase datasource ID is not mapped to platform instace, fall back to platform mapping |
| 567 | + # Set platform_instance if configuration provides a mapping from platform name to instance |
| 568 | + if platform and self.config.platform_instance_map and platform_instance is None: |
| 569 | + platform_instance = self.config.platform_instance_map.get(platform) |
| 570 | + |
| 571 | + return platform_instance |
| 572 | + |
527 | 573 | @lru_cache(maxsize=None) |
528 | 574 | def get_datasource_from_id(self, datasource_id): |
529 | 575 | try: |
@@ -564,11 +610,8 @@ def get_datasource_from_id(self, datasource_id): |
564 | 610 | reason=f"Platform was not found in DataHub. Using {platform} name as is", |
565 | 611 | ) |
566 | 612 |
|
567 | | - # Set platform_instance if configuration provides a mapping from platform name to instance |
568 | | - platform_instance = ( |
569 | | - self.config.platform_instance_map.get(platform) |
570 | | - if self.config.platform_instance_map |
571 | | - else None |
| 613 | + platform_instance = self.get_platform_instance( |
| 614 | + platform, dataset_json.get("id", None) |
572 | 615 | ) |
573 | 616 |
|
574 | 617 | field_for_dbname_mapping = { |
|
0 commit comments