Skip to content

Commit 2f31d73

Browse files
committed
Refactor lazy read
1 parent 9664679 commit 2f31d73

File tree

6 files changed

+185
-271
lines changed

6 files changed

+185
-271
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -2873,6 +2873,15 @@ definitions:
28732873
type:
28742874
type: string
28752875
enum: [ParentStreamConfig]
2876+
lazy_read_pointer:
2877+
title: Lazy Read Pointer
2878+
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
2879+
type: array
2880+
default: [ ]
2881+
items:
2882+
- type: string
2883+
interpolation_context:
2884+
- config
28762885
parent_key:
28772886
title: Parent Key
28782887
description: The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API.
@@ -3242,15 +3251,6 @@ definitions:
32423251
- "$ref": "#/definitions/IterableDecoder"
32433252
- "$ref": "#/definitions/XmlDecoder"
32443253
- "$ref": "#/definitions/ZipfileDecoder"
3245-
lazy_read_pointer:
3246-
title: Lazy Read Pointer
3247-
description: If set, this will enable lazy reading, using the initial read of parent records to extract child records.
3248-
type: array
3249-
default: [ ]
3250-
items:
3251-
- type: string
3252-
interpolation_context:
3253-
- config
32543254
$parameters:
32553255
type: object
32563256
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 60 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -609,7 +609,9 @@ class OAuthAuthenticator(BaseModel):
609609
scopes: Optional[List[str]] = Field(
610610
None,
611611
description="List of scopes that should be granted to the access token.",
612-
examples=[["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]],
612+
examples=[
613+
["crm.list.read", "crm.objects.contacts.read", "crm.schema.contacts.read"]
614+
],
613615
title="Scopes",
614616
)
615617
token_expiry_date: Optional[str] = Field(
@@ -1078,24 +1080,28 @@ class OAuthConfigSpecification(BaseModel):
10781080
class Config:
10791081
extra = Extra.allow
10801082

1081-
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = Field(
1082-
None,
1083-
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1084-
examples=[
1085-
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1086-
{
1087-
"app_id": {
1088-
"type": "string",
1089-
"path_in_connector_config": ["info", "app_id"],
1090-
}
1091-
},
1092-
],
1093-
title="OAuth user input",
1083+
oauth_user_input_from_connector_config_specification: Optional[Dict[str, Any]] = (
1084+
Field(
1085+
None,
1086+
description="OAuth specific blob. This is a Json Schema used to validate Json configurations used as input to OAuth.\nMust be a valid non-nested JSON that refers to properties from ConnectorSpecification.connectionSpecification\nusing special annotation 'path_in_connector_config'.\nThese are input values the user is entering through the UI to authenticate to the connector, that might also shared\nas inputs for syncing data via the connector.\nExamples:\nif no connector values is shared during oauth flow, oauth_user_input_from_connector_config_specification=[]\nif connector values such as 'app_id' inside the top level are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['app_id']\n }\n }\nif connector values such as 'info.app_id' nested inside another object are used to generate the API url for the oauth flow,\n oauth_user_input_from_connector_config_specification={\n app_id: {\n type: string\n path_in_connector_config: ['info', 'app_id']\n }\n }",
1087+
examples=[
1088+
{"app_id": {"type": "string", "path_in_connector_config": ["app_id"]}},
1089+
{
1090+
"app_id": {
1091+
"type": "string",
1092+
"path_in_connector_config": ["info", "app_id"],
1093+
}
1094+
},
1095+
],
1096+
title="OAuth user input",
1097+
)
10941098
)
1095-
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = Field(
1096-
None,
1097-
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1098-
title="DeclarativeOAuth Connector Specification",
1099+
oauth_connector_input_specification: Optional[OauthConnectorInputSpecification] = (
1100+
Field(
1101+
None,
1102+
description='The DeclarativeOAuth specific blob.\nPertains to the fields defined by the connector relating to the OAuth flow.\n\nInterpolation capabilities:\n- The variables placeholders are declared as `{{my_var}}`.\n- The nested resolution variables like `{{ {{my_nested_var}} }}` is allowed as well.\n\n- The allowed interpolation context is:\n + base64Encoder - encode to `base64`, {{ {{my_var_a}}:{{my_var_b}} | base64Encoder }}\n + base64Decorer - decode from `base64` encoded string, {{ {{my_string_variable_or_string_value}} | base64Decoder }}\n + urlEncoder - encode the input string to URL-like format, {{ https://test.host.com/endpoint | urlEncoder}}\n + urlDecorer - decode the input url-encoded string into text format, {{ urlDecoder:https%3A%2F%2Fairbyte.io | urlDecoder}}\n + codeChallengeS256 - get the `codeChallenge` encoded value to provide additional data-provider specific authorisation values, {{ {{state_value}} | codeChallengeS256 }}\n\nExamples:\n - The TikTok Marketing DeclarativeOAuth spec:\n {\n "oauth_connector_input_specification": {\n "type": "object",\n "additionalProperties": false,\n "properties": {\n "consent_url": "https://ads.tiktok.com/marketing_api/auth?{{client_id_key}}={{client_id_value}}&{{redirect_uri_key}}={{ {{redirect_uri_value}} | urlEncoder}}&{{state_key}}={{state_value}}",\n "access_token_url": "https://business-api.tiktok.com/open_api/v1.3/oauth2/access_token/",\n "access_token_params": {\n "{{ auth_code_key }}": "{{ auth_code_value }}",\n "{{ client_id_key }}": "{{ client_id_value }}",\n "{{ client_secret_key }}": "{{ client_secret_value }}"\n },\n "access_token_headers": {\n "Content-Type": "application/json",\n "Accept": "application/json"\n },\n "extract_output": ["data.access_token"],\n "client_id_key": "app_id",\n "client_secret_key": "secret",\n "auth_code_key": "auth_code"\n }\n }\n }',
1103+
title="DeclarativeOAuth Connector Specification",
1104+
)
10991105
)
11001106
complete_oauth_output_specification: Optional[Dict[str, Any]] = Field(
11011107
None,
@@ -1113,7 +1119,9 @@ class Config:
11131119
complete_oauth_server_input_specification: Optional[Dict[str, Any]] = Field(
11141120
None,
11151121
description="OAuth specific blob. This is a Json Schema used to validate Json configurations persisted as Airbyte Server configurations.\nMust be a valid non-nested JSON describing additional fields configured by the Airbyte Instance or Workspace Admins to be used by the\nserver when completing an OAuth flow (typically exchanging an auth code for refresh token).\nExamples:\n complete_oauth_server_input_specification={\n client_id: {\n type: string\n },\n client_secret: {\n type: string\n }\n }",
1116-
examples=[{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}],
1122+
examples=[
1123+
{"client_id": {"type": "string"}, "client_secret": {"type": "string"}}
1124+
],
11171125
title="OAuth input specification",
11181126
)
11191127
complete_oauth_server_output_specification: Optional[Dict[str, Any]] = Field(
@@ -1766,7 +1774,9 @@ class RecordSelector(BaseModel):
17661774
description="Responsible for filtering records to be emitted by the Source.",
17671775
title="Record Filter",
17681776
)
1769-
schema_normalization: Optional[Union[SchemaNormalization, CustomSchemaNormalization]] = Field(
1777+
schema_normalization: Optional[
1778+
Union[SchemaNormalization, CustomSchemaNormalization]
1779+
] = Field(
17701780
SchemaNormalization.None_,
17711781
description="Responsible for normalization according to the schema.",
17721782
title="Schema Normalization",
@@ -1987,7 +1997,9 @@ class Config:
19871997
description="Component used to fetch data incrementally based on a time field in the data.",
19881998
title="Incremental Sync",
19891999
)
1990-
name: Optional[str] = Field("", description="The stream name.", example=["Users"], title="Name")
2000+
name: Optional[str] = Field(
2001+
"", description="The stream name.", example=["Users"], title="Name"
2002+
)
19912003
primary_key: Optional[PrimaryKey] = Field(
19922004
"", description="The primary key of the stream.", title="Primary Key"
19932005
)
@@ -2205,6 +2217,11 @@ class DynamicSchemaLoader(BaseModel):
22052217

22062218
class ParentStreamConfig(BaseModel):
22072219
type: Literal["ParentStreamConfig"]
2220+
lazy_read_pointer: Optional[List[str]] = Field(
2221+
[],
2222+
description="If set, this will enable lazy reading, using the initial read of parent records to extract child records.",
2223+
title="Lazy Read Pointer",
2224+
)
22082225
parent_key: str = Field(
22092226
...,
22102227
description="The primary key of records from the parent stream that will be used during the retrieval of records for the current substream. This parent identifier field is typically a characteristic of the child records being extracted from the source API.",
@@ -2240,7 +2257,9 @@ class ParentStreamConfig(BaseModel):
22402257

22412258
class StateDelegatingStream(BaseModel):
22422259
type: Literal["StateDelegatingStream"]
2243-
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
2260+
name: str = Field(
2261+
..., description="The stream name.", example=["Users"], title="Name"
2262+
)
22442263
full_refresh_stream: DeclarativeStream = Field(
22452264
...,
22462265
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
@@ -2277,7 +2296,11 @@ class SimpleRetriever(BaseModel):
22772296
CustomPartitionRouter,
22782297
ListPartitionRouter,
22792298
SubstreamPartitionRouter,
2280-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2299+
List[
2300+
Union[
2301+
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2302+
]
2303+
],
22812304
]
22822305
] = Field(
22832306
[],
@@ -2300,11 +2323,6 @@ class SimpleRetriever(BaseModel):
23002323
description="Component decoding the response so records can be extracted.",
23012324
title="Decoder",
23022325
)
2303-
lazy_read_pointer: Optional[List[str]] = Field(
2304-
[],
2305-
description="If set, this will enable lazy reading, using the initial read of parent records to extract child records.",
2306-
title="Lazy Read Pointer",
2307-
)
23082326
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
23092327

23102328

@@ -2326,7 +2344,9 @@ class AsyncRetriever(BaseModel):
23262344
)
23272345
download_extractor: Optional[
23282346
Union[CustomRecordExtractor, DpathExtractor, ResponseToFileExtractor]
2329-
] = Field(None, description="Responsible for fetching the records from provided urls.")
2347+
] = Field(
2348+
None, description="Responsible for fetching the records from provided urls."
2349+
)
23302350
creation_requester: Union[CustomRequester, HttpRequester] = Field(
23312351
...,
23322352
description="Requester component that describes how to prepare HTTP requests to send to the source API to create the async server-side job.",
@@ -2360,7 +2380,11 @@ class AsyncRetriever(BaseModel):
23602380
CustomPartitionRouter,
23612381
ListPartitionRouter,
23622382
SubstreamPartitionRouter,
2363-
List[Union[CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter]],
2383+
List[
2384+
Union[
2385+
CustomPartitionRouter, ListPartitionRouter, SubstreamPartitionRouter
2386+
]
2387+
],
23642388
]
23652389
] = Field(
23662390
[],
@@ -2428,10 +2452,12 @@ class DynamicDeclarativeStream(BaseModel):
24282452
stream_template: DeclarativeStream = Field(
24292453
..., description="Reference to the stream template.", title="Stream Template"
24302454
)
2431-
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = Field(
2432-
...,
2433-
description="Component resolve and populates stream templates with components values.",
2434-
title="Components Resolver",
2455+
components_resolver: Union[HttpComponentsResolver, ConfigComponentsResolver] = (
2456+
Field(
2457+
...,
2458+
description="Component resolve and populates stream templates with components values.",
2459+
title="Components Resolver",
2460+
)
24352461
)
24362462

24372463

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 9 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -2528,6 +2528,10 @@ def create_parent_stream_config(
25282528
if model.request_option
25292529
else None
25302530
)
2531+
2532+
if "*" in model.lazy_read_pointer:
2533+
raise ValueError("The '*' wildcard in 'lazy_read_pointer' is not supported — only direct paths are allowed.")
2534+
25312535
return ParentStreamConfig(
25322536
parent_key=model.parent_key,
25332537
request_option=request_option,
@@ -2537,6 +2541,7 @@ def create_parent_stream_config(
25372541
incremental_dependency=model.incremental_dependency or False,
25382542
parameters=model.parameters or {},
25392543
extra_fields=model.extra_fields,
2544+
lazy_read_pointer=model.lazy_read_pointer
25402545
)
25412546

25422547
@staticmethod
@@ -2740,42 +2745,13 @@ def create_simple_retriever(
27402745
model.ignore_stream_slicer_parameters_on_paginated_requests or False
27412746
)
27422747

2743-
if model.lazy_read_pointer and not hasattr(model, "partition_router"):
2744-
raise ValueError(
2745-
"LazySimpleRetriever requires a 'partition_router' when 'lazy_read_pointer' is set. "
2746-
"Please either define 'partition_router' or remove 'lazy_read_pointer' from the model."
2747-
)
2748+
if hasattr(model, "partition_router") and model.partition_router and model.partition_router.type == "SubstreamPartitionRouter" and not bool(
2749+
self._connector_state_manager.get_stream_state(name, None)) and any(parent_stream_config.lazy_read_pointer for parent_stream_config in model.partition_router.parent_stream_configs):
27482750

2749-
if model.lazy_read_pointer and not bool(
2750-
self._connector_state_manager.get_stream_state(name, None)
2751-
):
2752-
if model.partition_router.type != "SubstreamPartitionRouter": # type: ignore[union-attr] # model.partition_router has BaseModel type
2751+
if incremental_sync.step or incremental_sync.cursor_granularity:
27532752
raise ValueError(
2754-
"LazySimpleRetriever only supports 'SubstreamPartitionRouterModel' as the 'partition_router' type. " # type: ignore[union-attr] # model.partition_router has BaseModel type
2755-
f"Found: '{model.partition_router.type}'."
2753+
f"Found more that one slice per parent. LazySimpleRetriever only supports single slice read for stream - {name}."
27562754
)
2757-
lazy_read_pointer = []
2758-
for i, path in enumerate(model.lazy_read_pointer):
2759-
if path == "*":
2760-
raise ValueError(
2761-
f"'lazy_read_pointer' support only direct pointing. Found: '* as a {i} element in the pointer.'"
2762-
)
2763-
2764-
lazy_read_pointer.append(InterpolatedString.create(path, parameters=model.parameters or {}))
2765-
2766-
lazy_read_pointer = [
2767-
InterpolatedString.create(path, parameters=model.parameters or {})
2768-
for path in model.lazy_read_pointer
2769-
]
2770-
partition_router = self._create_component_from_model(
2771-
model=model.partition_router, # type: ignore[arg-type] # model.partition_router has BaseModel type
2772-
config=config, # type: ignore[arg-type]
2773-
)
2774-
stream_slicer = (
2775-
self._create_component_from_model(model=incremental_sync, config=config)
2776-
if incremental_sync
2777-
else SinglePartitionRouter(parameters={})
2778-
)
27792755

27802756
return LazySimpleRetriever(
27812757
name=name,
@@ -2789,8 +2765,6 @@ def create_simple_retriever(
27892765
config=config,
27902766
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
27912767
parameters=model.parameters or {},
2792-
partition_router=partition_router,
2793-
lazy_read_pointer=lazy_read_pointer,
27942768
)
27952769

27962770
if self._limit_slices_fetched or self._emit_connector_builder_messages:

0 commit comments

Comments
 (0)