Skip to content

Commit 9af489d

Browse files
committed
Update StateDelegatingRetriever
1 parent 8e7b2a3 commit 9af489d

File tree

8 files changed

+118
-162
lines changed

8 files changed

+118
-162
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,9 @@ def read(
162162
else:
163163
filtered_catalog = catalog
164164

165+
if not filtered_catalog.streams:
166+
return
167+
165168
yield from super().read(logger, config, filtered_catalog, state)
166169

167170
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
@@ -213,6 +216,12 @@ def _group_streams(
213216
declarative_stream.name
214217
].get("incremental_sync")
215218

219+
if name_to_stream_mapping[declarative_stream.name].get("retriever", {}).get("full_refresh_no_slice_in_params", False):
220+
incremental_sync_component_definition["step"] = None
221+
incremental_sync_component_definition["cursor_granularity"] = None
222+
incremental_sync_component_definition["start_time_option"] = None
223+
incremental_sync_component_definition["end_time_option"] = None
224+
216225
partition_router_component_definition = (
217226
name_to_stream_mapping[declarative_stream.name]
218227
.get("retriever", {})

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -3156,24 +3156,24 @@ definitions:
31563156
type: object
31573157
required:
31583158
- type
3159-
- incremental_data_retriever
3160-
- full_data_retriever
3159+
- full_refresh_retriever
3160+
- incremental_retriever
31613161
properties:
31623162
type:
31633163
type: string
31643164
enum: [ StateDelegatingRetriever ]
3165-
ignore_first_request_options_provider:
3166-
description: If set to true, slice request options will be ignored when sending requests.
3165+
full_refresh_no_slice_in_params:
3166+
description: If set to true, a single slice will be used and its request options will be ignored when sending requests.
31673167
type: boolean
31683168
default: false
3169-
incremental_data_retriever:
3169+
full_refresh_retriever:
31703170
title: Retriever
31713171
description: Component used to coordinate how records are extracted across stream slices and request pages.
31723172
anyOf:
31733173
- "$ref": "#/definitions/AsyncRetriever"
31743174
- "$ref": "#/definitions/CustomRetriever"
31753175
- "$ref": "#/definitions/SimpleRetriever"
3176-
full_data_retriever:
3176+
incremental_retriever:
31773177
title: Retriever
31783178
description: Component used to coordinate how records are extracted across stream slices and request pages.
31793179
anyOf:

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -162,7 +162,11 @@ def _initialize_cache_for_parent_streams(
162162
def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
163163
for parent_config in parent_configs:
164164
parent_streams.add(parent_config["stream"]["name"])
165-
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
165+
if parent_config["stream"]["retriever"]["type"] == "StateDelegatingRetriever":
166+
parent_config["stream"]["retriever"]["full_refresh_retriever"]["requester"]["use_cache"] = True
167+
parent_config["stream"]["retriever"]["incremental_retriever"]["requester"]["use_cache"] = True
168+
else:
169+
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
166170

167171
for stream_config in stream_configs:
168172
if stream_config.get("incremental_sync", {}).get("parent_stream"):
@@ -185,7 +189,11 @@ def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> No
185189

186190
for stream_config in stream_configs:
187191
if stream_config["name"] in parent_streams:
188-
stream_config["retriever"]["requester"]["use_cache"] = True
192+
if stream_config["retriever"]["type"] == "StateDelegatingRetriever":
193+
stream_config["retriever"]["full_refresh_retriever"]["requester"]["use_cache"] = True
194+
stream_config["retriever"]["incremental_retriever"]["requester"]["use_cache"] = True
195+
else:
196+
stream_config["retriever"]["requester"]["use_cache"] = True
189197

190198
return stream_configs
191199

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 72 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,9 @@ class OAuthAuthenticator(BaseModel):
609609
scopes: Optional[List[str]] = Field(
610610
None,
611611
description="List of scopes that should be granted to the access token.",
612-
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
612+
examples=[
613+
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
614+
],
613615
title="Scopes",
614616
)
615617
token_expiry_date: Optional[str] = Field(
@@ -1078,24 +1080,28 @@ class OAuthConfigSpecification(BaseModel):
10781080
class Config:
10791081
extra = Extra.allow
10801082

1081-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
1082-
None,
1083-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1084-
examples=[
1085-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1086-
{
1087-
"app_id": {
1088-
"type": "string",
1089-
"path_in_connector_config": ["info", "app_id"],
1090-
}
1091-
},
1092-
],
1093-
title="OAuth user input",
1083+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
1084+
Field(
1085+
None,
1086+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1087+
examples=[
1088+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1089+
{
1090+
"app_id": {
1091+
"type": "string",
1092+
"path_in_connector_config": ["info", "app_id"],
1093+
}
1094+
},
1095+
],
1096+
title="OAuth user input",
1097+
)
10941098
)
1095-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
1096-
None,
1097-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1098-
title="DeclarativeOAuth Connector Specification",
1099+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
1100+
Field(
1101+
None,
1102+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1103+
title="DeclarativeOAuth Connector Specification",
1104+
)
10991105
)
11001106
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
11011107
None,
@@ -1113,7 +1119,9 @@ class Config:
11131119
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
11141120
None,
11151121
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
1116-
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
1122+
examples=[
1123+
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
1124+
],
11171125
title="OAuth input specification",
11181126
)
11191127
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1766,7 +1774,9 @@ class RecordSelector(BaseModel):
17661774
description="Responsible for filtering records to be emitted by the Source.",
17671775
title="Record Filter",
17681776
)
1769-
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
1777+
schema_normalization: Optional[
1778+
Union[SchemaNormalization, CustomSchemaNormalization]
1779+
] = Field(
17701780
SchemaNormalization.None_,
17711781
description="Responsible for normalization according to the schema.",
17721782
title="Schema Normalization",
@@ -1965,12 +1975,12 @@ class Config:
19651975
extra = Extra.allow
19661976

19671977
type: Literal["DeclarativeStream"]
1968-
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever, StateDelegatingRetriever] = (
1969-
Field(
1970-
...,
1971-
description="Component used to coordinate how records are extracted across stream slices and request pages.",
1972-
title="Retriever",
1973-
)
1978+
retriever: Union[
1979+
AsyncRetriever, CustomRetriever, SimpleRetriever, StateDelegatingRetriever
1980+
] = Field(
1981+
...,
1982+
description="Component used to coordinate how records are extracted across stream slices and request pages.",
1983+
title="Retriever",
19741984
)
19751985
incremental_sync: Optional[
19761986
Union[CustomIncrementalSync, DatetimeBasedCursor, IncrementingCountCursor]
@@ -1979,7 +1989,9 @@ class Config:
19791989
description="Component used to fetch data incrementally based on a time field in the data.",
19801990
title="Incremental Sync",
19811991
)
1982-
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
1992+
name: Optional[str] = Field(
1993+
"", description="The stream name.", example=["Users"], title="Name"
1994+
)
19831995
primary_key: Optional[PrimaryKey] = Field(
19841996
"", description="The primary key of the stream.", title="Primary Key"
19851997
)
@@ -2232,19 +2244,23 @@ class ParentStreamConfig(BaseModel):
22322244

22332245
class StateDelegatingRetriever(BaseModel):
22342246
type: Literal["StateDelegatingRetriever"]
2235-
ignore_first_request_options_provider: Optional[bool] = Field(
2247+
full_refresh_no_slice_in_params: Optional[bool] = Field(
22362248
False,
2237-
description="If set to true, slice request options will be ignored when sending requests.",
2249+
description="If set to true, a single slice will be used and its request options will be ignored when sending requests.",
22382250
)
2239-
incremental_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
2240-
...,
2241-
description="Component used to coordinate how records are extracted across stream slices and request pages.",
2242-
title="Retriever",
2251+
full_refresh_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = (
2252+
Field(
2253+
...,
2254+
description="Component used to coordinate how records are extracted across stream slices and request pages.",
2255+
title="Retriever",
2256+
)
22432257
)
2244-
full_data_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
2245-
...,
2246-
description="Component used to coordinate how records are extracted across stream slices and request pages.",
2247-
title="Retriever",
2258+
incremental_retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = (
2259+
Field(
2260+
...,
2261+
description="Component used to coordinate how records are extracted across stream slices and request pages.",
2262+
title="Retriever",
2263+
)
22482264
)
22492265
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22502266

@@ -2272,7 +2288,11 @@ class SimpleRetriever(BaseModel):
22722288
CustomPartitionRouter,
22732289
ListPartitionRouter,
22742290
SubstreamPartitionRouter,
2275-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2291+
List[
2292+
Union[
2293+
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2294+
]
2295+
],
22762296
]
22772297
] = Field(
22782298
[],
@@ -2316,7 +2336,9 @@ class AsyncRetriever(BaseModel):
23162336
)
23172337
download_extractor: Optional[
23182338
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
2319-
] = Field(None, description="Responsible for fetching the records from provided urls.")
2339+
] = Field(
2340+
None, description="Responsible for fetching the records from provided urls."
2341+
)
23202342
creation_requester: Union[CustomRequester, HttpRequester] = Field(
23212343
...,
23222344
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -2350,7 +2372,11 @@ class AsyncRetriever(BaseModel):
23502372
CustomPartitionRouter,
23512373
ListPartitionRouter,
23522374
SubstreamPartitionRouter,
2353-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2375+
List[
2376+
Union[
2377+
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2378+
]
2379+
],
23542380
]
23552381
] = Field(
23562382
[],
@@ -2418,10 +2444,12 @@ class DynamicDeclarativeStream(BaseModel):
24182444
stream_template: DeclarativeStream = Field(
24192445
..., description="Reference to the stream template.", title="Stream Template"
24202446
)
2421-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2422-
...,
2423-
description="Component resolve and populates stream templates with components values.",
2424-
title="Components Resolver",
2447+
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
2448+
Field(
2449+
...,
2450+
description="Component resolve and populates stream templates with components values.",
2451+
title="Components Resolver",
2452+
)
24252453
)
24262454

24272455

0 commit comments

Comments
 (0)