Skip to content

Commit 51808c1

Browse files
committed
Add schema filter
1 parent fa8d54d commit 51808c1

File tree

5 files changed

+94
-46
lines changed

5 files changed

+94
-46
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2318,6 +2318,12 @@ definitions:
23182318
- "$ref": "#/definitions/AsyncRetriever"
23192319
- "$ref": "#/definitions/CustomRetriever"
23202320
- "$ref": "#/definitions/SimpleRetriever"
2321+
schema_filter:
2322+
title: Schema Filter
2323+
description: placeholder
2324+
anyOf:
2325+
- "$ref": "#/definitions/RecordFilter"
2326+
- "$ref": "#/definitions/CustomRecordFilter"
23212327
schema_transformations:
23222328
title: Schema Transformations
23232329
description: A list of transformations to be applied to the schema.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 63 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2-
31
# generated by datamodel-codegen:
42
# filename: declarative_component_schema.yaml
53

@@ -620,7 +618,9 @@ class OAuthAuthenticator(BaseModel):
620618
scopes: Optional[List[str]] = Field(
621619
None,
622620
description="List of scopes that should be granted to the access token.",
623-
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
621+
examples=[
622+
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
623+
],
624624
title="Scopes",
625625
)
626626
token_expiry_date: Optional[str] = Field(
@@ -1126,24 +1126,28 @@ class OAuthConfigSpecification(BaseModel):
11261126
class Config:
11271127
extra = Extra.allow
11281128

1129-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
1130-
None,
1131-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1132-
examples=[
1133-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1134-
{
1135-
"app_id": {
1136-
"type": "string",
1137-
"path_in_connector_config": ["info", "app_id"],
1138-
}
1139-
},
1140-
],
1141-
title="OAuth user input",
1129+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
1130+
Field(
1131+
None,
1132+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1133+
examples=[
1134+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1135+
{
1136+
"app_id": {
1137+
"type": "string",
1138+
"path_in_connector_config": ["info", "app_id"],
1139+
}
1140+
},
1141+
],
1142+
title="OAuth user input",
1143+
)
11421144
)
1143-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
1144-
None,
1145-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1146-
title="DeclarativeOAuth Connector Specification",
1145+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
1146+
Field(
1147+
None,
1148+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1149+
title="DeclarativeOAuth Connector Specification",
1150+
)
11471151
)
11481152
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
11491153
None,
@@ -1161,7 +1165,9 @@ class Config:
11611165
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
11621166
None,
11631167
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
1164-
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
1168+
examples=[
1169+
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
1170+
],
11651171
title="OAuth input specification",
11661172
)
11671173
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1887,7 +1893,9 @@ class RecordSelector(BaseModel):
18871893
description="Responsible for filtering records to be emitted by the Source.",
18881894
title="Record Filter",
18891895
)
1890-
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
1896+
schema_normalization: Optional[
1897+
Union[SchemaNormalization, CustomSchemaNormalization]
1898+
] = Field(
18911899
None,
18921900
description="Responsible for normalization according to the schema.",
18931901
title="Schema Normalization",
@@ -2133,7 +2141,9 @@ class Config:
21332141
extra = Extra.allow
21342142

21352143
type: Literal["DeclarativeStream"]
2136-
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
2144+
name: Optional[str] = Field(
2145+
"", description="The stream name.", example=["Users"], title="Name"
2146+
)
21372147
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
21382148
...,
21392149
description="Component used to coordinate how records are extracted across stream slices and request pages.",
@@ -2164,7 +2174,7 @@ class Config:
21642174
]
21652175
] = Field(
21662176
None,
2167-
description="Component used to retrieve the schema for the current stream.",
2177+
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
21682178
title="Schema Loader",
21692179
)
21702180
transformations: Optional[
@@ -2311,18 +2321,20 @@ class HttpRequester(BaseModelWithDeprecations):
23112321
description="Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.",
23122322
title="Fetch Properties from Endpoint",
23132323
)
2314-
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field(
2315-
None,
2316-
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2317-
examples=[
2318-
{"unit": "day"},
2319-
{
2320-
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2321-
},
2322-
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2323-
{"sort_by[asc]": "updated_at"},
2324-
],
2325-
title="Query Parameters",
2324+
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = (
2325+
Field(
2326+
None,
2327+
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2328+
examples=[
2329+
{"unit": "day"},
2330+
{
2331+
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2332+
},
2333+
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2334+
{"sort_by[asc]": "updated_at"},
2335+
],
2336+
title="Query Parameters",
2337+
)
23262338
)
23272339
request_headers: Optional[Union[Dict[str, str], str]] = Field(
23282340
None,
@@ -2414,6 +2426,9 @@ class DynamicSchemaLoader(BaseModel):
24142426
description="Component used to coordinate how records are extracted across stream slices and request pages.",
24152427
title="Retriever",
24162428
)
2429+
schema_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
2430+
None, description="placeholder", title="Schema Filter"
2431+
)
24172432
schema_transformations: Optional[
24182433
List[
24192434
Union[
@@ -2512,7 +2527,9 @@ class QueryProperties(BaseModel):
25122527

25132528
class StateDelegatingStream(BaseModel):
25142529
type: Literal["StateDelegatingStream"]
2515-
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
2530+
name: str = Field(
2531+
..., description="The stream name.", example=["Users"], title="Name"
2532+
)
25162533
full_refresh_stream: DeclarativeStream = Field(
25172534
...,
25182535
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
@@ -2601,7 +2618,9 @@ class AsyncRetriever(BaseModel):
26012618
)
26022619
download_extractor: Optional[
26032620
Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor]
2604-
] = Field(None, description="Responsible for fetching the records from provided urls.")
2621+
] = Field(
2622+
None, description="Responsible for fetching the records from provided urls."
2623+
)
26052624
creation_requester: Union[HttpRequester, CustomRequester] = Field(
26062625
...,
26072626
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -2741,10 +2760,12 @@ class DynamicDeclarativeStream(BaseModel):
27412760
stream_template: DeclarativeStream = Field(
27422761
..., description="Reference to the stream template.", title="Stream Template"
27432762
)
2744-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2745-
...,
2746-
description="Component resolve and populates stream templates with components values.",
2747-
title="Components Resolver",
2763+
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
2764+
Field(
2765+
...,
2766+
description="Component resolve and populates stream templates with components values.",
2767+
title="Components Resolver",
2768+
)
27482769
)
27492770

27502771

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2432,10 +2432,14 @@ def create_dynamic_schema_loader(
24322432
schema_type_identifier = self._create_component_from_model(
24332433
model.schema_type_identifier, config=config, parameters=model.parameters or {}
24342434
)
2435+
schema_filter = self._create_component_from_model(
2436+
model.schema_filter, config=config, parameters=model.parameters or {}
2437+
)
24352438
return DynamicSchemaLoader(
24362439
retriever=retriever,
24372440
config=config,
24382441
schema_transformations=schema_transformations,
2442+
schema_filter=schema_filter,
24392443
schema_type_identifier=schema_type_identifier,
24402444
parameters=model.parameters or {},
24412445
)

airbyte_cdk/sources/declarative/resolvers/components_resolver.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ class ComponentMappingDefinition:
2222
value: Union["InterpolatedString", str]
2323
value_type: Optional[Type[Any]]
2424
parameters: InitVar[Mapping[str, Any]]
25+
create_or_update: Optional[bool] = False
2526

2627

2728
@dataclass(frozen=True)
@@ -34,6 +35,7 @@ class ResolvedComponentMappingDefinition:
3435
value: "InterpolatedString"
3536
value_type: Optional[Type[Any]]
3637
parameters: InitVar[Mapping[str, Any]]
38+
create_or_update: Optional[bool] = False
3739

3840

3941
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1515
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1616
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
17+
from airbyte_cdk.sources.declarative.extractors.record_filter import RecordFilter
1718
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1819
from airbyte_cdk.sources.source import ExperimentalClassWarning
1920
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
@@ -126,6 +127,7 @@ class DynamicSchemaLoader(SchemaLoader):
126127
parameters: InitVar[Mapping[str, Any]]
127128
schema_type_identifier: SchemaTypeIdentifier
128129
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
130+
schema_filter: Optional[RecordFilter] = None
129131

130132
def get_json_schema(self) -> Mapping[str, Any]:
131133
"""
@@ -151,20 +153,18 @@ def get_json_schema(self) -> Mapping[str, Any]:
151153
)
152154
properties[key] = value
153155

154-
transformed_properties = self._transform(properties, {})
156+
filtered_transformed_properties = self._transform(self._filter(properties))
155157

156158
return {
157159
"$schema": "https://json-schema.org/draft-07/schema#",
158160
"type": "object",
159161
"additionalProperties": True,
160-
"properties": transformed_properties,
162+
"properties": filtered_transformed_properties,
161163
}
162164

163165
def _transform(
164166
self,
165167
properties: Mapping[str, Any],
166-
stream_state: StreamState,
167-
stream_slice: Optional[StreamSlice] = None,
168168
) -> Mapping[str, Any]:
169169
for transformation in self.schema_transformations:
170170
transformation.transform(
@@ -173,6 +173,21 @@ def _transform(
173173
)
174174
return properties
175175

176+
def _filter(
177+
self,
178+
properties: Mapping[str, Any],
179+
) -> Mapping[str, Any]:
180+
if not self.schema_filter:
181+
return properties
182+
183+
filtered_properties = {}
184+
for item in self.schema_filter.filter_records(
185+
({k: v} for k, v in properties.items()),
186+
{},
187+
):
188+
filtered_properties.update(item)
189+
return filtered_properties
190+
176191
def _get_key(
177192
self,
178193
raw_schema: MutableMapping[str, Any],

0 commit comments

Comments
 (0)