Skip to content

Commit 23f5efb

Browse files
authored
Merge branch 'main' into lazebnyi/add-condition-to-add-fields
2 parents 7ef4579 + 591a079 commit 23f5efb

31 files changed

+1123
-245
lines changed
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
name: Dependency Analysis
2+
3+
on:
4+
push:
5+
branches:
6+
- main
7+
paths:
8+
- "airbyte_cdk/**"
9+
- "poetry.lock"
10+
- "pyproject.toml"
11+
pull_request:
12+
paths:
13+
- "airbyte_cdk/**"
14+
- "poetry.lock"
15+
- "pyproject.toml"
16+
17+
jobs:
18+
dependency-analysis:
19+
name: Dependency Analysis with Deptry
20+
runs-on: ubuntu-24.04
21+
steps:
22+
- name: Checkout code
23+
uses: actions/checkout@v4
24+
- name: Set up Python
25+
uses: actions/setup-python@v5
26+
with:
27+
python-version: '3.10'
28+
- name: Set up Poetry
29+
uses: Gr1N/setup-poetry@v9
30+
with:
31+
poetry-version: "2.0.1"
32+
- name: Install dependencies
33+
run: poetry install --all-extras
34+
35+
# Job-specific step(s):
36+
- name: Run Deptry
37+
run: |
38+
poetry run deptry .

airbyte_cdk/sources/declarative/async_job/job_tracker.py

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,11 @@ class ConcurrentJobLimitReached(Exception):
1717
class JobTracker:
1818
def __init__(self, limit: int):
1919
self._jobs: Set[str] = set()
20-
self._limit = limit
20+
if limit < 1:
21+
LOGGER.warning(
22+
f"The `max_concurrent_async_job_count` property is less than 1: {limit}. Setting to 1. Please update the source manifest to set a valid value."
23+
)
24+
self._limit = 1 if limit < 1 else limit
2125
self._lock = threading.Lock()
2226

2327
def try_to_get_intent(self) -> str:

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -162,6 +162,10 @@ def read(
162162
else:
163163
filtered_catalog = catalog
164164

165+
# It is no need run read for synchronous streams if they are not exists.
166+
if not filtered_catalog.streams:
167+
return
168+
165169
yield from super().read(logger, config, filtered_catalog, state)
166170

167171
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
@@ -201,6 +205,18 @@ def _group_streams(
201205
# Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect
202206
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
203207
# so we need to treat them as synchronous
208+
209+
if name_to_stream_mapping[declarative_stream.name]["type"] == "StateDelegatingStream":
210+
stream_state = self._connector_state_manager.get_stream_state(
211+
stream_name=declarative_stream.name, namespace=declarative_stream.namespace
212+
)
213+
214+
name_to_stream_mapping[declarative_stream.name] = (
215+
name_to_stream_mapping[declarative_stream.name]["incremental_stream"]
216+
if stream_state
217+
else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"]
218+
)
219+
204220
if isinstance(declarative_stream, DeclarativeStream) and (
205221
name_to_stream_mapping[declarative_stream.name]["retriever"]["type"]
206222
== "SimpleRetriever"

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 44 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,9 @@ properties:
2424
streams:
2525
type: array
2626
items:
27-
"$ref": "#/definitions/DeclarativeStream"
27+
anyOf:
28+
- "$ref": "#/definitions/DeclarativeStream"
29+
- "$ref": "#/definitions/StateDelegatingStream"
2830
dynamic_streams:
2931
type: array
3032
items:
@@ -42,6 +44,10 @@ properties:
4244
"$ref": "#/definitions/ConcurrencyLevel"
4345
api_budget:
4446
"$ref": "#/definitions/HTTPAPIBudget"
47+
max_concurrent_async_job_count:
48+
title: Maximum Concurrent Asynchronous Jobs
49+
description: Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.
50+
type: integer
4551
metadata:
4652
type: object
4753
description: For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.
@@ -2272,6 +2278,10 @@ definitions:
22722278
title: Delete Origin Value
22732279
description: Whether to delete the origin value or keep it. Default is False.
22742280
type: boolean
2281+
replace_record:
2282+
title: Replace Origin Record
2283+
description: Whether to replace the origin record or not. Default is False.
2284+
type: boolean
22752285
$parameters:
22762286
type: object
22772287
additionalProperties: true
@@ -2890,7 +2900,9 @@ definitions:
28902900
stream:
28912901
title: Parent Stream
28922902
description: Reference to the parent stream.
2893-
"$ref": "#/definitions/DeclarativeStream"
2903+
anyOf:
2904+
- "$ref": "#/definitions/DeclarativeStream"
2905+
- "$ref": "#/definitions/StateDelegatingStream"
28942906
partition_field:
28952907
title: Current Parent Key Value Identifier
28962908
description: While iterating over parent records during a sync, the parent_key value can be referenced by using this field.
@@ -3163,6 +3175,36 @@ definitions:
31633175
$parameters:
31643176
type: object
31653177
additionalProperties: true
3178+
StateDelegatingStream:
3179+
description: (This component is experimental. Use at your own risk.) Orchestrate the retriever's usage based on the state value.
3180+
type: object
3181+
required:
3182+
- type
3183+
- name
3184+
- full_refresh_stream
3185+
- incremental_stream
3186+
properties:
3187+
type:
3188+
type: string
3189+
enum: [ StateDelegatingStream ]
3190+
name:
3191+
title: Name
3192+
description: The stream name.
3193+
type: string
3194+
default: ""
3195+
example:
3196+
- "Users"
3197+
full_refresh_stream:
3198+
title: Retriever
3199+
description: Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.
3200+
"$ref": "#/definitions/DeclarativeStream"
3201+
incremental_stream:
3202+
title: Retriever
3203+
description: Component used to coordinate how records are extracted across stream slices and request pages when the state provided.
3204+
"$ref": "#/definitions/DeclarativeStream"
3205+
$parameters:
3206+
type: object
3207+
additionalProperties: true
31663208
SimpleRetriever:
31673209
description: Retrieves records by synchronously sending requests to fetch records. The retriever acts as an orchestrator between the requester, the record selector, the paginator, and the partition router.
31683210
type: object

airbyte_cdk/sources/declarative/declarative_stream.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1515
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
1616
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
17+
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
1718
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1819
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
1920
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
@@ -76,11 +77,17 @@ def primary_key(self, value: str) -> None:
7677

7778
@property
7879
def exit_on_rate_limit(self) -> bool:
80+
if isinstance(self.retriever, AsyncRetriever):
81+
return self.retriever.exit_on_rate_limit
82+
7983
return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute
8084

8185
@exit_on_rate_limit.setter
8286
def exit_on_rate_limit(self, value: bool) -> None:
83-
self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined]
87+
if isinstance(self.retriever, AsyncRetriever):
88+
self.retriever.exit_on_rate_limit = value
89+
else:
90+
self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined]
8491

8592
@property # type: ignore
8693
def name(self) -> str:

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 28 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
DeclarativeStream as DeclarativeStreamModel,
3131
)
3232
from airbyte_cdk.sources.declarative.models.declarative_component_schema import Spec as SpecModel
33+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
34+
StateDelegatingStream as StateDelegatingStreamModel,
35+
)
3336
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
3437
get_registered_components_module,
3538
)
@@ -93,7 +96,10 @@ def __init__(
9396
self._constructor = (
9497
component_factory
9598
if component_factory
96-
else ModelToComponentFactory(emit_connector_builder_messages)
99+
else ModelToComponentFactory(
100+
emit_connector_builder_messages,
101+
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
102+
)
97103
)
98104
self._message_repository = self._constructor.get_message_repository()
99105
self._slice_logger: SliceLogger = (
@@ -143,7 +149,9 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
143149

144150
source_streams = [
145151
self._constructor.create_component(
146-
DeclarativeStreamModel,
152+
StateDelegatingStreamModel
153+
if stream_config.get("type") == StateDelegatingStreamModel.__name__
154+
else DeclarativeStreamModel,
147155
stream_config,
148156
config,
149157
emit_connector_builder_messages=self._emit_connector_builder_messages,
@@ -162,7 +170,15 @@ def _initialize_cache_for_parent_streams(
162170
def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> None:
163171
for parent_config in parent_configs:
164172
parent_streams.add(parent_config["stream"]["name"])
165-
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
173+
if parent_config["stream"]["type"] == "StateDelegatingStream":
174+
parent_config["stream"]["full_refresh_stream"]["retriever"]["requester"][
175+
"use_cache"
176+
] = True
177+
parent_config["stream"]["incremental_stream"]["retriever"]["requester"][
178+
"use_cache"
179+
] = True
180+
else:
181+
parent_config["stream"]["retriever"]["requester"]["use_cache"] = True
166182

167183
for stream_config in stream_configs:
168184
if stream_config.get("incremental_sync", {}).get("parent_stream"):
@@ -185,7 +201,15 @@ def update_with_cache_parent_configs(parent_configs: list[dict[str, Any]]) -> No
185201

186202
for stream_config in stream_configs:
187203
if stream_config["name"] in parent_streams:
188-
stream_config["retriever"]["requester"]["use_cache"] = True
204+
if stream_config["type"] == "StateDelegatingStream":
205+
stream_config["full_refresh_stream"]["retriever"]["requester"]["use_cache"] = (
206+
True
207+
)
208+
stream_config["incremental_stream"]["retriever"]["requester"]["use_cache"] = (
209+
True
210+
)
211+
else:
212+
stream_config["retriever"]["requester"]["use_cache"] = True
189213

190214
return stream_configs
191215

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 35 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,11 @@ class DpathFlattenFields(BaseModel):
877877
description="Whether to delete the origin value or keep it. Default is False.",
878878
title="Delete Origin Value",
879879
)
880+
replace_record: Optional[bool] = Field(
881+
None,
882+
description="Whether to replace the origin record or not. Default is False.",
883+
title="Replace Origin Record",
884+
)
880885
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
881886

882887

@@ -1870,7 +1875,7 @@ class Config:
18701875

18711876
type: Literal["DeclarativeSource"]
18721877
check: Union[CheckStream, CheckDynamicStream]
1873-
streams: List[DeclarativeStream]
1878+
streams: List[Union[DeclarativeStream, StateDelegatingStream]]
18741879
dynamic_streams: Optional[List[DynamicDeclarativeStream]] = None
18751880
version: str = Field(
18761881
...,
@@ -1881,6 +1886,11 @@ class Config:
18811886
spec: Optional[Spec] = None
18821887
concurrency_level: Optional[ConcurrencyLevel] = None
18831888
api_budget: Optional[HTTPAPIBudget] = None
1889+
max_concurrent_async_job_count: Optional[int] = Field(
1890+
None,
1891+
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
1892+
title="Maximum Concurrent Asynchronous Jobs",
1893+
)
18841894
metadata: Optional[Dict[str, Any]] = Field(
18851895
None,
18861896
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
@@ -1897,7 +1907,7 @@ class Config:
18971907

18981908
type: Literal["DeclarativeSource"]
18991909
check: Union[CheckStream, CheckDynamicStream]
1900-
streams: Optional[List[DeclarativeStream]] = None
1910+
streams: Optional[List[Union[DeclarativeStream, StateDelegatingStream]]] = None
19011911
dynamic_streams: List[DynamicDeclarativeStream]
19021912
version: str = Field(
19031913
...,
@@ -1908,6 +1918,11 @@ class Config:
19081918
spec: Optional[Spec] = None
19091919
concurrency_level: Optional[ConcurrencyLevel] = None
19101920
api_budget: Optional[HTTPAPIBudget] = None
1921+
max_concurrent_async_job_count: Optional[int] = Field(
1922+
None,
1923+
description="Maximum number of concurrent asynchronous jobs to run. This property is only relevant for sources/streams that support asynchronous job execution through the AsyncRetriever (e.g. a report-based stream that initiates a job, polls the job status, and then fetches the job results). This is often set by the API's maximum number of concurrent jobs on the account level. Refer to the API's documentation for this information.",
1924+
title="Maximum Concurrent Asynchronous Jobs",
1925+
)
19111926
metadata: Optional[Dict[str, Any]] = Field(
19121927
None,
19131928
description="For internal Airbyte use only - DO NOT modify manually. Used by consumers of declarative manifests for storing related metadata.",
@@ -2211,7 +2226,7 @@ class ParentStreamConfig(BaseModel):
22112226
examples=["id", "{{ config['parent_record_id'] }}"],
22122227
title="Parent Key",
22132228
)
2214-
stream: DeclarativeStream = Field(
2229+
stream: Union[DeclarativeStream, StateDelegatingStream] = Field(
22152230
..., description="Reference to the parent stream.", title="Parent Stream"
22162231
)
22172232
partition_field: str = Field(
@@ -2238,6 +2253,22 @@ class ParentStreamConfig(BaseModel):
22382253
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
22392254

22402255

2256+
class StateDelegatingStream(BaseModel):
2257+
type: Literal["StateDelegatingStream"]
2258+
name: str = Field(..., description="The stream name.", example=["Users"], title="Name")
2259+
full_refresh_stream: DeclarativeStream = Field(
2260+
...,
2261+
description="Component used to coordinate how records are extracted across stream slices and request pages when the state is empty or not provided.",
2262+
title="Retriever",
2263+
)
2264+
incremental_stream: DeclarativeStream = Field(
2265+
...,
2266+
description="Component used to coordinate how records are extracted across stream slices and request pages when the state provided.",
2267+
title="Retriever",
2268+
)
2269+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2270+
2271+
22412272
class SimpleRetriever(BaseModel):
22422273
type: Literal["SimpleRetriever"]
22432274
record_selector: RecordSelector = Field(
@@ -2423,5 +2454,6 @@ class DynamicDeclarativeStream(BaseModel):
24232454
DeclarativeStream.update_forward_refs()
24242455
SessionTokenAuthenticator.update_forward_refs()
24252456
DynamicSchemaLoader.update_forward_refs()
2457+
ParentStreamConfig.update_forward_refs()
24262458
SimpleRetriever.update_forward_refs()
24272459
AsyncRetriever.update_forward_refs()

0 commit comments

Comments
 (0)