Skip to content
Merged
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ def __init__(
component_factory = ModelToComponentFactory(
emit_connector_builder_messages=emit_connector_builder_messages,
message_repository=ConcurrentMessageRepository(queue, message_repository),
configured_catalog=catalog,
connector_state_manager=self._connector_state_manager,
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
limit_pages_fetched_per_slice=limits.max_pages_per_slice if limits else None,
Expand Down
36 changes: 34 additions & 2 deletions airbyte_cdk/sources/declarative/declarative_component_schema.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2520,6 +2520,34 @@ definitions:
type:
type: string
enum: [JsonlDecoder]
JsonSchemaPropertySelector:
title: Json Schema Property Selector
description: When configured, the JSON schema supplied in the catalog containing which columns are selected for the current stream will be used to reduce which query properties will be included in the outbound API request. This can improve the performance of API requests, especially for those requiring multiple requests to get a complete record.
type: object
required:
- type
properties:
type:
type: string
enum: [JsonSchemaPropertySelector]
transformations:
title: Transformations
description: A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.
linkable: true
type: array
items:
anyOf:
- "$ref": "#/definitions/AddFields"
- "$ref": "#/definitions/RemoveFields"
- "$ref": "#/definitions/KeysToLower"
- "$ref": "#/definitions/KeysToSnakeCase"
- "$ref": "#/definitions/FlattenFields"
- "$ref": "#/definitions/DpathFlattenFields"
- "$ref": "#/definitions/KeysReplace"
- "$ref": "#/definitions/CustomTransformation"
$parameters:
type: object
additionalProperties: true
KeysToLower:
title: Keys to Lower Case
description: A transformation that renames all keys to lower case.
Expand Down Expand Up @@ -3410,6 +3438,10 @@ definitions:
title: Property Chunking
description: Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.
"$ref": "#/definitions/PropertyChunking"
property_selector:
title: Property Selector
description: Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.
"$ref": "#/definitions/JsonSchemaPropertySelector"
$parameters:
type: object
additionalProperties: true
Expand Down Expand Up @@ -3746,7 +3778,7 @@ definitions:
properties:
type:
type: string
enum: [ PaginationReset ]
enum: [PaginationReset]
action:
type: string
enum:
Expand All @@ -3763,7 +3795,7 @@ definitions:
properties:
type:
type: string
enum: [ PaginationResetLimits ]
enum: [PaginationResetLimits]
number_of_records:
type: integer
GzipDecoder:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

# generated by datamodel-codegen:
# filename: declarative_component_schema.yaml

Expand Down Expand Up @@ -2029,6 +2031,29 @@ class SessionTokenRequestApiKeyAuthenticator(BaseModel):
)


class JsonSchemaPropertySelector(BaseModel):
type: Literal["JsonSchemaPropertySelector"]
transformations: Optional[
List[
Union[
AddFields,
RemoveFields,
KeysToLower,
KeysToSnakeCase,
FlattenFields,
DpathFlattenFields,
KeysReplace,
CustomTransformation,
]
]
] = Field(
None,
description="A list of transformations to be applied on the customer configured schema that will be used to filter out unselected fields when specifying query properties for API requests.",
title="Transformations",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


class ListPartitionRouter(BaseModel):
type: Literal["ListPartitionRouter"]
cursor_field: str = Field(
Expand Down Expand Up @@ -2799,6 +2824,11 @@ class QueryProperties(BaseModel):
description="Defines how query properties will be grouped into smaller sets for APIs with limitations on the number of properties fetched per API request.",
title="Property Chunking",
)
property_selector: Optional[JsonSchemaPropertySelector] = Field(
None,
description="Defines where to look for and which query properties that should be sent in outbound API requests. For example, you can specify that only the selected columns of a stream should be in the request.",
title="Property Selector",
)
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
get_type_hints,
)

from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream
from isodate import parse_duration
from pydantic.v1 import BaseModel
from requests import Response
Expand All @@ -42,6 +43,7 @@
AirbyteStateMessage,
AirbyteStateType,
AirbyteStreamState,
ConfiguredAirbyteCatalog,
FailureType,
Level,
StreamDescriptor,
Expand Down Expand Up @@ -314,6 +316,9 @@
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonlDecoder as JsonlDecoderModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JsonSchemaPropertySelector as JsonSchemaPropertySelectorModel,
)
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
JwtAuthenticator as JwtAuthenticatorModel,
)
Expand Down Expand Up @@ -501,6 +506,9 @@
from airbyte_cdk.sources.declarative.requesters.query_properties.property_chunking import (
PropertyLimitType,
)
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector import (
JsonSchemaPropertySelector,
)
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import (
GroupByKey,
)
Expand Down Expand Up @@ -668,6 +676,7 @@ def __init__(
message_repository: Optional[MessageRepository] = None,
connector_state_manager: Optional[ConnectorStateManager] = None,
max_concurrent_async_job_count: Optional[int] = None,
configured_catalog: Optional[ConfiguredAirbyteCatalog] = None,
):
self._init_mappings()
self._limit_pages_fetched_per_slice = limit_pages_fetched_per_slice
Expand All @@ -678,6 +687,9 @@ def __init__(
self._message_repository = message_repository or InMemoryMessageRepository(
self._evaluate_log_level(emit_connector_builder_messages)
)
self._stream_name_to_configured_stream = self._create_stream_name_to_configured_stream(
configured_catalog
)
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
Expand Down Expand Up @@ -734,6 +746,7 @@ def _init_mappings(self) -> None:
InlineSchemaLoaderModel: self.create_inline_schema_loader,
JsonDecoderModel: self.create_json_decoder,
JsonlDecoderModel: self.create_jsonl_decoder,
JsonSchemaPropertySelectorModel: self.create_json_schema_property_selector,
GzipDecoderModel: self.create_gzip_decoder,
KeysToLowerModel: self.create_keys_to_lower_transformation,
KeysToSnakeCaseModel: self.create_keys_to_snake_transformation,
Expand Down Expand Up @@ -796,6 +809,16 @@ def _init_mappings(self) -> None:
# Needed for the case where we need to perform a second parse on the fields of a custom component
self.TYPE_NAME_TO_MODEL = {cls.__name__: cls for cls in self.PYDANTIC_MODEL_TO_CONSTRUCTOR}

@staticmethod
def _create_stream_name_to_configured_stream(
configured_catalog: Optional[ConfiguredAirbyteCatalog],
) -> Mapping[str, ConfiguredAirbyteStream]:
return (
{stream.stream.name: stream for stream in configured_catalog.streams}
if configured_catalog
else {}
)

def create_component(
self,
model_type: Type[BaseModel],
Expand Down Expand Up @@ -2987,7 +3010,7 @@ def create_property_chunking(
)

def create_query_properties(
self, model: QueryPropertiesModel, config: Config, **kwargs: Any
self, model: QueryPropertiesModel, config: Config, *, stream_name: str, **kwargs: Any
) -> QueryProperties:
if isinstance(model.property_list, list):
property_list = model.property_list
Expand All @@ -3004,10 +3027,43 @@ def create_query_properties(
else None
)

property_selector = (
self._create_component_from_model(
model=model.property_selector, config=config, stream_name=stream_name, **kwargs
)
if model.property_selector
else None
)

return QueryProperties(
property_list=property_list,
always_include_properties=model.always_include_properties,
property_chunking=property_chunking,
property_selector=property_selector,
config=config,
parameters=model.parameters or {},
)

def create_json_schema_property_selector(
self,
model: JsonSchemaPropertySelectorModel,
config: Config,
*,
stream_name: str,
**kwargs: Any,
) -> JsonSchemaPropertySelector:
configured_stream = self._stream_name_to_configured_stream.get(stream_name)

transformations = []
if model.transformations:
for transformation_model in model.transformations:
transformations.append(
self._create_component_from_model(model=transformation_model, config=config)
)

return JsonSchemaPropertySelector(
configured_stream=configured_stream,
properties_transformations=transformations,
config=config,
parameters=model.parameters or {},
)
Expand Down Expand Up @@ -3235,7 +3291,7 @@ def _get_url(req: Requester) -> str:

if len(query_properties_definitions) == 1:
query_properties = self._create_component_from_model(
model=query_properties_definitions[0], config=config
model=query_properties_definitions[0], stream_name=name, config=config
)

# Removes QueryProperties components from the interpolated mappings because it has been designed
Expand All @@ -3261,11 +3317,13 @@ def _get_url(req: Requester) -> str:

query_properties = self.create_query_properties(
model=query_properties_definition,
stream_name=name,
config=config,
)
elif hasattr(model.requester, "query_properties") and model.requester.query_properties:
query_properties = self.create_query_properties(
model=model.requester.query_properties,
stream_name=name,
config=config,
)

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, Optional, Set

from airbyte_protocol_dataclasses.models import ConfiguredAirbyteStream

from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.types import Config


@dataclass
class JsonSchemaPropertySelector:
"""
A class that contains a list of transformations to apply to properties.
"""

configured_stream: ConfiguredAirbyteStream
config: Config
parameters: InitVar[Mapping[str, Any]]
properties_transformations: List[RecordTransformation] = field(default_factory=lambda: [])

def __post_init__(self, parameters: Mapping[str, Any]) -> None:
self._parameters = parameters

def select(self) -> Set[str]:
properties = set()
for schema_property in self.configured_stream.stream.json_schema.get(
"properties", {}
).keys():
if self.properties_transformations:
for transformation in self.properties_transformations:
transformation.transform(
schema_property, # type: ignore # record has type Mapping[str, Any], but Dict[str, Any] expected
config=self.config,
)
properties.add(schema_property)
return properties
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

from dataclasses import InitVar, dataclass
from enum import Enum
from typing import Any, Iterable, List, Mapping, Optional
from typing import Any, Iterable, List, Mapping, Optional, Set

from airbyte_cdk.sources.declarative.requesters.query_properties.strategies import GroupByKey
from airbyte_cdk.sources.declarative.requesters.query_properties.strategies.merge_strategy import (
Expand Down Expand Up @@ -40,7 +40,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
)

def get_request_property_chunks(
self, property_fields: Iterable[str], always_include_properties: Optional[List[str]]
self,
property_fields: Iterable[str],
always_include_properties: Optional[List[str]],
configured_properties: Optional[Set[str]],
) -> Iterable[List[str]]:
if not self.property_limit:
single_property_chunk = list(property_fields)
Expand All @@ -53,6 +56,8 @@ def get_request_property_chunks(
for property_field in property_fields:
# If property_limit_type is not defined, we default to property_count which is just an incrementing count
# todo: Add ability to specify parameter delimiter representation and take into account in property_field_size
if configured_properties is not None and property_field not in configured_properties:
continue
property_field_size = (
len(property_field)
+ 3 # The +3 represents the extra characters for encoding the delimiter in between properties
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.

from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.json_schema_property_selector import (
JsonSchemaPropertySelector,
)
from airbyte_cdk.sources.declarative.requesters.query_properties.property_selector.property_selector import (
PropertySelector,
)

__all__ = ["JsonSchemaPropertySelector", "PropertySelector"]
Loading
Loading