Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -120,14 +120,6 @@ def propagate_types_and_parameters(
if found_type:
propagated_component["type"] = found_type

# When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
# When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
# be json_schema are not objects but we believe this is not likely in our case because:
# * records are Mapping so objects hence SchemaLoader root should be an object
# * connection_specification is a Mapping
if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
return propagated_component

# Combines parameters defined at the current level with parameters from parent components. Parameters at the current
# level take precedence
current_parameters = dict(copy.deepcopy(parent_parameters))
Expand All @@ -138,6 +130,29 @@ def propagate_types_and_parameters(
else {**current_parameters, **component_parameters}
)

# When processing request parameters which is an object that does not have a type, so $parameters will not be passes to the object.
# But request parameters can have PropertyChunking object that needs to be updated with paranet $parameters.
# When there is a PropertyChunking object _process_property_chunking_property() is called to update PropertyChunking object with $parameters
# and set updated object to propagated_component, then it's returned without propagation.
if "type" not in propagated_component and self._is_property_chunking_component(
propagated_component
):
propagated_component = self._process_property_chunking_property(
propagated_component,
parent_field_identifier,
current_parameters,
use_parent_parameters,
)

# When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
# When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
# be json_schema are not objects but we believe this is not likely in our case because:
# * records are Mapping so objects hence SchemaLoader root should be an object
# * connection_specification is a Mapping

if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
return propagated_component

# Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
# both exist
for parameter_key, parameter_value in current_parameters.items():
Expand Down Expand Up @@ -182,3 +197,30 @@ def propagate_types_and_parameters(
@staticmethod
def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
return propagated_component.get("type") == "object"

@staticmethod
def _is_property_chunking_component(propagated_component: Mapping[str, Any]) -> bool:
has_property_chunking = False
for k, v in propagated_component.items():
if isinstance(v, dict) and v.get("type") == "QueryProperties":
has_property_chunking = True
return has_property_chunking

def _process_property_chunking_property(
self,
propagated_component: Dict[str, Any],
parent_field_identifier: str,
current_parameters: Mapping[str, Any],
use_parent_parameters: Optional[bool] = None,
) -> Dict[str, Any]:
for k, v in propagated_component.items():
if isinstance(v, dict) and v.get("type") == "QueryProperties":
property_chunking_with_parameters = self.propagate_types_and_parameters(
parent_field_identifier,
v,
current_parameters,
use_parent_parameters=use_parent_parameters,
)
propagated_component[k] = property_chunking_with_parameters

return propagated_component
Original file line number Diff line number Diff line change
Expand Up @@ -460,3 +460,112 @@ def test_do_not_propagate_parameters_on_json_schema_object():
actual_component = transformer.propagate_types_and_parameters("", component, {})

assert actual_component == expected_component


def test_propagate_property_chunking():
component = {
"type": "DeclarativeStream",
"streams": [
{
"type": "DeclarativeStream",
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://test.com",
"request_parameters": {
"properties": {
"type": "QueryProperties",
"property_list": {
"type": "PropertiesFromEndpoint",
"property_field_path": ["name"],
"retriever": {
"type": "SimpleRetriever",
"requester": {
"type": "HttpRequester",
"url_base": "https://test.com",
"authenticator": {
"$ref": "#/definitions/authenticator"
},
"path": "/properties/{{ parameters.entity }}/properties",
"http_method": "GET",
"request_headers": {"Content-Type": "application/json"},
},
},
},
"property_chunking": {
"type": "PropertyChunking",
"property_limit_type": "characters",
"property_limit": 15000,
},
}
},
},
},
"$parameters": {"entity": "test_entity"},
}
],
}
expected_component = {
"streams": [
{
"$parameters": {"entity": "test_entity"},
"entity": "test_entity",
"retriever": {
"$parameters": {"entity": "test_entity"},
"entity": "test_entity",
"requester": {
"$parameters": {"entity": "test_entity"},
"entity": "test_entity",
"request_parameters": {
"properties": {
"$parameters": {"entity": "test_entity"},
"entity": "test_entity",
"property_chunking": {
"$parameters": {"entity": "test_entity"},
"entity": "test_entity",
"property_limit": 15000,
"property_limit_type": "characters",
"type": "PropertyChunking",
},
"property_list": {
"$parameters": {"entity": "test_entity"},
"entity": "test_entity",
"property_field_path": ["name"],
"retriever": {
"$parameters": {"entity": "test_entity"},
"entity": "test_entity",
"requester": {
"$parameters": {"entity": "test_entity"},
"authenticator": {
"$ref": "#/definitions/authenticator"
},
"entity": "test_entity",
"http_method": "GET",
"path": "/properties/{{ "
"parameters.entity "
"}}/properties",
"request_headers": {"Content-Type": "application/json"},
"type": "HttpRequester",
"url_base": "https://test.com",
},
"type": "SimpleRetriever",
},
"type": "PropertiesFromEndpoint",
},
"type": "QueryProperties",
}
},
"type": "HttpRequester",
"url_base": "https://test.com",
},
"type": "SimpleRetriever",
},
"type": "DeclarativeStream",
}
],
"type": "DeclarativeStream",
}
transformer = ManifestComponentTransformer()
actual_component = transformer.propagate_types_and_parameters("", component, {})
assert actual_component == expected_component
Loading