Skip to content

Commit ef07959

Browse files
committed
remove stream_slices() from SimpleRetriever and AsyncRetriever
1 parent cb9beef commit ef07959

File tree

5 files changed

+9
-80
lines changed

5 files changed

+9
-80
lines changed

airbyte_cdk/legacy/sources/declarative/declarative_stream.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,8 @@
55
from dataclasses import InitVar, dataclass, field
66
from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union
77

8+
from typing_extensions import deprecated
9+
810
from airbyte_cdk.legacy.sources.declarative.incremental import (
911
GlobalSubstreamCursor,
1012
PerPartitionCursor,
@@ -13,7 +15,6 @@
1315
from airbyte_cdk.models import SyncMode
1416
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1517
from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration
16-
from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever
1718
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
1819
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1920
from airbyte_cdk.sources.declarative.schema import DefaultSchemaLoader
@@ -28,6 +29,7 @@
2829
from airbyte_cdk.sources.types import Config, StreamSlice
2930

3031

32+
@deprecated("DeclarativeStream has been deprecated in favor of the concurrent DefaultStream")
3133
@dataclass
3234
class DeclarativeStream(Stream):
3335
"""
@@ -198,8 +200,6 @@ def state_checkpoint_interval(self) -> Optional[int]:
198200
return None
199201

200202
def get_cursor(self) -> Optional[Cursor]:
201-
if self.retriever and isinstance(self.retriever, SimpleRetriever):
202-
return self.retriever.cursor
203203
return None
204204

205205
def _get_checkpoint_reader(

airbyte_cdk/sources/declarative/retrievers/async_retriever.py

Lines changed: 2 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
)
1212
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1313
from airbyte_cdk.sources.streams.core import StreamData
14-
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
14+
from airbyte_cdk.sources.types import Config, StreamSlice
1515
from airbyte_cdk.sources.utils.slice_logger import AlwaysLogSliceLogger
1616

1717

@@ -59,30 +59,6 @@ def exit_on_rate_limit(self, value: bool) -> None:
5959
if job_orchestrator is not None:
6060
job_orchestrator._job_repository.creation_requester.exit_on_rate_limit = value # type: ignore[attr-defined, assignment]
6161

62-
@property
63-
def state(self) -> StreamState:
64-
"""
65-
As a first iteration for sendgrid, there is no state to be managed
66-
"""
67-
return {}
68-
69-
@state.setter
70-
def state(self, value: StreamState) -> None:
71-
"""
72-
As a first iteration for sendgrid, there is no state to be managed
73-
"""
74-
pass
75-
76-
def _get_stream_state(self) -> StreamState:
77-
"""
78-
Gets the current state of the stream.
79-
80-
Returns:
81-
StreamState: Mapping[str, Any]
82-
"""
83-
84-
return self.state
85-
8662
def _validate_and_get_stream_slice_jobs(
8763
self, stream_slice: Optional[StreamSlice] = None
8864
) -> Iterable[AsyncJob]:
@@ -101,9 +77,6 @@ def _validate_and_get_stream_slice_jobs(
10177
"""
10278
return stream_slice.extra_fields.get("jobs", []) if stream_slice else []
10379

104-
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
105-
yield from self.stream_slicer.stream_slices()
106-
10780
def read_records(
10881
self,
10982
records_schema: Mapping[str, Any],
@@ -112,13 +85,12 @@ def read_records(
11285
# emit the slice_descriptor log message, for connector builder TestRead
11386
yield self.slice_logger.create_slice_log_message(stream_slice.cursor_slice) # type: ignore
11487

115-
stream_state: StreamState = self._get_stream_state()
11688
jobs: Iterable[AsyncJob] = self._validate_and_get_stream_slice_jobs(stream_slice)
11789
records: Iterable[Mapping[str, Any]] = self.stream_slicer.fetch_records(jobs)
11890

11991
yield from self.record_selector.filter_and_transform(
12092
all_data=records,
121-
stream_state=stream_state,
93+
stream_state={}, # stream_state as an interpolation context is deprecated
12294
records_schema=records_schema,
12395
stream_slice=stream_slice,
12496
)

airbyte_cdk/sources/declarative/retrievers/retriever.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,10 @@ def read_records(
3333
@abstractmethod
3434
@deprecated("Stream slicing is being moved to the stream level.")
3535
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
36-
"""Returns the stream slices"""
36+
"""Does nothing as this method is deprecated, so underlying Retriever implementations
37+
do not need to implement this.
38+
"""
39+
yield from []
3740

3841
@property
3942
@deprecated("State management is being moved to the stream level.")

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 0 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -456,23 +456,6 @@ def read_records(
456456
)
457457
yield from self._read_pages(record_generator, _slice)
458458

459-
def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore
460-
"""
461-
Specifies the slices for this stream. See the stream slicing section of the docs for more information.
462-
463-
:param sync_mode:
464-
:param cursor_field:
465-
:return:
466-
"""
467-
return self.stream_slicer.stream_slices()
468-
469-
# todo: There are a number of things that can be cleaned up when we remove self.cursor and all the related
470-
# SimpleRetriever state management that is handled by the concurrent CDK Framework:
471-
# - DONE ModelToComponentFactory.create_datetime_based_cursor() should be removed since it does need to be instantiated
472-
# - DONE ModelToComponentFactory.create_incrementing_count_cursor() should be removed since it's a placeholder
473-
# - DONE test_simple_retriever.py: Remove all imports and usages of legacy cursor components
474-
# - DONE test_model_to_component_factory.py:test_datetime_based_cursor() test can be removed
475-
476459
def _parse_records(
477460
self,
478461
response: Optional[requests.Response],

unit_tests/sources/declarative/retrievers/test_simple_retriever.py

Lines changed: 0 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,6 @@
2121
from airbyte_cdk.sources.declarative.decoders import JsonDecoder
2222
from airbyte_cdk.sources.declarative.extractors import DpathExtractor, HttpSelector, RecordSelector
2323
from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter
24-
from airbyte_cdk.sources.declarative.partition_routers.substream_partition_router import (
25-
ParentStreamConfig,
26-
SubstreamPartitionRouter,
27-
)
2824
from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator, Paginator
2925
from airbyte_cdk.sources.declarative.requesters.paginators.strategies import (
3026
CursorPaginationStrategy,
@@ -42,7 +38,6 @@
4238
from airbyte_cdk.sources.declarative.requesters.requester import HttpMethod, Requester
4339
from airbyte_cdk.sources.declarative.retrievers.pagination_tracker import PaginationTracker
4440
from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever
45-
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
4641
from airbyte_cdk.sources.streams.http.pagination_reset_exception import (
4742
PaginationResetRequiredException,
4843
)
@@ -425,30 +420,6 @@ def test_path(test_name, requester_path, paginator_path, expected_path):
425420
assert actual_path == expected_path
426421

427422

428-
def test_limit_stream_slices():
429-
maximum_number_of_slices = 4
430-
stream_slicer = MagicMock()
431-
stream_slicer.stream_slices.return_value = _generate_slices(maximum_number_of_slices * 2)
432-
stream_slicer_wrapped = StreamSlicerTestReadDecorator(
433-
wrapped_slicer=stream_slicer,
434-
maximum_number_of_slices=maximum_number_of_slices,
435-
)
436-
retriever = SimpleRetriever(
437-
name="stream_name",
438-
primary_key=primary_key,
439-
requester=MagicMock(),
440-
paginator=MagicMock(),
441-
record_selector=MagicMock(),
442-
stream_slicer=stream_slicer_wrapped,
443-
parameters={},
444-
config={},
445-
)
446-
447-
truncated_slices = list(retriever.stream_slices())
448-
449-
assert truncated_slices == _generate_slices(maximum_number_of_slices)
450-
451-
452423
def test_given_stream_data_is_not_record_when_read_records_then_update_slice_with_optional_record():
453424
stream_data = [
454425
AirbyteMessage(

0 commit comments

Comments
 (0)