Skip to content

Commit 19cd2db

Browse files
author
Oleksandr Bazarnov
committed
Merge remote-tracking branch 'origin/main' into baz/cdk/add-deprecations-module
2 parents b2332da + 8ef954c commit 19cd2db

File tree

2 files changed

+72
-8
lines changed

2 files changed

+72
-8
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2870,13 +2870,9 @@ def create_record_selector(
28702870
else None
28712871
)
28722872

2873-
if model.transform_before_filtering is None:
2874-
# default to False if not set
2875-
model.transform_before_filtering = False
2876-
2877-
assert model.transform_before_filtering is not None # for mypy
2878-
2879-
transform_before_filtering = model.transform_before_filtering
2873+
transform_before_filtering = (
2874+
False if model.transform_before_filtering is None else model.transform_before_filtering
2875+
)
28802876
if client_side_incremental_sync:
28812877
record_filter = ClientSideIncrementalRecordFilterDecorator(
28822878
config=config,
@@ -2886,7 +2882,11 @@ def create_record_selector(
28862882
else None,
28872883
**client_side_incremental_sync,
28882884
)
2889-
transform_before_filtering = True
2885+
transform_before_filtering = (
2886+
True
2887+
if model.transform_before_filtering is None
2888+
else model.transform_before_filtering
2889+
)
28902890

28912891
if model.schema_normalization is None:
28922892
# default to no schema normalization if not set

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import freezegun
1313
import isodate
14+
import pytest
1415
from typing_extensions import deprecated
1516

1617
from airbyte_cdk.models import (
@@ -1876,6 +1877,69 @@ def test_stream_using_is_client_side_incremental_has_cursor_state():
18761877
assert client_side_incremental_cursor_state == expected_cursor_value
18771878

18781879

1880+
@pytest.mark.parametrize(
1881+
"expected_transform_before_filtering",
1882+
[
1883+
pytest.param(
1884+
True,
1885+
id="transform before filtering",
1886+
),
1887+
pytest.param(
1888+
False,
1889+
id="transform after filtering",
1890+
),
1891+
pytest.param(
1892+
None,
1893+
id="default transform before filtering",
1894+
),
1895+
],
1896+
)
1897+
def test_stream_using_is_client_side_incremental_has_transform_before_filtering_according_to_manifest(
1898+
expected_transform_before_filtering,
1899+
):
1900+
expected_cursor_value = "2024-07-01"
1901+
state = [
1902+
AirbyteStateMessage(
1903+
type=AirbyteStateType.STREAM,
1904+
stream=AirbyteStreamState(
1905+
stream_descriptor=StreamDescriptor(name="locations", namespace=None),
1906+
stream_state=AirbyteStateBlob(updated_at=expected_cursor_value),
1907+
),
1908+
)
1909+
]
1910+
1911+
manifest_with_stream_state_interpolation = copy.deepcopy(_MANIFEST)
1912+
1913+
# Enable semi-incremental on the locations stream
1914+
manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["incremental_sync"][
1915+
"is_client_side_incremental"
1916+
] = True
1917+
1918+
if expected_transform_before_filtering is not None:
1919+
manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["retriever"][
1920+
"record_selector"
1921+
]["transform_before_filtering"] = expected_transform_before_filtering
1922+
1923+
source = ConcurrentDeclarativeSource(
1924+
source_config=manifest_with_stream_state_interpolation,
1925+
config=_CONFIG,
1926+
catalog=_CATALOG,
1927+
state=state,
1928+
)
1929+
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)
1930+
1931+
locations_stream = concurrent_streams[2]
1932+
assert isinstance(locations_stream, DefaultStream)
1933+
1934+
simple_retriever = locations_stream._stream_partition_generator._partition_factory._retriever
1935+
record_selector = simple_retriever.record_selector
1936+
1937+
if expected_transform_before_filtering is not None:
1938+
assert record_selector.transform_before_filtering == expected_transform_before_filtering
1939+
else:
1940+
assert record_selector.transform_before_filtering is True
1941+
1942+
18791943
def create_wrapped_stream(stream: DeclarativeStream) -> Stream:
18801944
slice_to_records_mapping = get_mocked_read_records_output(stream_name=stream.name)
18811945

0 commit comments

Comments
 (0)