diff --git a/metadata-ingestion/docs/sources/openapi/openapi.md b/metadata-ingestion/docs/sources/openapi/openapi.md index b3231b018bdde..badd9601fa197 100644 --- a/metadata-ingestion/docs/sources/openapi/openapi.md +++ b/metadata-ingestion/docs/sources/openapi/openapi.md @@ -6,9 +6,40 @@ The plugin read the swagger file where the endopints are defined and searches fo a `GET` call: those are the ones supposed to give back the datasets. For every selected endpoint defined in the `paths` section, -the tool searches whether the medatada are already defined in there. -As example, if in your swagger file there is the `/api/users/` defined as follows: +the tool searches whether the schema or metadata are already defined in there. +As example, if in your swagger file there is the `/api/pets/` defined as follows: +```yaml +components: + schemas: + Pet: + type: "object" + properties: + id: + type: "string" + format: "uuid" + minLength: 36 + maxLength: 36 + name: + type: "string" + minLength: 10 + maxLength: 2048 + +paths: + /api/pets/: + get: + tags: [ "Pets" ] + operationID: GetPets + description: Retrieve pets data + responses: + 200: + description: Return the list of pets + content: + application/json: + schema: + $ref: "#/components/schemas/Pet" +``` +or if there is no schema defined, but example presents ```yaml paths: /api/users/: @@ -27,7 +58,7 @@ paths: then this plugin has all the information needed to create the dataset in DataHub. -In case there is no example defined, the plugin will try to get the metadata directly from the endpoint. +In case there is no schemas or example defined, the plugin will try to get the metadata directly from the endpoint. So, if in your swagger file you have ```yaml diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index 78570a2a4ceca..2e825954c77bb 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -20,13 +20,13 @@ from datahub.ingestion.api.source import Source, SourceReport from datahub.ingestion.api.workunit import MetadataWorkUnit from datahub.ingestion.source.openapi_parser import ( + SchemaMetadataExtractor, clean_url, compose_url_attr, extract_fields, get_endpoints, get_swag_json, get_tok, - get_url_basepath, request_call, set_metadata, try_guessing, @@ -139,25 +139,17 @@ def __init__(self, config: OpenApiConfig, ctx: PipelineContext, platform: str): self.report = SourceReport() self.url_basepath = "" - def report_bad_responses(self, status_code: int, key: str) -> None: - if status_code == 400: - self.report.report_warning( - key=key, reason="Unknown error for reaching endpoint" - ) - elif status_code == 403: - self.report.report_warning(key=key, reason="Not authorised to get endpoint") - elif status_code == 404: - self.report.report_warning( - key=key, - reason="Unable to find an example for endpoint. Please add it to the list of forced examples.", - ) - elif status_code == 500: - self.report.report_warning( - key=key, reason="Server error for reaching endpoint" - ) - elif status_code == 504: - self.report.report_warning(key=key, reason="Timeout for reaching endpoint") - else: + def report_bad_response(self, status_code: int, key: str) -> None: + codes_mapping = { + 400: "Unknown error for reaching endpoint", + 403: "Not authorised to get endpoint", + 404: "Unable to find an example for endpoint. Please add it to the list of forced examples.", + 500: "Server error for reaching endpoint", + 504: "Timeout for reaching endpoint", + } + + reason = codes_mapping.get(status_code) + if reason is None: raise Exception( f"Unable to retrieve endpoint, response code {status_code}, key {key}" ) @@ -187,13 +179,13 @@ def init_dataset( dataset_snapshot.aspects.append(dataset_properties) # adding tags - tags_str = [make_tag_urn(t) for t in endpoint_dets["tags"]] + tags_str = (make_tag_urn(t) for t in endpoint_dets["tags"]) tags_tac = [TagAssociationClass(t) for t in tags_str] gtc = GlobalTagsClass(tags_tac) dataset_snapshot.aspects.append(gtc) # the link will appear in the "documentation" - link_url = clean_url(config.url + self.url_basepath + endpoint_k) + link_url = clean_url(f"{config.url}{self.url_basepath}{endpoint_k}") link_description = "Link to call for the dataset." creation = AuditStampClass( time=int(time.time()), actor="urn:li:corpuser:etl", impersonator=None @@ -215,18 +207,18 @@ def build_wu( def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 config = self.config - sw_dict = self.config.get_swagger() + specification = self.config.get_swagger() - self.url_basepath = get_url_basepath(sw_dict) + self.url_basepath = specification.get("basePath", "") # Getting all the URLs accepting the "GET" method with warnings.catch_warnings(record=True) as warn_c: - url_endpoints = get_endpoints(sw_dict) + url_endpoints = get_endpoints(specification) for w in warn_c: w_msg = w.message - w_spl = w_msg.args[0].split(" --- ") # type: ignore - self.report.report_warning(key=w_spl[1], reason=w_spl[0]) + w_spl_reason, w_spl_key, *_ = w_msg.args[0].split(" --- ") # type: ignore + self.report.report_warning(key=w_spl_key, reason=w_spl_reason) # here we put a sample from the "listing endpoint". To be used for later guessing of comosed endpoints. root_dataset_samples = {} @@ -241,7 +233,23 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 ) # adding dataset fields - if "data" in endpoint_dets.keys(): + if ( + endpoint_dets.get("schema") + and endpoint_dets.get("schema", {}).get("AnyValue") is None + ): + metadata_extractor = SchemaMetadataExtractor( + dataset_name, + endpoint_dets["schema"], + specification, + ) + schema_metadata = metadata_extractor.extract_metadata() + if schema_metadata: + schema_metadata = set_metadata(dataset_name, endpoint_dets["data"]) + dataset_snapshot.aspects.append(schema_metadata) + yield self.build_wu(dataset_snapshot, dataset_name) + continue + + if endpoint_dets.get("data", {}): # we are lucky! data is defined in the swagger for this endpoint schema_metadata = set_metadata(dataset_name, endpoint_dets["data"]) dataset_snapshot.aspects.append(schema_metadata) @@ -249,7 +257,7 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 elif ( "{" not in endpoint_k ): # if the API does not explicitly require parameters - tot_url = clean_url(config.url + self.url_basepath + endpoint_k) + tot_url = clean_url(f"{config.url}{self.url_basepath}{endpoint_k}") if config.token: response = request_call(tot_url, token=config.token) @@ -268,12 +276,12 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 yield self.build_wu(dataset_snapshot, dataset_name) else: - self.report_bad_responses(response.status_code, key=endpoint_k) + self.report_bad_response(response.status_code, key=endpoint_k) else: if endpoint_k not in config.forced_examples.keys(): # start guessing... url_guess = try_guessing(endpoint_k, root_dataset_samples) - tot_url = clean_url(config.url + self.url_basepath + url_guess) + tot_url = clean_url(f"{config.url}{self.url_basepath}{url_guess}") if config.token: response = request_call(tot_url, token=config.token) else: @@ -291,12 +299,14 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 yield self.build_wu(dataset_snapshot, dataset_name) else: - self.report_bad_responses(response.status_code, key=endpoint_k) + self.report_bad_response(response.status_code, key=endpoint_k) else: composed_url = compose_url_attr( raw_url=endpoint_k, attr_list=config.forced_examples[endpoint_k] ) - tot_url = clean_url(config.url + self.url_basepath + composed_url) + tot_url = clean_url( + f"{config.url}{self.url_basepath}{composed_url}" + ) if config.token: response = request_call(tot_url, token=config.token) else: @@ -314,7 +324,7 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901 yield self.build_wu(dataset_snapshot, dataset_name) else: - self.report_bad_responses(response.status_code, key=endpoint_k) + self.report_bad_response(response.status_code, key=endpoint_k) def get_report(self): return self.report diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py index 14597618a9d51..17295ffc07f79 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py @@ -1,7 +1,7 @@ import json import logging import re -from typing import Any, Dict, Generator, List, Optional, Tuple +from typing import Any, Dict, Generator, List, Optional, Tuple, Union import requests import yaml @@ -12,10 +12,22 @@ SchemaField, SchemaMetadata, ) -from datahub.metadata.schema_classes import SchemaFieldDataTypeClass, StringTypeClass +from datahub.metadata.schema_classes import ( + ArrayTypeClass, + BooleanTypeClass, + MapTypeClass, + NullTypeClass, + NumberTypeClass, + SchemaFieldDataTypeClass, + StringTypeClass, +) logger = logging.getLogger(__name__) +GET_METHOD = "get" +CONTENT_TYPE_JSON = "application/json" +CONTENT_TYPE_CSV = "text/csv" + def flatten(d: dict, prefix: str = "") -> Generator: for k, v in d.items(): @@ -42,8 +54,7 @@ def flatten2list(d: dict) -> list: "anotherone.third_a.last" ] """ - fl_l = list(flatten(d)) - return [d[1:] if d[0] == "-" else d for d in fl_l] + return [d[1:] if d[0] == "-" else d for d in flatten(d)] def request_call( @@ -88,93 +99,85 @@ def get_swag_json( return dict_data -def get_url_basepath(sw_dict: dict) -> str: - try: - return sw_dict["basePath"] - except KeyError: # no base path defined - return "" - - def check_sw_version(sw_dict: dict) -> None: - if "swagger" in sw_dict: - v_split = sw_dict["swagger"].split(".") - else: - v_split = sw_dict["openapi"].split(".") + v_split = ( + sw_dict["swagger"].split(".") + if sw_dict.get("swagger", "") + else sw_dict["openapi"].split(".") + ) - version = [int(v) for v in v_split] + version_major, version_minor, *_ = tuple(int(v) for v in v_split) - if version[0] == 3 and version[1] > 0: + if version_major == 3 and version_minor > 0: raise NotImplementedError( "This plugin is not compatible with Swagger version >3.0" ) -def get_endpoints(sw_dict: dict) -> dict: # noqa: C901 +def get_endpoints(specification: dict) -> dict: # noqa: C901 """ Get all the URLs accepting the "GET" method, together with their description and the tags """ url_details = {} - check_sw_version(sw_dict) + check_sw_version(specification) - for p_k, p_o in sw_dict["paths"].items(): + for api_path, path_details in specification["paths"].items(): # will track only the "get" methods, which are the ones that give us data - if "get" in p_o.keys(): - if "200" in p_o["get"]["responses"].keys(): - base_res = p_o["get"]["responses"]["200"] - elif 200 in p_o["get"]["responses"].keys(): - # if you read a plain yml file the 200 will be an integer - base_res = p_o["get"]["responses"][200] - else: - # the endpoint does not have a 200 response + if path_get_details := path_details.get(GET_METHOD): + api_response = path_get_details["responses"].get("200") or path_get_details[ + "responses" + ].get(200) + if api_response is None: continue - if "description" in p_o["get"].keys(): - desc = p_o["get"]["description"] - elif "summary" in p_o["get"].keys(): - desc = p_o["get"]["summary"] - else: # still testing - desc = "" + desc = path_get_details.get("description") or path_get_details.get( + "summary", "" + ) - try: - tags = p_o["get"]["tags"] - except KeyError: - tags = [] + tags = path_get_details.get("tags", []) - url_details[p_k] = {"description": desc, "tags": tags} + url_details[api_path] = { + "description": desc, + "tags": tags, + } + + if api_response.get("schema"): + url_details[api_path]["schema"] = api_response["schema"] # trying if dataset is defined in swagger... - if "content" in base_res.keys(): - res_cont = base_res["content"] - if "application/json" in res_cont.keys(): - ex_field = None - if "example" in res_cont["application/json"]: - ex_field = "example" - elif "examples" in res_cont["application/json"]: - ex_field = "examples" - - if ex_field: - if isinstance(res_cont["application/json"][ex_field], dict): - url_details[p_k]["data"] = res_cont["application/json"][ - ex_field - ] - elif isinstance(res_cont["application/json"][ex_field], list): + if response_content := api_response.get("content"): + if json_schema := response_content.get(CONTENT_TYPE_JSON, {}).get( + "schema" + ): + url_details[api_path]["schema"] = json_schema + elif response_content.get(CONTENT_TYPE_JSON): + example = response_content[CONTENT_TYPE_JSON].get( + "example" + ) or response_content[CONTENT_TYPE_JSON].get("examples") + if example: + if isinstance(example, dict): + url_details[api_path]["data"] = example + elif isinstance(example, list): # taking the first example - url_details[p_k]["data"] = res_cont["application/json"][ - ex_field - ][0] + url_details[api_path]["data"], *_ = example else: logger.warning( - f"Field in swagger file does not give consistent data --- {p_k}" + f"Field in swagger file does not give consistent data --- {api_path}" ) - elif "text/csv" in res_cont.keys(): - url_details[p_k]["data"] = res_cont["text/csv"]["schema"] - elif "examples" in base_res.keys(): - url_details[p_k]["data"] = base_res["examples"]["application/json"] + elif response_content.get(CONTENT_TYPE_CSV): + url_details[api_path]["data"] = response_content[CONTENT_TYPE_CSV][ + "schema" + ] + elif api_response.get("examples"): + url_details[api_path]["data"] = ( + api_response["examples"].get(CONTENT_TYPE_JSON) + or api_response["examples"] + ) # checking whether there are defined parameters to execute the call... - if "parameters" in p_o["get"].keys(): - url_details[p_k]["parameters"] = p_o["get"]["parameters"] + if path_get_details.get("parameters"): + url_details[api_path]["parameters"] = path_get_details["parameters"] return dict(sorted(url_details.items())) @@ -224,7 +227,7 @@ def guessing_url_name(url: str, examples: dict) -> str: # substituting the parameter's name w the value for name, clean_name in zip(needed_n, cleaned_needed_n): - if clean_name in examples[ex2use].keys(): + if examples[ex2use].get(clean_name): guessed_url = re.sub(name, str(examples[ex2use][clean_name]), guessed_url) return guessed_url @@ -242,7 +245,7 @@ def compose_url_attr(raw_url: str, attr_list: list) -> str: attr_list=["2",]) asd2 == "http://asd.com/2" """ - splitted = re.split(r"\{[^}]+\}", raw_url) + splitted = re.split(r"\{[^}]+}", raw_url) if splitted[-1] == "": # it can happen that the last element is empty splitted = splitted[:-1] composed_url = "" @@ -256,13 +259,13 @@ def compose_url_attr(raw_url: str, attr_list: list) -> str: def maybe_theres_simple_id(url: str) -> str: - dets = re.findall(r"(\{[^}]+\})", url) # searching the fields between parenthesis - if len(dets) == 0: + dets = re.findall(r"(\{[^}]+})", url) # searching the fields between parenthesis + if not dets: return url dets_w_id = [det for det in dets if "id" in det] # the fields containing "id" if len(dets) == len(dets_w_id): # if we only have fields containing IDs, we guess to use "1"s - return compose_url_attr(url, ["1" for _ in dets_w_id]) + return compose_url_attr(url, ["1"] * len(dets_w_id)) else: return url @@ -302,24 +305,23 @@ def extract_fields( return [], {} elif isinstance(dict_data, list): # it's maybe just a list - if len(dict_data) == 0: + if not dict_data: logger.warning(f"Empty data --- {dataset_name}") return [], {} # so we take the fields of the first element, # if it's a dict - if isinstance(dict_data[0], dict): - return flatten2list(dict_data[0]), dict_data[0] - elif isinstance(dict_data[0], str): + response_data, *_ = dict_data + if isinstance(response_data, dict): + return flatten2list(response_data), response_data + elif isinstance(response_data, str): # this is actually data - return ["contains_a_string"], {"contains_a_string": dict_data[0]} + return ["contains_a_string"], {"contains_a_string": response_data} else: raise ValueError("unknown format") if len(dict_data.keys()) > 1: # the elements are directly inside the dict return flatten2list(dict_data), dict_data - dst_key = list(dict_data.keys())[ - 0 - ] # the first and unique key is the dataset's name + dst_key, *_ = dict_data.keys() # the first and unique key is the dataset's name try: return flatten2list(dict_data[dst_key]), dict_data[dst_key] @@ -327,10 +329,10 @@ def extract_fields( # if the content is a list, we should treat each element as a dataset. # ..but will take the keys of the first element (to be improved) if isinstance(dict_data[dst_key], list): - if len(dict_data[dst_key]) > 0: - return flatten2list(dict_data[dst_key][0]), dict_data[dst_key][0] - else: + if not dict_data[dst_key]: return [], {} # it's empty! + else: + return flatten2list(dict_data[dst_key][0]), dict_data[dst_key][0] else: logger.warning(f"Unable to get the attributes --- {dataset_name}") return [], {} @@ -394,3 +396,236 @@ def set_metadata( fields=canonical_schema, ) return schema_metadata + + +OBJECT_TYPE = "object" +ARRAY_TYPE = "array" +UNKNOWN_TYPE = "unknown" + +TYPES_MAPPING = { + "string": StringTypeClass, + "integer": NumberTypeClass, + "number": NumberTypeClass, + "boolean": BooleanTypeClass, + ARRAY_TYPE: ArrayTypeClass, + OBJECT_TYPE: MapTypeClass, + UNKNOWN_TYPE: NullTypeClass, +} + + +class SchemaMetadataExtractor: + """ + Class for extracting metadata from schemas, defined in definitions + Recursively going through all fields without max depth limitations, avoiding circle dependencies + """ + + def __init__( + self, + dataset_name: str, + endpoint_schema: dict, + full_specification: dict, + platform: str = "api", + ) -> None: + self.dataset_name = dataset_name + self.endpoint_schema = endpoint_schema + self.full_specification = full_specification + self.platform = platform + self.canonical_schema: list[SchemaField] = [] + self.schemas_stack: list[str] = [] + + def get_schema_by_ref(self, schema_ref: str) -> tuple[str, dict]: + _, *schema_path_parts = schema_ref.split("/") + schema_name = "" + schema_data = self.full_specification + for part in schema_path_parts: + schema_name = part + schema_data = schema_data.get(part, {}) + if not schema_data: + logger.warning(f"Schema is empty --- {schema_name}") + return schema_name, schema_data + + def parse_nested_schema( + self, + schema_ref: str, + field_path: str = "", + ) -> None: + schema_name, inner_schema = self.get_schema_by_ref(schema_ref) + if schema_name in self.schemas_stack: + return + self.parse_schema(inner_schema, field_path, current_schema_name=schema_name) + + def parse_array_type( + self, + items: Union[dict, str], + field_path: str = "", + description: str = "", + current_schema_name: str = "", + ) -> None: + if isinstance(items, dict) and items.get("$ref", ""): + _, inner_schema = self.get_schema_by_ref(items.get("$ref", "")) + if inner_schema.get("oneOf") or inner_schema.get("allOf"): + nested_type = OBJECT_TYPE + else: + nested_type = inner_schema.get("type", UNKNOWN_TYPE) + else: + nested_type = ( + items.get("type", OBJECT_TYPE) if isinstance(items, dict) else items + ) + if field_path: + field = SchemaField( + fieldPath=field_path, + type=SchemaFieldDataTypeClass( + type=ArrayTypeClass(nestedType=[nested_type]) + ), + nativeDataType=repr(ARRAY_TYPE), + description=description, + recursive=False, + ) + self.canonical_schema.append(field) + if not isinstance(items, dict) or nested_type != OBJECT_TYPE: + return + if items.get("$ref"): + self.schemas_stack.append(current_schema_name) + self.parse_nested_schema(items["$ref"], field_path) + self.schemas_stack.pop() + if items.get("properties"): + self.parse_schema(items, field_path, current_schema_name) + + def parse_all_one_of( + self, schemas: list[dict[str, Any]] + ) -> tuple[dict[str, dict[str, Any]], int]: + result_schema: dict[str, dict[str, Any]] = {} + schemas_added_to_stack = 0 + for schema in schemas: + if schema.get("$ref", ""): + schema_name, schema = self.get_schema_by_ref(schema.get("$ref", "")) + self.schemas_stack.append(schema_name) + schemas_added_to_stack += 1 + if schema.get("properties", {}): + result_schema["properties"] = { + **result_schema.get("properties", {}), + **schema["properties"], + } + else: + result_schema = {**result_schema, **schema} + return result_schema, schemas_added_to_stack + + def parse_properties( + self, + properties: dict, + base_path: str = "", + current_schema_name: str = "", + ) -> None: + for column_name, column_props in properties.items(): + field_path = f"{base_path}.{column_name}" if base_path else column_name + if schema_with_of := ( + column_props.get("allOf", []) or column_props.get("oneOf", []) + ): + schema, schemas_in_stack = self.parse_all_one_of(schema_with_of) + self.parse_schema(schema, field_path, current_schema_name) + if schemas_in_stack > 0: + self.schemas_stack = self.schemas_stack[:-schemas_in_stack] + continue + else: + column_type = column_props.get("type", UNKNOWN_TYPE) + + if column_props.get("$ref"): + self.schemas_stack.append(current_schema_name) + self.parse_nested_schema( + column_props["$ref"], + field_path, + ) + self.schemas_stack.pop() + elif column_type == OBJECT_TYPE and column_props.get("properties"): + self.parse_schema(column_props, field_path, current_schema_name) + elif column_type == ARRAY_TYPE: + self.parse_array_type( + column_props.get("items", ""), + field_path, + column_props.get("description", ""), + current_schema_name, + ) + else: + if column_type == UNKNOWN_TYPE: + logger.warning( + f"Unknown type \"{column_props.get('type')}\" for field --- {field_path}" + ) + field = SchemaField( + fieldPath=field_path, + type=SchemaFieldDataTypeClass( + type=TYPES_MAPPING.get(column_type, NullTypeClass)() + ), + nativeDataType=repr(column_type), + description=column_props.get("description", ""), + recursive=False, + ) + self.canonical_schema.append(field) + + def parse_ref(self, ref: str, base_path: str, current_schema_name: str) -> None: + if not ref: + return + self.schemas_stack.append(current_schema_name) + self.parse_nested_schema(ref, base_path) + self.schemas_stack.pop() + + def parse_flatten_schema(self, schema_to_parse: dict, field_path: str) -> None: + field_type = schema_to_parse.get("type", "") + if field_type in (ARRAY_TYPE, OBJECT_TYPE, "") or schema_to_parse.get( + "properties", "" + ): + return + field = SchemaField( + fieldPath=field_path, + type=SchemaFieldDataTypeClass( + type=TYPES_MAPPING.get(field_type, NullTypeClass)() + ), + nativeDataType=repr(field_type), + description=schema_to_parse.get("description", ""), + recursive=False, + ) + self.canonical_schema.append(field) + + def parse_schema( + self, + schema_to_parse: dict, + base_path: str = "", + current_schema_name: str = "", + ) -> None: + self.parse_ref(schema_to_parse.get("$ref", ""), base_path, current_schema_name) + self.parse_properties( + schema_to_parse.get("properties", {}), base_path, current_schema_name + ) + if schema_to_parse.get("type") == ARRAY_TYPE or schema_to_parse.get("items"): + self.parse_array_type( + schema_to_parse["items"], + base_path, + schema_to_parse.get("description", ""), + current_schema_name, + ) + if schema_with_of := ( + schema_to_parse.get("allOf", []) or schema_to_parse.get("oneOf", []) + ): + schema, number_of_schemas_in_stack = self.parse_all_one_of(schema_with_of) + self.parse_schema(schema, base_path, current_schema_name) + if number_of_schemas_in_stack > 0: + self.schemas_stack = self.schemas_stack[:-number_of_schemas_in_stack] + self.parse_flatten_schema(schema_to_parse, base_path) + + def extract_metadata(self) -> Optional[SchemaMetadata]: + self.parse_schema(self.endpoint_schema) + if self.canonical_schema: + if (num_fields := len(self.canonical_schema)) > 1000: + logger.warning( + f"Dataset {self.dataset_name} contains {num_fields} fields" + ) + + schema_metadata = SchemaMetadata( + schemaName=self.dataset_name, + platform=f"urn:li:dataPlatform:{self.platform}", + version=0, + hash="", + platformSchema=OtherSchemaClass(rawSchema=""), + fields=self.canonical_schema, + ) + return schema_metadata + return None diff --git a/metadata-ingestion/tests/unit/test_openapi.py b/metadata-ingestion/tests/unit/test_openapi.py index 64edd7fab2158..5adf506918715 100644 --- a/metadata-ingestion/tests/unit/test_openapi.py +++ b/metadata-ingestion/tests/unit/test_openapi.py @@ -1,14 +1,26 @@ import unittest +import pytest import yaml from datahub.ingestion.source.openapi_parser import ( + SchemaMetadataExtractor, flatten2list, get_endpoints, guessing_url_name, maybe_theres_simple_id, try_guessing, ) +from datahub.metadata.com.linkedin.pegasus2avro.schema import SchemaField +from datahub.metadata.schema_classes import ( + ArrayTypeClass, + BooleanTypeClass, + MapTypeClass, + NullTypeClass, + NumberTypeClass, + SchemaFieldDataTypeClass, + StringTypeClass, +) class TestGetEndpoints(unittest.TestCase): @@ -455,3 +467,716 @@ def test_no_good_guesses(self): url2complete = "/advancedcomputersearches/name/{nasde}/asd/{asd}/jhg" guessed_url = try_guessing(url2complete, self.extr_data2) self.assertEqual(guessed_url, url2complete) + + +NESTED_SCHEMAS_DEFINITIONS = { + "definitions": { + "FirstSchema": { + "properties": { + "fs_first_field": {"type": "string"}, + "fs_second_field": {"type": "boolean"}, + "fs_third_field": {"items": {"type": "string"}, "type": "array"}, + "fs_fourth_field": {"format": "int32", "type": "integer"}, + }, + "type": "object", + }, + "SecondSchema": { + "properties": { + "sc_first_field": { + "format": "int32", + "readOnly": True, + "type": "integer", + }, + "sc_second_field": { + "items": {"type": "string"}, + "readOnly": True, + "type": "array", + }, + "sc_third_field": {"type": "string"}, + "sc_fourth_field": { + "items": {"$ref": "#/definitions/SecondSchema"}, + "type": "array", + }, + "sc_fifth_field": { + "items": {"$ref": "#/definitions/ThirdSchema"}, + "type": "array", + }, + }, + "required": [ + "name", + ], + "type": "object", + }, + "ThirdSchema": { + "properties": { + "ts_first_field": {"$ref": "#/definitions/SecondSchema"}, + "ts_second_field": {"format": "int32", "type": "integer"}, + "ts_third_field": {"type": "string"}, + }, + "required": [ + "model", + ], + "type": "object", + }, + "FourthSchema": { + "properties": { + "os_first_field": { + "items": {"$ref": "#/definitions/FirstSchema"}, + "type": "array", + }, + "os_second_field": {"$ref": "#/definitions/ThirdSchema"}, + "os_third_field": {"type": "string"}, + "oc_fourth_field": {"format": "int32", "type": "integer"}, + }, + "type": "object", + }, + } +} + +SCHEMAS_WITH_ONE_OF_DEFINITIONS = { + "definitions": { + "OneOfFirst": { + "properties": { + "one_non_common": {"type": "string"}, + "common": {"type": "boolean"}, + "common_1": {"type": "string"}, + }, + "type": "object", + }, + "OneOfSecond": { + "properties": { + "two_non_common": { + "format": "int32", + "readOnly": True, + "type": "integer", + }, + "common": {"type": "boolean"}, + "common_1": {"type": "string"}, + }, + "type": "object", + }, + "OneOfThird": {"$ref": "#/definitions/OneOfFirst"}, + } +} + +SCHEMAS_WITH_ONE_OF = [ + { + "properties": { + "one_of": { + "description": "Contains one of", + "oneOf": [ + {"$ref": "#/definitions/OneOfFirst"}, + {"$ref": "#/definitions/OneOfSecond"}, + {"$ref": "#/definitions/OneOfThird"}, + ], + }, + "regular": { + "type": "string", + }, + }, + "type": "object", + } +] + +MIXED_FLATTEN_DEFINITIONS = { + "definitions": { + "flatten_array": { + "items": {}, + "maxItems": 2147483647, + "minItems": 0, + "type": "array", + }, + "flatten_1": { + "allOf": [{"$ref": "#/definitions/fatten_string"}], + }, + "flatten_2": { + "allOf": [{"$ref": "#/definitions/fatten_string"}], + }, + "flatten_3": { + "allOf": [{"$ref": "#/definitions/fatten_string"}], + }, + "fatten_string": {"maxLength": 65535, "minLength": 0, "type": "string"}, + "flatten_with_allof": { + "type": "object", + "allOf": [{"$ref": "#/definitions/fatten_string"}], + }, + } +} + +MIXED_FLATTEN_SCHEMAS = [ + { + "properties": { + "items": { + "allOf": [ + { + "items": {}, + "maxItems": 2147483647, + "minItems": 0, + "type": "array", + }, + { + "items": { + "properties": { + "flatten_1": {"$ref": "#/definitions/flatten_1"}, + "flatten_2": {"$ref": "#/definitions/flatten_2"}, + }, + "type": "object", + }, + "type": "array", + }, + ] + }, + "flatten_3": {"$ref": "#/definitions/flatten_3"}, + "array": { + "type": "array", + "items": {"$ref": "#/definitions/flatten_string"}, + }, + }, + "type": "object", + }, + {"$ref": "#/definitions/flatten_with_allof"}, +] + +SCHEMAS = [ + { + "properties": { + "actions": {"readOnly": True}, + "id": {"type": "string"}, + }, + "type": "object", + }, + { + "properties": { + "e1_f1": {"type": "string"}, + "e2_f2": { + "items": { + "properties": { + "e1_i_f1": {"type": "boolean"}, + "e1_i_f2": {"type": "string"}, + "e1_i_f3": {"type": "object"}, + }, + "type": "object", + }, + "type": "array", + }, + }, + "type": "object", + }, + { + "properties": { + "e2_f1": { + "properties": { + "e2_i_f1": {"items": {"type": "string"}, "type": "array"}, + }, + "type": "object", + } + }, + "type": "object", + }, + { + "properties": { + "e3_f1": { + "items": "string", + "type": "array", + }, + }, + "type": "object", + }, + { + "items": { + "$ref": "#/definitions/FirstSchema", + }, + "type": "array", + }, + { + "properties": { + "e5_f1": { + "items": {"$ref": "#/definitions/MissingSchema"}, + "type": "array", + }, + }, + "type": "object", + }, + { + "items": { + "$ref": "#/definitions/MissingSchema", + "type": "object", + }, + }, + { + "properties": { + "e7_f1": { + "items": "string", + "type": "array", + }, + }, + "type": "object", + }, + { + "properties": { + "e8_f1": { + "type": "array", + }, + }, + "type": "object", + }, + { + "properties": { + "e9_f1": { + "type": "string", + }, + "e9_f2": { + "type": "integer", + "format": "int64", + }, + "e9_f3": { + "type": "array", + "items": { + "$ref": "#/components/schemas/MissingSchema", + }, + }, + }, + }, +] + +FLATTEN_SCHEMAS = [ + { + "type": "array", + "nullable": True, + "minItems": 0, + "maxItems": 99, + "items": { + "type": "string", + "nullable": True, + "minLength": 11, + "maxLength": 11, + "example": "79151234567", + }, + }, + { + "format": "int32", + "type": "integer", + }, + { + "format": "binary", + "type": "string", + }, +] + +EXPECTED_FIELDS = [ + [ + SchemaField( + fieldPath="actions", + type=SchemaFieldDataTypeClass(type=NullTypeClass()), + nativeDataType="'unknown'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="id", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + ], + [ + SchemaField( + fieldPath="e1_f1", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="e2_f2", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["object"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="e2_f2.e1_i_f1", + type=SchemaFieldDataTypeClass(type=BooleanTypeClass()), + nativeDataType="'boolean'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="e2_f2.e1_i_f2", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="e2_f2.e1_i_f3", + type=SchemaFieldDataTypeClass(type=MapTypeClass()), + nativeDataType="'object'", + description="", + recursive=False, + ), + ], + [ + SchemaField( + fieldPath="e2_f1.e2_i_f1", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["string"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + ], + [ + SchemaField( + fieldPath="e3_f1", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["string"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + ], + [ + SchemaField( + fieldPath="fs_first_field", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="fs_second_field", + type=SchemaFieldDataTypeClass(type=BooleanTypeClass()), + nativeDataType="'boolean'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="fs_third_field", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["string"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="fs_fourth_field", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + nativeDataType="'integer'", + description="", + recursive=False, + ), + ], + [ + SchemaField( + fieldPath="e5_f1", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["unknown"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + ], + [], + [ + SchemaField( + fieldPath="e7_f1", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["string"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + ], + [ + SchemaField( + fieldPath="e8_f1", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=[""])), + nativeDataType="'array'", + description="", + recursive=False, + ), + ], + [ + SchemaField( + fieldPath="e9_f1", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="e9_f2", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + nativeDataType="'integer'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="e9_f3", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["unknown"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + ], +] + +NESTED_EXPECTED_FIELDS = [ + SchemaField( + fieldPath="os_first_field", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["object"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_first_field.fs_first_field", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_first_field.fs_second_field", + type=SchemaFieldDataTypeClass(type=BooleanTypeClass()), + nativeDataType="'boolean'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_first_field.fs_third_field", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["string"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_first_field.fs_fourth_field", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + nativeDataType="'integer'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_second_field.ts_first_field.sc_first_field", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + nativeDataType="'integer'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_second_field.ts_first_field.sc_second_field", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["string"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_second_field.ts_first_field.sc_third_field", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_second_field.ts_first_field.sc_fourth_field", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["object"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_second_field.ts_first_field.sc_fifth_field", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["object"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_second_field.ts_second_field", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + nativeDataType="'integer'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_second_field.ts_third_field", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="os_third_field", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="oc_fourth_field", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + nativeDataType="'integer'", + description="", + recursive=False, + ), +] + +FLATTEN_EXPECTED_FIELDS = [ + [], + [ + SchemaField( + fieldPath="", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + nativeDataType="'integer'", + description="", + recursive=False, + ), + ], + [ + SchemaField( + fieldPath="", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + ], +] + +ONE_OF_EXPECTED_FIELDS = [ + [ + SchemaField( + fieldPath="one_of.one_non_common", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="one_of.common", + type=SchemaFieldDataTypeClass(type=BooleanTypeClass()), + nativeDataType="'boolean'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="one_of.common_1", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="one_of.two_non_common", + type=SchemaFieldDataTypeClass(type=NumberTypeClass()), + nativeDataType="'integer'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="regular", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + ] +] + +EXPECTED_MIXED_FLATTEN = [ + [ + SchemaField( + fieldPath="items", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["object"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="items.flatten_1", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="items.flatten_2", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="flatten_3", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + SchemaField( + fieldPath="array", + type=SchemaFieldDataTypeClass(type=ArrayTypeClass(nestedType=["unknown"])), + nativeDataType="'array'", + description="", + recursive=False, + ), + ], + [ + SchemaField( + fieldPath="", + type=SchemaFieldDataTypeClass(type=StringTypeClass()), + nativeDataType="'string'", + description="", + recursive=False, + ), + ], +] + + +@pytest.mark.parametrize("schema, expected_fields", zip(SCHEMAS, EXPECTED_FIELDS)) +def test_schemas_parsing(schema, expected_fields): + metadata_extractor = SchemaMetadataExtractor("", schema, NESTED_SCHEMAS_DEFINITIONS) + metadata_extractor.extract_metadata() + assert metadata_extractor.canonical_schema == expected_fields + + +@pytest.mark.parametrize( + "schema, expected_fields", + ( + ( + NESTED_SCHEMAS_DEFINITIONS["definitions"]["FourthSchema"], + NESTED_EXPECTED_FIELDS, + ), + ), +) +def test_parse_nested_schema(schema, expected_fields): + metadata_extractor = SchemaMetadataExtractor("", {}, NESTED_SCHEMAS_DEFINITIONS) + metadata_extractor.parse_schema(schema) + assert metadata_extractor.canonical_schema == expected_fields + + +@pytest.mark.parametrize( + "schema, expected_fields", zip(FLATTEN_SCHEMAS, FLATTEN_EXPECTED_FIELDS) +) +def test_parse_fatten_responses(schema, expected_fields): + metadata_extractor = SchemaMetadataExtractor("", {}, {}) + metadata_extractor.parse_flatten_schema(schema, "") + assert metadata_extractor.canonical_schema == expected_fields + + +@pytest.mark.parametrize( + "schema, expected_fields", zip(SCHEMAS_WITH_ONE_OF, ONE_OF_EXPECTED_FIELDS) +) +def test_parse_with_one_of(schema, expected_fields): + metadata_extractor = SchemaMetadataExtractor( + "", {}, SCHEMAS_WITH_ONE_OF_DEFINITIONS + ) + metadata_extractor.parse_schema(schema) + assert metadata_extractor.canonical_schema == expected_fields + + +@pytest.mark.parametrize( + "schema, expected_fields", zip(MIXED_FLATTEN_SCHEMAS, EXPECTED_MIXED_FLATTEN) +) +def test_parse_mixed(schema, expected_fields): + metadata_extractor = SchemaMetadataExtractor("", {}, MIXED_FLATTEN_DEFINITIONS) + metadata_extractor.parse_schema(schema) + assert metadata_extractor.canonical_schema == expected_fields