Skip to content

Commit c7c63a7

Browse files
author
maxime.c
committed
format
1 parent 3543837 commit c7c63a7

File tree

6 files changed

+100
-22
lines changed

6 files changed

+100
-22
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ def __init__(
8181
connector_state_converter: AbstractStreamStateConverter,
8282
cursor_field: CursorField,
8383
use_global_cursor: bool = False,
84-
attempt_to_create_cursor_if_not_provided: bool = False
84+
attempt_to_create_cursor_if_not_provided: bool = False,
8585
) -> None:
8686
self._global_cursor: Optional[StreamState] = {}
8787
self._stream_name = stream_name
@@ -517,7 +517,10 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
517517
"Invalid state as stream slices that are emitted should refer to an existing cursor"
518518
)
519519
partition_key = self._to_partition_key(record.associated_slice.partition)
520-
if partition_key not in self._cursor_per_partition and not self._attempt_to_create_cursor_if_not_provided:
520+
if (
521+
partition_key not in self._cursor_per_partition
522+
and not self._attempt_to_create_cursor_if_not_provided
523+
):
521524
raise ValueError(
522525
"Invalid state as stream slices that are emitted should refer to an existing cursor"
523526
)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1561,7 +1561,6 @@ def create_concurrent_cursor_from_perpartition_cursor(
15611561
partition_router: PartitionRouter,
15621562
stream_state_migrations: Optional[List[Any]] = None,
15631563
attempt_to_create_cursor_if_not_provided: bool = False,
1564-
15651564
**kwargs: Any,
15661565
) -> ConcurrentPerPartitionCursor:
15671566
component_type = component_definition.get("type")
@@ -2188,7 +2187,9 @@ def _build_concurrent_cursor(
21882187
# FIXME should this be in create_concurrent_cursor_from_perpartition_cursor
21892188
if model.state_migrations:
21902189
state_transformations = [
2191-
self._create_component_from_model(state_migration, config, declarative_stream=model)
2190+
self._create_component_from_model(
2191+
state_migration, config, declarative_stream=model
2192+
)
21922193
for state_migration in model.state_migrations
21932194
]
21942195
else:
@@ -2228,7 +2229,9 @@ def _build_concurrent_cursor(
22282229
attempt_to_create_cursor_if_not_provided=True,
22292230
)
22302231
else:
2231-
raise ValueError(f"Incremental sync of type {type(model.incremental_sync)} is not supported")
2232+
raise ValueError(
2233+
f"Incremental sync of type {type(model.incremental_sync)} is not supported"
2234+
)
22322235
return None
22332236

22342237
def _build_resumable_cursor(

airbyte_cdk/sources/file_based/stream/concurrent/cursor/file_based_concurrent_cursor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828

2929

3030
class FileBasedConcurrentCursor(AbstractConcurrentFileBasedCursor):
31-
3231
CURSOR_FIELD = "_ab_source_file_last_modified"
3332
DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL = (
3433
DefaultFileBasedCursor.DEFAULT_DAYS_TO_SYNC_IF_HISTORY_IS_FULL

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -206,9 +206,15 @@ def _get_concurrent_state(
206206
if slices_from_partitioned_state:
207207
# We assume here that the slices have been already merged
208208
first_slice = slices_from_partitioned_state[0]
209-
value_from_partitioned_state = first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY] if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice else first_slice[self._connector_state_converter.END_KEY]
209+
value_from_partitioned_state = (
210+
first_slice[self._connector_state_converter.MOST_RECENT_RECORD_KEY]
211+
if self._connector_state_converter.MOST_RECENT_RECORD_KEY in first_slice
212+
else first_slice[self._connector_state_converter.END_KEY]
213+
)
210214
return (
211-
value_from_partitioned_state or self._start or self._connector_state_converter.zero_value,
215+
value_from_partitioned_state
216+
or self._start
217+
or self._connector_state_converter.zero_value,
212218
partitioned_state,
213219
)
214220
return self._connector_state_converter.convert_from_sequential_state(

unit_tests/sources/streams/concurrent/test_cursor.py

Lines changed: 81 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1780,6 +1780,8 @@ def test_close_partition_with_slice_range_granularity_concurrent_cursor_from_dat
17801780

17811781

17821782
_SHOULD_BE_SYNCED_START = 10
1783+
1784+
17831785
@pytest.mark.parametrize(
17841786
"record, should_be_synced",
17851787
[
@@ -1853,8 +1855,19 @@ def test_given_state_when_should_be_synced_then_use_cursor_value_to_filter():
18531855
_NO_LOOKBACK_WINDOW,
18541856
)
18551857

1856-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: state_value - 1}, stream_name="test_stream")) == False
1857-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: state_value}, stream_name="test_stream")) == True
1858+
assert (
1859+
cursor.should_be_synced(
1860+
Record(data={_A_CURSOR_FIELD_KEY: state_value - 1}, stream_name="test_stream")
1861+
)
1862+
== False
1863+
)
1864+
assert (
1865+
cursor.should_be_synced(
1866+
Record(data={_A_CURSOR_FIELD_KEY: state_value}, stream_name="test_stream")
1867+
)
1868+
== True
1869+
)
1870+
18581871

18591872
def test_given_partitioned_state_without_slices_nor_start_when_should_be_synced_then_use_zero_value_to_filter():
18601873
cursor = ConcurrentCursor(
@@ -1874,8 +1887,14 @@ def test_given_partitioned_state_without_slices_nor_start_when_should_be_synced_
18741887
_NO_LOOKBACK_WINDOW,
18751888
)
18761889

1877-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: -1}, stream_name="test_stream")) == False
1878-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: 0}, stream_name="test_stream")) == True
1890+
assert (
1891+
cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: -1}, stream_name="test_stream"))
1892+
== False
1893+
)
1894+
assert (
1895+
cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: 0}, stream_name="test_stream"))
1896+
== True
1897+
)
18791898

18801899

18811900
def test_given_partitioned_state_without_slices_but_start_when_should_be_synced_then_use_start_value_to_filter():
@@ -1896,8 +1915,20 @@ def test_given_partitioned_state_without_slices_but_start_when_should_be_synced_
18961915
_NO_LOOKBACK_WINDOW,
18971916
)
18981917

1899-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: _SHOULD_BE_SYNCED_START-1}, stream_name="test_stream")) == False
1900-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: _SHOULD_BE_SYNCED_START}, stream_name="test_stream")) == True
1918+
assert (
1919+
cursor.should_be_synced(
1920+
Record(
1921+
data={_A_CURSOR_FIELD_KEY: _SHOULD_BE_SYNCED_START - 1}, stream_name="test_stream"
1922+
)
1923+
)
1924+
== False
1925+
)
1926+
assert (
1927+
cursor.should_be_synced(
1928+
Record(data={_A_CURSOR_FIELD_KEY: _SHOULD_BE_SYNCED_START}, stream_name="test_stream")
1929+
)
1930+
== True
1931+
)
19011932

19021933

19031934
def test_given_partitioned_state_with_one_slice_and_most_recent_cursor_value_when_should_be_synced_then_use_most_recent_cursor_value_of_slice_to_filter():
@@ -1921,8 +1952,20 @@ def test_given_partitioned_state_with_one_slice_and_most_recent_cursor_value_whe
19211952
_NO_LOOKBACK_WINDOW,
19221953
)
19231954

1924-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: most_recent_cursor_value - 1}, stream_name="test_stream")) == False
1925-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: most_recent_cursor_value}, stream_name="test_stream")) == True
1955+
assert (
1956+
cursor.should_be_synced(
1957+
Record(
1958+
data={_A_CURSOR_FIELD_KEY: most_recent_cursor_value - 1}, stream_name="test_stream"
1959+
)
1960+
)
1961+
== False
1962+
)
1963+
assert (
1964+
cursor.should_be_synced(
1965+
Record(data={_A_CURSOR_FIELD_KEY: most_recent_cursor_value}, stream_name="test_stream")
1966+
)
1967+
== True
1968+
)
19261969

19271970

19281971
def test_given_partitioned_state_with_one_slice_without_most_recent_cursor_value_when_should_be_synced_then_use_upper_boundary_of_slice_to_filter():
@@ -1946,8 +1989,18 @@ def test_given_partitioned_state_with_one_slice_without_most_recent_cursor_value
19461989
_NO_LOOKBACK_WINDOW,
19471990
)
19481991

1949-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: slice_end - 1}, stream_name="test_stream")) == False
1950-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: slice_end}, stream_name="test_stream")) == True
1992+
assert (
1993+
cursor.should_be_synced(
1994+
Record(data={_A_CURSOR_FIELD_KEY: slice_end - 1}, stream_name="test_stream")
1995+
)
1996+
== False
1997+
)
1998+
assert (
1999+
cursor.should_be_synced(
2000+
Record(data={_A_CURSOR_FIELD_KEY: slice_end}, stream_name="test_stream")
2001+
)
2002+
== True
2003+
)
19512004

19522005

19532006
def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then_use_upper_boundary_of_first_slice_to_filter():
@@ -1973,8 +2026,23 @@ def test_given_partitioned_state_with_multiple_slices_when_should_be_synced_then
19732026
_NO_LOOKBACK_WINDOW,
19742027
)
19752028

1976-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: first_slice_end - 1}, stream_name="test_stream")) == False
1977-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream")) == True
2029+
assert (
2030+
cursor.should_be_synced(
2031+
Record(data={_A_CURSOR_FIELD_KEY: first_slice_end - 1}, stream_name="test_stream")
2032+
)
2033+
== False
2034+
)
2035+
assert (
2036+
cursor.should_be_synced(
2037+
Record(data={_A_CURSOR_FIELD_KEY: first_slice_end}, stream_name="test_stream")
2038+
)
2039+
== True
2040+
)
19782041
# even if this is within a boundary that has been synced, we don't take any chance and we sync it
19792042
# anyway in most cases, it shouldn't be pulled because we query for specific slice boundaries to the API
1980-
assert cursor.should_be_synced(Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream")) == True
2043+
assert (
2044+
cursor.should_be_synced(
2045+
Record(data={_A_CURSOR_FIELD_KEY: second_slice_start}, stream_name="test_stream")
2046+
)
2047+
== True
2048+
)

unit_tests/sources/streams/test_stream_read.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ def read_records(
114114

115115

116116
class MockConcurrentCursor(Cursor):
117-
118117
_state: MutableMapping[str, Any]
119118
_message_repository: MessageRepository
120119

0 commit comments

Comments
 (0)