Skip to content

Commit cb9beef

Browse files
committed
Remove cursor and RFR from the simple retriever and streamline classes
1 parent 6504148 commit cb9beef

File tree

7 files changed

+62
-574
lines changed

7 files changed

+62
-574
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 1 addition & 85 deletions
Original file line numberDiff line numberDiff line change
@@ -34,10 +34,6 @@
3434
from airbyte_cdk.connector_builder.models import (
3535
LogMessage as ConnectorBuilderLogMessage,
3636
)
37-
from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream
38-
from airbyte_cdk.legacy.sources.declarative.incremental import (
39-
DatetimeBasedCursor,
40-
)
4137
from airbyte_cdk.models import (
4238
AirbyteStateBlob,
4339
AirbyteStateMessage,
@@ -735,7 +731,6 @@ def _init_mappings(self) -> None:
735731
CustomTransformationModel: self.create_custom_component,
736732
CustomValidationStrategyModel: self.create_custom_component,
737733
CustomConfigTransformationModel: self.create_custom_component,
738-
DatetimeBasedCursorModel: self.create_datetime_based_cursor,
739734
DeclarativeStreamModel: self.create_default_stream,
740735
DefaultErrorHandlerModel: self.create_default_error_handler,
741736
DefaultPaginatorModel: self.create_default_paginator,
@@ -758,7 +753,6 @@ def _init_mappings(self) -> None:
758753
FlattenFieldsModel: self.create_flatten_fields,
759754
DpathFlattenFieldsModel: self.create_dpath_flatten_fields,
760755
IterableDecoderModel: self.create_iterable_decoder,
761-
IncrementingCountCursorModel: self.create_incrementing_count_cursor,
762756
XmlDecoderModel: self.create_xml_decoder,
763757
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
764758
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
@@ -1926,64 +1920,6 @@ def _create_nested_component(
19261920
def _is_component(model_value: Any) -> bool:
19271921
return isinstance(model_value, dict) and model_value.get("type") is not None
19281922

1929-
def create_datetime_based_cursor(
1930-
self, model: DatetimeBasedCursorModel, config: Config, **kwargs: Any
1931-
) -> DatetimeBasedCursor:
1932-
start_datetime: Union[str, MinMaxDatetime] = (
1933-
model.start_datetime
1934-
if isinstance(model.start_datetime, str)
1935-
else self.create_min_max_datetime(model.start_datetime, config)
1936-
)
1937-
end_datetime: Union[str, MinMaxDatetime, None] = None
1938-
if model.is_data_feed and model.end_datetime:
1939-
raise ValueError("Data feed does not support end_datetime")
1940-
if model.is_data_feed and model.is_client_side_incremental:
1941-
raise ValueError(
1942-
"`Client side incremental` cannot be applied with `data feed`. Choose only 1 from them."
1943-
)
1944-
if model.end_datetime:
1945-
end_datetime = (
1946-
model.end_datetime
1947-
if isinstance(model.end_datetime, str)
1948-
else self.create_min_max_datetime(model.end_datetime, config)
1949-
)
1950-
1951-
end_time_option = (
1952-
self._create_component_from_model(
1953-
model.end_time_option, config, parameters=model.parameters or {}
1954-
)
1955-
if model.end_time_option
1956-
else None
1957-
)
1958-
start_time_option = (
1959-
self._create_component_from_model(
1960-
model.start_time_option, config, parameters=model.parameters or {}
1961-
)
1962-
if model.start_time_option
1963-
else None
1964-
)
1965-
1966-
return DatetimeBasedCursor(
1967-
cursor_field=model.cursor_field,
1968-
cursor_datetime_formats=model.cursor_datetime_formats
1969-
if model.cursor_datetime_formats
1970-
else [],
1971-
cursor_granularity=model.cursor_granularity,
1972-
datetime_format=model.datetime_format,
1973-
end_datetime=end_datetime,
1974-
start_datetime=start_datetime,
1975-
step=model.step,
1976-
end_time_option=end_time_option,
1977-
lookback_window=model.lookback_window,
1978-
start_time_option=start_time_option,
1979-
partition_field_end=model.partition_field_end,
1980-
partition_field_start=model.partition_field_start,
1981-
message_repository=self._message_repository,
1982-
is_compare_strictly=model.is_compare_strictly,
1983-
config=config,
1984-
parameters=model.parameters or {},
1985-
)
1986-
19871923
def create_default_stream(
19881924
self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any
19891925
) -> AbstractStream:
@@ -2647,24 +2583,6 @@ def create_gzip_decoder(
26472583
fallback_parser=gzip_parser.inner_parser,
26482584
)
26492585

2650-
# todo: This method should be removed once we deprecate the SimpleRetriever.cursor field and the various
2651-
# state methods
2652-
@staticmethod
2653-
def create_incrementing_count_cursor(
2654-
model: IncrementingCountCursorModel, config: Config, **kwargs: Any
2655-
) -> DatetimeBasedCursor:
2656-
# This should not actually get used anywhere at runtime, but needed to add this to pass checks since
2657-
# we still parse models into components. The issue is that there's no runtime implementation of a
2658-
# IncrementingCountCursor.
2659-
# A known and expected issue with this stub is running a check with the declared IncrementingCountCursor because it is run without ConcurrentCursor.
2660-
return DatetimeBasedCursor(
2661-
cursor_field=model.cursor_field,
2662-
datetime_format="%Y-%m-%d",
2663-
start_datetime="2024-12-12",
2664-
config=config,
2665-
parameters={},
2666-
)
2667-
26682586
@staticmethod
26692587
def create_iterable_decoder(
26702588
model: IterableDecoderModel, config: Config, **kwargs: Any
@@ -3392,7 +3310,6 @@ def _get_url(req: Requester) -> str:
33923310
record_selector=record_selector,
33933311
stream_slicer=_NO_STREAM_SLICING,
33943312
request_option_provider=request_options_provider,
3395-
cursor=None,
33963313
config=config,
33973314
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
33983315
parameters=model.parameters or {},
@@ -3413,7 +3330,6 @@ def _get_url(req: Requester) -> str:
34133330
record_selector=record_selector,
34143331
stream_slicer=_NO_STREAM_SLICING,
34153332
request_option_provider=request_options_provider,
3416-
cursor=None,
34173333
config=config,
34183334
ignore_stream_slicer_parameters_on_paginated_requests=ignore_stream_slicer_parameters_on_paginated_requests,
34193335
additional_query_properties=query_properties,
@@ -3506,7 +3422,7 @@ def create_state_delegating_stream(
35063422
config: Config,
35073423
has_parent_state: Optional[bool] = None,
35083424
**kwargs: Any,
3509-
) -> DeclarativeStream:
3425+
) -> DefaultStream:
35103426
if (
35113427
model.full_refresh_stream.name != model.name
35123428
or model.name != model.incremental_stream.name

airbyte_cdk/sources/declarative/retrievers/retriever.py

Lines changed: 9 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,23 +36,19 @@ def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
3636
"""Returns the stream slices"""
3737

3838
@property
39-
@abstractmethod
4039
@deprecated("State management is being moved to the stream level.")
4140
def state(self) -> StreamState:
42-
"""State getter, should return state in form that can serialized to a string and send to the output
43-
as a STATE AirbyteMessage.
44-
45-
A good example of a state is a cursor_value:
46-
{
47-
self.cursor_field: "cursor_value"
48-
}
49-
50-
State should try to be as small as possible but at the same time descriptive enough to restore
51-
syncing process from the point where it stopped.
5241
"""
42+
Does nothing as this method is deprecated, so underlying Retriever implementations
43+
do not need to implement this.
44+
"""
45+
return {}
5346

5447
@state.setter
55-
@abstractmethod
5648
@deprecated("State management is being moved to the stream level.")
5749
def state(self, value: StreamState) -> None:
58-
"""State setter, accept state serialized by state getter."""
50+
"""
51+
Does nothing as this method is deprecated, so underlying Retriever implementations
52+
do not need to implement this.
53+
"""
54+
pass

0 commit comments

Comments
 (0)