Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 34 additions & 3 deletions metadata-ingestion/docs/sources/openapi/openapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,40 @@ The plugin read the swagger file where the endopints are defined and searches fo
a `GET` call: those are the ones supposed to give back the datasets.

For every selected endpoint defined in the `paths` section,
the tool searches whether the medatada are already defined in there.
As example, if in your swagger file there is the `/api/users/` defined as follows:
the tool searches whether the schema or metadata are already defined in there.
As example, if in your swagger file there is the `/api/pets/` defined as follows:

```yaml
components:
schemas:
Pet:
type: "object"
properties:
id:
type: "string"
format: "uuid"
minLength: 36
maxLength: 36
name:
type: "string"
minLength: 10
maxLength: 2048

paths:
/api/pets/:
get:
tags: [ "Pets" ]
operationID: GetPets
description: Retrieve pets data
responses:
200:
description: Return the list of pets
content:
application/json:
schema:
$ref: "#/components/schemas/Pet"
```
or if there is no schema defined, but example presents
```yaml
paths:
/api/users/:
Expand All @@ -27,7 +58,7 @@ paths:

then this plugin has all the information needed to create the dataset in DataHub.

In case there is no example defined, the plugin will try to get the metadata directly from the endpoint.
In case there is no schemas or example defined, the plugin will try to get the metadata directly from the endpoint.
So, if in your swagger file you have

```yaml
Expand Down
78 changes: 44 additions & 34 deletions metadata-ingestion/src/datahub/ingestion/source/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@
from datahub.ingestion.api.source import Source, SourceReport
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.openapi_parser import (
SchemaMetadataExtractor,
clean_url,
compose_url_attr,
extract_fields,
get_endpoints,
get_swag_json,
get_tok,
get_url_basepath,
request_call,
set_metadata,
try_guessing,
Expand Down Expand Up @@ -139,25 +139,17 @@ def __init__(self, config: OpenApiConfig, ctx: PipelineContext, platform: str):
self.report = SourceReport()
self.url_basepath = ""

def report_bad_responses(self, status_code: int, key: str) -> None:
if status_code == 400:
self.report.report_warning(
key=key, reason="Unknown error for reaching endpoint"
)
elif status_code == 403:
self.report.report_warning(key=key, reason="Not authorised to get endpoint")
elif status_code == 404:
self.report.report_warning(
key=key,
reason="Unable to find an example for endpoint. Please add it to the list of forced examples.",
)
elif status_code == 500:
self.report.report_warning(
key=key, reason="Server error for reaching endpoint"
)
elif status_code == 504:
self.report.report_warning(key=key, reason="Timeout for reaching endpoint")
else:
def report_bad_response(self, status_code: int, key: str) -> None:
codes_mapping = {
400: "Unknown error for reaching endpoint",
403: "Not authorised to get endpoint",
404: "Unable to find an example for endpoint. Please add it to the list of forced examples.",
500: "Server error for reaching endpoint",
504: "Timeout for reaching endpoint",
}

reason = codes_mapping.get(status_code)
if reason is None:
raise Exception(
f"Unable to retrieve endpoint, response code {status_code}, key {key}"
)
Expand Down Expand Up @@ -187,13 +179,13 @@ def init_dataset(
dataset_snapshot.aspects.append(dataset_properties)

# adding tags
tags_str = [make_tag_urn(t) for t in endpoint_dets["tags"]]
tags_str = (make_tag_urn(t) for t in endpoint_dets["tags"])
tags_tac = [TagAssociationClass(t) for t in tags_str]
gtc = GlobalTagsClass(tags_tac)
dataset_snapshot.aspects.append(gtc)

# the link will appear in the "documentation"
link_url = clean_url(config.url + self.url_basepath + endpoint_k)
link_url = clean_url(f"{config.url}{self.url_basepath}{endpoint_k}")
link_description = "Link to call for the dataset."
creation = AuditStampClass(
time=int(time.time()), actor="urn:li:corpuser:etl", impersonator=None
Expand All @@ -215,18 +207,18 @@ def build_wu(
def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
config = self.config

sw_dict = self.config.get_swagger()
specification = self.config.get_swagger()

self.url_basepath = get_url_basepath(sw_dict)
self.url_basepath = specification.get("basePath", "")

# Getting all the URLs accepting the "GET" method
with warnings.catch_warnings(record=True) as warn_c:
url_endpoints = get_endpoints(sw_dict)
url_endpoints = get_endpoints(specification)

for w in warn_c:
w_msg = w.message
w_spl = w_msg.args[0].split(" --- ") # type: ignore
self.report.report_warning(key=w_spl[1], reason=w_spl[0])
w_spl_reason, w_spl_key, *_ = w_msg.args[0].split(" --- ") # type: ignore
self.report.report_warning(key=w_spl_key, reason=w_spl_reason)

# here we put a sample from the "listing endpoint". To be used for later guessing of comosed endpoints.
root_dataset_samples = {}
Expand All @@ -241,15 +233,31 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
)

# adding dataset fields
if "data" in endpoint_dets.keys():
if (
endpoint_dets.get("schema")
and endpoint_dets.get("schema", {}).get("AnyValue") is None
):
metadata_extractor = SchemaMetadataExtractor(
dataset_name,
endpoint_dets["schema"],
specification,
)
schema_metadata = metadata_extractor.extract_metadata()
if schema_metadata:
schema_metadata = set_metadata(dataset_name, endpoint_dets["data"])
dataset_snapshot.aspects.append(schema_metadata)
yield self.build_wu(dataset_snapshot, dataset_name)
continue

if endpoint_dets.get("data", {}):
# we are lucky! data is defined in the swagger for this endpoint
schema_metadata = set_metadata(dataset_name, endpoint_dets["data"])
dataset_snapshot.aspects.append(schema_metadata)
yield self.build_wu(dataset_snapshot, dataset_name)
elif (
"{" not in endpoint_k
): # if the API does not explicitly require parameters
tot_url = clean_url(config.url + self.url_basepath + endpoint_k)
tot_url = clean_url(f"{config.url}{self.url_basepath}{endpoint_k}")

if config.token:
response = request_call(tot_url, token=config.token)
Expand All @@ -268,12 +276,12 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901

yield self.build_wu(dataset_snapshot, dataset_name)
else:
self.report_bad_responses(response.status_code, key=endpoint_k)
self.report_bad_response(response.status_code, key=endpoint_k)
else:
if endpoint_k not in config.forced_examples.keys():
# start guessing...
url_guess = try_guessing(endpoint_k, root_dataset_samples)
tot_url = clean_url(config.url + self.url_basepath + url_guess)
tot_url = clean_url(f"{config.url}{self.url_basepath}{url_guess}")
if config.token:
response = request_call(tot_url, token=config.token)
else:
Expand All @@ -291,12 +299,14 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901

yield self.build_wu(dataset_snapshot, dataset_name)
else:
self.report_bad_responses(response.status_code, key=endpoint_k)
self.report_bad_response(response.status_code, key=endpoint_k)
else:
composed_url = compose_url_attr(
raw_url=endpoint_k, attr_list=config.forced_examples[endpoint_k]
)
tot_url = clean_url(config.url + self.url_basepath + composed_url)
tot_url = clean_url(
f"{config.url}{self.url_basepath}{composed_url}"
)
if config.token:
response = request_call(tot_url, token=config.token)
else:
Expand All @@ -314,7 +324,7 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901

yield self.build_wu(dataset_snapshot, dataset_name)
else:
self.report_bad_responses(response.status_code, key=endpoint_k)
self.report_bad_response(response.status_code, key=endpoint_k)

def get_report(self):
return self.report
Expand Down
Loading