Skip to content

Commit 0a95ed3

Browse files
committed
Merge branch 'lazebnyi/add-lazy-read-to-simple-retriver' of github.com:airbytehq/airbyte-python-cdk into lazebnyi/add-lazy-read-to-simple-retriver
2 parents 0829662 + 3d31e27 commit 0a95ed3

File tree

10 files changed

+194
-28
lines changed

10 files changed

+194
-28
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,19 @@ definitions:
116116
type: array
117117
items:
118118
"$ref": "#/definitions/AddedFieldDefinition"
119+
condition:
120+
description: Fields will be added if expression is evaluated to True.
121+
type: string
122+
default: ""
123+
interpolation_context:
124+
- config
125+
- property
126+
- parameters
127+
examples:
128+
- "{{ property|string == '' }}"
129+
- "{{ property is integer }}"
130+
- "{{ property|length > 5 }}"
131+
- "{{ property == 'some_string_to_match' }}"
119132
$parameters:
120133
type: object
121134
additionalProperties: true
@@ -2265,6 +2278,10 @@ definitions:
22652278
title: Delete Origin Value
22662279
description: Whether to delete the origin value or keep it. Default is False.
22672280
type: boolean
2281+
replace_record:
2282+
title: Replace Origin Record
2283+
description: Whether to replace the origin record or not. Default is False.
2284+
type: boolean
22682285
$parameters:
22692286
type: object
22702287
additionalProperties: true
@@ -3000,6 +3017,10 @@ definitions:
30003017
- "$ref": "#/definitions/SchemaNormalization"
30013018
- "$ref": "#/definitions/CustomSchemaNormalization"
30023019
default: None
3020+
transform_before_filtering:
3021+
description: If true, transformation will be applied before record filtering.
3022+
type: boolean
3023+
default: false
30033024
$parameters:
30043025
type: object
30053026
additionalProperties: true

airbyte_cdk/sources/declarative/declarative_stream.py

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1515
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
1616
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
17+
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
1718
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1819
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
1920
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
@@ -76,11 +77,17 @@ def primary_key(self, value: str) -> None:
7677

7778
@property
7879
def exit_on_rate_limit(self) -> bool:
80+
if isinstance(self.retriever, AsyncRetriever):
81+
return self.retriever.exit_on_rate_limit
82+
7983
return self.retriever.requester.exit_on_rate_limit # type: ignore # abstract Retriever class has not requester attribute
8084

8185
@exit_on_rate_limit.setter
8286
def exit_on_rate_limit(self, value: bool) -> None:
83-
self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined]
87+
if isinstance(self.retriever, AsyncRetriever):
88+
self.retriever.exit_on_rate_limit = value
89+
else:
90+
self.retriever.requester.exit_on_rate_limit = value # type: ignore[attr-defined]
8491

8592
@property # type: ignore
8693
def name(self) -> str:

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -877,6 +877,11 @@ class DpathFlattenFields(BaseModel):
877877
description="Whether to delete the origin value or keep it. Default is False.",
878878
title="Delete Origin Value",
879879
)
880+
replace_record: Optional[bool] = Field(
881+
None,
882+
description="Whether to replace the origin record or not. Default is False.",
883+
title="Replace Origin Record",
884+
)
880885
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
881886

882887

@@ -1460,6 +1465,16 @@ class AddFields(BaseModel):
14601465
description="List of transformations (path and corresponding value) that will be added to the record.",
14611466
title="Fields",
14621467
)
1468+
condition: Optional[str] = Field(
1469+
"",
1470+
description="Fields will be added if expression is evaluated to True.,",
1471+
examples=[
1472+
"{{ property|string == '' }}",
1473+
"{{ property is integer }}",
1474+
"{{ property|length > 5 }}",
1475+
"{{ property == 'some_string_to_match' }}",
1476+
],
1477+
)
14631478
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
14641479

14651480

@@ -1771,6 +1786,10 @@ class RecordSelector(BaseModel):
17711786
description="Responsible for normalization according to the schema.",
17721787
title="Schema Normalization",
17731788
)
1789+
transform_before_filtering: Optional[bool] = Field(
1790+
False,
1791+
description="If true, transformation will be applied before record filtering.",
1792+
)
17741793
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
17751794

17761795

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -713,7 +713,11 @@ def create_add_fields(self, model: AddFieldsModel, config: Config, **kwargs: Any
713713
)
714714
for added_field_definition_model in model.fields
715715
]
716-
return AddFields(fields=added_field_definitions, parameters=model.parameters or {})
716+
return AddFields(
717+
fields=added_field_definitions,
718+
condition=model.condition or "",
719+
parameters=model.parameters or {},
720+
)
717721

718722
def create_keys_to_lower_transformation(
719723
self, model: KeysToLowerModel, config: Config, **kwargs: Any
@@ -749,6 +753,7 @@ def create_dpath_flatten_fields(
749753
delete_origin_value=model.delete_origin_value
750754
if model.delete_origin_value is not None
751755
else False,
756+
replace_record=model.replace_record if model.replace_record is not None else False,
752757
parameters=model.parameters or {},
753758
)
754759

@@ -1903,6 +1908,10 @@ def _merge_stream_slicers(
19031908
) -> Optional[StreamSlicer]:
19041909
retriever_model = model.retriever
19051910

1911+
stream_slicer = self._build_stream_slicer_from_partition_router(
1912+
retriever_model, config, stream_name=model.name
1913+
)
1914+
19061915
if retriever_model.type == "AsyncRetriever":
19071916
is_not_datetime_cursor = (
19081917
model.incremental_sync.type != "DatetimeBasedCursor"
@@ -1922,13 +1931,11 @@ def _merge_stream_slicers(
19221931
"AsyncRetriever with cursor other than DatetimeBasedCursor is not supported yet."
19231932
)
19241933

1925-
if is_partition_router:
1934+
if is_partition_router and not stream_slicer:
19261935
# Note that this development is also done in parallel to the per partition development which once merged
19271936
# we could support here by calling create_concurrent_cursor_from_perpartition_cursor
19281937
raise ValueError("Per partition state is not supported yet for AsyncRetriever.")
19291938

1930-
stream_slicer = self._build_stream_slicer_from_partition_router(retriever_model, config, stream_name=model.name)
1931-
19321939
if model.incremental_sync:
19331940
return self._build_incremental_cursor(model, stream_slicer, config)
19341941

@@ -2607,7 +2614,9 @@ def create_record_selector(
26072614
else None
26082615
)
26092616

2610-
transform_before_filtering = False
2617+
assert model.transform_before_filtering is not None # for mypy
2618+
2619+
transform_before_filtering = model.transform_before_filtering
26112620
if client_side_incremental_sync:
26122621
record_filter = ClientSideIncrementalRecordFilterDecorator(
26132622
config=config,

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -320,20 +320,15 @@ def set_initial_state(self, stream_state: StreamState) -> None:
320320

321321
parent_state = stream_state.get("parent_state", {})
322322

323-
# If `parent_state` doesn't exist and at least one parent stream has an incremental dependency,
324-
# copy the child state to parent streams with incremental dependencies.
325-
incremental_dependency = any(
326-
[parent_config.incremental_dependency for parent_config in self.parent_stream_configs]
327-
)
328-
if not parent_state and not incremental_dependency:
329-
return
330-
331-
if not parent_state and incremental_dependency:
332-
# Migrate child state to parent state format
333-
parent_state = self._migrate_child_state_to_parent_state(stream_state)
334-
335323
# Set state for each parent stream with an incremental dependency
336324
for parent_config in self.parent_stream_configs:
325+
if (
326+
not parent_state.get(parent_config.stream.name, {})
327+
and parent_config.incremental_dependency
328+
):
329+
# Migrate child state to parent state format
330+
parent_state = self._migrate_child_state_to_parent_state(stream_state)
331+
337332
if parent_config.incremental_dependency:
338333
parent_config.stream.state = parent_state.get(parent_config.stream.name, {})
339334

airbyte_cdk/sources/declarative/retrievers/async_retriever.py

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,36 @@ class AsyncRetriever(Retriever):
3636
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3737
self._parameters = parameters
3838

39+
@property
40+
def exit_on_rate_limit(self) -> bool:
41+
"""
42+
Whether to exit on rate limit. This is a property of the job repository
43+
and not the stream slicer. The stream slicer is responsible for creating
44+
the jobs, but the job repository is responsible for managing the rate
45+
limits and other job-related properties.
46+
47+
Note:
48+
- If the `creation_requester` cannot place / create the job - it might be the case of the RateLimits
49+
- If the `creation_requester` can place / create the job - it means all other requesters should successfully manage
50+
to complete the results.
51+
"""
52+
job_orchestrator = self.stream_slicer._job_orchestrator
53+
if job_orchestrator is None:
54+
# Default value when orchestrator is not available
55+
return False
56+
return job_orchestrator._job_repository.creation_requester.exit_on_rate_limit # type: ignore
57+
58+
@exit_on_rate_limit.setter
59+
def exit_on_rate_limit(self, value: bool) -> None:
60+
"""
61+
Sets the `exit_on_rate_limit` property of the job repository > creation_requester,
62+
meaning that the Job cannot be placed / created if the rate limit is reached.
63+
Thus no further work on managing jobs is expected to be done.
64+
"""
65+
job_orchestrator = self.stream_slicer._job_orchestrator
66+
if job_orchestrator is not None:
67+
job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment]
68+
3969
@property
4070
def state(self) -> StreamState:
4171
"""

airbyte_cdk/sources/declarative/transformations/add_fields.py

Lines changed: 10 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,13 @@
11
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
33
#
44

55
from dataclasses import InitVar, dataclass, field
66
from typing import Any, Dict, List, Mapping, Optional, Type, Union
77

88
import dpath
99

10+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1011
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1112
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1213
from airbyte_cdk.sources.types import Config, FieldPointer, StreamSlice, StreamState
@@ -86,11 +87,16 @@ class AddFields(RecordTransformation):
8687

8788
fields: List[AddedFieldDefinition]
8889
parameters: InitVar[Mapping[str, Any]]
90+
condition: str = ""
8991
_parsed_fields: List[ParsedAddFieldDefinition] = field(
9092
init=False, repr=False, default_factory=list
9193
)
9294

9395
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
96+
self._filter_interpolator = InterpolatedBoolean(
97+
condition=self.condition, parameters=parameters
98+
)
99+
94100
for add_field in self.fields:
95101
if len(add_field.path) < 1:
96102
raise ValueError(
@@ -132,7 +138,9 @@ def transform(
132138
for parsed_field in self._parsed_fields:
133139
valid_types = (parsed_field.value_type,) if parsed_field.value_type else None
134140
value = parsed_field.value.eval(config, valid_types=valid_types, **kwargs)
135-
dpath.new(record, parsed_field.path, value)
141+
is_empty_condition = not self.condition
142+
if is_empty_condition or self._filter_interpolator.eval(config, value=value, **kwargs):
143+
dpath.new(record, parsed_field.path, value)
136144

137145
def __eq__(self, other: Any) -> bool:
138146
return bool(self.__dict__ == other.__dict__)

airbyte_cdk/sources/declarative/transformations/dpath_flatten_fields.py

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,15 @@ class DpathFlattenFields(RecordTransformation):
1515
1616
field_path: List[Union[InterpolatedString, str]] path to the field to flatten.
1717
delete_origin_value: bool = False whether to delete origin field or keep it. Default is False.
18+
replace_record: bool = False whether to replace origin record or not. Default is False.
1819
1920
"""
2021

2122
config: Config
2223
field_path: List[Union[InterpolatedString, str]]
2324
parameters: InitVar[Mapping[str, Any]]
2425
delete_origin_value: bool = False
26+
replace_record: bool = False
2527

2628
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
2729
self._field_path = [
@@ -48,8 +50,12 @@ def transform(
4850
extracted = dpath.get(record, path, default=[])
4951

5052
if isinstance(extracted, dict):
51-
conflicts = set(extracted.keys()) & set(record.keys())
52-
if not conflicts:
53-
if self.delete_origin_value:
54-
dpath.delete(record, path)
53+
if self.replace_record and extracted:
54+
dpath.delete(record, "**")
5555
record.update(extracted)
56+
else:
57+
conflicts = set(extracted.keys()) & set(record.keys())
58+
if not conflicts:
59+
if self.delete_origin_value:
60+
dpath.delete(record, path)
61+
record.update(extracted)

0 commit comments

Comments
 (0)