Skip to content

Commit 09cb8b8

Browse files
committed
allow for specifying PropertiesFromEndpoint from the HttpRequester to allow for interpolation on other request options
1 parent 3f2ba17 commit 09cb8b8

File tree

5 files changed

+113
-23
lines changed

5 files changed

+113
-23
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1459,7 +1459,7 @@ definitions:
14591459
properties:
14601460
type:
14611461
type: string
1462-
enum: [ FileUploader ]
1462+
enum: [FileUploader]
14631463
requester:
14641464
description: Requester component that describes how to prepare HTTP requests to send to the source API.
14651465
anyOf:
@@ -1960,6 +1960,10 @@ definitions:
19601960
- "$ref": "#/definitions/DefaultErrorHandler"
19611961
- "$ref": "#/definitions/CustomErrorHandler"
19621962
- "$ref": "#/definitions/CompositeErrorHandler"
1963+
fetch_properties_from_endpoint:
1964+
title: Fetch Properties from Endpoint
1965+
description: Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.
1966+
"$ref": "#/definitions/PropertiesFromEndpoint"
19631967
http_method:
19641968
title: HTTP Method
19651969
description: The HTTP method used to fetch data from the source (can be GET or POST).
@@ -2351,7 +2355,7 @@ definitions:
23512355
properties:
23522356
type:
23532357
type: string
2354-
enum: [ KeyTransformation ]
2358+
enum: [KeyTransformation]
23552359
prefix:
23562360
title: Key Prefix
23572361
description: Prefix to add for object keys. If not provided original keys remain unchanged.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 12 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -880,20 +880,17 @@ class FlattenFields(BaseModel):
880880

881881

882882
class KeyTransformation(BaseModel):
883-
prefix: Optional[Union[str, None]] = Field(
883+
type: Literal["KeyTransformation"]
884+
prefix: Optional[str] = Field(
884885
None,
885886
description="Prefix to add for object keys. If not provided original keys remain unchanged.",
886-
examples=[
887-
"flattened_",
888-
],
887+
examples=["flattened_"],
889888
title="Key Prefix",
890889
)
891-
suffix: Optional[Union[str, None]] = Field(
890+
suffix: Optional[str] = Field(
892891
None,
893892
description="Suffix to add for object keys. If not provided original keys remain unchanged.",
894-
examples=[
895-
"_flattened",
896-
],
893+
examples=["_flattened"],
897894
title="Key Suffix",
898895
)
899896

@@ -916,7 +913,7 @@ class DpathFlattenFields(BaseModel):
916913
description="Whether to replace the origin record or not. Default is False.",
917914
title="Replace Origin Record",
918915
)
919-
key_transformation: Optional[Union[KeyTransformation, None]] = Field(
916+
key_transformation: Optional[KeyTransformation] = Field(
920917
None,
921918
description="Transformation for object keys. If not provided, original key will be used.",
922919
title="Key transformation",
@@ -2088,7 +2085,6 @@ class FileUploader(BaseModel):
20882085
"{{ record.id }}_{{ record.file_name }}/",
20892086
],
20902087
)
2091-
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20922088

20932089

20942090
class DeclarativeStream(BaseModel):
@@ -2249,6 +2245,11 @@ class HttpRequester(BaseModel):
22492245
description="Error handler component that defines how to handle errors.",
22502246
title="Error Handler",
22512247
)
2248+
fetch_properties_from_endpoint: Optional[PropertiesFromEndpoint] = Field(
2249+
None,
2250+
description="Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.",
2251+
title="Fetch Properties from Endpoint",
2252+
)
22522253
http_method: Optional[HttpMethod] = Field(
22532254
HttpMethod.GET,
22542255
description="The HTTP method used to fetch data from the source (can be GET or POST).",
@@ -2650,6 +2651,7 @@ class DynamicDeclarativeStream(BaseModel):
26502651
FileUploader.update_forward_refs()
26512652
DeclarativeStream.update_forward_refs()
26522653
SessionTokenAuthenticator.update_forward_refs()
2654+
HttpRequester.update_forward_refs()
26532655
DynamicSchemaLoader.update_forward_refs()
26542656
ParentStreamConfig.update_forward_refs()
26552657
PropertiesFromEndpoint.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2967,6 +2967,18 @@ def create_simple_retriever(
29672967
model.requester.request_parameters = self._remove_query_properties(
29682968
model.requester.request_parameters
29692969
)
2970+
elif model.requester.fetch_properties_from_endpoint:
2971+
query_properties_definition = QueryPropertiesModel(
2972+
type="QueryProperties",
2973+
property_list=model.requester.fetch_properties_from_endpoint,
2974+
always_include_properties=None,
2975+
property_chunking=None,
2976+
) # type: ignore # $parameters has a default value
2977+
2978+
query_properties = self.create_query_properties(
2979+
model=query_properties_definition,
2980+
config=config,
2981+
)
29702982

29712983
requester = self._create_component_from_model(
29722984
model=model.requester,

airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,3 @@ def get_request_property_chunks(
4646
)
4747
else:
4848
yield list(fields)
49-
50-
# delete later, but leaving this to keep the discussion thread on the PR from getting hidden
51-
def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool:
52-
property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice))
53-
try:
54-
next(property_chunks)
55-
next(property_chunks)
56-
return True
57-
except StopIteration:
58-
return False

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 83 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4119,7 +4119,7 @@ def test_simple_retriever_with_query_properties():
41194119
assert request_options_provider.request_parameters.get("nonary") == "{{config['nonary'] }}"
41204120

41214121

4122-
def test_simple_retriever_with_properties_from_endpoint():
4122+
def test_simple_retriever_with_request_parameters_properties_from_endpoint():
41234123
content = """
41244124
selector:
41254125
type: RecordSelector
@@ -4216,6 +4216,88 @@ def test_simple_retriever_with_properties_from_endpoint():
42164216
assert property_chunking.property_limit == 3
42174217

42184218

4219+
def test_simple_retriever_with_requester_properties_from_endpoint():
4220+
content = """
4221+
selector:
4222+
type: RecordSelector
4223+
extractor:
4224+
type: DpathExtractor
4225+
field_path: ["extractor_path"]
4226+
record_filter:
4227+
type: RecordFilter
4228+
condition: "{{ record['id'] > stream_state['id'] }}"
4229+
requester:
4230+
type: HttpRequester
4231+
name: "{{ parameters['name'] }}"
4232+
url_base: "https://api.hubapi.com"
4233+
http_method: "GET"
4234+
path: "adAnalytics"
4235+
fetch_properties_from_endpoint:
4236+
type: PropertiesFromEndpoint
4237+
property_field_path: [ "name" ]
4238+
retriever:
4239+
type: SimpleRetriever
4240+
requester:
4241+
type: HttpRequester
4242+
url_base: https://api.hubapi.com
4243+
path: "/properties/v2/dynamics/properties"
4244+
http_method: GET
4245+
record_selector:
4246+
type: RecordSelector
4247+
extractor:
4248+
type: DpathExtractor
4249+
field_path: []
4250+
dynamic_properties_stream:
4251+
type: DeclarativeStream
4252+
incremental_sync:
4253+
type: DatetimeBasedCursor
4254+
$parameters:
4255+
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
4256+
start_datetime: "{{ config['start_time'] }}"
4257+
cursor_field: "created"
4258+
retriever:
4259+
type: SimpleRetriever
4260+
name: "{{ parameters['name'] }}"
4261+
requester:
4262+
$ref: "#/requester"
4263+
record_selector:
4264+
$ref: "#/selector"
4265+
$parameters:
4266+
name: "dynamics"
4267+
"""
4268+
4269+
parsed_manifest = YamlDeclarativeSource._parse(content)
4270+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
4271+
stream_manifest = transformer.propagate_types_and_parameters(
4272+
"", resolved_manifest["dynamic_properties_stream"], {}
4273+
)
4274+
4275+
stream = factory.create_component(
4276+
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config
4277+
)
4278+
4279+
query_properties = stream.retriever.additional_query_properties
4280+
assert isinstance(query_properties, QueryProperties)
4281+
assert query_properties.always_include_properties is None
4282+
assert query_properties.property_chunking is None
4283+
4284+
properties_from_endpoint = stream.retriever.additional_query_properties.property_list
4285+
assert isinstance(properties_from_endpoint, PropertiesFromEndpoint)
4286+
assert properties_from_endpoint.property_field_path == ["name"]
4287+
4288+
properties_from_endpoint_retriever = (
4289+
stream.retriever.additional_query_properties.property_list.retriever
4290+
)
4291+
assert isinstance(properties_from_endpoint_retriever, SimpleRetriever)
4292+
4293+
properties_from_endpoint_requester = (
4294+
stream.retriever.additional_query_properties.property_list.retriever.requester
4295+
)
4296+
assert isinstance(properties_from_endpoint_requester, HttpRequester)
4297+
assert properties_from_endpoint_requester.url_base == "https://api.hubapi.com"
4298+
assert properties_from_endpoint_requester.path == "/properties/v2/dynamics/properties"
4299+
4300+
42194301
def test_request_parameters_raise_error_if_not_of_type_query_properties():
42204302
content = """
42214303
selector:

0 commit comments

Comments
 (0)