Skip to content

Commit 0006ceb

Browse files
authored
feat(property chunking): Allow PropertiesFromEndpoint to be defined on HttpRequester (#507)
1 parent 0a5dee2 commit 0006ceb

File tree

5 files changed

+219
-22
lines changed

5 files changed

+219
-22
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1464,7 +1464,7 @@ definitions:
14641464
properties:
14651465
type:
14661466
type: string
1467-
enum: [ FileUploader ]
1467+
enum: [FileUploader]
14681468
requester:
14691469
description: Requester component that describes how to prepare HTTP requests to send to the source API.
14701470
anyOf:
@@ -1978,6 +1978,10 @@ definitions:
19781978
- "$ref": "#/definitions/SelectiveAuthenticator"
19791979
- "$ref": "#/definitions/CustomAuthenticator"
19801980
- "$ref": "#/definitions/LegacySessionTokenAuthenticator"
1981+
fetch_properties_from_endpoint:
1982+
title: Fetch Properties from Endpoint
1983+
description: Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.
1984+
"$ref": "#/definitions/PropertiesFromEndpoint"
19811985
request_body_data:
19821986
title: Request Body Payload (Non-JSON)
19831987
description: Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.
@@ -2370,7 +2374,7 @@ definitions:
23702374
properties:
23712375
type:
23722376
type: string
2373-
enum: [ KeyTransformation ]
2377+
enum: [KeyTransformation]
23742378
prefix:
23752379
title: Key Prefix
23762380
description: Prefix to add for object keys. If not provided original keys remain unchanged.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -2252,6 +2254,11 @@ class HttpRequester(BaseModel):
22522254
description="Authentication method to use for requests sent to the API.",
22532255
title="Authenticator",
22542256
)
2257+
fetch_properties_from_endpoint: Optional[PropertiesFromEndpoint] = Field(
2258+
None,
2259+
description="Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.",
2260+
title="Fetch Properties from Endpoint",
2261+
)
22552262
request_body_data: Optional[Union[Dict[str, str], str]] = Field(
22562263
None,
22572264
description="Specifies how to populate the body of the request with a non-JSON payload. Plain text will be sent as is, whereas objects will be converted to a urlencoded form.",

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 40 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2951,16 +2951,19 @@ def create_simple_retriever(
29512951

29522952
query_properties: Optional[QueryProperties] = None
29532953
query_properties_key: Optional[str] = None
2954-
if (
2955-
hasattr(model.requester, "request_parameters")
2956-
and model.requester.request_parameters
2957-
and isinstance(model.requester.request_parameters, Mapping)
2958-
):
2954+
if self._query_properties_in_request_parameters(model.requester):
2955+
# It is better to be explicit about an error if PropertiesFromEndpoint is defined in multiple
2956+
# places instead of default to request_parameters which isn't clearly documented
2957+
if (
2958+
hasattr(model.requester, "fetch_properties_from_endpoint")
2959+
and model.requester.fetch_properties_from_endpoint
2960+
):
2961+
raise ValueError(
2962+
f"PropertiesFromEndpoint should only be specified once per stream, but found in {model.requester.type}.fetch_properties_from_endpoint and {model.requester.type}.request_parameters"
2963+
)
2964+
29592965
query_properties_definitions = []
2960-
for key, request_parameter in model.requester.request_parameters.items():
2961-
# When translating JSON schema into Pydantic models, enforcing types for arrays containing both
2962-
# concrete string complex object definitions like QueryProperties would get resolved to Union[str, Any].
2963-
# This adds the extra validation that we couldn't get for free in Pydantic model generation
2966+
for key, request_parameter in model.requester.request_parameters.items(): # type: ignore # request_parameters is already validated to be a Mapping using _query_properties_in_request_parameters()
29642967
if isinstance(request_parameter, QueryPropertiesModel):
29652968
query_properties_key = key
29662969
query_properties_definitions.append(request_parameter)
@@ -2974,6 +2977,21 @@ def create_simple_retriever(
29742977
query_properties = self._create_component_from_model(
29752978
model=query_properties_definitions[0], config=config
29762979
)
2980+
elif (
2981+
hasattr(model.requester, "fetch_properties_from_endpoint")
2982+
and model.requester.fetch_properties_from_endpoint
2983+
):
2984+
query_properties_definition = QueryPropertiesModel(
2985+
type="QueryProperties",
2986+
property_list=model.requester.fetch_properties_from_endpoint,
2987+
always_include_properties=None,
2988+
property_chunking=None,
2989+
) # type: ignore # $parameters has a default value
2990+
2991+
query_properties = self.create_query_properties(
2992+
model=query_properties_definition,
2993+
config=config,
2994+
)
29772995

29782996
requester = self._create_component_from_model(
29792997
model=model.requester,
@@ -3093,6 +3111,19 @@ def create_simple_retriever(
30933111
parameters=model.parameters or {},
30943112
)
30953113

3114+
@staticmethod
3115+
def _query_properties_in_request_parameters(
3116+
requester: Union[HttpRequesterModel, CustomRequesterModel],
3117+
) -> bool:
3118+
if not hasattr(requester, "request_parameters"):
3119+
return False
3120+
request_parameters = requester.request_parameters
3121+
if request_parameters and isinstance(request_parameters, Mapping):
3122+
for request_parameter in request_parameters.values():
3123+
if isinstance(request_parameter, QueryPropertiesModel):
3124+
return True
3125+
return False
3126+
30963127
@staticmethod
30973128
def _remove_query_properties(
30983129
request_parameters: Mapping[str, Union[str, QueryPropertiesModel]],

airbyte_cdk/sources/declarative/requesters/query_properties/query_properties.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -46,13 +46,3 @@ def get_request_property_chunks(
4646
)
4747
else:
4848
yield list(fields)
49-
50-
# delete later, but leaving this to keep the discussion thread on the PR from getting hidden
51-
def has_multiple_chunks(self, stream_slice: Optional[StreamSlice]) -> bool:
52-
property_chunks = iter(self.get_request_property_chunks(stream_slice=stream_slice))
53-
try:
54-
next(property_chunks)
55-
next(property_chunks)
56-
return True
57-
except StopIteration:
58-
return False

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 166 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4119,7 +4119,7 @@ def test_simple_retriever_with_query_properties():
41194119
assert request_options_provider.request_parameters.get("nonary") == "{{config['nonary'] }}"
41204120

41214121

4122-
def test_simple_retriever_with_properties_from_endpoint():
4122+
def test_simple_retriever_with_request_parameters_properties_from_endpoint():
41234123
content = """
41244124
selector:
41254125
type: RecordSelector
@@ -4216,6 +4216,88 @@ def test_simple_retriever_with_properties_from_endpoint():
42164216
assert property_chunking.property_limit == 3
42174217

42184218

4219+
def test_simple_retriever_with_requester_properties_from_endpoint():
4220+
content = """
4221+
selector:
4222+
type: RecordSelector
4223+
extractor:
4224+
type: DpathExtractor
4225+
field_path: ["extractor_path"]
4226+
record_filter:
4227+
type: RecordFilter
4228+
condition: "{{ record['id'] > stream_state['id'] }}"
4229+
requester:
4230+
type: HttpRequester
4231+
name: "{{ parameters['name'] }}"
4232+
url_base: "https://api.hubapi.com"
4233+
http_method: "GET"
4234+
path: "adAnalytics"
4235+
fetch_properties_from_endpoint:
4236+
type: PropertiesFromEndpoint
4237+
property_field_path: [ "name" ]
4238+
retriever:
4239+
type: SimpleRetriever
4240+
requester:
4241+
type: HttpRequester
4242+
url_base: https://api.hubapi.com
4243+
path: "/properties/v2/dynamics/properties"
4244+
http_method: GET
4245+
record_selector:
4246+
type: RecordSelector
4247+
extractor:
4248+
type: DpathExtractor
4249+
field_path: []
4250+
dynamic_properties_stream:
4251+
type: DeclarativeStream
4252+
incremental_sync:
4253+
type: DatetimeBasedCursor
4254+
$parameters:
4255+
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
4256+
start_datetime: "{{ config['start_time'] }}"
4257+
cursor_field: "created"
4258+
retriever:
4259+
type: SimpleRetriever
4260+
name: "{{ parameters['name'] }}"
4261+
requester:
4262+
$ref: "#/requester"
4263+
record_selector:
4264+
$ref: "#/selector"
4265+
$parameters:
4266+
name: "dynamics"
4267+
"""
4268+
4269+
parsed_manifest = YamlDeclarativeSource._parse(content)
4270+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
4271+
stream_manifest = transformer.propagate_types_and_parameters(
4272+
"", resolved_manifest["dynamic_properties_stream"], {}
4273+
)
4274+
4275+
stream = factory.create_component(
4276+
model_type=DeclarativeStreamModel, component_definition=stream_manifest, config=input_config
4277+
)
4278+
4279+
query_properties = stream.retriever.additional_query_properties
4280+
assert isinstance(query_properties, QueryProperties)
4281+
assert query_properties.always_include_properties is None
4282+
assert query_properties.property_chunking is None
4283+
4284+
properties_from_endpoint = stream.retriever.additional_query_properties.property_list
4285+
assert isinstance(properties_from_endpoint, PropertiesFromEndpoint)
4286+
assert properties_from_endpoint.property_field_path == ["name"]
4287+
4288+
properties_from_endpoint_retriever = (
4289+
stream.retriever.additional_query_properties.property_list.retriever
4290+
)
4291+
assert isinstance(properties_from_endpoint_retriever, SimpleRetriever)
4292+
4293+
properties_from_endpoint_requester = (
4294+
stream.retriever.additional_query_properties.property_list.retriever.requester
4295+
)
4296+
assert isinstance(properties_from_endpoint_requester, HttpRequester)
4297+
assert properties_from_endpoint_requester.url_base == "https://api.hubapi.com"
4298+
assert properties_from_endpoint_requester.path == "/properties/v2/dynamics/properties"
4299+
4300+
42194301
def test_request_parameters_raise_error_if_not_of_type_query_properties():
42204302
content = """
42214303
selector:
@@ -4360,6 +4442,89 @@ def test_create_simple_retriever_raise_error_if_multiple_request_properties():
43604442
)
43614443

43624444

4445+
def test_create_simple_retriever_raise_error_properties_from_endpoint_defined_multiple_times():
4446+
content = """
4447+
selector:
4448+
type: RecordSelector
4449+
extractor:
4450+
type: DpathExtractor
4451+
field_path: ["extractor_path"]
4452+
record_filter:
4453+
type: RecordFilter
4454+
condition: "{{ record['id'] > stream_state['id'] }}"
4455+
requester:
4456+
type: HttpRequester
4457+
name: "{{ parameters['name'] }}"
4458+
url_base: "https://api.linkedin.com/rest/"
4459+
http_method: "GET"
4460+
path: "adAnalytics"
4461+
fetch_properties_from_endpoint:
4462+
type: PropertiesFromEndpoint
4463+
property_field_path: [ "name" ]
4464+
retriever:
4465+
type: SimpleRetriever
4466+
requester:
4467+
type: HttpRequester
4468+
url_base: https://api.hubapi.com
4469+
path: "/properties/v2/dynamics/properties"
4470+
http_method: GET
4471+
record_selector:
4472+
type: RecordSelector
4473+
extractor:
4474+
type: DpathExtractor
4475+
field_path: []
4476+
request_parameters:
4477+
properties:
4478+
type: QueryProperties
4479+
property_list:
4480+
- first_name
4481+
- last_name
4482+
- status
4483+
- organization
4484+
- created_at
4485+
always_include_properties:
4486+
- id
4487+
property_chunking:
4488+
type: PropertyChunking
4489+
property_limit_type: property_count
4490+
property_limit: 3
4491+
record_merge_strategy:
4492+
type: GroupByKeyMergeStrategy
4493+
key: ["id"]
4494+
nonary: "{{config['nonary'] }}"
4495+
analytics_stream:
4496+
type: DeclarativeStream
4497+
incremental_sync:
4498+
type: DatetimeBasedCursor
4499+
$parameters:
4500+
datetime_format: "%Y-%m-%dT%H:%M:%S.%f%z"
4501+
start_datetime: "{{ config['start_time'] }}"
4502+
cursor_field: "created"
4503+
retriever:
4504+
type: SimpleRetriever
4505+
name: "{{ parameters['name'] }}"
4506+
requester:
4507+
$ref: "#/requester"
4508+
record_selector:
4509+
$ref: "#/selector"
4510+
$parameters:
4511+
name: "analytics"
4512+
"""
4513+
4514+
parsed_manifest = YamlDeclarativeSource._parse(content)
4515+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
4516+
stream_manifest = transformer.propagate_types_and_parameters(
4517+
"", resolved_manifest["analytics_stream"], {}
4518+
)
4519+
4520+
with pytest.raises(ValueError):
4521+
factory.create_component(
4522+
model_type=DeclarativeStreamModel,
4523+
component_definition=stream_manifest,
4524+
config=input_config,
4525+
)
4526+
4527+
43634528
def test_create_property_chunking_characters():
43644529
property_chunking_model = {
43654530
"type": "PropertyChunking",

0 commit comments

Comments
 (0)