Skip to content

Commit 1a41386

Browse files
committed
Update ConfigComponentsResolver to supprt list of stream_config
1 parent fa8d54d commit 1a41386

File tree

7 files changed

+171
-70
lines changed

7 files changed

+171
-70
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2318,6 +2318,12 @@ definitions:
23182318
- "$ref": "#/definitions/AsyncRetriever"
23192319
- "$ref": "#/definitions/CustomRetriever"
23202320
- "$ref": "#/definitions/SimpleRetriever"
2321+
schema_filter:
2322+
title: Schema Filter
2323+
description: placeholder
2324+
anyOf:
2325+
- "$ref": "#/definitions/RecordFilter"
2326+
- "$ref": "#/definitions/CustomRecordFilter"
23212327
schema_transformations:
23222328
title: Schema Transformations
23232329
description: A list of transformations to be applied to the schema.
@@ -3994,6 +4000,9 @@ definitions:
39944000
title: Value Type
39954001
description: The expected data type of the value. If omitted, the type will be inferred from the value provided.
39964002
"$ref": "#/definitions/ValueType"
4003+
create_or_update:
4004+
type: boolean
4005+
default: false
39974006
$parameters:
39984007
type: object
39994008
additionalProperties: true
@@ -4045,6 +4054,10 @@ definitions:
40454054
- ["data"]
40464055
- ["data", "streams"]
40474056
- ["data", "{{ parameters.name }}"]
4057+
default_values:
4058+
title: Default Values
4059+
description: placeholder
4060+
type: array
40484061
$parameters:
40494062
type: object
40504063
additionalProperties: true
@@ -4056,7 +4069,11 @@ definitions:
40564069
type: string
40574070
enum: [ConfigComponentsResolver]
40584071
stream_config:
4059-
"$ref": "#/definitions/StreamConfig"
4072+
anyOf:
4073+
- type: array
4074+
items:
4075+
"$ref": "#/definitions/StreamConfig"
4076+
- "$ref": "#/definitions/StreamConfig"
40604077
components_mapping:
40614078
type: array
40624079
items:

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -262,9 +262,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
262262
}
263263
)
264264

265-
stream_configs = self._stream_configs(self._source_config) + self._dynamic_stream_configs(
266-
self._source_config, config
267-
)
265+
stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams
268266

269267
api_budget_model = self._source_config.get("api_budget")
270268
if api_budget_model:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 72 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,3 @@
1-
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2-
31
# generated by datamodel-codegen:
42
# filename: declarative_component_schema.yaml
53

@@ -620,7 +618,9 @@ class OAuthAuthenticator(BaseModel):
620618
scopes: Optional[List[str]] = Field(
621619
None,
622620
description="List of scopes that should be granted to the access token.",
623-
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
621+
examples=[
622+
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
623+
],
624624
title="Scopes",
625625
)
626626
token_expiry_date: Optional[str] = Field(
@@ -1126,24 +1126,28 @@ class OAuthConfigSpecification(BaseModel):
11261126
class Config:
11271127
extra = Extra.allow
11281128

1129-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
1130-
None,
1131-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1132-
examples=[
1133-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1134-
{
1135-
"app_id": {
1136-
"type": "string",
1137-
"path_in_connector_config": ["info", "app_id"],
1138-
}
1139-
},
1140-
],
1141-
title="OAuth user input",
1129+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
1130+
Field(
1131+
None,
1132+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1133+
examples=[
1134+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1135+
{
1136+
"app_id": {
1137+
"type": "string",
1138+
"path_in_connector_config": ["info", "app_id"],
1139+
}
1140+
},
1141+
],
1142+
title="OAuth user input",
1143+
)
11421144
)
1143-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
1144-
None,
1145-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1146-
title="DeclarativeOAuth Connector Specification",
1145+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
1146+
Field(
1147+
None,
1148+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1149+
title="DeclarativeOAuth Connector Specification",
1150+
)
11471151
)
11481152
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
11491153
None,
@@ -1161,7 +1165,9 @@ class Config:
11611165
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
11621166
None,
11631167
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
1164-
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
1168+
examples=[
1169+
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
1170+
],
11651171
title="OAuth input specification",
11661172
)
11671173
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1480,6 +1486,7 @@ class ComponentMappingDefinition(BaseModel):
14801486
description="The expected data type of the value. If omitted, the type will be inferred from the value provided.",
14811487
title="Value Type",
14821488
)
1489+
create_or_update: Optional[bool] = False
14831490
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
14841491

14851492

@@ -1491,12 +1498,15 @@ class StreamConfig(BaseModel):
14911498
examples=[["data"], ["data", "streams"], ["data", "{{ parameters.name }}"]],
14921499
title="Configs Pointer",
14931500
)
1501+
default_values: Optional[List] = Field(
1502+
None, description="placeholder", title="Default Values"
1503+
)
14941504
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
14951505

14961506

14971507
class ConfigComponentsResolver(BaseModel):
14981508
type: Literal["ConfigComponentsResolver"]
1499-
stream_config: StreamConfig
1509+
stream_config: Union[List[StreamConfig], StreamConfig]
15001510
components_mapping: List[ComponentMappingDefinition]
15011511
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
15021512

@@ -1881,13 +1891,17 @@ class ListPartitionRouter(BaseModel):
18811891

18821892
class RecordSelector(BaseModel):
18831893
type: Literal["RecordSelector"]
1884-
extractor: Union[DpathExtractor, CustomRecordExtractor]
1894+
extractor: Union[
1895+
DpathExtractor, CustomRecordExtractor
1896+
]
18851897
record_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
18861898
None,
18871899
description="Responsible for filtering records to be emitted by the Source.",
18881900
title="Record Filter",
18891901
)
1890-
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
1902+
schema_normalization: Optional[
1903+
Union[SchemaNormalization, CustomSchemaNormalization]
1904+
] = Field(
18911905
None,
18921906
description="Responsible for normalization according to the schema.",
18931907
title="Schema Normalization",
@@ -2133,7 +2147,9 @@ class Config:
21332147
extra = Extra.allow
21342148

21352149
type: Literal["DeclarativeStream"]
2136-
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
2150+
name: Optional[str] = Field(
2151+
"", description="The stream name.", example=["Users"], title="Name"
2152+
)
21372153
retriever: Union[SimpleRetriever, AsyncRetriever, CustomRetriever] = Field(
21382154
...,
21392155
description="Component used to coordinate how records are extracted across stream slices and request pages.",
@@ -2164,7 +2180,7 @@ class Config:
21642180
]
21652181
] = Field(
21662182
None,
2167-
description="Component used to retrieve the schema for the current stream.",
2183+
description="One or many schema loaders can be used to retrieve the schema for the current stream. When multiple schema loaders are defined, schema properties will be merged together. Schema loaders defined first taking precedence in the event of a conflict.",
21682184
title="Schema Loader",
21692185
)
21702186
transformations: Optional[
@@ -2311,18 +2327,20 @@ class HttpRequester(BaseModelWithDeprecations):
23112327
description="Allows for retrieving a dynamic set of properties from an API endpoint which can be injected into outbound request using the stream_partition.extra_fields.",
23122328
title="Fetch Properties from Endpoint",
23132329
)
2314-
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = Field(
2315-
None,
2316-
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2317-
examples=[
2318-
{"unit": "day"},
2319-
{
2320-
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2321-
},
2322-
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2323-
{"sort_by[asc]": "updated_at"},
2324-
],
2325-
title="Query Parameters",
2330+
request_parameters: Optional[Union[Dict[str, Union[str, QueryProperties]], str]] = (
2331+
Field(
2332+
None,
2333+
description="Specifies the query parameters that should be set on an outgoing HTTP request given the inputs.",
2334+
examples=[
2335+
{"unit": "day"},
2336+
{
2337+
"query": 'last_event_time BETWEEN TIMESTAMP "{{ stream_interval.start_time }}" AND TIMESTAMP "{{ stream_interval.end_time }}"'
2338+
},
2339+
{"searchIn": "{{ ','.join(config.get('search_in', [])) }}"},
2340+
{"sort_by[asc]": "updated_at"},
2341+
],
2342+
title="Query Parameters",
2343+
)
23262344
)
23272345
request_headers: Optional[Union[Dict[str, str], str]] = Field(
23282346
None,
@@ -2414,6 +2432,9 @@ class DynamicSchemaLoader(BaseModel):
24142432
description="Component used to coordinate how records are extracted across stream slices and request pages.",
24152433
title="Retriever",
24162434
)
2435+
schema_filter: Optional[Union[RecordFilter, CustomRecordFilter]] = Field(
2436+
None, description="placeholder", title="Schema Filter"
2437+
)
24172438
schema_transformations: Optional[
24182439
List[
24192440
Union[
@@ -2512,7 +2533,9 @@ class QueryProperties(BaseModel):
25122533

25132534
class StateDelegatingStream(BaseModel):
25142535
type: Literal["StateDelegatingStream"]
2515-
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
2536+
name: str = Field(
2537+
..., description="The stream name.", example=["Users"], title="Name"
2538+
)
25162539
full_refresh_stream: DeclarativeStream = Field(
25172540
...,
25182541
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
@@ -2601,7 +2624,9 @@ class AsyncRetriever(BaseModel):
26012624
)
26022625
download_extractor: Optional[
26032626
Union[DpathExtractor, CustomRecordExtractor, ResponseToFileExtractor]
2604-
] = Field(None, description="Responsible for fetching the records from provided urls.")
2627+
] = Field(
2628+
None, description="Responsible for fetching the records from provided urls."
2629+
)
26052630
creation_requester: Union[HttpRequester, CustomRequester] = Field(
26062631
...,
26072632
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -2741,14 +2766,17 @@ class DynamicDeclarativeStream(BaseModel):
27412766
stream_template: DeclarativeStream = Field(
27422767
..., description="Reference to the stream template.", title="Stream Template"
27432768
)
2744-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2745-
...,
2746-
description="Component resolve and populates stream templates with components values.",
2747-
title="Components Resolver",
2769+
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
2770+
Field(
2771+
...,
2772+
description="Component resolve and populates stream templates with components values.",
2773+
title="Components Resolver",
2774+
)
27482775
)
27492776

27502777

27512778
ComplexFieldType.update_forward_refs()
2779+
CombinedExtractor.update_forward_refs()
27522780
GzipDecoder.update_forward_refs()
27532781
CompositeErrorHandler.update_forward_refs()
27542782
DeclarativeSource1.update_forward_refs()

0 commit comments

Comments
 (0)