Skip to content
Merged
18 changes: 9 additions & 9 deletions metadata-ingestion/docs/sources/openapi/openapi.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,11 @@ The dataset metadata should be defined directly in the Swagger file, section `["

## Capabilities

The plugin read the swagger file where the endopints are defined and searches for the ones which accept
a `GET` call: those are the ones supposed to give back the datasets.
This plugin reads the swagger file where the endpoints are defined, reads example data if provided (for any method), or searches for
data for the endpoints which do not have example data and accept a `GET` call.

For every selected endpoint defined in the `paths` section,
the tool searches whether the medatada are already defined in there.
the tool searches whether the metadata are already defined.
As example, if in your swagger file there is the `/api/users/` defined as follows:

```yaml
Expand All @@ -27,7 +27,7 @@ paths:

then this plugin has all the information needed to create the dataset in DataHub.

In case there is no example defined, the plugin will try to get the metadata directly from the endpoint.
In case there is no example defined, the plugin will try to get the metadata directly from the endpoint, if it is a `GET` method.
So, if in your swagger file you have

```yaml
Expand All @@ -42,7 +42,7 @@ paths:
description: Return the list of colors
```

the tool will make a `GET` call to `https:///test_endpoint.com/colors`
the tool will make a `GET` call to `https://test_endpoint.com/colors`
and parse the response obtained.

### Automatically recorded examples
Expand All @@ -53,7 +53,7 @@ Sometimes you can have an endpoint which wants a parameter to work, like
Since in the OpenApi specifications the listing endpoints are specified
just before the detailed ones, in the list of the paths, you will find

https:///test_endpoint.com/colors
https://test_endpoint.com/colors

defined before

Expand All @@ -80,7 +80,7 @@ and this last URL will be called to get back the needed metadata.
If no useful example is found, a second procedure will try to guess a numerical ID.
So if we have:

https:///test_endpoint.com/colors/{colorID}
https://test_endpoint.com/colors/{colorID}

and there is no `colorID` example already found by the plugin,
it will try to put a number one (1) at the parameter place
Expand Down Expand Up @@ -120,8 +120,8 @@ paths:
description: Return details about the group
```

and the plugin did not found an example in its previous calls,
so the tool have no idea about what substitute to the `{name}` part.
and the plugin did not find an example in its previous calls,
the tool has no idea about what to substitute for the `{name}` part.

By specifying in the configuration file

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,6 @@ def _generate_properties_mcp(
def generate_mcp(
self, upsert: bool
) -> Iterable[Union[MetadataChangeProposalWrapper, MetadataChangeProposalClass]]:

if self._resolved_domain_urn is None:
raise Exception(
f"Unable to generate MCP-s because we were unable to resolve the domain {self.domain} to an urn."
Expand Down Expand Up @@ -440,7 +439,6 @@ def patch_yaml(
original_dataproduct: DataProduct,
output_file: Path,
) -> bool:

update_needed = False
if not original_dataproduct._original_yaml_dict:
raise Exception("Original Data Product was not loaded from yaml")
Expand Down Expand Up @@ -523,7 +521,6 @@ def to_yaml(
self,
file: Path,
) -> None:

with open(file, "w") as fp:
yaml = YAML(typ="rt") # default, if not specfied, is 'rt' (round-trip)
yaml.indent(mapping=2, sequence=4, offset=2)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ def _abort_if_non_existent_urn(graph: DataHubGraph, urn: str, operation: str) ->


def _print_diff(orig_file, new_file):

with open(orig_file) as fp:
orig_lines = fp.readlines()
with open(new_file) as fp:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,6 @@ def from_query_event(
query_event: QueryEvent,
debug_include_full_payloads: bool = False,
) -> "ReadEvent":

readEvent = ReadEvent(
actor_email=query_event.actor_email,
timestamp=query_event.timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,6 @@ def _get_exported_bigquery_audit_metadata(
def _get_bigquery_log_entries_via_gcp_logging(
self, client: GCPLoggingClient, limit: Optional[int] = None
) -> Iterable[AuditLogEntry]:

filter = self._generate_filter(BQ_AUDIT_V2)
logger.debug(filter)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@


class PathSpecsConfigMixin(ConfigModel):

path_specs: List[PathSpec] = Field(
description="List of PathSpec. See [below](#path-spec) the details about PathSpec"
)
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,6 @@ def external_url_defaults_to_api_config_base_url(
def stateful_ingestion_should_be_enabled(
cls, v: Optional[bool], *, values: Dict[str, Any], **kwargs: Dict[str, Any]
) -> Optional[bool]:

stateful_ingestion: StatefulStaleMetadataRemovalConfig = cast(
StatefulStaleMetadataRemovalConfig, values.get("stateful_ingestion")
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1128,7 +1128,6 @@ def get_workunit_processors(self) -> List[Optional[MetadataWorkUnitProcessor]]:
def emit_independent_looks_mcp(
self, dashboard_element: LookerDashboardElement
) -> Iterable[MetadataWorkUnit]:

yield from auto_workunit(
stream=self._make_chart_metadata_events(
dashboard_element=dashboard_element,
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/src/datahub/ingestion/source/nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ def validate_auth_params(cla, values):

@root_validator(pre=False)
def validator_site_url_to_site_name(cls, values):

site_url_to_site_name = values.get("site_url_to_site_name")
site_url = values.get("site_url")
site_name = values.get("site_name")
Expand Down
6 changes: 6 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/source/openapi.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,12 @@ def get_workunits_internal(self) -> Iterable[ApiWorkUnit]: # noqa: C901
schema_metadata = set_metadata(dataset_name, endpoint_dets["data"])
dataset_snapshot.aspects.append(schema_metadata)
yield self.build_wu(dataset_snapshot, dataset_name)
elif endpoint_dets["method"] != "get":
self.report.report_warning(
key=endpoint_k,
reason=f"No example provided for {endpoint_dets['method']}",
)
continue # Only test endpoints if they're GETs
elif (
"{" not in endpoint_k
): # if the API does not explicitly require parameters
Expand Down
130 changes: 65 additions & 65 deletions metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,13 @@ def get_swag_json(


def get_url_basepath(sw_dict: dict) -> str:
try:
if "basePath" in sw_dict:
return sw_dict["basePath"]
except KeyError: # no base path defined
return ""
if "servers" in sw_dict:
# When the API path doesn't match the OAS path
return sw_dict["servers"][0]["url"]

return ""


def check_sw_version(sw_dict: dict) -> None:
Expand All @@ -111,70 +114,69 @@ def check_sw_version(sw_dict: dict) -> None:

def get_endpoints(sw_dict: dict) -> dict: # noqa: C901
"""
Get all the URLs accepting the "GET" method, together with their description and the tags
Get all the URLs, together with their description and the tags
"""
url_details = {}

check_sw_version(sw_dict)

for p_k, p_o in sw_dict["paths"].items():
# will track only the "get" methods, which are the ones that give us data
if "get" in p_o.keys():
if "200" in p_o["get"]["responses"].keys():
base_res = p_o["get"]["responses"]["200"]
elif 200 in p_o["get"]["responses"].keys():
# if you read a plain yml file the 200 will be an integer
base_res = p_o["get"]["responses"][200]
else:
# the endpoint does not have a 200 response
continue

if "description" in p_o["get"].keys():
desc = p_o["get"]["description"]
elif "summary" in p_o["get"].keys():
desc = p_o["get"]["summary"]
else: # still testing
desc = ""

try:
tags = p_o["get"]["tags"]
except KeyError:
tags = []

url_details[p_k] = {"description": desc, "tags": tags}

# trying if dataset is defined in swagger...
if "content" in base_res.keys():
res_cont = base_res["content"]
if "application/json" in res_cont.keys():
ex_field = None
if "example" in res_cont["application/json"]:
ex_field = "example"
elif "examples" in res_cont["application/json"]:
ex_field = "examples"

if ex_field:
if isinstance(res_cont["application/json"][ex_field], dict):
url_details[p_k]["data"] = res_cont["application/json"][
ex_field
]
elif isinstance(res_cont["application/json"][ex_field], list):
# taking the first example
url_details[p_k]["data"] = res_cont["application/json"][
ex_field
][0]
else:
logger.warning(
f"Field in swagger file does not give consistent data --- {p_k}"
)
elif "text/csv" in res_cont.keys():
url_details[p_k]["data"] = res_cont["text/csv"]["schema"]
elif "examples" in base_res.keys():
url_details[p_k]["data"] = base_res["examples"]["application/json"]

# checking whether there are defined parameters to execute the call...
if "parameters" in p_o["get"].keys():
url_details[p_k]["parameters"] = p_o["get"]["parameters"]
method = list(p_o)[0]
if "200" in p_o[method]["responses"].keys():
base_res = p_o[method]["responses"]["200"]
elif 200 in p_o[method]["responses"].keys():
# if you read a plain yml file the 200 will be an integer
base_res = p_o[method]["responses"][200]
else:
# the endpoint does not have a 200 response
continue

if "description" in p_o[method].keys():
desc = p_o[method]["description"]
elif "summary" in p_o[method].keys():
desc = p_o[method]["summary"]
else: # still testing
desc = ""

try:
tags = p_o[method]["tags"]
except KeyError:
tags = []

url_details[p_k] = {"description": desc, "tags": tags, "method": method}

# trying if dataset is defined in swagger...
if "content" in base_res.keys():
res_cont = base_res["content"]
if "application/json" in res_cont.keys():
ex_field = None
if "example" in res_cont["application/json"]:
ex_field = "example"
elif "examples" in res_cont["application/json"]:
ex_field = "examples"

if ex_field:
if isinstance(res_cont["application/json"][ex_field], dict):
url_details[p_k]["data"] = res_cont["application/json"][
ex_field
]
elif isinstance(res_cont["application/json"][ex_field], list):
# taking the first example
url_details[p_k]["data"] = res_cont["application/json"][
ex_field
][0]
else:
logger.warning(
f"Field in swagger file does not give consistent data --- {p_k}"
)
elif "text/csv" in res_cont.keys():
url_details[p_k]["data"] = res_cont["text/csv"]["schema"]
elif "examples" in base_res.keys():
url_details[p_k]["data"] = base_res["examples"]["application/json"]

# checking whether there are defined parameters to execute the call...
if "parameters" in p_o[method].keys():
url_details[p_k]["parameters"] = p_o[method]["parameters"]

return dict(sorted(url_details.items()))

Expand Down Expand Up @@ -314,12 +316,10 @@ def extract_fields(
return ["contains_a_string"], {"contains_a_string": dict_data[0]}
else:
raise ValueError("unknown format")
if len(dict_data.keys()) > 1:
if len(dict_data) > 0:
# the elements are directly inside the dict
return flatten2list(dict_data), dict_data
dst_key = list(dict_data.keys())[
0
] # the first and unique key is the dataset's name
dst_key = list(dict_data)[0] # the first and unique key is the dataset's name

try:
return flatten2list(dict_data[dst_key]), dict_data[dst_key]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ def to_datahub_schema(
self,
table: powerbi_data_classes.Table,
) -> SchemaMetadataClass:

fields = []
table_fields = (
[self.to_datahub_schema_field(column) for column in table.columns]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,7 +158,6 @@ def get_all_tags_on_object_with_propagation(

@staticmethod
def get_all_tags_in_database_without_propagation(db_name: str) -> str:

allowed_object_domains = (
"("
f"'{SnowflakeObjectDomain.DATABASE.upper()}',"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,11 @@ def get_percent_entities_changed(
def compute_percent_entities_changed(
new_entities: List[str], old_entities: List[str]
) -> float:
(overlap_count, old_count, _,) = _get_entity_overlap_and_cardinalities(
(
overlap_count,
old_count,
_,
) = _get_entity_overlap_and_cardinalities(
new_entities=new_entities, old_entities=old_entities
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ def transform_generic_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[GenericAspectClass]
) -> Optional[GenericAspectClass]:
"""Implement this method to transform the single custom aspect for an entity.
The purpose of this abstract method is to reinforce the use of GenericAspectClass."""
The purpose of this abstract method is to reinforce the use of GenericAspectClass.
"""
pass

def _transform_or_record_mcpc(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ def run_ingest(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:

mock_checkpoint.return_value = mock_datahub_graph

mocked_functions_reference(
Expand Down
1 change: 0 additions & 1 deletion metadata-ingestion/tests/integration/looker/test_looker.py
Original file line number Diff line number Diff line change
Expand Up @@ -914,7 +914,6 @@ def test_independent_soft_deleted_looks(
mocked_client = mock.MagicMock()

with mock.patch("looker_sdk.init40") as mock_sdk:

mock_sdk.return_value = mocked_client
setup_mock_look(mocked_client)
setup_mock_soft_deleted_look(mocked_client)
Expand Down
3 changes: 0 additions & 3 deletions metadata-ingestion/tests/integration/okta/test_okta.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,12 @@ def run_ingest(
mocked_functions_reference,
recipe,
):

with patch(
"datahub.ingestion.source.identity.okta.OktaClient"
) as MockClient, patch(
"datahub.ingestion.source.state_provider.datahub_ingestion_checkpointing_provider.DataHubGraph",
mock_datahub_graph,
) as mock_checkpoint:

mock_checkpoint.return_value = mock_datahub_graph

mocked_functions_reference(MockClient=MockClient)
Expand Down Expand Up @@ -277,7 +275,6 @@ def overwrite_group_in_mocked_data(test_resources_dir, MockClient):
def _init_mock_okta_client(
test_resources_dir, MockClient, mock_users_json=None, mock_groups_json=None
):

okta_users_json_file = (
test_resources_dir / "okta_users.json"
if mock_users_json is None
Expand Down
Loading