diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 8cdc69a37..8913a87fd 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -58,9 +58,6 @@ from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AlwaysAvailableAvailabilityStrategy, -) from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream @@ -368,7 +365,6 @@ def _group_streams( partition_generator=partition_generator, name=declarative_stream.name, json_schema=declarative_stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), cursor_field=cursor.cursor_field.cursor_field_key if hasattr(cursor, "cursor_field") @@ -408,7 +404,6 @@ def _group_streams( partition_generator=partition_generator, name=declarative_stream.name, json_schema=declarative_stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), cursor_field=None, logger=self.logger, @@ -464,7 +459,6 @@ def _group_streams( partition_generator=partition_generator, name=declarative_stream.name, json_schema=declarative_stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=get_primary_key_from_stream(declarative_stream.primary_key), cursor_field=perpartition_cursor.cursor_field.cursor_field_key, logger=self.logger, diff --git a/airbyte_cdk/sources/file_based/availability_strategy/__init__.py b/airbyte_cdk/sources/file_based/availability_strategy/__init__.py index 8134a89e0..ee3c802df 100644 --- a/airbyte_cdk/sources/file_based/availability_strategy/__init__.py +++ b/airbyte_cdk/sources/file_based/availability_strategy/__init__.py @@ -1,11 +1,7 @@ -from .abstract_file_based_availability_strategy import ( - AbstractFileBasedAvailabilityStrategy, - AbstractFileBasedAvailabilityStrategyWrapper, -) +from .abstract_file_based_availability_strategy import AbstractFileBasedAvailabilityStrategy from .default_file_based_availability_strategy import DefaultFileBasedAvailabilityStrategy __all__ = [ "AbstractFileBasedAvailabilityStrategy", - "AbstractFileBasedAvailabilityStrategyWrapper", "DefaultFileBasedAvailabilityStrategy", ] diff --git a/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py b/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py index 12e1740b6..c7ae6ff43 100644 --- a/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py +++ b/airbyte_cdk/sources/file_based/availability_strategy/abstract_file_based_availability_strategy.py @@ -10,12 +10,6 @@ from airbyte_cdk.sources import Source from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AbstractAvailabilityStrategy, - StreamAvailability, - StreamAvailable, - StreamUnavailable, -) from airbyte_cdk.sources.streams.core import Stream if TYPE_CHECKING: @@ -28,7 +22,7 @@ def check_availability( # type: ignore[override] # Signature doesn't match bas self, stream: Stream, logger: logging.Logger, - _: Optional[Source], + source: Optional[Source] = None, ) -> Tuple[bool, Optional[str]]: """ Perform a connection check for the stream. @@ -51,23 +45,3 @@ def check_availability_and_parsability( Returns (True, None) if successful, otherwise (False, ). """ ... - - -class AbstractFileBasedAvailabilityStrategyWrapper(AbstractAvailabilityStrategy): - def __init__(self, stream: AbstractFileBasedStream) -> None: - self.stream = stream - - def check_availability(self, logger: logging.Logger) -> StreamAvailability: - is_available, reason = self.stream.availability_strategy.check_availability( - self.stream, logger, None - ) - if is_available: - return StreamAvailable() - return StreamUnavailable(reason or "") - - def check_availability_and_parsability( - self, logger: logging.Logger - ) -> Tuple[bool, Optional[str]]: - return self.stream.availability_strategy.check_availability_and_parsability( - self.stream, logger, None - ) diff --git a/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py b/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py index ef258b34d..e3fb0179e 100644 --- a/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py +++ b/airbyte_cdk/sources/file_based/stream/abstract_file_based_stream.py @@ -179,7 +179,6 @@ def record_passes_validation_policy(self, record: Mapping[str, Any]) -> bool: ) @cached_property - @deprecated("Deprecated as of CDK version 3.7.0.") def availability_strategy(self) -> AbstractFileBasedAvailabilityStrategy: return self._availability_strategy diff --git a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py index c36e5179d..fd8eef9b0 100644 --- a/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py +++ b/airbyte_cdk/sources/file_based/stream/concurrent/adapters.py @@ -21,7 +21,6 @@ from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.file_based.availability_strategy import ( AbstractFileBasedAvailabilityStrategy, - AbstractFileBasedAvailabilityStrategyWrapper, ) from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser @@ -97,7 +96,6 @@ def create_from_stream( ), name=stream.name, json_schema=stream.get_json_schema(), - availability_strategy=AbstractFileBasedAvailabilityStrategyWrapper(stream), primary_key=pk, cursor_field=cursor_field, logger=logger, diff --git a/airbyte_cdk/sources/streams/availability_strategy.py b/airbyte_cdk/sources/streams/availability_strategy.py index 312ddae19..96a2c9bc9 100644 --- a/airbyte_cdk/sources/streams/availability_strategy.py +++ b/airbyte_cdk/sources/streams/availability_strategy.py @@ -14,6 +14,7 @@ from airbyte_cdk.sources import Source +# FIXME this class AvailabilityStrategy(ABC): """ Abstract base class for checking stream availability. diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 26e6f09d4..33e7c4d10 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -9,7 +9,6 @@ from airbyte_cdk.models import AirbyteStream from airbyte_cdk.sources.source import ExperimentalClassWarning -from airbyte_cdk.sources.streams.concurrent.availability_strategy import StreamAvailability from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition @@ -64,12 +63,6 @@ def cursor_field(self) -> Optional[str]: :return: The name of the field used as a cursor. Nested cursor fields are not supported. """ - @abstractmethod - def check_availability(self) -> StreamAvailability: - """ - :return: The stream's availability - """ - @abstractmethod def get_json_schema(self) -> Mapping[str, Any]: """ diff --git a/airbyte_cdk/sources/streams/concurrent/adapters.py b/airbyte_cdk/sources/streams/concurrent/adapters.py index 7da594155..949f0545b 100644 --- a/airbyte_cdk/sources/streams/concurrent/adapters.py +++ b/airbyte_cdk/sources/streams/concurrent/adapters.py @@ -24,12 +24,7 @@ from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.source import ExperimentalClassWarning from airbyte_cdk.sources.streams import Stream -from airbyte_cdk.sources.streams.availability_strategy import AvailabilityStrategy from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AbstractAvailabilityStrategy, - AlwaysAvailableAvailabilityStrategy, -) from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage @@ -101,7 +96,6 @@ def create_from_stream( name=stream.name, namespace=stream.namespace, json_schema=stream.get_json_schema(), - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=pk, cursor_field=cursor_field, logger=logger, @@ -210,18 +204,6 @@ def get_json_schema(self) -> Mapping[str, Any]: def supports_incremental(self) -> bool: return self._legacy_stream.supports_incremental - def check_availability( - self, logger: logging.Logger, source: Optional["Source"] = None - ) -> Tuple[bool, Optional[str]]: - """ - Verifies the stream is available. Delegates to the underlying AbstractStream and ignores the parameters - :param logger: (ignored) - :param source: (ignored) - :return: - """ - availability = self._abstract_stream.check_availability() - return availability.is_available(), availability.message() - def as_airbyte_stream(self) -> AirbyteStream: return self._abstract_stream.as_airbyte_stream() @@ -370,28 +352,3 @@ def generate(self) -> Iterable[Partition]: self._cursor_field, self._state, ) - - -@deprecated( - "Availability strategy has been soft deprecated. Do not use. Class is subject to removal", - category=ExperimentalClassWarning, -) -class AvailabilityStrategyFacade(AvailabilityStrategy): - def __init__(self, abstract_availability_strategy: AbstractAvailabilityStrategy): - self._abstract_availability_strategy = abstract_availability_strategy - - def check_availability( - self, stream: Stream, logger: logging.Logger, source: Optional["Source"] = None - ) -> Tuple[bool, Optional[str]]: - """ - Checks stream availability. - - Important to note that the stream and source parameters are not used by the underlying AbstractAvailabilityStrategy. - - :param stream: (unused) - :param logger: logger object to use - :param source: (unused) - :return: A tuple of (boolean, str). If boolean is true, then the stream - """ - stream_availability = self._abstract_availability_strategy.check_availability(logger) - return stream_availability.is_available(), stream_availability.message() diff --git a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py b/airbyte_cdk/sources/streams/concurrent/availability_strategy.py deleted file mode 100644 index 118a7d0bb..000000000 --- a/airbyte_cdk/sources/streams/concurrent/availability_strategy.py +++ /dev/null @@ -1,94 +0,0 @@ -# -# Copyright (c) 2023 Airbyte, Inc., all rights reserved. -# - -import logging -from abc import ABC, abstractmethod -from typing import Optional - -from typing_extensions import deprecated - -from airbyte_cdk.sources.source import ExperimentalClassWarning - - -class StreamAvailability(ABC): - @abstractmethod - def is_available(self) -> bool: - """ - :return: True if the stream is available. False if the stream is not - """ - - @abstractmethod - def message(self) -> Optional[str]: - """ - :return: A message describing why the stream is not available. If the stream is available, this should return None. - """ - - -class StreamAvailable(StreamAvailability): - def is_available(self) -> bool: - return True - - def message(self) -> Optional[str]: - return None - - -class StreamUnavailable(StreamAvailability): - def __init__(self, message: str): - self._message = message - - def is_available(self) -> bool: - return False - - def message(self) -> Optional[str]: - return self._message - - -# Singleton instances of StreamAvailability to avoid the overhead of creating new dummy objects -STREAM_AVAILABLE = StreamAvailable() - - -@deprecated( - "This class is experimental. Use at your own risk.", - category=ExperimentalClassWarning, -) -class AbstractAvailabilityStrategy(ABC): - """ - AbstractAvailabilityStrategy is an experimental interface developed as part of the Concurrent CDK. - This interface is not yet stable and may change in the future. Use at your own risk. - - Why create a new interface instead of using the existing AvailabilityStrategy? - The existing AvailabilityStrategy is tightly coupled with Stream and Source, which yields to circular dependencies and makes it difficult to move away from the Stream interface to AbstractStream. - """ - - @abstractmethod - def check_availability(self, logger: logging.Logger) -> StreamAvailability: - """ - Checks stream availability. - - :param logger: logger object to use - :return: A StreamAvailability object describing the stream's availability - """ - - -@deprecated( - "This class is experimental. Use at your own risk.", - category=ExperimentalClassWarning, -) -class AlwaysAvailableAvailabilityStrategy(AbstractAvailabilityStrategy): - """ - An availability strategy that always indicates a stream is available. - - This strategy is used to avoid breaking changes and serves as a soft - deprecation of the availability strategy, allowing a smoother transition - without disrupting existing functionality. - """ - - def check_availability(self, logger: logging.Logger) -> StreamAvailability: - """ - Checks stream availability. - - :param logger: logger object to use - :return: A StreamAvailability object describing the stream's availability - """ - return StreamAvailable() diff --git a/airbyte_cdk/sources/streams/concurrent/default_stream.py b/airbyte_cdk/sources/streams/concurrent/default_stream.py index 54600d635..70ddd7d16 100644 --- a/airbyte_cdk/sources/streams/concurrent/default_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/default_stream.py @@ -8,10 +8,6 @@ from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AbstractAvailabilityStrategy, - StreamAvailability, -) from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator @@ -23,7 +19,6 @@ def __init__( partition_generator: PartitionGenerator, name: str, json_schema: Mapping[str, Any], - availability_strategy: AbstractAvailabilityStrategy, primary_key: List[str], cursor_field: Optional[str], logger: Logger, @@ -34,7 +29,6 @@ def __init__( self._stream_partition_generator = partition_generator self._name = name self._json_schema = json_schema - self._availability_strategy = availability_strategy self._primary_key = primary_key self._cursor_field = cursor_field self._logger = logger @@ -53,9 +47,6 @@ def name(self) -> str: def namespace(self) -> Optional[str]: return self._namespace - def check_availability(self) -> StreamAvailability: - return self._availability_strategy.check_availability(self._logger) - @property def cursor_field(self) -> Optional[str]: return self._cursor_field diff --git a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py index 185c5dceb..7db65b53d 100644 --- a/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py +++ b/unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_scenarios.py @@ -5,9 +5,6 @@ import logging from airbyte_cdk.sources.message import InMemoryMessageRepository -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - AlwaysAvailableAvailabilityStrategy, -) from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream from airbyte_cdk.sources.types import Record @@ -48,7 +45,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -84,7 +80,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -120,7 +115,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=["id"], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -171,7 +165,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -222,7 +215,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -255,7 +247,6 @@ "id": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), @@ -397,7 +388,6 @@ "key": {"type": ["null", "string"]}, }, }, - availability_strategy=AlwaysAvailableAvailabilityStrategy(), primary_key=[], cursor_field=None, logger=logging.getLogger("test_logger"), diff --git a/unit_tests/sources/streams/concurrent/test_adapters.py b/unit_tests/sources/streams/concurrent/test_adapters.py index 66f48a9e0..68efbc941 100644 --- a/unit_tests/sources/streams/concurrent/test_adapters.py +++ b/unit_tests/sources/streams/concurrent/test_adapters.py @@ -18,16 +18,10 @@ from airbyte_cdk.models import Type as MessageType from airbyte_cdk.sources.message import InMemoryMessageRepository from airbyte_cdk.sources.streams.concurrent.adapters import ( - AvailabilityStrategyFacade, StreamFacade, StreamPartition, StreamPartitionGenerator, ) -from airbyte_cdk.sources.streams.concurrent.availability_strategy import ( - STREAM_AVAILABLE, - StreamAvailable, - StreamUnavailable, -) from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.streams.concurrent.exceptions import ExceptionWithDisplayMessage from airbyte_cdk.sources.streams.core import Stream @@ -42,28 +36,6 @@ _ANY_CURSOR = Mock(spec=Cursor) -@pytest.mark.parametrize( - "stream_availability, expected_available, expected_message", - [ - pytest.param(StreamAvailable(), True, None, id="test_stream_is_available"), - pytest.param(STREAM_AVAILABLE, True, None, id="test_stream_is_available_using_singleton"), - pytest.param(StreamUnavailable("message"), False, "message", id="test_stream_is_available"), - ], -) -def test_availability_strategy_facade(stream_availability, expected_available, expected_message): - strategy = Mock() - strategy.check_availability.return_value = stream_availability - facade = AvailabilityStrategyFacade(strategy) - - logger = Mock() - available, message = facade.check_availability(Mock(), logger, Mock()) - - assert available == expected_available - assert message == expected_message - - strategy.check_availability.assert_called_once_with(logger) - - @pytest.mark.parametrize( "sync_mode", [ @@ -319,15 +291,6 @@ def test_given_cursor_is_not_noop_when_supports_incremental_then_return_true(sel Mock(spec=logging.Logger), ).supports_incremental - def test_check_availability_is_delegated_to_wrapped_stream(self): - availability = StreamAvailable() - self._abstract_stream.check_availability.return_value = availability - assert self._facade.check_availability(Mock(), Mock()) == ( - availability.is_available(), - availability.message(), - ) - self._abstract_stream.check_availability.assert_called_once_with() - def test_full_refresh(self): expected_stream_data = [{"data": 1}, {"data": 2}] records = [Record(data, "stream") for data in expected_stream_data] diff --git a/unit_tests/sources/streams/concurrent/test_default_stream.py b/unit_tests/sources/streams/concurrent/test_default_stream.py index 2c9afe4da..7cfc3ac05 100644 --- a/unit_tests/sources/streams/concurrent/test_default_stream.py +++ b/unit_tests/sources/streams/concurrent/test_default_stream.py @@ -6,7 +6,6 @@ from airbyte_cdk.models import AirbyteStream, SyncMode from airbyte_cdk.sources.message import InMemoryMessageRepository -from airbyte_cdk.sources.streams.concurrent.availability_strategy import STREAM_AVAILABLE from airbyte_cdk.sources.streams.concurrent.cursor import Cursor, FinalStateCursor from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream @@ -16,7 +15,6 @@ def setUp(self): self._partition_generator = Mock() self._name = "name" self._json_schema = {} - self._availability_strategy = Mock() self._primary_key = [] self._cursor_field = None self._logger = Mock() @@ -26,7 +24,6 @@ def setUp(self): self._partition_generator, self._name, self._json_schema, - self._availability_strategy, self._primary_key, self._cursor_field, self._logger, @@ -41,12 +38,6 @@ def test_get_json_schema(self): json_schema = self._stream.get_json_schema() assert json_schema == self._json_schema - def test_check_availability(self): - self._availability_strategy.check_availability.return_value = STREAM_AVAILABLE - availability = self._stream.check_availability() - assert availability == STREAM_AVAILABLE - self._availability_strategy.check_availability.assert_called_once_with(self._logger) - def test_check_for_error_raises_an_exception_if_any_of_the_futures_are_not_done(self): futures = [Mock() for _ in range(3)] for f in futures: @@ -93,7 +84,6 @@ def test_as_airbyte_stream_with_primary_key(self): self._partition_generator, self._name, json_schema, - self._availability_strategy, ["composite_key_1", "composite_key_2"], self._cursor_field, self._logger, @@ -131,7 +121,6 @@ def test_as_airbyte_stream_with_composite_primary_key(self): self._partition_generator, self._name, json_schema, - self._availability_strategy, ["id_a", "id_b"], self._cursor_field, self._logger, @@ -169,7 +158,6 @@ def test_as_airbyte_stream_with_a_cursor(self): self._partition_generator, self._name, json_schema, - self._availability_strategy, self._primary_key, "date", self._logger, @@ -200,7 +188,6 @@ def test_as_airbyte_stream_with_namespace(self): self._partition_generator, self._name, self._json_schema, - self._availability_strategy, self._primary_key, self._cursor_field, self._logger, @@ -231,7 +218,6 @@ def test_as_airbyte_stream_with_file_transfer_support(self): self._partition_generator, self._name, self._json_schema, - self._availability_strategy, self._primary_key, self._cursor_field, self._logger,