diff --git a/metadata-ingestion/setup.py b/metadata-ingestion/setup.py index 0c1813da1d6a96..b75398a10005aa 100644 --- a/metadata-ingestion/setup.py +++ b/metadata-ingestion/setup.py @@ -263,6 +263,14 @@ slack = {"slack-sdk==3.18.1"} +ssas = { + "xmltodict==0.13.0", + "requests==2.28.1", + "beautifulsoup4==4.11.1", + "lxml==4.9.1", + "requests-kerberos==0.14.0" +} + databricks = { # 0.1.11 appears to have authentication issues with azure databricks "databricks-sdk>=0.9.0", @@ -393,6 +401,7 @@ "sqlalchemy": sql_common, "sql-queries": usage_common | sqlglot_lib, "slack": slack, + "ssas": ssas, "superset": { "requests", "sqlalchemy", @@ -538,6 +547,7 @@ "s3", "snowflake", "slack", + "ssas", "tableau", "teradata", "trino", @@ -580,6 +590,7 @@ "ldap", "mongodb", "slack", + #"ssas", "mssql", "mysql", "mariadb", @@ -637,6 +648,8 @@ "redash = datahub.ingestion.source.redash:RedashSource", "redshift = datahub.ingestion.source.redshift.redshift:RedshiftSource", "slack = datahub.ingestion.source.slack.slack:SlackSource", + "ssas_multidimension = datahub.ingestion.source.ssas.ssas_multidimension.ssas_multidimension:SsasMultidimensionSource", + "ssas_tabular = datahub.ingestion.source.ssas.ssas_tabular.ssas_tabular:SsasTabularSource", "snowflake = datahub.ingestion.source.snowflake.snowflake_v2:SnowflakeV2Source", "superset = datahub.ingestion.source.superset:SupersetSource", "tableau = datahub.ingestion.source.tableau:TableauSource", diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/api.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/api.py new file mode 100644 index 00000000000000..429280d7a5ab64 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/api.py @@ -0,0 +1,54 @@ +""" +Module xmla communicate classes. +""" + +from abc import ABC, abstractmethod +from typing import Any, Dict, Union + +import xmltodict +from requests.auth import HTTPBasicAuth +from requests_kerberos import HTTPKerberosAuth + +from .config import SsasServerHTTPConfig +from .tools import MsXmlaTemplates +from .xmlaclient import XmlaClient + + +class ISsasAPI(ABC): + @abstractmethod + def get_server(self): + pass + + @property + @abstractmethod + def auth_credentials(self): + pass + + +class SsasXmlaAPI: + """ + Class for parse ssas xmla server response + """ + + def __init__(self, config: SsasServerHTTPConfig, auth: Union[HTTPKerberosAuth, HTTPBasicAuth]): + self.__config = config + self.__auth = auth + self.__client = XmlaClient(config=config, auth=self.__auth) + + def get_server_info(self) -> Dict[str, Any]: + """ + Extract server metadata info from response + """ + + server_data_xml = xmltodict.parse(self.get_server_metadata()) + + return server_data_xml["soap:Envelope"]["soap:Body"]["DiscoverResponse"][ + "return" + ]["root"]["row"]["xars:METADATA"]["Server"] + + def get_server_metadata(self) -> str: + """ + Get ssas server metadata + """ + + return str(self.__client.discover(query=MsXmlaTemplates.QUERY_METADATA)) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/config.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/config.py new file mode 100644 index 00000000000000..fbd62c3c8c2db7 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/config.py @@ -0,0 +1,68 @@ +from typing import List, Dict + +import datahub.emitter.mce_builder as builder +import pydantic +from datahub.configuration.common import AllowDenyPattern +from datahub.configuration.source_common import EnvConfigMixin + + +class SsasServerHTTPConfig(EnvConfigMixin): + """ + Class represent config object. + Contains parameters for connect to ssas over http (HTTP Access to Analysis Services) + https://learn.microsoft.com/en-us/analysis-services/instances/configure-http-access-to-analysis-services-on-iis-8-0?view=asallproducts-allversions + + username - Active Directory user login + username - Active Directory user password + host_port - XMLA gateway url (format - url:port) + server_alias - XMLA gateway server alias + virtual_directory_name - + instance - not used ??? + use_https - set true if use XMLA gateway over https + dns_suffixes - list dns zone if use ssas servers in different domains. + Used to search for the main domain for the ssas server if it is not specified in the cube properties + + """ + username: str = pydantic.Field(description="Windows account username") + password: str = pydantic.Field(description="Windows account password") + instance: str + host_port: str = pydantic.Field( + default="localhost:81", description="XMLA gateway url" + ) + server_alias: str = pydantic.Field(default="localhost") + + virtual_directory_name: str = pydantic.Field( + default="ssas", description="Report Virtual Directory URL name" + ) + ssas_instance: str + use_https: bool = pydantic.Field(default=True) + ssas_instance_auth_type: str = pydantic.Field(default="HTTPKerberosAuth", description="SSAS instance auth type") + + dns_suffixes: List = pydantic.Field(default_factory=list) + default_ssas_instances_by_server: Dict = pydantic.Field(default_factory=dict) + + @pydantic.validator('ssas_instance_auth_type') + def check_ssas_instance_auth_type(cls, v): + if v not in ["HTTPBasicAuth", "HTTPKerberosAuth"]: + raise ValueError("Support only HTTPBasicAuth or HTTPKerberosAuth auth type") + return v + + @property + def use_dns_resolver(self) -> bool: + return bool(self.dns_suffixes) + + @property + def base_api_url(self) -> str: + protocol = "https" if self.use_https else "http" + return f"{protocol}://{self.host_port}/{self.virtual_directory_name}/{self.ssas_instance}/msmdpump.dll" + + @property + def host(self) -> str: + return self.server_alias or self.host_port.split(":")[0] + + +class SsasServerHTTPSourceConfig(SsasServerHTTPConfig): + platform_name: str = "ssas" + platform_urn: str = builder.make_data_platform_urn(platform=platform_name) + report_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() + chart_pattern: AllowDenyPattern = AllowDenyPattern.allow_all() diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/domains.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/domains.py new file mode 100644 index 00000000000000..cd13d9d4c0c0dd --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/domains.py @@ -0,0 +1,360 @@ +from dataclasses import asdict, dataclass, field, fields +from typing import Any, Dict, List, Optional, Union + +from datahub.emitter.mce_builder import make_data_platform_urn, make_dataset_urn, make_dataplatform_instance_urn +from datahub.metadata.schema_classes import ( + DataPlatformInstanceClass, + DatasetPropertiesClass, + UpstreamClass, + UpstreamLineageClass, +) + +# pylint: disable=C0103 + +STATIC_TYPES = ("TABLE_TYPE", "VIEW", "USER_TABLE") + + +@dataclass +class Server: + """ + Server information. + """ + + name: str + id: str + version: str + type: str + default_compatibility_level: str + supported_compatibility_levels: str + + @property + def as_dict(self) -> Dict[str, Any]: + """ + Get representations as dictionary. + + :return: data in dictionary. + """ + return asdict(self) + + +@dataclass +class Catalog: + """ + Catalog information. + """ + + instance: str + host_port: str + name: str + env: str + source: str = "ssas" + + @property + def formatted_name(self) -> str: + """ + Get formatted catalog name. + + :return: string name. + """ + return self.name.replace(",", "-") + + @property + def formatted_instance(self) -> str: + """ + Get formatted catalog instance. + + :return: string instance. + """ + return self.instance.replace("\\", "|") + + @property + def full_type(self) -> str: + """ + Get catalog type. + + :return: string type. + """ + return f"({self.source},{self.formatted_name},{self.env})" + + @property + def orchestrator(self) -> str: + """ + Get catalog orchestrator. + + :return: string orchestrator. + """ + return self.source + + @property + def cluster(self) -> str: + """ + Get catalog cluster. + + :return: string cluster. + """ + return f"{self.env}/{self.host_port}/{self.formatted_instance}" + + +@dataclass +class CubeDependency: + """ + Cube dependency. + """ + + db: str + schema: str + name: str + env: str + type: str + server: str + source: str + instance: Optional[str] + + def get_instance(self) -> str: + """ + Get dependency instance. + + :return: string instance. + """ + return "default" if self.instance is None else self.instance.lower() + + +@dataclass +class OLAPLineageStream: + """ + Lineage representation. + """ + + dependencies: List[CubeDependency] + + @property + def as_datasets_urn_list(self) -> List[str]: + """ + Get representation as list of lineage urns. + + :return: list of urns. + """ + return [ + make_dataset_urn( + platform=dep.source, + name=f"{dep.server}.{dep.get_instance()}.{dep.db}.{dep.schema}.{dep.name}", + env=dep.env, + ) + for dep in self.dependencies + if dep.type in STATIC_TYPES + ] + + @property + def as_property(self) -> Dict[str, str]: + """ + Get representation as dictionary. + + :return: dictionary of properties. + """ + return { + f"{dep.db}.{dep.schema}.{dep.name}": dep.type for dep in self.dependencies + } + + +@dataclass +class Cube: + """ + Datahub cube. + """ + + instance: str + host_port: str + name: str + env: str + flow: Catalog + type: str = "CUBE" + source: str = "ssas" + + @property + def full_type(self) -> str: + """ + Get cube type. + + :return: string type. + """ + return self.source.upper() + "_" + self.type + + @property + def formatted_name(self) -> str: + """ + Get cube formatted name. + + :return: string name. + """ + return self.name.replace(",", "-") + + @property + def full_name(self) -> str: + """ + Get cube full name. + + :return: string name. + """ + return f"{self.flow.formatted_instance}.{self.flow.formatted_name}.{self.formatted_name}" + + +@dataclass +class SSASDataSet: + """ + Datahub dataset. + """ + + entity: Cube + platform: str = "ssas" + type: str = "dataset" + external_url: str = "" + incoming: List[str] = field(default_factory=list) + set_properties: Dict[str, str] = field(default_factory=dict) + + @property + def name(self) -> str: + """ + Get dataset name. + + :return: string name. + """ + # TODO: add server to urn + # return self.entity.formatted_name + return self.entity.full_name + + def add_property(self, name: str, value: Union[str, float, int]) -> None: + """ + Add property to dataset. + + :param name: property name. + :param value: propery value + """ + self.set_properties[name] = str(value) if value is not None else "" + + @property + def data_platform(self) -> str: + """ + Get dataplatform of object. + + :return: string dataplatform. + """ + return ( + self.platform[self.platform.rindex(":") + 1:] + if self.platform.startswith("urn:") + else self.platform + ) + + @property + def dataplatform_urn(self) -> str: + """ + Get dataplatform urn of object. + + :return: string dataplatform urn. + """ + return make_data_platform_urn(self.data_platform) + + @property + def urn(self) -> str: + """ + Get urn of object. + + :return: string urn. + """ + return make_dataset_urn(self.data_platform, self.name, self.entity.env) + + @property + def as_dataplatform_data(self) -> Dict[str, Any]: + """ + Get data representation for dataPlatformInstance aspect. + + :return: data in dictionary. + """ + + return dict( + aspectName="dataPlatformInstance", + aspect=DataPlatformInstanceClass( + platform=self.dataplatform_urn, + instance=make_dataplatform_instance_urn( + self.platform, + self.entity.instance + ) if self.entity.instance else None, + ) + ) + + @property + def as_upstream_lineage_aspect_data(self) -> Dict[str, Any]: + """ + Get data representation for upstreamLineage aspect. + + :return: data in dictionary. + """ + return dict( + aspectName="upstreamLineage", + aspect=UpstreamLineageClass( + upstreams=[ + UpstreamClass(dataset, "VIEW") for dataset in sorted(self.incoming) + ], + ), + ) + + @property + def as_dataset_properties_data(self) -> Dict[str, Any]: + """ + Get data representation for datasetProperties aspect. + + :return: data in dictionary. + """ + return dict( + aspectName="datasetProperties", + aspect=DatasetPropertiesClass( + externalUrl=self.external_url, + customProperties=self.set_properties, + name=self.entity.formatted_name, + ), + ) + + +@dataclass +class Datasource: + """ + Datasource information. + """ + + name: str + db_table_name: str + friendly_name: str + db_schema_name: str + table_type: str + + +@dataclass +class DSConnection: + """ + Connection information. + """ + + provider: str + data_source: str + integrated_security: str + initial_catalog: str + + @property + def type(self) -> str: + """ + Get type of connection. + + :return: string type name. + """ + if "sql" in self.provider.lower(): + return "mssql" + return "unknown" + + @classmethod + def from_tuple(cls, data: List[str]) -> "DSConnection": + """ + Create instance from tuple. + + :param data: data in tuple. + :return: DSConnection object. + """ + return cls(**{key.name: data[i] for i, key in enumerate(fields(cls))}) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/parser.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/parser.py new file mode 100644 index 00000000000000..ff28dcbec8e58d --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/parser.py @@ -0,0 +1,23 @@ +""" +Module of parser class. +""" +from typing import Any, Dict + +import xmltodict +from bs4 import BeautifulSoup as bs + + +class MdXmlaParser: + """ + Multidimensional server parser + """ + + def get_server(self, xmla_str: str) -> Dict[str, Any]: + """ + Get server data from xmla structure. + + :param xmla_str: string xmla data. + :return: server data in dictionary. + """ + bs_content = bs(xmla_str, "xml") + return xmltodict.parse(str(bs_content.find("Server")))["Server"] diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_core.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_core.py new file mode 100644 index 00000000000000..fd7ec4429160cf --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_core.py @@ -0,0 +1,186 @@ +""" +MetaData Ingestion From the Microsoft SSAS Server. +""" + +import logging +from dataclasses import dataclass, field as dataclass_field +from typing import Any, Dict, Iterable, List, Optional + +from datahub.emitter.mcp import MetadataChangeProposalWrapper +from datahub.emitter.mcp_builder import PlatformKey, gen_containers +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SourceCapability, + SupportStatus, + capability, + config_class, + platform_name, + support_status, +) +from datahub.ingestion.api.source import Source, SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.schema_classes import ChangeTypeClass + +from .config import SsasServerHTTPSourceConfig +from .domains import SSASDataSet + +LOGGER = logging.getLogger(__name__) +LOGGER.setLevel(logging.DEBUG) + + +class SSASContainerKey(PlatformKey): + name: str + + +class Mapper: + """ """ + + class EquableMetadataWorkUnit(MetadataWorkUnit): + """ """ + + def __eq__(self, instance): + return self.id == self.id + + def __hash__(self): + return id(self.id) + + def __init__(self, config: SsasServerHTTPSourceConfig): + self.__config = config + + @staticmethod + def new_mcp( + entity_type, + entity_urn, + aspect_name, + aspect, + change_type=ChangeTypeClass.UPSERT, + ): + """ + Create MCP + """ + return MetadataChangeProposalWrapper( + entityType=entity_type, + changeType=change_type, + entityUrn=entity_urn, + aspectName=aspect_name, + aspect=aspect, + ) + + def __to_work_unit( + self, mcp: MetadataChangeProposalWrapper + ) -> EquableMetadataWorkUnit: + return Mapper.EquableMetadataWorkUnit( + id="{PLATFORM}-{ENTITY_URN}-{ASPECT_NAME}".format( + PLATFORM=self.__config.platform_name, + ENTITY_URN=mcp.entityUrn, + ASPECT_NAME=mcp.aspectName, + ), + mcp=mcp, + ) + + def construct_set_workunits( + self, + data_set: SSASDataSet, + ) -> Iterable[MetadataWorkUnit]: + + mcp = MetadataChangeProposalWrapper( + entityType=data_set.type, + entityUrn=data_set.urn, + changeType=ChangeTypeClass.UPSERT, + **data_set.as_dataplatform_data, + ) + + wu = MetadataWorkUnit( + id=f"{data_set.platform}.{data_set.name}.{mcp.aspectName}", mcp=mcp + ) + LOGGER.debug(f"as_dataplatform_data methadata: {wu.get_metadata()}") + yield wu + + mcp = MetadataChangeProposalWrapper( + entityType=data_set.type, + entityUrn=data_set.urn, + changeType=ChangeTypeClass.UPSERT, + **data_set.as_dataset_properties_data, + ) + + wu = MetadataWorkUnit( + id=f"{data_set.platform}.{data_set.name}.{mcp.aspectName}", mcp=mcp + ) + LOGGER.debug(f"as_dataset_properties_data methadata: {wu.get_metadata()}") + yield wu + mcp = MetadataChangeProposalWrapper( + entityType=data_set.type, + entityUrn=data_set.urn, + changeType=ChangeTypeClass.UPSERT, + **data_set.as_upstream_lineage_aspect_data, + ) + + wu = MetadataWorkUnit( + id=f"{data_set.platform}.{data_set.name}.{mcp.aspectName}", mcp=mcp + ) + LOGGER.debug(f"as_upstream_lineage_aspect_data methadata: {wu.get_metadata()}") + yield wu + + +@dataclass +class SsasSourceReport(SourceReport): + scanned_report: int = 0 + filtered_reports: List[str] = dataclass_field(default_factory=list) + + def report_scanned(self, count: int = 1) -> None: + self.scanned_report += count + + def report_dropped(self, view: str) -> None: + self.filtered_reports.append(view) + + +@platform_name("SSAS") +@config_class(SsasServerHTTPSourceConfig) +@support_status(SupportStatus.UNKNOWN) +@capability(SourceCapability.OWNERSHIP, "Enabled by default") +class SsasSource(Source): + source_config: SsasServerHTTPSourceConfig + + def __init__(self, config: SsasServerHTTPSourceConfig, ctx: PipelineContext): + super().__init__(ctx) + self.source_config = config + self.report = SsasSourceReport() + self.mapper = Mapper(config) + self.processed_containers: List[str] = [] + + @classmethod + def create(cls, config_dict: Dict[str, Any], ctx: PipelineContext): + config = SsasServerHTTPSourceConfig.parse_obj(config_dict) + return cls(config, ctx) + + def create_emit_containers( + self, + container_key, + name: str, + sub_types: List[str], + parent_container_key: Optional[PlatformKey] = None, + domain_urn: Optional[str] = None, + ) -> Iterable[MetadataWorkUnit]: + if container_key.guid() not in self.processed_containers: + container_wus = gen_containers( + container_key=container_key, + name=name, + sub_types=sub_types, + parent_container_key=parent_container_key, + domain_urn=domain_urn, + ) + self.processed_containers.append(container_key.guid()) + LOGGER.debug(f"Creating container with key: {container_key}") + for wu in container_wus: + self.report.report_workunit(wu) + yield wu + + def gen_key(self, name): + return SSASContainerKey( + platform="ssas", + name=name, + instance=self.source_config.ssas_instance, + ) + + def get_report(self) -> SsasSourceReport: + return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/api.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/api.py new file mode 100644 index 00000000000000..800aeb7b6fbb2c --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/api.py @@ -0,0 +1,61 @@ +""" +Module for dao layer of multidimension ms ssas. +""" +from typing import Any, Dict, Union + +from requests.auth import HTTPBasicAuth +from requests_kerberos import HTTPKerberosAuth +from ssas.api import ISsasAPI, SsasXmlaAPI +from ssas.config import SsasServerHTTPConfig +from ssas.parser import MdXmlaParser + +from .domains import XMLAServer + + +class MultidimensionSsasAPI(ISsasAPI): + """ + API endpoints to fetch catalogs, cubes, dimension, measures. + """ + + def __init__(self, config: SsasServerHTTPConfig): + self.__config = config + + self.__auth = self.__get_auth() + self.__xmla_api = SsasXmlaAPI(config=self.__config, auth=self.__auth) + self.__xmla_parser = MdXmlaParser() + + def __get_auth(self) -> Union[HTTPBasicAuth, HTTPKerberosAuth]: + if self.__config.ssas_instance_auth_type == "HTTPBasicAuth": + return HTTPBasicAuth( + username=self.__config.username, + password=self.__config.password, + ) + if self.__config.ssas_instance_auth_type == "HTTPKerberosAuth": + return HTTPKerberosAuth() + raise TypeError(f"Unsupported auth type: {self.__config.ssas_instance_auth_type}") + + def get_server(self) -> XMLAServer: + """ + Get server metadata. + + :return: structured metadata dataclass. + """ + xmla_data = self.__xmla_api.get_server_metadata() + return XMLAServer(**self.__xmla_parser.get_server(xmla_data)) + + @property + def auth_credentials(self) -> HTTPKerberosAuth: + """ + Get authorization credentials. + + :return: authorization dataclass. + """ + return self.__auth + + def get_server_info(self) -> Dict[str, Any]: + """ + Get server information from metadata. + + :return: server metadata as dictionary. + """ + return self.__xmla_api.get_server_info() diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/domains.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/domains.py new file mode 100644 index 00000000000000..63055f1f9f08ad --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/domains.py @@ -0,0 +1,672 @@ +""" +Module for domain layer of multidimension ms ssas. +""" +from dataclasses import dataclass +from typing import Any, Dict, List, Optional, Union + +from pydantic import BaseModel, Field, validator + +from ssas.domains import DSConnection + +# pylint: disable=C0103 + + +class XMLAAttribute(BaseModel): + """ + Class representation of xmla attribute. + """ + + attribute_id: Optional[str] = Field(alias="AttributeID") + canonic_id: Optional[str] = Field(alias="ID") + source: Optional[Union[str, List[str]]] = Field(alias="KeyColumns") + + @property + def id(self) -> Optional[str]: + """ + Get identifier. + + :return: entity identifier. + """ + return self.canonic_id or self.attribute_id + + @validator("source", pre=True) + def validate_source(cls, value): + """ + Validate datasource value. + Extract data from xmla structure. + + :return: string datasource or list of datasources. + """ + if value is None: + return value + res = [] + if isinstance(value, list): + for column in value: + key_column = column["KeyColumn"] + if isinstance(key_column, list): + for source in key_column: + table_id = source["Source"].get("TableID") + if table_id: + res.append(table_id) + continue + table_id = key_column["Source"].get("TableID") + if table_id: + res.append(table_id) + return res or None + key_column = value["KeyColumn"] + if isinstance(key_column, list): + for source in key_column: + table_id = source["Source"].get("TableID") + if table_id: + res.append(table_id) + return res or None + return key_column["Source"].get("TableID") + + +class XMLADimension(BaseModel): + """ + Class representation of xmla dimension. + """ + + canonic_name: Optional[str] = Field(alias="Name") + incube_name: Optional[str] = Field(alias="CubeDimensionID") + canonic_id: Optional[str] = Field(alias="ID") + dimension_id: Optional[str] = Field(alias="DimensionID") + attributes: List[XMLAAttribute] = Field(default_factory=list, alias="Attributes") + + @property + def sources(self) -> List[str]: + """ + Get list of datasources. + + :return: list of string datasources. + """ + sources = [] + for attribute in self.attributes: + current_source = attribute.source + if current_source: + if isinstance(current_source, list): + sources += current_source + continue + sources.append(current_source) + + return sources + + @validator("attributes", pre=True) + def validate_attributes(cls, value): + """ + Validate attributes value. + Extract data from xmla structure. + Convert to list if it is not list instance. + + :return: list type value. + """ + attributes = value["Attribute"] + if isinstance(attributes, list): + return attributes + return [attributes] + + @property + def name(self) -> Optional[str]: + """ + Get name. + + :return: entity name. + """ + return self.canonic_name or self.incube_name + + @property + def id(self) -> Optional[str]: + """ + Get identifier. + + :return: entity identifier. + """ + return self.canonic_id or self.dimension_id + + @property + def additional_info(self) -> Dict[str, Any]: + """ + Group of additional information of entity. + + :return: dictionary with information. + """ + return dict( + dimension_id=self.dimension_id, + ) + + +class XMLADimensionsContainer(BaseModel): + """ + Class representation of xmla dimensions group. + """ + + dimensions: List[XMLADimension] = Field(alias="Dimension") + current_index: int = 0 + + @property + def sources(self) -> List[str]: + """ + Get list of datasources. + + :return: list of string datasources. + """ + sources = [] + for dimension in self.dimensions: + sources += dimension.sources + return sources + + @validator("dimensions", pre=True) + def validate_dimensions(cls, value): + """ + Validate dimensions value. + Convert to list if it is not list instance. + + :return: list type value. + """ + if isinstance(value, list): + return value + return [value] + + def __iter__(self) -> "XMLADimensionsContainer": # type: ignore + self.current_index = 0 + return self + + def __next__(self) -> XMLADimension: + if self.current_index < len(self.dimensions): + next_item = self.dimensions[self.current_index] + self.current_index += 1 + return next_item + raise StopIteration + + +class XMLAMeasure(BaseModel): + """ + Class representation of xmla measure. + """ + + name: str = Field(alias="Name") + id: str = Field(alias="ID") + type: str = Field(alias="DataType") + source: Optional[Union[str, List[str]]] = Field(alias="Source") + + @validator("source", pre=True) + def validate_source(cls, value): + """ + Validate datasources value. + Extract data from xmla structure. + + :return: string datasource or list of datasources. + """ + source_data = value["Source"] + if isinstance(source_data, list): + res = [] + for source in source_data: + table_id = source.get("TableID") + if table_id: + res.append(table_id) + return res + return source_data.get("TableID") + + @property + def additional_info(self) -> Dict[str, Any]: + """ + Group of additional information of entity. + + :return: dictionary with information. + """ + return dict( + id=self.id, + type=self.type, + ) + + +class XMLAMeasures(BaseModel): + """ + Class representation of xmla measures group. + """ + + measures: List[XMLAMeasure] = Field(alias="MeasureGroup") + dimensions: List[XMLADimensionsContainer] = Field(alias="MeasureGroup") + current_index: int = 0 + + @property + def sources(self) -> List[str]: + """ + Get list of datasources. + + :return: list of string datasources. + """ + sources = [] + for measure in self.measures: + source = measure.source + if source is None: + continue + if isinstance(source, list): + sources += source + continue + sources.append(source) + for dimension in self.dimensions: + sources += dimension.sources + return sources + + @validator("dimensions", pre=True) + def validate_dimensions(cls, value): + """ + Validate dimensions value. + Extract data from xmla structure. + Convert to list if it is not list instance. + + :return: list type value. + """ + res = [] + if isinstance(value, dict): + value = [value] + for item in value: + dimensions = item.get("Dimensions") + if dimensions: + res.append(dimensions) + return res + + @validator("measures", pre=True) + def validate_measures(cls, value): + """ + Validate measures value. + Extract measures from xmla stucture. + Convert to list if it is not list instance. + + :return: list type value. + """ + res = [] + if isinstance(value, dict): + value = [value] + for item in value: + measures_group = item["Measures"] + if not isinstance(measures_group, list): + measures_group = [measures_group] + for val in measures_group: + measures = val["Measure"] + if isinstance(measures, list): + res += measures + continue + if isinstance(measures, dict): + res += [measures] + return res + + def __iter__(self) -> "XMLAMeasures": + self.current_index = 0 + return self + + def __next__(self) -> XMLAMeasure: + if self.current_index < len(self.measures): + next_item = self.measures[self.current_index] + self.current_index += 1 + return next_item + raise StopIteration + + +class XMLASchemaBinding(BaseModel): + """ + Class representation of xmla schemabinding. + """ + + name: str = Field(alias="@name") + table_name: str = Field(alias="@msprop:DbTableName") + schema_name: Optional[str] = Field(alias="@msprop:DbSchemaName") + type: str = Field(alias="@msprop:TableType") + + +class XMLADataSourceView(BaseModel): + """ + Class representation of xmla datasourceview. + """ + + name: str = Field(alias="Name") + id: Optional[str] = Field(alias="ID") + source_id: Optional[str] = Field(alias="DataSourceID") + sources: List[XMLASchemaBinding] = Field(alias="Schema") + + @property + def datasource_id(self) -> Optional[str]: + """ + Get identifier. + + :return: identifier of source. + """ + return self.source_id or self.id + + @validator("sources", pre=True) + def validate_sources(cls, value): + """ + Validate datasources value. + Extract data from xmla structure. + Convert to list if it is not list instance. + + :return: list type value. + """ + res = value["xs:schema"]["xs:element"]["xs:complexType"]["xs:choice"][ + "xs:element" + ] + if isinstance(res, dict): + return [res] + return res + + +class XMLDataSourceViewsContainer(BaseModel): + """ + Class representation of xmla datasourcesview group. + """ + + datasources: List[XMLADataSourceView] = Field(alias="DataSourceView") + current_index: int = 0 + + @validator("datasources", pre=True) + def validate_datasources(cls, value): + """ + Validate datasources value. + Convert to list if it is not list instance. + + :return: list type value. + """ + if isinstance(value, list): + return value + return [value] + + def __iter__(self) -> "XMLDataSourceViewsContainer": # type: ignore + self.current_index = 0 + return self + + def __next__(self) -> XMLADataSourceView: + if self.current_index < len(self.datasources): + next_item = self.datasources[self.current_index] + self.current_index += 1 + return next_item + raise StopIteration + + +class XMLADataSource(BaseModel): + """ + Class representation of xmla datasource. + """ + + name: str = Field(alias="Name") + id: str = Field(alias="ID") + connection_string: str = Field(alias="ConnectionString") + + @property + def connection(self) -> DSConnection: + """ + Get connection representation. + + :return: connection of datasource. + """ + return DSConnection.from_tuple( + [ + item.split("=")[1] + for item in self.connection_string.replace("\n", "").split(";") + ] + ) + + +class XMLADataSourcesContainer(BaseModel): + """ + Class representation of xmla datasources group. + """ + + datasources: List[XMLADataSource] = Field(alias="DataSource") + current_index: int = 0 + + @validator("datasources", pre=True) + def validate_datasources(cls, value): + """ + Validate datasources value. + Convert to list if it is not list instance. + + :return: list type value. + """ + if isinstance(value, list): + return value + return [value] + + def __iter__(self) -> "XMLADataSourcesContainer": # type: ignore + self.current_index = 0 + return self + + def __next__(self) -> XMLADataSource: + if self.current_index < len(self.datasources): + next_item = self.datasources[self.current_index] + self.current_index += 1 + return next_item + raise StopIteration + + def get_source_by_id(self, id: str) -> Optional[XMLADataSource]: + """ + Find source by id. + + :param id: source identifier. + :return: source if id exists else None. + """ + for source in self.datasources: + if source.id == id: + return source + return None + + +class XMLACube(BaseModel): + """ + Class representation of xmla cube. + """ + + name: str = Field(alias="Name") + id: str = Field(alias="ID") + created: Optional[str] = Field(alias="CreatedTimestamp") + last_schema_update: Optional[str] = Field(alias="LastSchemaUpdate") + last_processed: Optional[str] = Field(alias="LastProcessed") + dimensions: XMLADimensionsContainer = Field(alias="Dimensions") + measures: XMLAMeasures = Field(alias="MeasureGroups") + + @property + def sources_ids(self) -> List[str]: + """ + Get list of unique sources. + + :return: list with unique entities of string sources ids. + """ + return list(set(self.measures.sources)) + + @property + def additional_info(self) -> Dict[str, Any]: + """ + Group of additional information of entity. + + :return: dictionary with information. + """ + return dict( + created=self.created, + last_schema_update=self.last_schema_update, + last_processed=self.last_processed, + ) + + +class XMLACubesContainer(BaseModel): + """ + Class representation of xmla cubes group. + """ + + cubes: List[XMLACube] = Field(alias="Cube") + current_index: int = 0 + + @validator("cubes", pre=True) + def validate_cubes(cls, value): + """ + Validate cubes value. + Convert to list if it is not list instance. + + :return: list type value. + """ + if isinstance(value, list): + return value + return [value] + + def __iter__(self) -> "XMLACubesContainer": # type: ignore + self.current_index = 0 + return self + + def __next__(self) -> XMLACube: + if self.current_index < len(self.cubes): + next_item = self.cubes[self.current_index] + self.current_index += 1 + return next_item + raise StopIteration + + +@dataclass +class DataSource: + """ + Datasource representation. + """ + + id: str + source: str + server: str + instance: Optional[str] + db: str + schema: str + name: str + type: str + + +class XMLADataBase(BaseModel): + """ + Class representation of xmla database. + """ + + name: str = Field(alias="Name") + created: Optional[str] = Field(alias="CreatedTimestamp") + last_update: Optional[str] = Field(alias="LastUpdate") + last_schema_update: Optional[str] = Field(alias="LastUpdate") + last_processed: Optional[str] = Field(alias="LastSchemaUpdate") + dimensions: XMLADimensionsContainer = Field( + alias="Dimensions", + default_factory=lambda: XMLADimensionsContainer(Dimension=[]), + ) + cubes: XMLACubesContainer = Field( + alias="Cubes", default_factory=lambda: XMLACubesContainer(Cube=[]) + ) + datasources: XMLADataSourcesContainer = Field( + alias="DataSources", + default_factory=lambda: XMLADataSourcesContainer(DataSource=[]), + ) + datasourceviews: XMLDataSourceViewsContainer = Field( + alias="DataSourceViews", + default_factory=lambda: XMLDataSourceViewsContainer(DataSourceView=[]), + ) + + @property + def sources(self) -> List[DataSource]: + """ + Get database sources. + + :return: list of sources. + """ + sources: List[DataSource] = [] + for source_view in self.datasourceviews: + datasource_id = source_view.datasource_id + if datasource_id is None: + continue + datasource = self.datasources.get_source_by_id(datasource_id) + if not datasource: + continue + connection = datasource.connection + server = connection.data_source + instance = None + + try: + server, instance = connection.data_source.split("\\") + except ValueError: + pass + for source in source_view.sources: + if not source.schema_name: + continue + sources.append( + DataSource( + id=source.name, + source=connection.type, + server=server, + instance=instance, + db=connection.initial_catalog, + schema=source.schema_name, + name=source.table_name, + type=source.type, + ) + ) + return sources + + @property + def additional_info(self) -> Dict[str, Any]: + """ + Group of additional information of entity. + + :return: dictionary with information. + """ + return dict( + created=self.created, + last_update=self.last_update, + last_schema_update=self.last_schema_update, + last_processed=self.last_processed, + ) + + +class XMLADataBasesContainer(BaseModel): + """ + Class representation of xmla databases group. + """ + + databases: List[XMLADataBase] = Field(alias="Database") + current_index: int = 0 + + @validator("databases", pre=True) + def validate_databases(cls, value): + """ + Validate databases value. + Convert to list if it is not list instance. + + :return: list type value. + """ + if isinstance(value, list): + return value + return [value] + + def __iter__(self) -> "XMLADataBasesContainer": # type: ignore + self.current_index = 0 + return self + + def __next__(self) -> XMLADataBase: + if self.current_index < len(self.databases): + next_item = self.databases[self.current_index] + self.current_index += 1 + return next_item + raise StopIteration + + +class XMLAServer(BaseModel): + """ + Class representation of xmla server. + """ + + name: str = Field(alias="Name") + id: str = Field(alias="ID") + version: str = Field(alias="Version") + databases: XMLADataBasesContainer = Field( + alias="Databases", + default_factory=lambda: XMLADataBasesContainer(Database=[]), + ) + + @property + def databases_names(self) -> List[str]: + """ + Get list of databases. + + :return: list of string databases names. + """ + return [database.name for database in self.databases.databases] diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/ssas_multidimension.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/ssas_multidimension.py new file mode 100644 index 00000000000000..14b5246ea801fc --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_multidimension/ssas_multidimension.py @@ -0,0 +1,221 @@ +""" +Meta Data Ingestion From the Microsoft OLAP Cubes +""" +import json +import logging +from typing import Iterable, List, Tuple + +from datahub.emitter.mcp_builder import add_dataset_to_container +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SourceCapability, + SupportStatus, + capability, + platform_name, + support_status, +) +from datahub.ingestion.api.workunit import MetadataWorkUnit + +from ssas.config import SsasServerHTTPSourceConfig +from ssas.domains import ( + Catalog, + Cube, + CubeDependency, + OLAPLineageStream, + SSASDataSet, +) +from ssas.ssas_core import SsasSource +from ssas.utils import DNSHostNameResolver +from ssas.xmla_server_response_error import XMLAServerResponseError + +from .api import MultidimensionSsasAPI +from .domains import ( + DataSource, + XMLACube, + XMLACubesContainer, + XMLADataBase, + XMLADataBasesContainer, + XMLADimensionsContainer, + XMLAMeasures, +) + +LOGGER = logging.getLogger(__name__) + + +@platform_name("SSAS Multidimension") +@support_status(SupportStatus.UNKNOWN) +@capability(SourceCapability.OWNERSHIP, "Enabled by default") +class SsasMultidimensionSource(SsasSource): + """ + This plugin extracts: + - MS SSAS multidimension cubes; + - dimensions and measures of cubes; + - additional information as properties. + """ + + def __init__(self, config: SsasServerHTTPSourceConfig, ctx: PipelineContext): + super().__init__(config, ctx) + self.ssas_client = MultidimensionSsasAPI(config) + self.auth = self.ssas_client.auth_credentials + self.config = config + + def _get_default_ssas_instance_from_config(self, server: str): + return self.config.default_ssas_instances_by_server.get(server, None) + + def _get_catalog(self, database: XMLADataBase) -> Catalog: + """ + Build datahub catalog entity. + + :param database: database representation from xmla response. + :return: datahub catalog entity. + """ + return Catalog( + host_port=self.source_config.host, + instance=self.source_config.ssas_instance, + name=database.name, + env=self.source_config.env, + ) + + def _get_olap_cube( + self, cube: XMLACube, database: XMLADataBase + ) -> Tuple[Cube, OLAPLineageStream]: + """ + Build datahub cube entity. + + :param cube: cube representation from xmla response. + :param catalog: datahub catalog entity for binding. + :return: datahub cube entity. + """ + catalog = self._get_catalog(database) + database_sources = database.sources + olap_cube = Cube( + instance=self.source_config.instance, + host_port=self.source_config.host, + name=cube.name, + env=self.source_config.env, + flow=catalog, + ) + datasets_stream = self._get_cube_dependency(cube, database_sources) + return olap_cube, datasets_stream + + def _get_cube_dependency( + self, cube: XMLACube, catalog_sources: List[DataSource] + ) -> OLAPLineageStream: + """ + Build cube lineage entity. + + :param cube: cube representation from xmla response. + :param catalog_sources: list of catalog data sources. + :return: datahub lineage entity. + """ + upstream_dependencies = [] + cube_sources_ids = cube.sources_ids + cube_sources = [ + source + for source in catalog_sources + if source.name in cube_sources_ids or source.id in cube_sources_ids + ] + + for dependency in cube_sources: + + server = dependency.server + + if self.source_config.use_dns_resolver: + resolver = DNSHostNameResolver( + hostname=server, dns_suffix_list=self.source_config.dns_suffixes + ) + server = resolver.primary_hostname + + upstream_dependencies.append( + CubeDependency( + source=dependency.source, + server=server, + instance=dependency.instance if dependency.instance is not None else self._get_default_ssas_instance_from_config(server=server), + db=dependency.db, + schema=dependency.schema, + name=dependency.name, + type=dependency.type.upper(), + env=self.source_config.env, + ) + ) + return OLAPLineageStream(dependencies=upstream_dependencies) + + def get_dimensions(self, cube: XMLACube) -> XMLADimensionsContainer: + """ + Get list of dimensions for current cube. + + :param cube: cube entity. + :return: cube dimensions representation. + """ + return cube.dimensions + + def get_measures(self, cube: XMLACube) -> XMLAMeasures: + """ + Get list of measures for current cube. + + :param cube: cube entity. + :return: cube measures representation. + """ + return cube.measures + + def get_cubes(self, database: XMLADataBase) -> XMLACubesContainer: + """ + Get list of cubes for current database. + + :param database: database entity. + :return: database cubes representation. + """ + return database.cubes + + def get_databases(self) -> XMLADataBasesContainer: + """ + Get list of available databases. + + :return: server databases representation. + """ + xmla_server = self.ssas_client.get_server() + return xmla_server.databases + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + for database in self.get_databases(): + catalog = self._get_catalog(database) + + container_key = self.gen_key(catalog.formatted_name) + + yield from self.create_emit_containers( + container_key=container_key, + name=catalog.formatted_name, + sub_types=["SSAS catalog"], + parent_container_key=None, + ) + + for cube in self.get_cubes(database): + olap_cube, datasets_stream = self._get_olap_cube(cube, database) + data_set = SSASDataSet( + entity=olap_cube, incoming=datasets_stream.as_datasets_urn_list + ) + + for name, value in cube.additional_info.items(): + data_set.add_property(name, value) + + try: + for dimension in self.get_dimensions(cube): + data_set.add_property( + f"Dimension {dimension.name}", + json.dumps(dimension.additional_info, ensure_ascii=False), + ) + except XMLAServerResponseError as e: + LOGGER.critical(f"{XMLAServerResponseError.__name__}Getting dimension for database: {database.name}; cube: {cube.name} failed: {e}.{XMLAServerResponseError.__name__}") + try: + for measure in self.get_measures(cube): + data_set.add_property( + f"Measure {measure.name}", + json.dumps(measure.additional_info, ensure_ascii=False), + ) + except XMLAServerResponseError as e: + LOGGER.critical(f"{XMLAServerResponseError.__name__}Getting measure for database: {database.name}; cube: {cube.name} failed: {e}.{XMLAServerResponseError.__name__}") + + yield from self.mapper.construct_set_workunits(data_set) + yield from add_dataset_to_container( + container_key=container_key, dataset_urn=data_set.urn + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/__init__.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/__init__.py new file mode 100644 index 00000000000000..e69de29bb2d1d6 diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/api.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/api.py new file mode 100644 index 00000000000000..4fc3b93520877e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/api.py @@ -0,0 +1,240 @@ +""" +Module for dao layer of tabular ms ssas. +""" + +from typing import Any, Dict, List, Union +import logging + +from requests.auth import HTTPBasicAuth +from requests_kerberos import HTTPKerberosAuth + +from ssas.api import ISsasAPI, SsasXmlaAPI +from ssas.config import SsasServerHTTPConfig, SsasServerHTTPSourceConfig +from ssas.parser import MdXmlaParser +from ssas.xmlaclient import XmlaClient + +from .domains import XMLACube, XMLADataBase, XMLADimension, XMLAMeasure, XMLAServer +from .tools import DvmQueries + +logger = logging.getLogger(__name__) + + +class SsasDvmAPI(ISsasAPI): + """ + API endpoints to fetch catalogs, cubes, dimension, measures. + """ + + def __init__(self, config: SsasServerHTTPConfig, auth: Union[HTTPBasicAuth, HTTPKerberosAuth]) -> None: + self.__config: SsasServerHTTPConfig = config + self.__auth = auth + self.__client = XmlaClient(config=self.__config, auth=self.__auth) + + def get_server(self): + pass + + def get_catalogs(self): + """ + Get list catalogs from dvm query. + + :return: list catalogs. + """ + response = self.__client.execute(query=DvmQueries.SELECT_CATALOGS) + return response.get_node() + + def get_cubes_by_catalog(self, catalog: str): + """ + Get list catalogs from dvm query. + + :return: list catalogs. + """ + + response = self.__client.execute( + query=DvmQueries.SELECT_CUBES_BY_CATALOG.format(catalog=catalog), # type: ignore + catalog_name=catalog, + ) + return response.get_node() + + def get_dimensions_by_cube(self, catalog_name: str, cube_name: str): + """ + Get dimension from dvm query. + + :return: dimension as dict. + """ + + response = self.__client.execute( + query=DvmQueries.SELECT_DIMENSIONS_BY_CUBE.format(name=cube_name), # type: ignore + catalog_name=catalog_name, + ) + return response.get_node() + + def get_dimensions_additional_info(self, catalog_name: str, dimension_name: str): + """ + Get dimension additional info from dvm query. + + :return: dimension additional info as dict. + """ + + response = self.__client.execute( + query=DvmQueries.SELECT_QUERY_DEFINITION.format(name=dimension_name), + catalog_name=catalog_name, + ) + return response.get_node() + + def get_measures_by_cube(self, catalog_name: str, cube_name: str): + """ + Get measure from dvm query. + + :return: measure as dict. + """ + response = self.__client.execute( + query=DvmQueries.SELECT_MEASURES_BY_CUBE.format(name=cube_name), # type: ignore + catalog_name=catalog_name, + ) + return response.get_node() + + @property + def auth_credentials(self): + """ + Get authorization credentials. + + :return: authorization dataclass. + """ + return self.__auth + + +class SsasTabularDvmAPI(SsasDvmAPI): + """ + API endpoints to fetch catalogs, cubes, dimension, measures for tabular ssas servers. + """ + + def get_catalog_sources(self) -> Dict[str, Any]: + """ + Get list catalogs from dvm query. + + :return: list catalogs. + """ + + response = self.__client.execute(query=DvmQueries.SELECT_DATA_SOURCES) + return response.as_dict() + + +class TabularSsasAPI(ISsasAPI): + """ + API endpoints to fetch catalogs, cubes, dimension, measures for tabular ssas servers. + """ + + def __init__(self, config: SsasServerHTTPSourceConfig): + self.__config = config + self.__auth = self.__get_auth() + self.__xmla_api = SsasXmlaAPI(config=self.__config, auth=self.__auth) + self.__dvm_api = SsasTabularDvmAPI(config=self.__config, auth=self.__auth) + self.__xmla_parser = MdXmlaParser() + + def __get_auth(self) -> Union[HTTPBasicAuth, HTTPKerberosAuth]: + if self.__config.ssas_instance_auth_type == "HTTPBasicAuth": + return HTTPBasicAuth( + username=self.__config.username, + password=self.__config.password, + ) + if self.__config.ssas_instance_auth_type == "HTTPKerberosAuth": + return HTTPKerberosAuth() + raise TypeError(f"Unsupported auth type: {self.__config.ssas_instance_auth_type}") + + def get_server(self) -> XMLAServer: + """ + Get server metadata. + + :return: structured metadata dataclass. + """ + + xmla_data = self.__xmla_api.get_server_metadata() + return XMLAServer(**self.__xmla_parser.get_server(xmla_data)) + + def get_catalogs(self) -> List[XMLADataBase]: + """ + Get list of catalogs from dvm query. + + :return: list of catalogs. + """ + return [XMLADataBase(**item) for item in self.__dvm_api.get_catalogs()] + + def get_cubes_by_catalog(self, catalog: str) -> List[XMLACube]: + """ + Get list of cubes from dvm query. + + :return: list of cubes. + """ + return [XMLACube(**item) for item in self.__dvm_api.get_cubes_by_catalog(catalog=catalog)] + + def add_dimension_additional_info( + self, catalog_name: str, dimension: XMLADimension + ): + """ + Add additional info to dimension. + + :return: dimension. + """ + dimension_name = dimension.name + if dimension_name is None: + return dimension + + info = self.__dvm_api.get_dimensions_additional_info( + dimension_name=dimension_name, catalog_name=catalog_name + ) + + for item in info: + dimension.query_definition = item["QueryDefinition"] + + return dimension + + def get_dimensions_by_cube( + self, catalog_name: str, cube_name: str + ) -> List[XMLADimension]: + """ + Get list of dimensions from qube dvm query. + + :return: list of dimensions. + """ + + result = [] + + dimensions = [XMLADimension(**item) for item in self.__dvm_api.get_dimensions_by_cube(catalog_name=catalog_name, + cube_name=cube_name)] + + for dimension in dimensions: + dimension = self.add_dimension_additional_info( + catalog_name=catalog_name, dimension=dimension + ) + result.append(dimension) + + return result + + def get_measures_by_cube( + self, catalog_name: str, cube_name: str + ) -> List[XMLAMeasure]: + """ + Get list of measures from qube dvm query. + + :return: list of measures. + """ + return [XMLAMeasure(**item) for item in self.__dvm_api.get_measures_by_cube(catalog_name=catalog_name, + cube_name=cube_name)] + + @property + def auth_credentials(self) -> HTTPBasicAuth: + """ + Get authorization credentials. + + :return: authorization dataclass. + """ + + return self.__auth + + def get_server_info(self) -> Dict[str, Any]: + """ + Get server metadata from XMLA request. + + :return: structured metadata as dict. + """ + + return self.__xmla_api.get_server_info() diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/domains.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/domains.py new file mode 100644 index 00000000000000..309a822958550e --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/domains.py @@ -0,0 +1,183 @@ +""" +Module for domain layer of tabular ms ssas. +""" + +from typing import Any, Dict, Optional + +from pydantic import BaseModel, Field + +from ssas.domains import DSConnection + + +class XMLACube(BaseModel): + """Class for represent OLAP cube""" + + name: str = Field(alias="CUBE_NAME") + created: Optional[str] = Field(alias="CREATED_ON") + last_schema_update: Optional[str] = Field(alias="LAST_SCHEMA_UPDATE") + last_processed: Optional[str] = Field(alias="LAST_DATA_UPDATE") + schema_updated_by: Optional[str] = Field(alias="SCHEMA_UPDATED_BY") + data_updated_by: Optional[str] = Field(alias="DATA_UPDATED_BY") + description: Optional[str] = Field(alias="DESCRIPTION") + + @property + def additional_info(self) -> Dict[str, Any]: + """ + Group of additional information of entity. + + :return: dictionary with information. + """ + + return dict( + created=self.created, + last_schema_update=self.last_schema_update, + last_processed=self.last_processed, + schema_updated_by=self.schema_updated_by, + data_updated_by=self.data_updated_by, + description=self.description, + ) + + +class XMLADimension(BaseModel): + """Class for represent dimension in OLAP cube""" + + dimension_name: Optional[str] = Field(alias="DIMENSION_NAME") + dimension_unique_name: Optional[str] = Field(alias="DIMENSION_UNIQUE_NAME") + dimension_caption: Optional[str] = Field(alias="DIMENSION_CAPTION") + dimension_type: Optional[str] = Field(alias="DIMENSION_TYPE") + default_hierarchy: Optional[str] = Field(alias="DEFAULT_HIERARCHY") + description: Optional[str] = Field(alias="DESCRIPTION") + query_definition: Optional[str] + + @property + def name(self) -> Optional[str]: + """ + Get name. + + :return: entity name. + """ + return self.dimension_name or self.dimension_unique_name + + @property + def additional_info(self) -> Dict[str, Any]: + """ + Group of additional information of entity. + + :return: dictionary with information. + """ + return dict( + dimension_caption=self.dimension_caption, + dimension_type=self.dimension_type, + default_hierarchy=self.default_hierarchy, + description=self.description, + query_definition=self.query_definition, + ) + + +class XMLAMeasure(BaseModel): + """ + Class representation of xmla measure. + """ + + measure_name: str = Field(alias="MEASURE_NAME") + measure_unique_name: Optional[str] = Field(alias="MEASURE_UNIQUE_NAME") + measure_caption: Optional[str] = Field(alias="MEASURE_CAPTION") + description: Optional[str] = Field(alias="DESCRIPTION") + expression: Optional[str] = Field(alias="EXPRESSION") + + @property + def name(self) -> Optional[str]: + """ + Build measure name. + + :return: measure name. + """ + + return self.measure_name or self.measure_unique_name + + @property + def formatted_name(self) -> Optional[str]: + """ + Reformat measure name. + + :return: reformatted measure name. + """ + + if self.name is None: + return None + return self.name.format(",", "-") + + @property + def additional_info(self) -> Dict[str, Any]: + """ + Group of additional information of entity. + + :return: dictionary with information. + """ + return dict( + measure_unique_name=self.measure_unique_name, + id=self.measure_caption, + description=self.description, + expression=self.expression, + ) + + +class XMLADataSource(BaseModel): + """ + Class representation of xmla datasource. + """ + + name: str = Field(alias="Name") + id: str = Field(alias="ID") + connection_string: str = Field(alias="ConnectionString") + + @property + def connection(self) -> DSConnection: + """ + Get datasource for OLAP cube from connection string . + + :return: datasource name. + """ + + return DSConnection.from_tuple( + [ + item.split("=")[1] + for item in self.connection_string.replace("\n", "").split(";") + ] + ) + + +class XMLADataBase(BaseModel): + """ + Class representation of xmla database. + """ + + name: str = Field(alias="CATALOG_NAME") + description: Optional[str] = Field(alias="DESCRIPTION") + last_update: str = Field(alias="DATE_MODIFIED") + compatibility_level: str = Field(alias="COMPATIBILITY_LEVEL") + database_id: str = Field(alias="DATABASE_ID") + + @property + def additional_info(self) -> Dict[str, Any]: + """ + Group of additional information of entity. + + :return: dictionary with information. + """ + return dict( + description=self.description, + last_update=self.last_update, + last_schema_update=self.compatibility_level, + last_processed=self.database_id, + ) + + +class XMLAServer(BaseModel): + """ + Class representation of xmla server. + """ + + name: str = Field(alias="Name") + id: str = Field(alias="ID") + version: str = Field(alias="Version") diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/ssas_tabular.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/ssas_tabular.py new file mode 100644 index 00000000000000..67d59ee760e6f1 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/ssas_tabular.py @@ -0,0 +1,188 @@ +""" + Meta Data Ingestion from the Microsoft OLAP Cubes +""" + +import json +import logging +from dataclasses import dataclass, field as dataclass_field +from typing import Iterable, List, Tuple + +from datahub.emitter.mcp_builder import add_dataset_to_container +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.api.decorators import ( + SourceCapability, + SupportStatus, + capability, + platform_name, + support_status, +) +from datahub.ingestion.api.source import SourceReport +from datahub.ingestion.api.workunit import MetadataWorkUnit + +from ssas.config import SsasServerHTTPSourceConfig +from ssas.domains import Catalog, Cube, OLAPLineageStream, SSASDataSet +from ssas.ssas_core import SsasSource +from ssas.xmla_server_response_error import XMLAServerResponseError + +from .api import TabularSsasAPI +from .domains import XMLACube, XMLADataBase, XMLADimension, XMLAMeasure + +logging.basicConfig(level=logging.INFO) +LOGGER = logging.getLogger(__name__) + + +@dataclass +class SsasSourceReport(SourceReport): + """Class for source report""" + + scanned_report: int = 0 + filtered_reports: List[str] = dataclass_field(default_factory=list) + + def report_scanned(self, count: int = 1) -> None: + self.scanned_report += count + + def report_dropped(self, view: str) -> None: + self.filtered_reports.append(view) + + +@platform_name("SSAS Tabular") +@support_status(SupportStatus.UNKNOWN) +@capability(SourceCapability.OWNERSHIP, "Enabled by default") +class SsasTabularSource(SsasSource): + """Class build datahub entities from tabular SSAS""" + + def __init__(self, config: SsasServerHTTPSourceConfig, ctx: PipelineContext): + super().__init__(config, ctx) + self.ssas_client = TabularSsasAPI(config) + self.auth = self.ssas_client.auth_credentials + + def _get_catalog(self, database: XMLADataBase) -> Catalog: + """ + Build datahub catalog entity. + + :param database: database representation from xmla response. + :return: datahub catalog entity. + """ + return Catalog( + host_port=self.source_config.host, + instance=self.source_config.ssas_instance, + name=database.name, + env=self.source_config.env, + ) + + def _get_olap_cube( + self, cube: XMLACube, database: XMLADataBase + ) -> Tuple[Cube, OLAPLineageStream]: + """ + Build datahub cube entity. + + :param cube: cube representation from xmla response. + :param catalog: datahub catalog entity for binding. + :return: datahub cube entity. + """ + catalog = self._get_catalog(database) + return Cube( + instance=self.source_config.ssas_instance, + host_port=self.source_config.host, + name=cube.name, + env=self.source_config.env, + flow=catalog, + ), OLAPLineageStream(dependencies=[]) + + def get_dimensions( + self, cube: XMLACube, database: XMLADataBase + ) -> List[XMLADimension]: + """ + Build list dimensions entities. + + :param cube: cube representation from xmla response. + :param database: datahub catalog entity for binding. + :return: list dimensions entities. + """ + + return self.ssas_client.get_dimensions_by_cube( + catalog_name=database.name, cube_name=cube.name + ) + + def get_measures(self, cube: XMLACube, database: XMLADataBase) -> List[XMLAMeasure]: + """ + Build list measures entities. + + :param cube: cube representation from xmla response. + :param database: datahub catalog entity for binding. + :return: list measures entities. + """ + + return self.ssas_client.get_measures_by_cube( + catalog_name=database.name, cube_name=cube.name + ) + + def get_cubes(self, database: XMLADataBase) -> List[XMLACube]: + """ + Build list OLAP cubes entities. + + :param database: datahub catalog entity for binding. + :return: list OLAP cubes entities. + """ + + return self.ssas_client.get_cubes_by_catalog(catalog=database.name) + + def get_databases(self) -> List[XMLADataBase]: + """ + Build list SSAS catalogs entities. + + :return: list catalogs entities. + """ + + return self.ssas_client.get_catalogs() + + def get_workunits(self) -> Iterable[MetadataWorkUnit]: + """ + Build and return metadata workunits and represent olap cube structure. + + :return: generator metadata workunits. + """ + + for database in self.get_databases(): + catalog = self._get_catalog(database) + + container_key = self.gen_key(catalog.formatted_name) + + yield from self.create_emit_containers( + container_key=container_key, + name=catalog.formatted_name, + sub_types=["SSAS catalog"], + parent_container_key=None, + ) + + for cube in self.get_cubes(database): + olap_cube, datasets_stream = self._get_olap_cube(cube, database) + data_set = SSASDataSet( + entity=olap_cube, incoming=datasets_stream.as_datasets_urn_list + ) + + for name, value in cube.additional_info.items(): + data_set.add_property(name, value) + + try: + for dimension in self.get_dimensions(cube, database): + data_set.add_property( + f"Dimension {dimension.name}", + json.dumps(dimension.additional_info, ensure_ascii=False), + ) + except XMLAServerResponseError as e: + LOGGER.critical(f"{XMLAServerResponseError.__name__}Getting dimension for database: {database.name}; cube: {cube.name} failed: {e}.{XMLAServerResponseError.__name__}") + + try: + for measure in self.get_measures(cube, database): + data_set.add_property( + f"Measure {measure.name}", + json.dumps(measure.additional_info, ensure_ascii=False), + ) + except XMLAServerResponseError as e: + LOGGER.critical(f"{XMLAServerResponseError.__name__}Getting measure for database: {database.name}; cube: {cube.name} failed: {e}.{XMLAServerResponseError.__name__}") + + yield from self.mapper.construct_set_workunits(data_set) + yield from add_dataset_to_container( + container_key=container_key, dataset_urn=data_set.urn + ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/tools.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/tools.py new file mode 100644 index 00000000000000..ebe44690e64f04 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/ssas_tabular/tools.py @@ -0,0 +1,61 @@ +""" +Module contain DVM queries for tabular SSAS +""" +from enum import Enum + + +class DvmQueries(str, Enum): + """Container for DVM queries""" + + SELECT_CATALOGS: str = """ + select [CATALOG_NAME], + [DESCRIPTION], + [DATE_MODIFIED], + [COMPATIBILITY_LEVEL], + [DATABASE_ID] + from $System.DBSCHEMA_CATALOGS""" + SELECT_CUBES_BY_CATALOG: str = """ + select [CUBE_NAME], + [CREATED_ON], + [LAST_SCHEMA_UPDATE], + [LAST_DATA_UPDATE], + [SCHEMA_UPDATED_BY], + [DATA_UPDATED_BY], + [DESCRIPTION] + from $system.mdschema_cubes + where [CATALOG_NAME] = '{catalog}'""" + SELECT_DIMENSIONS: str = "select * from $system.mdschema_dimensions" + SELECT_DIMENSIONS_BY_CUBE: str = """ + select [DIMENSION_NAME], + [DIMENSION_UNIQUE_NAME], + [DIMENSION_CAPTION], + [DIMENSION_TYPE], + [DEFAULT_HIERARCHY], + [DESCRIPTION] + from $system.mdschema_dimensions + where [CUBE_NAME] = '{name}' + """ + SELECT_HIERARCHIES: str = "select * from $system.mdschema_hierarchies" + SELECT_HIERARCHIE_BY_NAME: str = ( + "select * from $system.mdschema_hierarchies where [HIERARCHY_NAME] = '{name}'" + ) + SELECT_MEASURES_BY_CUBE: str = """ + select [MEASURE_NAME], + [MEASURE_UNIQUE_NAME], + [MEASURE_CAPTION], + [DESCRIPTION], + [EXPRESSION] + from $system.mdschema_measures + where [CUBE_NAME] = '{name}'""" + SELECT_MEASURES: str = "select * from $system.mdschema_measures" + SELECT_MEASURE_BY_NAME: str = ( + "select * from $system.mdschema_measures where MEASURE_NAME = '{name}'" + ) + SELECT_DATA_SOURCES: str = "select * from $system.TMSCHEMA_DATA_SOURCES" + SELECT_QUERY_DEFINITION = """ + select [QueryDefinition] + from $system.TMSCHEMA_PARTITIONS + WHERE [Name] = '{name}' + """ + + SELECT_ANNOTATIONS: str = "select * from $system.TMSCHEMA_ANNOTATIONS" diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/tools.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/tools.py new file mode 100644 index 00000000000000..560b5c80846dcd --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/tools.py @@ -0,0 +1,62 @@ +""" +Modul for reusable tools. +""" +from enum import Enum + + +class MsXmlaTemplates(str, Enum): + """ + Queries templates. + """ + + REQUEST_TEMPLATE: str = """ +
+ + {xmla_query} + +
""" + QUERY_DISCOVER: str = """ + {query} + + + ExpandFull + + + + + """ + QUERY_EXECUTE: str = """ + + + {query} + + + + + """ + QUERY_EXECUTE_TO_CATALOG: str = """ + + + {query} + + + + + {catalog_name} + + + """ + + QUERY_METADATA: str = "DISCOVER_XML_METADATA" + ERROR_CODE: str = " bool: + return len(self.hostname.split(".")) > 1 + + @property + def primary_hostname(self) -> str: + """ + Get primary hostname + + :return: primary hostname as str + """ + + primary_host_name = self.hostname + if self.__has_domain(): + primary_host_name = DNSHostName( + hostname=self.hostname + ).get_primary_server_name() + else: + for suffix in self.dns_suffix_list: + try: + hostname = ".".join([self.hostname, suffix]) + primary_host_name = DNSHostName(hostname=hostname).get_primary_server_name() + if primary_host_name: + break + except Exception as e: + print(e) + return primary_host_name + + +class DNSHostName: + """ + Class for work with dns + """ + + def __init__(self, hostname: str): + self.hostname = hostname + self.primary_hostname: str = HostDefaults.NAME + self.aliaslist: List[str] = [] + self.ipaddrlist: List[str] = [] + self.__parse_dns_info() + + def is_primary_name(self) -> bool: + """ + Check is hostname as primary + """ + return self.hostname == self.get_primary_server_name() + + def get_primary_host_name(self) -> str: + """ + Get primary hostname + """ + + return self.primary_hostname + + def get_domain(self) -> str: + """ + Extract domain from primary hostname + + :return: host domain as str + """ + + try: + if self.primary_hostname: + domain_part = self.primary_hostname.split(HostDefaults.SEPARATOR)[1:] + if domain_part: + return HostDefaults.SEPARATOR.join( + self.primary_hostname.split(HostDefaults.SEPARATOR)[1:] + ) + return HostDefaults.DOMAIN + raise ValueError + except IndexError: + return HostDefaults.DOMAIN + except ValueError: + return HostDefaults.DOMAIN + + def get_primary_server_name(self) -> str: + """ + Get primary hostname + """ + + try: + if self.primary_hostname: + return self.primary_hostname.split(HostDefaults.SEPARATOR)[0] + raise ValueError + except IndexError: + return HostDefaults.SERVER + except ValueError: + return HostDefaults.SERVER + + def is_alias(self) -> bool: + """ + Check hostname is alias or not + """ + + return any([self.hostname in item for item in self.aliaslist]) + + def __parse_dns_info(self) -> None: + """ + Get dns request and parse responce + """ + + try: + ( + self.primary_hostname, + self.aliaslist, + self.ipaddrlist, + ) = socket.gethostbyname_ex(self.hostname) + except Exception as excp: + raise excp + + def get_aliases(self) -> List[str]: + """ + Get hostname aliases list + """ + + return self.aliaslist + + def get_ipaddrlist(self) -> List[str]: + """ + Get hostname ipaddress list + """ + + return self.ipaddrlist diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/xmla_server_response_error.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/xmla_server_response_error.py new file mode 100644 index 00000000000000..72c489b917a6fb --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/xmla_server_response_error.py @@ -0,0 +1,5 @@ + +class XMLAServerResponseError(Exception): + """ + Any ErrorCodes occur in XMLA ssas server response. + """ diff --git a/metadata-ingestion/src/datahub/ingestion/source/ssas/xmlaclient.py b/metadata-ingestion/src/datahub/ingestion/source/ssas/xmlaclient.py new file mode 100644 index 00000000000000..742a5a40d415ca --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/ssas/xmlaclient.py @@ -0,0 +1,157 @@ +""" +Module with XMLA client. +""" + +import logging +from typing import Any, Dict, List + +import re +import requests +import xmltodict +from .xmla_server_response_error import XMLAServerResponseError + +from .tools import MsXmlaTemplates + +logging.basicConfig(level=logging.INFO) +logger = logging.getLogger(__name__) + + +class XmlaResponse: + """ + Class for parse XMLA ssas server response + """ + + def __init__(self, response: str): + self.item = response + + def as_dict(self) -> Dict[str, Any]: + """ + Represent xml as dict + + :return: xml data as dict. + """ + return xmltodict.parse(self.item) + + def __repr__(self): + return self.item + + def get_node(self, node_name="row"): + """ + Get custom node from xml tree + :return: node as dict + """ + + return self.find_node(tree=self.as_dict(), search_term=node_name) + + def find_node(self, tree: Dict[str, Any], search_term: str) -> List[Any]: + """ + Find custom node in xml tree + + :return: keys as list + """ + keys = [*tree.keys()] + try: + while len(keys) > 0: + key = keys.pop(0) + # search term has been found + if key == search_term: + result = tree[key] + if isinstance(result, list): + return tree[key] + return [tree[key]] + if isinstance(tree[key], dict): + nested = tree.pop(key) + keys.extend(nested.keys()) + tree.update(nested) + except Exception: + pass + return [] + + +class XmlaClient: + """ + Class for communicate with ssas server with xmla over http + """ + + def __init__(self, config, auth): + self.cfg = config + self.auth = auth + self.res = {} + + def __template(self, xmla_query: str) -> str: + """ + Format request template + + :return: xmla request as str + """ + + return MsXmlaTemplates.REQUEST_TEMPLATE.format(xmla_query=xmla_query) # type: ignore + + def discover(self, query: str) -> XmlaResponse: + """ + Send DISCOVER xmla request with query + + :return: response ssas xmla server as XmlaResponse object + """ + + response = self.__request( + data=self.__template( + xmla_query=MsXmlaTemplates.QUERY_DISCOVER.format(query=query) # type: ignore + ) + ) + logger.info(response) + return XmlaResponse(response) + + def execute(self, query, catalog_name=None) -> XmlaResponse: + """ + Send EXECUTE xmla request with query + + :return: response ssas xmla server as XmlaResponse object + """ + + template = MsXmlaTemplates.QUERY_EXECUTE + params = dict( + query=query, + ) + + if catalog_name: + template = MsXmlaTemplates.QUERY_EXECUTE_TO_CATALOG + params["catalog_name"] = catalog_name + + response = self.__request( + data=self.__template(xmla_query=template.format(**params)) + ) + + return XmlaResponse(response) + + def __request(self, data) -> str: + """ + Send request to server + + :return: server request as str + """ + + try: + logger.info(f"REQ_AUTH - {self.auth}") + + res = requests.post( + url=self.cfg.base_api_url, + data=data.encode("utf-8"), + auth=self.auth, + ) + + res.raise_for_status() + + res.encoding = "uft-8" + + if MsXmlaTemplates.ERROR_CODE in res.text: + error_msg_body_start = re.search(MsXmlaTemplates.ERROR_CODE, res.text).span()[0] + 7 + error_msg_body_end = error_msg_body_start + re.search("Source=", res.text[error_msg_body_start:]).span()[0] + raise XMLAServerResponseError(res.text[error_msg_body_start:error_msg_body_end]) + return res.text + except XMLAServerResponseError as e: + logger.error(f"XMLA error codes was received: {e}") + raise e + except Exception as excp: + logger.error(f"Error occurred during sending '{data}' request to server: {excp}") + raise