diff --git a/airbyte_cdk/__init__.py b/airbyte_cdk/__init__.py index 5268a9654..9cb83008e 100644 --- a/airbyte_cdk/__init__.py +++ b/airbyte_cdk/__init__.py @@ -65,6 +65,8 @@ from .connector import BaseConnector, Connector from .destinations import Destination from .entrypoint import AirbyteEntrypoint, launch +from .legacy.sources.declarative.declarative_stream import DeclarativeStream +from .legacy.sources.declarative.incremental import DatetimeBasedCursor from .logger import AirbyteLogFormatter, init_logger from .models import ( AdvancedAuth, @@ -99,13 +101,11 @@ BearerAuthenticator, ) from .sources.declarative.datetime.min_max_datetime import MinMaxDatetime -from .sources.declarative.declarative_stream import DeclarativeStream from .sources.declarative.decoders import Decoder, JsonDecoder from .sources.declarative.exceptions import ReadException from .sources.declarative.extractors import DpathExtractor, RecordSelector from .sources.declarative.extractors.record_extractor import RecordExtractor from .sources.declarative.extractors.record_filter import RecordFilter -from .sources.declarative.incremental import DatetimeBasedCursor from .sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString from .sources.declarative.migrations.legacy_to_per_partition_state_migration import ( LegacyToPerPartitionStateMigration, diff --git a/airbyte_cdk/sources/declarative/declarative_stream.py b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py similarity index 99% rename from airbyte_cdk/sources/declarative/declarative_stream.py rename to airbyte_cdk/legacy/sources/declarative/declarative_stream.py index 0ae117459..89935fda8 100644 --- a/airbyte_cdk/sources/declarative/declarative_stream.py +++ b/airbyte_cdk/legacy/sources/declarative/declarative_stream.py @@ -5,12 +5,12 @@ from dataclasses import InitVar, dataclass, field from typing import Any, Iterable, List, Mapping, MutableMapping, Optional, Union -from airbyte_cdk.models import SyncMode -from airbyte_cdk.sources.declarative.incremental import ( +from airbyte_cdk.legacy.sources.declarative.incremental import ( GlobalSubstreamCursor, PerPartitionCursor, PerPartitionWithGlobalCursor, ) +from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration from airbyte_cdk.sources.declarative.retrievers import SimpleRetriever diff --git a/airbyte_cdk/legacy/sources/declarative/incremental/__init__.py b/airbyte_cdk/legacy/sources/declarative/incremental/__init__.py index 58b636bf9..1226441f7 100644 --- a/airbyte_cdk/legacy/sources/declarative/incremental/__init__.py +++ b/airbyte_cdk/legacy/sources/declarative/incremental/__init__.py @@ -1 +1,31 @@ # Copyright (c) 2025 Airbyte, Inc., all rights reserved. + +from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import ( + DatetimeBasedCursor, +) +from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor +from airbyte_cdk.legacy.sources.declarative.incremental.global_substream_cursor import ( + GlobalSubstreamCursor, +) +from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( + CursorFactory, + PerPartitionCursor, +) +from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_with_global import ( + PerPartitionWithGlobalCursor, +) +from airbyte_cdk.legacy.sources.declarative.incremental.resumable_full_refresh_cursor import ( + ChildPartitionResumableFullRefreshCursor, + ResumableFullRefreshCursor, +) + +__all__ = [ + "CursorFactory", + "DatetimeBasedCursor", + "DeclarativeCursor", + "GlobalSubstreamCursor", + "PerPartitionCursor", + "PerPartitionWithGlobalCursor", + "ResumableFullRefreshCursor", + "ChildPartitionResumableFullRefreshCursor", +] diff --git a/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py b/airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py similarity index 99% rename from airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py rename to airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py index 4eadf68e1..616a13d8c 100644 --- a/airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py +++ b/airbyte_cdk/legacy/sources/declarative/incremental/datetime_based_cursor.py @@ -9,10 +9,10 @@ from isodate import Duration, duration_isoformat, parse_duration +from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, Type from airbyte_cdk.sources.declarative.datetime.datetime_parser import DatetimeParser from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation from airbyte_cdk.sources.declarative.requesters.request_option import ( diff --git a/airbyte_cdk/sources/declarative/incremental/declarative_cursor.py b/airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py similarity index 100% rename from airbyte_cdk/sources/declarative/incremental/declarative_cursor.py rename to airbyte_cdk/legacy/sources/declarative/incremental/declarative_cursor.py diff --git a/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py b/airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py similarity index 98% rename from airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py rename to airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py index 21733f94d..9a7c40478 100644 --- a/airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py +++ b/airbyte_cdk/legacy/sources/declarative/incremental/global_substream_cursor.py @@ -6,8 +6,10 @@ import time from typing import Any, Callable, Iterable, Mapping, Optional, TypeVar, Union -from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor +from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import ( + DatetimeBasedCursor, +) +from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.types import Record, StreamSlice, StreamState diff --git a/airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py b/airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py index 54060bd94..23746e808 100644 --- a/airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py +++ b/airbyte_cdk/legacy/sources/declarative/incremental/per_partition_cursor.py @@ -6,7 +6,7 @@ from collections import OrderedDict from typing import Any, Callable, Iterable, Mapping, Optional, Union -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor +from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import ( PerPartitionKeySerializer, diff --git a/airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py b/airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py similarity index 96% rename from airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py rename to airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py index 9779b35cc..ae52a94ca 100644 --- a/airbyte_cdk/sources/declarative/incremental/per_partition_with_global.py +++ b/airbyte_cdk/legacy/sources/declarative/incremental/per_partition_with_global.py @@ -3,16 +3,18 @@ # from typing import Any, Iterable, Mapping, MutableMapping, Optional, Union -from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( - CursorFactory, - PerPartitionCursor, +from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import ( + DatetimeBasedCursor, ) -from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor -from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( +from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor +from airbyte_cdk.legacy.sources.declarative.incremental.global_substream_cursor import ( GlobalSubstreamCursor, iterate_with_last_flag_and_state, ) +from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( + CursorFactory, + PerPartitionCursor, +) from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.types import Record, StreamSlice, StreamState diff --git a/airbyte_cdk/sources/declarative/incremental/resumable_full_refresh_cursor.py b/airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py similarity index 98% rename from airbyte_cdk/sources/declarative/incremental/resumable_full_refresh_cursor.py rename to airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py index 49a53e185..dbce54427 100644 --- a/airbyte_cdk/sources/declarative/incremental/resumable_full_refresh_cursor.py +++ b/airbyte_cdk/legacy/sources/declarative/incremental/resumable_full_refresh_cursor.py @@ -3,7 +3,7 @@ from dataclasses import InitVar, dataclass from typing import Any, Iterable, Mapping, Optional -from airbyte_cdk.sources.declarative.incremental import DeclarativeCursor +from airbyte_cdk.legacy.sources.declarative.incremental import DeclarativeCursor from airbyte_cdk.sources.declarative.types import Record, StreamSlice, StreamState from airbyte_cdk.sources.streams.checkpoint.checkpoint_reader import FULL_REFRESH_COMPLETE_STATE diff --git a/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py b/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py index 876750e4a..66002f9f9 100644 --- a/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_dynamic_stream.py @@ -6,11 +6,11 @@ from dataclasses import InitVar, dataclass from typing import Any, List, Mapping, Tuple, Union -from airbyte_cdk.sources.abstract_source import AbstractSource +from airbyte_cdk.sources import Source from airbyte_cdk.sources.declarative.checks.check_stream import evaluate_availability from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker +from airbyte_cdk.sources.streams import Stream from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream -from airbyte_cdk.sources.streams.http.availability_strategy import HttpAvailabilityStrategy @dataclass @@ -33,7 +33,10 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None: self._parameters = parameters def check_connection( - self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] + self, + source: Source, + logger: logging.Logger, + config: Mapping[str, Any], ) -> Tuple[bool, Any]: streams: List[Union[Stream, AbstractStream]] = source.streams(config=config) # type: ignore # this is a migration step and we expect the declarative CDK to migrate off of ConnectionChecker diff --git a/airbyte_cdk/sources/declarative/checks/check_stream.py b/airbyte_cdk/sources/declarative/checks/check_stream.py index 73940d382..aa3298b53 100644 --- a/airbyte_cdk/sources/declarative/checks/check_stream.py +++ b/airbyte_cdk/sources/declarative/checks/check_stream.py @@ -7,7 +7,7 @@ from dataclasses import InitVar, dataclass from typing import Any, Dict, List, Mapping, Optional, Tuple, Union -from airbyte_cdk.sources.abstract_source import AbstractSource +from airbyte_cdk.sources import Source from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream from airbyte_cdk.sources.streams.core import Stream @@ -64,7 +64,10 @@ def _log_error(self, logger: logging.Logger, action: str, error: Exception) -> T return False, error_message def check_connection( - self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] + self, + source: Source, + logger: logging.Logger, + config: Mapping[str, Any], ) -> Tuple[bool, Any]: """Checks the connection to the source and its streams.""" try: @@ -118,7 +121,7 @@ def _check_stream_availability( def _check_dynamic_streams_availability( self, - source: AbstractSource, + source: Source, stream_name_to_stream: Dict[str, Union[Stream, AbstractStream]], logger: logging.Logger, ) -> Tuple[bool, Any]: diff --git a/airbyte_cdk/sources/declarative/checks/connection_checker.py b/airbyte_cdk/sources/declarative/checks/connection_checker.py index fd1d1bba2..76c4c26c9 100644 --- a/airbyte_cdk/sources/declarative/checks/connection_checker.py +++ b/airbyte_cdk/sources/declarative/checks/connection_checker.py @@ -6,7 +6,7 @@ from abc import ABC, abstractmethod from typing import Any, Mapping, Tuple -from airbyte_cdk import AbstractSource +from airbyte_cdk.sources import Source class ConnectionChecker(ABC): @@ -16,7 +16,10 @@ class ConnectionChecker(ABC): @abstractmethod def check_connection( - self, source: AbstractSource, logger: logging.Logger, config: Mapping[str, Any] + self, + source: Source, + logger: logging.Logger, + config: Mapping[str, Any], ) -> Tuple[bool, Any]: """ Tests if the input configuration can be used to successfully connect to the integration e.g: if a provided Stripe API token can be used to connect diff --git a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py index 87a9cc56d..56a59e072 100644 --- a/airbyte_cdk/sources/declarative/concurrent_declarative_source.py +++ b/airbyte_cdk/sources/declarative/concurrent_declarative_source.py @@ -11,15 +11,11 @@ Any, ClassVar, Dict, - Generic, Iterator, List, Mapping, - MutableMapping, Optional, Set, - Tuple, - Union, ) import orjson @@ -43,37 +39,22 @@ ConfiguredAirbyteCatalog, ConnectorSpecification, FailureType, + Status, ) from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer -from airbyte_cdk.sources.abstract_source import AbstractSource +from airbyte_cdk.sources import Source from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream -from airbyte_cdk.sources.declarative.incremental import ( - ConcurrentPerPartitionCursor, - GlobalSubstreamCursor, -) -from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor -from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( - PerPartitionWithGlobalCursor, -) from airbyte_cdk.sources.declarative.interpolation import InterpolatedBoolean -from airbyte_cdk.sources.declarative.models import FileUploader from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( ConcurrencyLevel as ConcurrencyLevelModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - DatetimeBasedCursor as DatetimeBasedCursorModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( DeclarativeStream as DeclarativeStreamModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - IncrementingCountCursor as IncrementingCountCursorModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( Spec as SpecModel, ) @@ -95,24 +76,12 @@ from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) -from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING -from airbyte_cdk.sources.declarative.retrievers import AsyncRetriever, Retriever, SimpleRetriever from airbyte_cdk.sources.declarative.spec.spec import Spec -from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import ( - DeclarativePartitionFactory, - StreamSlicerPartitionGenerator, -) from airbyte_cdk.sources.declarative.types import Config, ConnectionDefinition from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository -from airbyte_cdk.sources.message.repository import InMemoryMessageRepository, MessageRepository -from airbyte_cdk.sources.source import TState -from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream -from airbyte_cdk.sources.streams.concurrent.abstract_stream_facade import AbstractStreamFacade -from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, FinalStateCursor -from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream -from airbyte_cdk.sources.streams.concurrent.helpers import get_primary_key_from_stream from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem from airbyte_cdk.sources.utils.slice_logger import ( AlwaysLogSliceLogger, @@ -155,14 +124,7 @@ def _get_declarative_component_schema() -> Dict[str, Any]: ) -# todo: AbstractSource can be removed once we've completely moved off all legacy synchronous CDK code paths -# and replaced with implementing the source.py:Source class -# -# todo: The `ConcurrentDeclarativeSource.message_repository()` method can also be removed once AbstractSource -# is no longer inherited from since the only external dependency is from that class. -# -# todo: It is worth investigating removal of the Generic[TState] since it will always be Optional[List[AirbyteStateMessage]] -class ConcurrentDeclarativeSource(AbstractSource): +class ConcurrentDeclarativeSource(Source): # By default, we defer to a value of 2. A value lower than could cause a PartitionEnqueuer to be stuck in a state of deadlock # because it has hit the limit of futures but not partition reader is consuming them. _LOWEST_SAFE_CONCURRENCY_LEVEL = 2 @@ -396,17 +358,6 @@ def resolved_manifest(self) -> Mapping[str, Any]: """ return self._source_config - # TODO: Deprecate this class once ConcurrentDeclarativeSource no longer inherits AbstractSource - @property - def message_repository(self) -> MessageRepository: - return self._message_repository - - # TODO: Remove this. This property is necessary to safely migrate Stripe during the transition state. - @property - def is_partially_declarative(self) -> bool: - """This flag used to avoid unexpected AbstractStreamFacade processing as concurrent streams.""" - return False - def deprecation_warnings(self) -> List[ConnectorBuilderLogMessage]: return self._constructor.get_model_deprecations() @@ -417,48 +368,23 @@ def read( catalog: ConfiguredAirbyteCatalog, state: Optional[List[AirbyteStateMessage]] = None, ) -> Iterator[AirbyteMessage]: - concurrent_streams, _ = self._group_streams(config=config) - - # ConcurrentReadProcessor pops streams that are finished being read so before syncing, the names of - # the concurrent streams must be saved so that they can be removed from the catalog before starting - # synchronous streams - if len(concurrent_streams) > 0: - concurrent_stream_names = set( - [concurrent_stream.name for concurrent_stream in concurrent_streams] - ) - - selected_concurrent_streams = self._select_streams( - streams=concurrent_streams, configured_catalog=catalog - ) - # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. - # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now - if selected_concurrent_streams: - yield from self._concurrent_source.read(selected_concurrent_streams) - - # Sync all streams that are not concurrent compatible. We filter out concurrent streams because the - # existing AbstractSource.read() implementation iterates over the catalog when syncing streams. Many - # of which were already synced using the Concurrent CDK - filtered_catalog = self._remove_concurrent_streams_from_catalog( - catalog=catalog, concurrent_stream_names=concurrent_stream_names - ) - else: - filtered_catalog = catalog - - # It is no need run read for synchronous streams if they are not exists. - if not filtered_catalog.streams: - return + selected_concurrent_streams = self._select_streams( + streams=self.streams(config=self._config), # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface + configured_catalog=catalog, + ) - yield from super().read(logger, config, filtered_catalog, state) + # It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor. + # This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now + if len(selected_concurrent_streams) > 0: + yield from self._concurrent_source.read(selected_concurrent_streams) def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog: - concurrent_streams, synchronous_streams = self._group_streams(config=config) return AirbyteCatalog( - streams=[ - stream.as_airbyte_stream() for stream in concurrent_streams + synchronous_streams - ] + streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)] ) - def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStream]]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder + # todo: add PR comment about whether we can change the signature to List[AbstractStream] + def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]: # type: ignore # we are migrating away from the AbstractSource and are expecting that this will only be called by ConcurrentDeclarativeSource or the Connector Builder """ The `streams` method is used as part of the AbstractSource in the following cases: * ConcurrentDeclarativeSource.check -> ManifestDeclarativeSource.check -> AbstractSource.check -> DeclarativeSource.check_connection -> CheckStream.check_connection -> streams @@ -469,15 +395,13 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea """ if self._spec_component: - self._spec_component.validate_config(config) + self._spec_component.validate_config(self._config) - stream_configs = ( - self._stream_configs(self._source_config, config=config) + self.dynamic_streams - ) + stream_configs = self._stream_configs(self._source_config) + self.dynamic_streams api_budget_model = self._source_config.get("api_budget") if api_budget_model: - self._constructor.set_api_budget(api_budget_model, config) + self._constructor.set_api_budget(api_budget_model, self._config) source_streams = [ self._constructor.create_component( @@ -487,7 +411,7 @@ def streams(self, config: Mapping[str, Any]) -> List[Union[Stream, AbstractStrea else DeclarativeStreamModel ), stream_config, - config, + self._config, emit_connector_builder_messages=self._emit_connector_builder_messages, ) for stream_config in self._initialize_cache_for_parent_streams(deepcopy(stream_configs)) @@ -559,315 +483,36 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification: ) def check(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteConnectionStatus: - return super().check(logger, config) + check = self._source_config.get("check") + if not check: + raise ValueError(f"Missing 'check' component definition within the manifest.") - def check_connection( - self, logger: logging.Logger, config: Mapping[str, Any] - ) -> Tuple[bool, Any]: - """ - :param logger: The source logger - :param config: The user-provided configuration as specified by the source's spec. - This usually contains information required to check connection e.g. tokens, secrets and keys etc. - :return: A tuple of (boolean, error). If boolean is true, then the connection check is successful - and we can connect to the underlying data source using the provided configuration. - Otherwise, the input config cannot be used to connect to the underlying data source, - and the "error" object should describe what went wrong. - The error object will be cast to string to display the problem to the user. - """ - return self.connection_checker.check_connection(self, logger, config) - - @property - def connection_checker(self) -> ConnectionChecker: - check = self._source_config["check"] if "type" not in check: check["type"] = "CheckStream" - check_stream = self._constructor.create_component( + connection_checker = self._constructor.create_component( COMPONENTS_CHECKER_TYPE_MAPPING[check["type"]], check, dict(), emit_connector_builder_messages=self._emit_connector_builder_messages, ) - if isinstance(check_stream, ConnectionChecker): - return check_stream - else: + if not isinstance(connection_checker, ConnectionChecker): raise ValueError( - f"Expected to generate a ConnectionChecker component, but received {check_stream.__class__}" + f"Expected to generate a ConnectionChecker component, but received {connection_checker.__class__}" ) + check_succeeded, error = connection_checker.check_connection(self, logger, self._config) + if not check_succeeded: + return AirbyteConnectionStatus(status=Status.FAILED, message=repr(error)) + return AirbyteConnectionStatus(status=Status.SUCCEEDED) + @property def dynamic_streams(self) -> List[Dict[str, Any]]: return self._dynamic_stream_configs( manifest=self._source_config, - config=self._config, with_dynamic_stream_name=True, ) - def _group_streams( - self, config: Mapping[str, Any] - ) -> Tuple[List[AbstractStream], List[Stream]]: - concurrent_streams: List[AbstractStream] = [] - synchronous_streams: List[Stream] = [] - - # Combine streams and dynamic_streams. Note: both cannot be empty at the same time, - # and this is validated during the initialization of the source. - streams = self._stream_configs(self._source_config, config) + self._dynamic_stream_configs( - self._source_config, config - ) - - name_to_stream_mapping = {stream["name"]: stream for stream in streams} - - for declarative_stream in self.streams(config=config): - # Some low-code sources use a combination of DeclarativeStream and regular Python streams. We can't inspect - # these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible, - # so we need to treat them as synchronous - - if isinstance(declarative_stream, AbstractStream): - concurrent_streams.append(declarative_stream) - continue - - supports_file_transfer = ( - isinstance(declarative_stream, DeclarativeStream) - and "file_uploader" in name_to_stream_mapping[declarative_stream.name] - ) - - if ( - isinstance(declarative_stream, DeclarativeStream) - and name_to_stream_mapping[declarative_stream.name]["type"] - == "StateDelegatingStream" - ): - stream_state = self._connector_state_manager.get_stream_state( - stream_name=declarative_stream.name, namespace=declarative_stream.namespace - ) - - name_to_stream_mapping[declarative_stream.name] = ( - name_to_stream_mapping[declarative_stream.name]["incremental_stream"] - if stream_state - else name_to_stream_mapping[declarative_stream.name]["full_refresh_stream"] - ) - - if isinstance(declarative_stream, DeclarativeStream) and ( - name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] - == "SimpleRetriever" - or name_to_stream_mapping[declarative_stream.name]["retriever"]["type"] - == "AsyncRetriever" - ): - incremental_sync_component_definition = name_to_stream_mapping[ - declarative_stream.name - ].get("incremental_sync") - - partition_router_component_definition = ( - name_to_stream_mapping[declarative_stream.name] - .get("retriever", {}) - .get("partition_router") - ) - is_without_partition_router_or_cursor = not bool( - incremental_sync_component_definition - ) and not bool(partition_router_component_definition) - - is_substream_without_incremental = ( - partition_router_component_definition - and not incremental_sync_component_definition - ) - - if self._is_concurrent_cursor_incremental_without_partition_routing( - declarative_stream, incremental_sync_component_definition - ): - stream_state = self._connector_state_manager.get_stream_state( - stream_name=declarative_stream.name, namespace=declarative_stream.namespace - ) - stream_state = self._migrate_state(declarative_stream, stream_state) - - retriever = self._get_retriever(declarative_stream, stream_state) - - if isinstance(declarative_stream.retriever, AsyncRetriever) and isinstance( - declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter - ): - cursor = declarative_stream.retriever.stream_slicer.stream_slicer - - if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor): - # This should never happen since we instantiate ConcurrentCursor in - # model_to_component_factory.py - raise ValueError( - f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}" - ) - - partition_generator = StreamSlicerPartitionGenerator( - partition_factory=DeclarativePartitionFactory( - stream_name=declarative_stream.name, - schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish - retriever=retriever, - message_repository=self._message_repository, - max_records_limit=self._limits.max_records - if self._limits - else None, - ), - stream_slicer=declarative_stream.retriever.stream_slicer, - slice_limit=self._limits.max_slices - if self._limits - else None, # technically not needed because create_default_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later - ) - else: - if ( - incremental_sync_component_definition - and incremental_sync_component_definition.get("type") - == IncrementingCountCursorModel.__name__ - ): - cursor = self._constructor.create_concurrent_cursor_from_incrementing_count_cursor( - model_type=IncrementingCountCursorModel, - component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above - stream_name=declarative_stream.name, - stream_namespace=declarative_stream.namespace, - config=config or {}, - ) - else: - cursor = self._constructor.create_concurrent_cursor_from_datetime_based_cursor( - model_type=DatetimeBasedCursorModel, - component_definition=incremental_sync_component_definition, # type: ignore # Not None because of the if condition above - stream_name=declarative_stream.name, - stream_namespace=declarative_stream.namespace, - config=config or {}, - stream_state_migrations=declarative_stream.state_migrations, - ) - partition_generator = StreamSlicerPartitionGenerator( - partition_factory=DeclarativePartitionFactory( - stream_name=declarative_stream.name, - schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish - retriever=retriever, - message_repository=self._message_repository, - max_records_limit=self._limits.max_records - if self._limits - else None, - ), - stream_slicer=cursor, - slice_limit=self._limits.max_slices if self._limits else None, - ) - - concurrent_streams.append( - DefaultStream( - partition_generator=partition_generator, - name=declarative_stream.name, - json_schema=declarative_stream.get_json_schema(), - primary_key=get_primary_key_from_stream(declarative_stream.primary_key), - cursor_field=cursor.cursor_field.cursor_field_key - if hasattr(cursor, "cursor_field") - and hasattr( - cursor.cursor_field, "cursor_field_key" - ) # FIXME this will need to be updated once we do the per partition - else None, - logger=self.logger, - cursor=cursor, - supports_file_transfer=supports_file_transfer, - ) - ) - elif ( - is_substream_without_incremental or is_without_partition_router_or_cursor - ) and hasattr(declarative_stream.retriever, "stream_slicer"): - partition_generator = StreamSlicerPartitionGenerator( - DeclarativePartitionFactory( - stream_name=declarative_stream.name, - schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish - retriever=declarative_stream.retriever, - message_repository=self._message_repository, - max_records_limit=self._limits.max_records if self._limits else None, - ), - declarative_stream.retriever.stream_slicer, - slice_limit=self._limits.max_slices - if self._limits - else None, # technically not needed because create_default_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later - ) - - final_state_cursor = FinalStateCursor( - stream_name=declarative_stream.name, - stream_namespace=declarative_stream.namespace, - message_repository=self._message_repository, - ) - - concurrent_streams.append( - DefaultStream( - partition_generator=partition_generator, - name=declarative_stream.name, - json_schema=declarative_stream.get_json_schema(), - primary_key=get_primary_key_from_stream(declarative_stream.primary_key), - cursor_field=None, - logger=self.logger, - cursor=final_state_cursor, - supports_file_transfer=supports_file_transfer, - ) - ) - elif ( - incremental_sync_component_definition - and incremental_sync_component_definition.get("type", "") - == DatetimeBasedCursorModel.__name__ - and hasattr(declarative_stream.retriever, "stream_slicer") - and isinstance( - declarative_stream.retriever.stream_slicer, - (GlobalSubstreamCursor, PerPartitionWithGlobalCursor), - ) - ): - stream_state = self._connector_state_manager.get_stream_state( - stream_name=declarative_stream.name, namespace=declarative_stream.namespace - ) - stream_state = self._migrate_state(declarative_stream, stream_state) - - partition_router = declarative_stream.retriever.stream_slicer._partition_router - - perpartition_cursor = ( - self._constructor.create_concurrent_cursor_from_perpartition_cursor( - state_manager=self._connector_state_manager, - model_type=DatetimeBasedCursorModel, - component_definition=incremental_sync_component_definition, - stream_name=declarative_stream.name, - stream_namespace=declarative_stream.namespace, - config=config or {}, - stream_state=stream_state, - partition_router=partition_router, - ) - ) - - retriever = self._get_retriever(declarative_stream, stream_state) - - partition_generator = StreamSlicerPartitionGenerator( - DeclarativePartitionFactory( - stream_name=declarative_stream.name, - schema_loader=declarative_stream._schema_loader, # type: ignore # We are accessing the private property but the public one is optional and we will remove this code soonish - retriever=retriever, - message_repository=self._message_repository, - max_records_limit=self._limits.max_records if self._limits else None, - ), - perpartition_cursor, - slice_limit=self._limits.max_slices if self._limits else None, - ) - - concurrent_streams.append( - DefaultStream( - partition_generator=partition_generator, - name=declarative_stream.name, - json_schema=declarative_stream.get_json_schema(), - primary_key=get_primary_key_from_stream(declarative_stream.primary_key), - cursor_field=perpartition_cursor.cursor_field.cursor_field_key, - logger=self.logger, - cursor=perpartition_cursor, - supports_file_transfer=supports_file_transfer, - ) - ) - else: - synchronous_streams.append(declarative_stream) - # TODO: Remove this. This check is necessary to safely migrate Stripe during the transition state. - # Condition below needs to ensure that concurrent support is not lost for sources that already support - # it before migration, but now are only partially migrated to declarative implementation (e.g., Stripe). - elif ( - isinstance(declarative_stream, AbstractStreamFacade) - and self.is_partially_declarative - ): - concurrent_streams.append(declarative_stream.get_underlying_stream()) - else: - synchronous_streams.append(declarative_stream) - - return concurrent_streams, synchronous_streams - - def _stream_configs( - self, manifest: Mapping[str, Any], config: Mapping[str, Any] - ) -> List[Dict[str, Any]]: + def _stream_configs(self, manifest: Mapping[str, Any]) -> List[Dict[str, Any]]: # This has a warning flag for static, but after we finish part 4 we'll replace manifest with self._source_config stream_configs = [] for current_stream_config in manifest.get("streams", []): @@ -880,7 +525,7 @@ def _stream_configs( parameters={}, ) - if interpolated_boolean.eval(config=config): + if interpolated_boolean.eval(config=self._config): stream_configs.extend(current_stream_config.get("streams", [])) else: if "type" not in current_stream_config: @@ -891,7 +536,6 @@ def _stream_configs( def _dynamic_stream_configs( self, manifest: Mapping[str, Any], - config: Mapping[str, Any], with_dynamic_stream_name: Optional[bool] = None, ) -> List[Dict[str, Any]]: dynamic_stream_definitions: List[Dict[str, Any]] = manifest.get("dynamic_streams", []) @@ -926,14 +570,14 @@ def _dynamic_stream_configs( components_resolver = self._constructor.create_component( model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], component_definition=components_resolver_config, - config=config, + config=self._config, stream_name=dynamic_definition.get("name"), ) else: components_resolver = self._constructor.create_component( model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type], component_definition=components_resolver_config, - config=config, + config=self._config, ) stream_template_config = dynamic_definition["stream_template"] @@ -986,40 +630,6 @@ def _dynamic_stream_configs( return dynamic_stream_configs - def _is_concurrent_cursor_incremental_without_partition_routing( - self, - declarative_stream: DeclarativeStream, - incremental_sync_component_definition: Mapping[str, Any] | None, - ) -> bool: - return ( - incremental_sync_component_definition is not None - and bool(incremental_sync_component_definition) - and ( - incremental_sync_component_definition.get("type", "") - in (DatetimeBasedCursorModel.__name__, IncrementingCountCursorModel.__name__) - ) - and hasattr(declarative_stream.retriever, "stream_slicer") - and ( - isinstance(declarative_stream.retriever.stream_slicer, DatetimeBasedCursor) - # IncrementingCountCursorModel is hardcoded to be of type DatetimeBasedCursor - # add isintance check here if we want to create a Declarative IncrementingCountCursor - # or isinstance( - # declarative_stream.retriever.stream_slicer, IncrementingCountCursor - # ) - or isinstance(declarative_stream.retriever.stream_slicer, AsyncJobPartitionRouter) - ) - ) - - @staticmethod - def _get_retriever( - declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any] - ) -> Retriever: - if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever): - # We zero it out here, but since this is a cursor reference, the state is still properly - # instantiated for the other components that reference it - declarative_stream.retriever.cursor = None - return declarative_stream.retriever - @staticmethod def _select_streams( streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog @@ -1032,27 +642,3 @@ def _select_streams( abstract_streams.append(stream_instance) return abstract_streams - - @staticmethod - def _remove_concurrent_streams_from_catalog( - catalog: ConfiguredAirbyteCatalog, - concurrent_stream_names: set[str], - ) -> ConfiguredAirbyteCatalog: - return ConfiguredAirbyteCatalog( - streams=[ - stream - for stream in catalog.streams - if stream.stream.name not in concurrent_stream_names - ] - ) - - @staticmethod - def _migrate_state( - declarative_stream: DeclarativeStream, stream_state: MutableMapping[str, Any] - ) -> MutableMapping[str, Any]: - for state_migration in declarative_stream.state_migrations: - if state_migration.should_migrate(stream_state): - # The state variable is expected to be mutable but the migrate method returns an immutable mapping. - stream_state = dict(state_migration.migrate(stream_state)) - - return stream_state diff --git a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml index e7f8d0793..888989b13 100644 --- a/airbyte_cdk/sources/declarative/declarative_component_schema.yaml +++ b/airbyte_cdk/sources/declarative/declarative_component_schema.yaml @@ -593,32 +593,6 @@ definitions: $parameters: type: object additionalProperties: true - CustomIncrementalSync: - title: Custom Incremental Sync - description: Incremental component whose behavior is derived from a custom code implementation of the connector. - type: object - additionalProperties: true - required: - - type - - class_name - - cursor_field - properties: - type: - type: string - enum: [CustomIncrementalSync] - class_name: - title: Class Name - description: Fully-qualified name of the class that will be implementing the custom incremental sync. The format is `source_..`. - type: string - additionalProperties: true - examples: - - "source_railz.components.MyCustomIncrementalSync" - cursor_field: - description: The location of the value on a record that will be used as a bookmark during sync. - type: string - $parameters: - type: object - additionalProperties: true CustomPaginationStrategy: title: Custom Pagination Strategy description: Pagination strategy component whose behavior is derived from a custom code implementation of the connector. @@ -1551,7 +1525,6 @@ definitions: anyOf: - "$ref": "#/definitions/DatetimeBasedCursor" - "$ref": "#/definitions/IncrementingCountCursor" - - "$ref": "#/definitions/CustomIncrementalSync" primary_key: title: Primary Key "$ref": "#/definitions/PrimaryKey" diff --git a/airbyte_cdk/sources/declarative/incremental/__init__.py b/airbyte_cdk/sources/declarative/incremental/__init__.py index 5b9bb5143..ddefe0f8b 100644 --- a/airbyte_cdk/sources/declarative/incremental/__init__.py +++ b/airbyte_cdk/sources/declarative/incremental/__init__.py @@ -2,36 +2,12 @@ # Copyright (c) 2022 Airbyte, Inc., all rights reserved. # -from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( - CursorFactory, - PerPartitionCursor, -) from airbyte_cdk.sources.declarative.incremental.concurrent_partition_cursor import ( ConcurrentCursorFactory, ConcurrentPerPartitionCursor, ) -from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor -from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( - GlobalSubstreamCursor, -) -from airbyte_cdk.sources.declarative.incremental.per_partition_with_global import ( - PerPartitionWithGlobalCursor, -) -from airbyte_cdk.sources.declarative.incremental.resumable_full_refresh_cursor import ( - ChildPartitionResumableFullRefreshCursor, - ResumableFullRefreshCursor, -) __all__ = [ - "CursorFactory", "ConcurrentCursorFactory", "ConcurrentPerPartitionCursor", - "DatetimeBasedCursor", - "DeclarativeCursor", - "GlobalSubstreamCursor", - "PerPartitionCursor", - "PerPartitionWithGlobalCursor", - "ResumableFullRefreshCursor", - "ChildPartitionResumableFullRefreshCursor", ] diff --git a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py index 41ee09e0b..75669d5e3 100644 --- a/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py +++ b/airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py @@ -9,7 +9,7 @@ from collections import OrderedDict from copy import deepcopy from datetime import timedelta -from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional +from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, TypeVar from airbyte_cdk.models import ( AirbyteStateBlob, @@ -19,10 +19,6 @@ StreamDescriptor, ) from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager -from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( - Timer, - iterate_with_last_flag_and_state, -) from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import ( @@ -38,6 +34,63 @@ logger = logging.getLogger("airbyte") +T = TypeVar("T") + + +def iterate_with_last_flag_and_state( + generator: Iterable[T], get_stream_state_func: Callable[[], Optional[Mapping[str, StreamState]]] +) -> Iterable[tuple[T, bool, Any]]: + """ + Iterates over the given generator, yielding tuples containing the element, a flag + indicating whether it's the last element in the generator, and the result of + `get_stream_state_func` applied to the element. + + Args: + generator: The iterable to iterate over. + get_stream_state_func: A function that takes an element from the generator and + returns its state. + + Returns: + An iterator that yields tuples of the form (element, is_last, state). + """ + + iterator = iter(generator) + + try: + current = next(iterator) + state = get_stream_state_func() + except StopIteration: + return # Return an empty iterator + + for next_item in iterator: + yield current, False, state + current = next_item + state = get_stream_state_func() + + yield current, True, state + + +class Timer: + """ + A simple timer class that measures elapsed time in seconds using a high-resolution performance counter. + """ + + def __init__(self) -> None: + self._start: Optional[int] = None + + def start(self) -> None: + self._start = time.perf_counter_ns() + + def finish(self) -> int: + if self._start: + return ((time.perf_counter_ns() - self._start) / 1e9).__ceil__() + else: + raise RuntimeError("Global substream cursor timer not started") + + def is_running(self) -> bool: + return self._start is not None + + class ConcurrentCursorFactory: def __init__(self, create_function: Callable[..., ConcurrentCursor]): self._create_function = create_function diff --git a/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py b/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py index c8e93743d..022881bf5 100644 --- a/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py +++ b/airbyte_cdk/sources/declarative/migrations/legacy_to_per_partition_state_migration.py @@ -5,7 +5,6 @@ from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration from airbyte_cdk.sources.declarative.models import ( - CustomIncrementalSync, DatetimeBasedCursor, SubstreamPartitionRouter, ) @@ -36,7 +35,7 @@ class LegacyToPerPartitionStateMigration(StateMigration): def __init__( self, partition_router: SubstreamPartitionRouter, - cursor: CustomIncrementalSync | DatetimeBasedCursor, + cursor: DatetimeBasedCursor, config: Mapping[str, Any], parameters: Mapping[str, Any], ): diff --git a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py index fb004a65e..ccd2e9e8d 100644 --- a/airbyte_cdk/sources/declarative/models/declarative_component_schema.py +++ b/airbyte_cdk/sources/declarative/models/declarative_component_schema.py @@ -174,24 +174,6 @@ class Config: parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") -class CustomIncrementalSync(BaseModel): - class Config: - extra = Extra.allow - - type: Literal["CustomIncrementalSync"] - class_name: str = Field( - ..., - description="Fully-qualified name of the class that will be implementing the custom incremental sync. The format is `source_..`.", - examples=["source_railz.components.MyCustomIncrementalSync"], - title="Class Name", - ) - cursor_field: str = Field( - ..., - description="The location of the value on a record that will be used as a bookmark during sync.", - ) - parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters") - - class CustomPaginationStrategy(BaseModel): class Config: extra = Extra.allow @@ -2432,9 +2414,7 @@ class Config: description="Component used to coordinate how records are extracted across stream slices and request pages.", title="Retriever", ) - incremental_sync: Optional[ - Union[DatetimeBasedCursor, IncrementingCountCursor, CustomIncrementalSync] - ] = Field( + incremental_sync: Optional[Union[DatetimeBasedCursor, IncrementingCountCursor]] = Field( None, description="Component used to fetch data incrementally based on a time field in the data.", title="Incremental Sync", diff --git a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py index 8b1f45fcd..3ed86bf06 100644 --- a/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py +++ b/airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py @@ -19,11 +19,6 @@ "DatetimeBasedCursor.end_time_option": "RequestOption", "DatetimeBasedCursor.start_datetime": "MinMaxDatetime", "DatetimeBasedCursor.start_time_option": "RequestOption", - # CustomIncrementalSync - "CustomIncrementalSync.end_datetime": "MinMaxDatetime", - "CustomIncrementalSync.end_time_option": "RequestOption", - "CustomIncrementalSync.start_datetime": "MinMaxDatetime", - "CustomIncrementalSync.start_time_option": "RequestOption", # DeclarativeSource "DeclarativeSource.check": "CheckStream", "DeclarativeSource.spec": "Spec", diff --git a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py index 105f472de..4a73dced3 100644 --- a/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py +++ b/airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py @@ -33,6 +33,10 @@ from airbyte_cdk.connector_builder.models import ( LogMessage as ConnectorBuilderLogMessage, ) +from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.legacy.sources.declarative.incremental import ( + DatetimeBasedCursor, +) from airbyte_cdk.models import ( AirbyteStateBlob, AirbyteStateMessage, @@ -75,7 +79,6 @@ ) from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import ( Decoder, IterableDecoder, @@ -105,10 +108,6 @@ from airbyte_cdk.sources.declarative.incremental import ( ConcurrentCursorFactory, ConcurrentPerPartitionCursor, - CursorFactory, - DatetimeBasedCursor, - GlobalSubstreamCursor, - PerPartitionWithGlobalCursor, ) from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.interpolation.interpolated_mapping import InterpolatedMapping @@ -200,9 +199,6 @@ from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CustomErrorHandler as CustomErrorHandlerModel, ) -from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( - CustomIncrementalSync as CustomIncrementalSyncModel, -) from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( CustomPaginationStrategy as CustomPaginationStrategyModel, ) @@ -701,7 +697,6 @@ def _init_mappings(self) -> None: CustomBackoffStrategyModel: self.create_custom_component, CustomDecoderModel: self.create_custom_component, CustomErrorHandlerModel: self.create_custom_component, - CustomIncrementalSyncModel: self.create_custom_component, CustomRecordExtractorModel: self.create_custom_component, CustomRecordFilterModel: self.create_custom_component, CustomRequesterModel: self.create_custom_component, @@ -1977,7 +1972,7 @@ def create_datetime_based_cursor( def create_default_stream( self, model: DeclarativeStreamModel, config: Config, is_parent: bool = False, **kwargs: Any - ) -> Union[DeclarativeStream, AbstractStream]: + ) -> AbstractStream: primary_key = model.primary_key.__root__ if model.primary_key else None partition_router = self._build_stream_slicer_from_partition_router( @@ -2618,6 +2613,8 @@ def create_gzip_decoder( fallback_parser=gzip_parser.inner_parser, ) + # todo: This method should be removed once we deprecate the SimpleRetriever.cursor field and the various + # state methods @staticmethod def create_incrementing_count_cursor( model: IncrementingCountCursorModel, config: Config, **kwargs: Any @@ -3148,9 +3145,7 @@ def create_simple_retriever( transformations: List[RecordTransformation], file_uploader: Optional[DefaultFileUploader] = None, incremental_sync: Optional[ - Union[ - IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel - ] + Union[IncrementingCountCursorModel, DatetimeBasedCursorModel] ] = None, use_cache: Optional[bool] = None, log_formatter: Optional[Callable[[Response], Any]] = None, @@ -3789,7 +3784,6 @@ def _instantiate_parent_stream_state_manager( incremental_sync_model: Union[ DatetimeBasedCursorModel, IncrementingCountCursorModel, - CustomIncrementalSyncModel, ] = ( model.stream.incremental_sync # type: ignore # if we are there, it is because there is incremental_dependency and therefore there is an incremental_sync on the parent stream if isinstance(model.stream, DeclarativeStreamModel) diff --git a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py index 8e77c3409..2ca38494e 100644 --- a/airbyte_cdk/sources/declarative/requesters/http_job_repository.py +++ b/airbyte_cdk/sources/declarative/requesters/http_job_repository.py @@ -1,4 +1,5 @@ -# Copyright (c) 2024 Airbyte, Inc., all rights reserved. +# Copyright (c) 2025 Airbyte, Inc., all rights reserved. + import logging import uuid from dataclasses import dataclass, field @@ -8,9 +9,8 @@ import requests from requests import Response -from airbyte_cdk import AirbyteMessage from airbyte_cdk.logger import lazy_log -from airbyte_cdk.models import FailureType, Type +from airbyte_cdk.models import AirbyteMessage, FailureType, Type from airbyte_cdk.sources.declarative.async_job.job import AsyncJob from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus diff --git a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py index ea34bffa1..ed83279de 100644 --- a/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py +++ b/airbyte_cdk/sources/declarative/retrievers/simple_retriever.py @@ -23,10 +23,10 @@ import requests from typing_extensions import deprecated +from airbyte_cdk.legacy.sources.declarative.incremental import ResumableFullRefreshCursor +from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.models import AirbyteMessage from airbyte_cdk.sources.declarative.extractors.http_selector import HttpSelector -from airbyte_cdk.sources.declarative.incremental import ResumableFullRefreshCursor -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.partition_routers.single_partition_router import ( SinglePartitionRouter, @@ -569,6 +569,12 @@ def stream_slices(self) -> Iterable[Optional[StreamSlice]]: # type: ignore """ return self.stream_slicer.stream_slices() + # todo: There are a number of things that can be cleaned up when we remove self.cursor and all the related + # SimpleRetriever state management that is handled by the concurrent CDK Framework: + # - ModelToComponentFactory.create_datetime_based_cursor() should be removed since it does need to be instantiated + # - ModelToComponentFactory.create_incrementing_count_cursor() should be removed since it's a placeholder + # - test_simple_retriever.py: Remove all imports and usages of legacy cursor components + # - test_model_to_component_factory.py:test_datetime_based_cursor() test can be removed @property def state(self) -> Mapping[str, Any]: return self.cursor.get_stream_state() if self.cursor else {} diff --git a/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py index 8fe0bbffb..45766e70e 100644 --- a/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py +++ b/airbyte_cdk/sources/declarative/transformations/keys_replace_transformation.py @@ -5,7 +5,7 @@ from dataclasses import InitVar, dataclass from typing import Any, Dict, Mapping, Optional -from airbyte_cdk import InterpolatedString +from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.transformations import RecordTransformation from airbyte_cdk.sources.types import Config, StreamSlice, StreamState diff --git a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py index 53fa9450e..667d088ab 100644 --- a/airbyte_cdk/sources/streams/concurrent/abstract_stream.py +++ b/airbyte_cdk/sources/streams/concurrent/abstract_stream.py @@ -14,10 +14,6 @@ from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition -@deprecated( - "This class is experimental. Use at your own risk.", - category=ExperimentalClassWarning, -) class AbstractStream(ABC): """ AbstractStream is an experimental interface for streams developed as part of the Concurrent CDK. diff --git a/unit_tests/connector_builder/test_connector_builder_handler.py b/unit_tests/connector_builder/test_connector_builder_handler.py index 847665301..6f0394407 100644 --- a/unit_tests/connector_builder/test_connector_builder_handler.py +++ b/unit_tests/connector_builder/test_connector_builder_handler.py @@ -7,7 +7,7 @@ import json import logging import os -from typing import List, Literal, Union +from typing import List, Literal from unittest import mock from unittest.mock import MagicMock, patch @@ -56,7 +56,6 @@ ConcurrentDeclarativeSource, TestLimits, ) -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.retrievers.simple_retriever import SimpleRetriever from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream @@ -440,12 +439,8 @@ } -def get_retriever(stream: Union[DeclarativeStream, DefaultStream]): - return ( - stream.retriever - if isinstance(stream, DeclarativeStream) - else stream._stream_partition_generator._partition_factory._retriever - ) +def get_retriever(stream: DefaultStream): + return stream._stream_partition_generator._partition_factory._retriever @pytest.fixture @@ -985,12 +980,6 @@ def create_mock_retriever(name, url_base, path): return http_stream -def create_mock_declarative_stream(http_stream): - declarative_stream = mock.Mock(spec=DeclarativeStream, autospec=True) - declarative_stream.retriever = http_stream - return declarative_stream - - @pytest.mark.parametrize( "test_name, config, expected_max_records, expected_max_slices, expected_max_pages_per_slice", [ diff --git a/unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py b/unit_tests/legacy/sources/declarative/incremental/test_datetime_based_cursor.py similarity index 99% rename from unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py rename to unit_tests/legacy/sources/declarative/incremental/test_datetime_based_cursor.py index b4f990ee7..800f10377 100644 --- a/unit_tests/sources/declarative/incremental/test_datetime_based_cursor.py +++ b/unit_tests/legacy/sources/declarative/incremental/test_datetime_based_cursor.py @@ -7,8 +7,8 @@ import pytest +from airbyte_cdk.legacy.sources.declarative.incremental import DatetimeBasedCursor from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime -from airbyte_cdk.sources.declarative.incremental import DatetimeBasedCursor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.requesters.request_option import ( RequestOption, diff --git a/unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py b/unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py index 4e6bf38fb..d602a7729 100644 --- a/unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py +++ b/unit_tests/legacy/sources/declarative/incremental/test_per_partition_cursor.py @@ -7,15 +7,15 @@ import pytest +from airbyte_cdk.legacy.sources.declarative.incremental.declarative_cursor import DeclarativeCursor +from airbyte_cdk.legacy.sources.declarative.incremental.global_substream_cursor import ( + GlobalSubstreamCursor, +) from airbyte_cdk.legacy.sources.declarative.incremental.per_partition_cursor import ( PerPartitionCursor, PerPartitionKeySerializer, StreamSlice, ) -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor -from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import ( - GlobalSubstreamCursor, -) from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter from airbyte_cdk.sources.types import Record diff --git a/unit_tests/sources/declarative/incremental/test_resumable_full_refresh_cursor.py b/unit_tests/legacy/sources/declarative/incremental/test_resumable_full_refresh_cursor.py similarity index 96% rename from unit_tests/sources/declarative/incremental/test_resumable_full_refresh_cursor.py rename to unit_tests/legacy/sources/declarative/incremental/test_resumable_full_refresh_cursor.py index 8d7bbc669..367613ccf 100644 --- a/unit_tests/sources/declarative/incremental/test_resumable_full_refresh_cursor.py +++ b/unit_tests/legacy/sources/declarative/incremental/test_resumable_full_refresh_cursor.py @@ -2,7 +2,7 @@ import pytest -from airbyte_cdk.sources.declarative.incremental import ( +from airbyte_cdk.legacy.sources.declarative.incremental import ( ChildPartitionResumableFullRefreshCursor, ResumableFullRefreshCursor, ) diff --git a/unit_tests/sources/declarative/test_declarative_stream.py b/unit_tests/legacy/sources/declarative/test_declarative_stream.py similarity index 98% rename from unit_tests/sources/declarative/test_declarative_stream.py rename to unit_tests/legacy/sources/declarative/test_declarative_stream.py index 747b73840..03b65289e 100644 --- a/unit_tests/sources/declarative/test_declarative_stream.py +++ b/unit_tests/legacy/sources/declarative/test_declarative_stream.py @@ -6,6 +6,7 @@ import pytest +from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.models import ( AirbyteLogMessage, AirbyteMessage, @@ -15,7 +16,6 @@ TraceType, Type, ) -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.types import StreamSlice SLICE_NOT_CONSIDERED_FOR_EQUALITY = {} diff --git a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py index b7d8ab8f3..87e8574bb 100644 --- a/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py +++ b/unit_tests/legacy/sources/declarative/test_manifest_declarative_source.py @@ -34,7 +34,6 @@ from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) diff --git a/unit_tests/sources/declarative/async_job/test_integration.py b/unit_tests/sources/declarative/async_job/test_integration.py index a0b6195b7..77c942ab6 100644 --- a/unit_tests/sources/declarative/async_job/test_integration.py +++ b/unit_tests/sources/declarative/async_job/test_integration.py @@ -5,14 +5,9 @@ from typing import Any, Iterable, List, Mapping, Optional, Set, Tuple from unittest import TestCase, mock -from airbyte_cdk import ( - AbstractSource, - DeclarativeStream, - SinglePartitionRouter, - Stream, - StreamSlice, -) +from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.models import ConnectorSpecification +from airbyte_cdk.sources import AbstractSource from airbyte_cdk.sources.declarative.async_job.job import AsyncJob from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker @@ -20,6 +15,7 @@ from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector +from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import ( AsyncJobPartitionRouter, ) @@ -27,6 +23,8 @@ from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer from airbyte_cdk.sources.message import NoopMessageRepository +from airbyte_cdk.sources.streams import Stream +from airbyte_cdk.sources.types import StreamSlice from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder from airbyte_cdk.test.entrypoint_wrapper import read diff --git a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py index d99f8502f..dd953ca56 100644 --- a/unit_tests/sources/declarative/async_job/test_job_orchestrator.py +++ b/unit_tests/sources/declarative/async_job/test_job_orchestrator.py @@ -10,7 +10,6 @@ import pytest -from airbyte_cdk import AirbyteTracedException, StreamSlice from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.async_job.job import AsyncJob, AsyncJobStatus from airbyte_cdk.sources.declarative.async_job.job_orchestrator import ( @@ -21,6 +20,8 @@ from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams.http.http_client import MessageRepresentationAirbyteTracedErrors +from airbyte_cdk.sources.types import StreamSlice +from airbyte_cdk.utils import AirbyteTracedException _ANY_STREAM_SLICE = Mock() _A_STREAM_SLICE = Mock() diff --git a/unit_tests/sources/declarative/checks/test_check_dynamic_stream.py b/unit_tests/sources/declarative/checks/test_check_dynamic_stream.py index b26c64bcc..6a41d945c 100644 --- a/unit_tests/sources/declarative/checks/test_check_dynamic_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_dynamic_stream.py @@ -8,6 +8,7 @@ import pytest +from airbyte_cdk.models import Status from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) @@ -109,23 +110,23 @@ [ pytest.param( 404, - False, + Status.FAILED, True, ["Not found. The requested resource was not found on the server."], id="test_stream_unavailable_unhandled_error", ), pytest.param( 403, - False, + Status.FAILED, True, ["Forbidden. You don't have permission to access this resource."], id="test_stream_unavailable_handled_error", ), - pytest.param(200, True, True, [], id="test_stream_available"), - pytest.param(200, True, False, [], id="test_stream_available"), + pytest.param(200, Status.SUCCEEDED, True, [], id="test_stream_available"), + pytest.param(200, Status.SUCCEEDED, False, [], id="test_stream_available"), pytest.param( 401, - False, + Status.FAILED, True, ["Unauthorized. Please ensure you are authenticated correctly."], id="test_stream_unauthorized_error", @@ -160,10 +161,10 @@ def test_check_dynamic_stream( state=None, ) - stream_is_available, reason = source.check_connection(logger, _CONFIG) + connection_status = source.check(logger, _CONFIG) http_mocker.assert_number_of_calls(item_request, item_request_count) - assert stream_is_available == available_expectation + assert connection_status.status == available_expectation for message in expected_messages: - assert message in reason + assert message in connection_status.message diff --git a/unit_tests/sources/declarative/checks/test_check_stream.py b/unit_tests/sources/declarative/checks/test_check_stream.py index 21f036440..317abb6c9 100644 --- a/unit_tests/sources/declarative/checks/test_check_stream.py +++ b/unit_tests/sources/declarative/checks/test_check_stream.py @@ -12,6 +12,7 @@ import requests from jsonschema.exceptions import ValidationError +from airbyte_cdk.models import Status from airbyte_cdk.sources.declarative.checks.check_stream import CheckStream from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, @@ -368,7 +369,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp [ pytest.param( {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, - True, + Status.SUCCEEDED, False, 200, [{"id": 1, "name": "static_1"}, {"id": 2, "name": "static_2"}], @@ -389,7 +390,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ], } }, - True, + Status.SUCCEEDED, False, 200, [], @@ -410,7 +411,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ], } }, - True, + Status.SUCCEEDED, False, 200, [], @@ -434,7 +435,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ], } }, - True, + Status.SUCCEEDED, False, 200, [], @@ -459,7 +460,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ], } }, - True, + Status.SUCCEEDED, False, 200, [], @@ -479,7 +480,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ], } }, - True, + Status.SUCCEEDED, False, 200, [], @@ -488,7 +489,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ), pytest.param( {"check": {"type": "CheckStream", "stream_names": ["non_existent_stream"]}}, - False, + Status.FAILED, True, 200, [], @@ -508,7 +509,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ], } }, - False, + Status.FAILED, False, 200, [], @@ -517,7 +518,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ), pytest.param( {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, - False, + Status.FAILED, False, 404, ["Not found. The requested resource was not found on the server."], @@ -526,7 +527,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ), pytest.param( {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, - False, + Status.FAILED, False, 403, ["Forbidden. You don't have permission to access this resource."], @@ -535,7 +536,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ), pytest.param( {"check": {"type": "CheckStream", "stream_names": ["static_stream"]}}, - False, + Status.FAILED, False, 401, ["Unauthorized. Please ensure you are authenticated correctly."], @@ -559,7 +560,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ], } }, - False, + Status.FAILED, False, 404, ["Not found. The requested resource was not found on the server."], @@ -583,7 +584,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ], } }, - False, + Status.FAILED, False, 403, ["Forbidden. You don't have permission to access this resource."], @@ -607,7 +608,7 @@ def parse_response(self, response: requests.Response, **kwargs) -> Iterable[Mapp ], } }, - False, + Status.FAILED, False, 401, ["Unauthorized. Please ensure you are authenticated correctly."], @@ -654,11 +655,11 @@ def test_check_stream1( ) if expectation: with pytest.raises(ValueError): - source.check_connection(logger, _CONFIG) + source.check(logger, _CONFIG) else: - stream_is_available, reason = source.check_connection(logger, _CONFIG) + connection_status = source.check(logger, _CONFIG) http_mocker.assert_number_of_calls(item_request_2, request_count) - assert stream_is_available == expected_result + assert connection_status.status == expected_result def test_check_stream_missing_fields(): @@ -690,4 +691,4 @@ def test_check_stream_only_type_provided(): state=None, ) with pytest.raises(ValueError): - source.check_connection(logger, _CONFIG) + source.check(logger, _CONFIG) diff --git a/unit_tests/sources/declarative/custom_state_migration.py b/unit_tests/sources/declarative/custom_state_migration.py index 86ca4a5c4..ac7b7e04b 100644 --- a/unit_tests/sources/declarative/custom_state_migration.py +++ b/unit_tests/sources/declarative/custom_state_migration.py @@ -4,17 +4,19 @@ from typing import Any, Mapping -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.migrations.state_migration import StateMigration +from airbyte_cdk.sources.declarative.models.declarative_component_schema import ( + DeclarativeStream as DeclarativeStreamModel, +) from airbyte_cdk.sources.types import Config class CustomStateMigration(StateMigration): - declarative_stream: DeclarativeStream + declarative_stream: DeclarativeStreamModel config: Config - def __init__(self, declarative_stream: DeclarativeStream, config: Config): + def __init__(self, declarative_stream: DeclarativeStreamModel, config: Config): self._config = config self.declarative_stream = declarative_stream self._cursor = declarative_stream.incremental_sync diff --git a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py index 2960c5802..6a9d7317a 100644 --- a/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py +++ b/unit_tests/sources/declarative/decoders/test_decoders_memory_usage.py @@ -8,12 +8,12 @@ import pytest import requests -from airbyte_cdk import YamlDeclarativeSource from airbyte_cdk.models import SyncMode from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( ModelToComponentFactory, ) +from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource @pytest.mark.slow diff --git a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py index fa216685a..05e586592 100644 --- a/unit_tests/sources/declarative/extractors/test_dpath_extractor.py +++ b/unit_tests/sources/declarative/extractors/test_dpath_extractor.py @@ -8,8 +8,7 @@ import pytest import requests -from airbyte_cdk import Decoder -from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder +from airbyte_cdk.sources.declarative.decoders import CompositeRawDecoder, Decoder from airbyte_cdk.sources.declarative.decoders.composite_raw_decoder import JsonLineParser from airbyte_cdk.sources.declarative.decoders.json_decoder import ( IterableDecoder, diff --git a/unit_tests/sources/declarative/interpolation/test_jinja.py b/unit_tests/sources/declarative/interpolation/test_jinja.py index 00040ed62..330c938b0 100644 --- a/unit_tests/sources/declarative/interpolation/test_jinja.py +++ b/unit_tests/sources/declarative/interpolation/test_jinja.py @@ -8,8 +8,8 @@ from freezegun import freeze_time from jinja2.exceptions import TemplateSyntaxError -from airbyte_cdk import StreamSlice from airbyte_cdk.sources.declarative.interpolation.jinja import JinjaInterpolation +from airbyte_cdk.sources.types import StreamSlice from airbyte_cdk.utils import AirbyteTracedException interpolation = JinjaInterpolation() diff --git a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py index 3b2eaf03b..5798132df 100644 --- a/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py +++ b/unit_tests/sources/declarative/parsers/test_model_to_component_factory.py @@ -14,7 +14,8 @@ from freezegun.api import FakeDatetime from pydantic.v1 import ValidationError -from airbyte_cdk import AirbyteTracedException +from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream +from airbyte_cdk.legacy.sources.declarative.incremental import DatetimeBasedCursor from airbyte_cdk.models import ( AirbyteStateBlob, AirbyteStateMessage, @@ -37,21 +38,13 @@ from airbyte_cdk.sources.declarative.checks import CheckStream from airbyte_cdk.sources.declarative.concurrency_level import ConcurrencyLevel from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.decoders import JsonDecoder, PaginationDecoderDecorator from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordFilter, RecordSelector from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor from airbyte_cdk.sources.declarative.extractors.record_filter import ( ClientSideIncrementalRecordFilterDecorator, ) -from airbyte_cdk.sources.declarative.incremental import ( - ConcurrentPerPartitionCursor, - CursorFactory, - DatetimeBasedCursor, - PerPartitionCursor, - PerPartitionWithGlobalCursor, - ResumableFullRefreshCursor, -) +from airbyte_cdk.sources.declarative.incremental import ConcurrentPerPartitionCursor from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import AsyncRetriever as AsyncRetrieverModel from airbyte_cdk.sources.declarative.models import CheckStream as CheckStreamModel @@ -181,6 +174,7 @@ SingleUseRefreshTokenOauth2Authenticator, ) from airbyte_cdk.sources.types import StreamSlice +from airbyte_cdk.utils import AirbyteTracedException from airbyte_cdk.utils.datetime_helpers import AirbyteDateTime, ab_datetime_now, ab_datetime_parse from unit_tests.sources.declarative.parsers.testing_components import ( TestingCustomSubstreamPartitionRouter, @@ -745,6 +739,7 @@ def test_create_substream_partition_router(): assert partition_router.parent_stream_configs[1].request_option is None +# todo: delete this class once we deprecate SimpleRetriever.cursor and SimpleRetriever.state methods def test_datetime_based_cursor(): content = """ incremental: diff --git a/unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py b/unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py index 232a77e12..5b4ba4d5f 100644 --- a/unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py +++ b/unit_tests/sources/declarative/partition_routers/test_cartesian_product_partition_router.py @@ -5,8 +5,10 @@ import pytest as pytest +from airbyte_cdk.legacy.sources.declarative.incremental.datetime_based_cursor import ( + DatetimeBasedCursor, +) from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime -from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString from airbyte_cdk.sources.declarative.partition_routers import ( CartesianProductStreamSlicer, @@ -20,6 +22,8 @@ from airbyte_cdk.sources.types import StreamSlice +# todo: All these tests rely on stream_slicers that are of a the deprecated legacy class DatetimeBasedCursor these +# should really be ConcurrentCursor, but this fix is a bit tedious and are tested in other parts of the code @pytest.mark.parametrize( "test_name, stream_slicers, expected_slices", [ diff --git a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py index ba670f507..d37766d12 100644 --- a/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py +++ b/unit_tests/sources/declarative/requesters/error_handlers/backoff_strategies/test_wait_time_from_header.py @@ -7,11 +7,11 @@ import pytest from requests import Response -from airbyte_cdk import AirbyteTracedException from airbyte_cdk.models import FailureType from airbyte_cdk.sources.declarative.requesters.error_handlers.backoff_strategies.wait_time_from_header_backoff_strategy import ( WaitTimeFromHeaderBackoffStrategy, ) +from airbyte_cdk.utils import AirbyteTracedException SOME_BACKOFF_TIME = 60 _A_RETRY_HEADER = "retry-header" diff --git a/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py b/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py index 5561f92ab..b89baf443 100644 --- a/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py +++ b/unit_tests/sources/declarative/requesters/paginators/test_stop_condition.py @@ -6,7 +6,6 @@ from pytest import fixture -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor from airbyte_cdk.sources.declarative.requesters.paginators.strategies.pagination_strategy import ( PaginationStrategy, ) @@ -15,6 +14,7 @@ PaginationStopCondition, StopConditionPaginationStrategyDecorator, ) +from airbyte_cdk.sources.streams.concurrent.cursor import Cursor from airbyte_cdk.sources.types import Record ANY_RECORD = Mock() @@ -24,7 +24,7 @@ @fixture def mocked_cursor(): - return Mock(spec=DeclarativeCursor) + return Mock(spec=Cursor) @fixture diff --git a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py index d39e84e4d..9150ed43e 100644 --- a/unit_tests/sources/declarative/retrievers/test_simple_retriever.py +++ b/unit_tests/sources/declarative/retrievers/test_simple_retriever.py @@ -10,11 +10,14 @@ import pytest import requests -from airbyte_cdk import YamlDeclarativeSource +from airbyte_cdk.legacy.sources.declarative.incremental import ( + DatetimeBasedCursor, + DeclarativeCursor, + ResumableFullRefreshCursor, +) from airbyte_cdk.models import ( AirbyteLogMessage, AirbyteMessage, - AirbyteRecordMessage, Level, SyncMode, Type, @@ -22,15 +25,6 @@ from airbyte_cdk.sources.declarative.auth.declarative_authenticator import NoAuth from airbyte_cdk.sources.declarative.decoders import JsonDecoder from airbyte_cdk.sources.declarative.extractors import DpathExtractor, RecordSelector -from airbyte_cdk.sources.declarative.incremental import ( - DatetimeBasedCursor, - DeclarativeCursor, - ResumableFullRefreshCursor, -) -from airbyte_cdk.sources.declarative.models import DeclarativeStream as DeclarativeStreamModel -from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import ( - ModelToComponentFactory, -) from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter from airbyte_cdk.sources.declarative.requesters.paginators import DefaultPaginator from airbyte_cdk.sources.declarative.requesters.paginators.strategies import ( diff --git a/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py b/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py index 811f66e5e..9cabae283 100644 --- a/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py +++ b/unit_tests/sources/declarative/stream_slicers/test_stream_slicer_read_decorator.py @@ -1,22 +1,20 @@ # # Copyright (c) 2025 Airbyte, Inc., all rights reserved. # - +from datetime import timedelta from unittest.mock import Mock +import pytest + +from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager from airbyte_cdk.sources.declarative.async_job.job_orchestrator import ( AsyncJobOrchestrator, ) from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker -from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime from airbyte_cdk.sources.declarative.incremental import ( - CursorFactory, - DatetimeBasedCursor, - GlobalSubstreamCursor, - PerPartitionWithGlobalCursor, + ConcurrentCursorFactory, + ConcurrentPerPartitionCursor, ) -from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor -from airbyte_cdk.sources.declarative.interpolation import InterpolatedString from airbyte_cdk.sources.declarative.models import ( CustomRetriever, DeclarativeStream, @@ -34,8 +32,13 @@ StreamSlicer, StreamSlicerTestReadDecorator, ) -from airbyte_cdk.sources.message import NoopMessageRepository +from airbyte_cdk.sources.message import MessageRepository, NoopMessageRepository +from airbyte_cdk.sources.streams.concurrent.cursor import ConcurrentCursor, Cursor, CursorField +from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( + CustomFormatConcurrentStreamStateConverter, +) from airbyte_cdk.sources.types import StreamSlice +from airbyte_cdk.utils.datetime_helpers import ab_datetime_parse from unit_tests.sources.declarative.async_job.test_integration import MockAsyncJobRepository CURSOR_SLICE_FIELD = "cursor slice field" @@ -57,7 +60,7 @@ def with_stream_state(self, stream_state): return self def build(self): - cursor = Mock(spec=DeclarativeCursor) + cursor = Mock(spec=Cursor) cursor.get_stream_state.return_value = self._stream_state cursor.stream_slices.return_value = self._stream_slices return cursor @@ -67,20 +70,31 @@ def mocked_partition_router(): return Mock(spec=PartitionRouter) -def date_time_based_cursor_factory() -> DatetimeBasedCursor: - return DatetimeBasedCursor( - start_datetime=MinMaxDatetime( - datetime="2021-01-01", datetime_format=DATE_FORMAT, parameters={} - ), - end_datetime=MinMaxDatetime( - datetime="2021-01-05", datetime_format=DATE_FORMAT, parameters={} - ), - step="P10Y", - cursor_field=InterpolatedString.create("created_at", parameters={}), +def mocked_message_repository() -> MessageRepository: + return Mock(spec=MessageRepository) + + +def mocked_connector_state_manager() -> ConnectorStateManager: + return Mock(spec=ConnectorStateManager) + + +def concurrent_cursor_factory() -> ConcurrentCursor: + state_converter = CustomFormatConcurrentStreamStateConverter( datetime_format=DATE_FORMAT, - cursor_granularity="P1D", - config={}, - parameters={}, + is_sequential_state=False, + ) + + return ConcurrentCursor( + stream_name="test", + stream_namespace="", + stream_state={}, + message_repository=mocked_message_repository(), + connector_state_manager=mocked_connector_state_manager(), + connector_state_converter=state_converter, + cursor_field=CursorField("created_at"), + slice_boundary_fields=None, + start=ab_datetime_parse("2021-01-01"), + end_provider=state_converter.get_end_provider(), ) @@ -102,45 +116,7 @@ def create_substream_partition_router(): ) -def test_isinstance_global_cursor(): - first_partition = {"first_partition_key": "first_partition_value"} - partition_router = mocked_partition_router() - partition_router.stream_slices.return_value = [ - StreamSlice( - partition=first_partition, cursor_slice={}, extra_fields={"extra_field": "extra_value"} - ), - ] - cursor = ( - MockedCursorBuilder() - .with_stream_slices([{CURSOR_SLICE_FIELD: "first slice cursor value"}]) - .build() - ) - - global_cursor = GlobalSubstreamCursor(cursor, partition_router) - wrapped_slicer = StreamSlicerTestReadDecorator( - wrapped_slicer=global_cursor, - maximum_number_of_slices=5, - ) - assert isinstance(wrapped_slicer, GlobalSubstreamCursor) - assert isinstance(wrapped_slicer.wrapped_slicer, GlobalSubstreamCursor) - assert isinstance(wrapped_slicer, StreamSlicerTestReadDecorator) - - assert not isinstance(wrapped_slicer.wrapped_slicer, StreamSlicerTestReadDecorator) - assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter) - assert not isinstance(wrapped_slicer.wrapped_slicer, AsyncJobPartitionRouter) - assert not isinstance(wrapped_slicer, PerPartitionWithGlobalCursor) - assert not isinstance(wrapped_slicer.wrapped_slicer, PerPartitionWithGlobalCursor) - assert not isinstance(wrapped_slicer, SubstreamPartitionRouter) - assert not isinstance(wrapped_slicer.wrapped_slicer, SubstreamPartitionRouter) - - assert isinstance(global_cursor, GlobalSubstreamCursor) - assert not isinstance(global_cursor, StreamSlicerTestReadDecorator) - assert not isinstance(global_cursor, AsyncJobPartitionRouter) - assert not isinstance(global_cursor, PerPartitionWithGlobalCursor) - assert not isinstance(global_cursor, SubstreamPartitionRouter) - - -def test_isinstance_global_cursor_aysnc_job_partition_router(): +def test_isinstance_global_cursor_async_job_partition_router(): async_job_partition_router = AsyncJobPartitionRouter( stream_slicer=SinglePartitionRouter(parameters={}), job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator( @@ -162,17 +138,14 @@ def test_isinstance_global_cursor_aysnc_job_partition_router(): assert isinstance(wrapped_slicer, StreamSlicerTestReadDecorator) assert not isinstance(wrapped_slicer.wrapped_slicer, StreamSlicerTestReadDecorator) - assert not isinstance(wrapped_slicer, GlobalSubstreamCursor) - assert not isinstance(wrapped_slicer.wrapped_slicer, GlobalSubstreamCursor) - assert not isinstance(wrapped_slicer, PerPartitionWithGlobalCursor) - assert not isinstance(wrapped_slicer.wrapped_slicer, PerPartitionWithGlobalCursor) + assert not isinstance(wrapped_slicer, ConcurrentPerPartitionCursor) + assert not isinstance(wrapped_slicer.wrapped_slicer, ConcurrentPerPartitionCursor) assert not isinstance(wrapped_slicer, SubstreamPartitionRouter) assert not isinstance(wrapped_slicer.wrapped_slicer, SubstreamPartitionRouter) assert isinstance(async_job_partition_router, AsyncJobPartitionRouter) assert not isinstance(async_job_partition_router, StreamSlicerTestReadDecorator) - assert not isinstance(async_job_partition_router, GlobalSubstreamCursor) - assert not isinstance(async_job_partition_router, PerPartitionWithGlobalCursor) + assert not isinstance(async_job_partition_router, ConcurrentPerPartitionCursor) assert not isinstance(async_job_partition_router, SubstreamPartitionRouter) @@ -189,29 +162,45 @@ def test_isinstance_substream_partition_router(): assert isinstance(wrapped_slicer, StreamSlicerTestReadDecorator) assert not isinstance(wrapped_slicer.wrapped_slicer, StreamSlicerTestReadDecorator) - assert not isinstance(wrapped_slicer, GlobalSubstreamCursor) - assert not isinstance(wrapped_slicer.wrapped_slicer, GlobalSubstreamCursor) + assert not isinstance(wrapped_slicer, ConcurrentPerPartitionCursor) + assert not isinstance(wrapped_slicer.wrapped_slicer, ConcurrentPerPartitionCursor) assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter) assert not isinstance(wrapped_slicer.wrapped_slicer, AsyncJobPartitionRouter) - assert not isinstance(wrapped_slicer, PerPartitionWithGlobalCursor) - assert not isinstance(wrapped_slicer.wrapped_slicer, PerPartitionWithGlobalCursor) assert isinstance(partition_router, SubstreamPartitionRouter) assert not isinstance(partition_router, StreamSlicerTestReadDecorator) - assert not isinstance(partition_router, GlobalSubstreamCursor) + assert not isinstance(partition_router, ConcurrentPerPartitionCursor) assert not isinstance(partition_router, AsyncJobPartitionRouter) - assert not isinstance(partition_router, PerPartitionWithGlobalCursor) -def test_isinstance_perpartition_with_global_cursor(): +@pytest.mark.parametrize( + "use_global_cursor", + [ + pytest.param(True, id="test_with_global_cursor"), + pytest.param(False, id="test_with_no_global_cursor"), + ], +) +def test_isinstance_concurrent_per_partition_cursor(use_global_cursor): partition_router = create_substream_partition_router() - date_time_based_cursor = date_time_based_cursor_factory() + cursor_factory = ConcurrentCursorFactory(concurrent_cursor_factory) + connector_state_converter = CustomFormatConcurrentStreamStateConverter( + datetime_format="%Y-%m-%dT%H:%M:%SZ", + input_datetime_formats=["%Y-%m-%dT%H:%M:%SZ"], + is_sequential_state=True, + cursor_granularity=timedelta(0), + ) - cursor_factory = CursorFactory(date_time_based_cursor_factory) - substream_cursor = PerPartitionWithGlobalCursor( + substream_cursor = ConcurrentPerPartitionCursor( cursor_factory=cursor_factory, partition_router=partition_router, - stream_cursor=date_time_based_cursor, + stream_name="test", + stream_namespace="", + stream_state={}, + message_repository=mocked_message_repository(), + connector_state_manager=mocked_connector_state_manager(), + connector_state_converter=connector_state_converter, + cursor_field=CursorField(cursor_field_key="updated_at"), + use_global_cursor=use_global_cursor, ) wrapped_slicer = StreamSlicerTestReadDecorator( @@ -219,33 +208,30 @@ def test_isinstance_perpartition_with_global_cursor(): maximum_number_of_slices=5, ) - assert isinstance(wrapped_slicer, PerPartitionWithGlobalCursor) - assert isinstance(wrapped_slicer.wrapped_slicer, PerPartitionWithGlobalCursor) + assert isinstance(wrapped_slicer, ConcurrentPerPartitionCursor) + assert isinstance(wrapped_slicer.wrapped_slicer, ConcurrentPerPartitionCursor) assert isinstance(wrapped_slicer, StreamSlicerTestReadDecorator) assert not isinstance(wrapped_slicer.wrapped_slicer, StreamSlicerTestReadDecorator) - assert not isinstance(wrapped_slicer, GlobalSubstreamCursor) - assert not isinstance(wrapped_slicer.wrapped_slicer, GlobalSubstreamCursor) assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter) assert not isinstance(wrapped_slicer.wrapped_slicer, AsyncJobPartitionRouter) assert not isinstance(wrapped_slicer, SubstreamPartitionRouter) assert not isinstance(wrapped_slicer.wrapped_slicer, SubstreamPartitionRouter) - assert wrapped_slicer._per_partition_cursor._cursor_factory == cursor_factory + assert wrapped_slicer._cursor_factory == cursor_factory assert wrapped_slicer._partition_router == partition_router - assert wrapped_slicer._global_cursor._stream_cursor == date_time_based_cursor + assert wrapped_slicer._use_global_cursor == use_global_cursor - assert isinstance(substream_cursor, PerPartitionWithGlobalCursor) + assert isinstance(substream_cursor, ConcurrentPerPartitionCursor) assert not isinstance(substream_cursor, StreamSlicerTestReadDecorator) - assert not isinstance(substream_cursor, GlobalSubstreamCursor) assert not isinstance(substream_cursor, AsyncJobPartitionRouter) assert not isinstance(substream_cursor, SubstreamPartitionRouter) - assert substream_cursor._per_partition_cursor._cursor_factory == cursor_factory + assert substream_cursor._cursor_factory == cursor_factory assert substream_cursor._partition_router == partition_router - assert substream_cursor._global_cursor._stream_cursor == date_time_based_cursor + assert wrapped_slicer._use_global_cursor == use_global_cursor - assert substream_cursor._get_active_cursor() == wrapped_slicer._get_active_cursor() + assert substream_cursor._use_global_cursor == wrapped_slicer._use_global_cursor def test_slice_limiting_functionality(): diff --git a/unit_tests/sources/declarative/test_concurrent_declarative_source.py b/unit_tests/sources/declarative/test_concurrent_declarative_source.py index 72152a167..1d62ba33f 100644 --- a/unit_tests/sources/declarative/test_concurrent_declarative_source.py +++ b/unit_tests/sources/declarative/test_concurrent_declarative_source.py @@ -23,6 +23,7 @@ from typing_extensions import deprecated import unit_tests.sources.declarative.external_component # Needed for dynamic imports to work +from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.models import ( AirbyteLogMessage, AirbyteMessage, @@ -46,7 +47,6 @@ from airbyte_cdk.sources.declarative.concurrent_declarative_source import ( ConcurrentDeclarativeSource, ) -from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream from airbyte_cdk.sources.declarative.extractors.record_filter import ( ClientSideIncrementalRecordFilterDecorator, ) @@ -685,86 +685,6 @@ def get_cursor(self) -> Optional[Cursor]: return self._declarative_stream.get_cursor() -def test_group_streams(): - """ - Tests the grouping of low-code streams into ones that can be processed concurrently vs ones that must be processed concurrently - """ - - catalog = ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=AirbyteStream( - name="party_members", - json_schema={}, - supported_sync_modes=[SyncMode.incremental], - ), - sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.append, - ), - ConfiguredAirbyteStream( - stream=AirbyteStream( - name="palaces", json_schema={}, supported_sync_modes=[SyncMode.full_refresh] - ), - sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.append, - ), - ConfiguredAirbyteStream( - stream=AirbyteStream( - name="locations", json_schema={}, supported_sync_modes=[SyncMode.incremental] - ), - sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.append, - ), - ConfiguredAirbyteStream( - stream=AirbyteStream( - name="party_members_skills", - json_schema={}, - supported_sync_modes=[SyncMode.full_refresh], - ), - sync_mode=SyncMode.full_refresh, - destination_sync_mode=DestinationSyncMode.append, - ), - ] - ) - - state = [] - - source = ConcurrentDeclarativeSource( - source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=state - ) - concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - - # 1 full refresh stream, 3 incremental streams, 1 substream w/o incremental, 1 list based substream w/o incremental - # 1 async job stream, 1 substream w/ incremental - assert len(concurrent_streams) == 8 - ( - concurrent_stream_0, - concurrent_stream_1, - concurrent_stream_2, - concurrent_stream_3, - concurrent_stream_4, - concurrent_stream_5, - concurrent_stream_6, - concurrent_stream_7, - ) = concurrent_streams - assert isinstance(concurrent_stream_0, DefaultStream) - assert concurrent_stream_0.name == "party_members" - assert isinstance(concurrent_stream_1, DefaultStream) - assert concurrent_stream_1.name == "palaces" - assert isinstance(concurrent_stream_2, DefaultStream) - assert concurrent_stream_2.name == "locations" - assert isinstance(concurrent_stream_3, DefaultStream) - assert concurrent_stream_3.name == "party_members_skills" - assert isinstance(concurrent_stream_4, DefaultStream) - assert concurrent_stream_4.name == "arcana_personas" - assert isinstance(concurrent_stream_5, DefaultStream) - assert concurrent_stream_5.name == "palace_enemies" - assert isinstance(concurrent_stream_6, DefaultStream) - assert concurrent_stream_6.name == "async_job_stream" - assert isinstance(concurrent_stream_7, DefaultStream) - assert concurrent_stream_7.name == "incremental_counting_stream" - - @freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) def test_create_concurrent_cursor(): """ @@ -792,9 +712,9 @@ def test_create_concurrent_cursor(): source = ConcurrentDeclarativeSource( source_config=_MANIFEST, config=_CONFIG, catalog=_CATALOG, state=state ) - concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) + streams = source.streams(config=_CONFIG) - party_members_stream = concurrent_streams[0] + party_members_stream = streams[0] assert isinstance(party_members_stream, DefaultStream) party_members_cursor = party_members_stream.cursor @@ -810,7 +730,7 @@ def test_create_concurrent_cursor(): assert party_members_cursor._lookback_window == timedelta(days=5) assert party_members_cursor._cursor_granularity == timedelta(days=1) - locations_stream = concurrent_streams[2] + locations_stream = streams[2] assert isinstance(locations_stream, DefaultStream) locations_cursor = locations_stream.cursor @@ -835,7 +755,7 @@ def test_create_concurrent_cursor(): "state_type": "date-range", } - incremental_counting_stream = concurrent_streams[7] + incremental_counting_stream = streams[7] assert isinstance(incremental_counting_stream, DefaultStream) incremental_counting_cursor = incremental_counting_stream.cursor @@ -1470,11 +1390,9 @@ def test_concurrent_declarative_source_runs_state_migrations_provided_in_manifes source = ConcurrentDeclarativeSource( source_config=manifest, config=_CONFIG, catalog=_CATALOG, state=state ) - concurrent_streams, synchronous_streams = source._group_streams(_CONFIG) - assert concurrent_streams[0].cursor.state.get("state") != state_blob.__dict__, ( - "State was not migrated." - ) - assert concurrent_streams[0].cursor.state.get("states") == [ + streams = source.streams(_CONFIG) + assert streams[0].cursor.state.get("state") != state_blob.__dict__, "State was not migrated." + assert streams[0].cursor.state.get("states") == [ {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_1"}}, {"cursor": {"updated_at": "2024-08-21"}, "partition": {"type": "type_2"}}, ], "State was migrated, but actual state don't match expected" @@ -1689,145 +1607,6 @@ def test_concurrency_level_initial_number_partitions_to_generate_is_always_one_o assert source._concurrent_source._initial_number_partitions_to_generate == 1 -def test_given_partition_routing_and_incremental_sync_then_stream_is_concurrent(): - manifest = { - "version": "5.0.0", - "definitions": { - "selector": { - "type": "RecordSelector", - "extractor": {"type": "DpathExtractor", "field_path": []}, - }, - "requester": { - "type": "HttpRequester", - "url_base": "https://persona.metaverse.com", - "http_method": "GET", - "authenticator": { - "type": "BasicHttpAuthenticator", - "username": "{{ config['api_key'] }}", - "password": "{{ config['secret_key'] }}", - }, - "error_handler": { - "type": "DefaultErrorHandler", - "response_filters": [ - { - "http_codes": [403], - "action": "FAIL", - "failure_type": "config_error", - "error_message": "Access denied due to lack of permission or invalid API/Secret key or wrong data region.", - }, - { - "http_codes": [404], - "action": "IGNORE", - "error_message": "No data available for the time range requested.", - }, - ], - }, - }, - "retriever": { - "type": "SimpleRetriever", - "record_selector": {"$ref": "#/definitions/selector"}, - "paginator": {"type": "NoPagination"}, - "requester": {"$ref": "#/definitions/requester"}, - }, - "incremental_cursor": { - "type": "DatetimeBasedCursor", - "start_datetime": { - "datetime": "{{ format_datetime(config['start_date'], '%Y-%m-%d') }}" - }, - "end_datetime": {"datetime": "{{ now_utc().strftime('%Y-%m-%d') }}"}, - "datetime_format": "%Y-%m-%d", - "cursor_datetime_formats": ["%Y-%m-%d", "%Y-%m-%dT%H:%M:%S"], - "cursor_granularity": "P1D", - "step": "P15D", - "cursor_field": "updated_at", - "lookback_window": "P5D", - "start_time_option": { - "type": "RequestOption", - "field_name": "start", - "inject_into": "request_parameter", - }, - "end_time_option": { - "type": "RequestOption", - "field_name": "end", - "inject_into": "request_parameter", - }, - }, - "base_stream": {"retriever": {"$ref": "#/definitions/retriever"}}, - "base_incremental_stream": { - "retriever": { - "$ref": "#/definitions/retriever", - "requester": {"$ref": "#/definitions/requester"}, - }, - "incremental_sync": {"$ref": "#/definitions/incremental_cursor"}, - }, - "incremental_party_members_skills_stream": { - "$ref": "#/definitions/base_incremental_stream", - "retriever": { - "$ref": "#/definitions/base_incremental_stream/retriever", - "partition_router": { - "type": "ListPartitionRouter", - "cursor_field": "party_member_id", - "values": ["party_member1", "party_member2"], - }, - }, - "$parameters": { - "name": "incremental_party_members_skills", - "primary_key": "id", - "path": "/party_members/{{stream_slice.party_member_id}}/skills", - }, - "schema_loader": { - "type": "InlineSchemaLoader", - "schema": { - "$schema": "https://json-schema.org/draft-07/schema#", - "type": "object", - "properties": { - "id": { - "description": "The identifier", - "type": ["null", "string"], - }, - "name": { - "description": "The name of the party member", - "type": ["null", "string"], - }, - }, - }, - }, - }, - }, - "streams": ["#/definitions/incremental_party_members_skills_stream"], - "check": {"stream_names": ["incremental_party_members_skills"]}, - "concurrency_level": { - "type": "ConcurrencyLevel", - "default_concurrency": "{{ config['num_workers'] or 10 }}", - "max_concurrency": 25, - }, - } - - catalog = ConfiguredAirbyteCatalog( - streams=[ - ConfiguredAirbyteStream( - stream=AirbyteStream( - name="incremental_party_members_skills", - json_schema={}, - supported_sync_modes=[SyncMode.full_refresh], - ), - sync_mode=SyncMode.incremental, - destination_sync_mode=DestinationSyncMode.append, - ) - ] - ) - - state = [] - - source = ConcurrentDeclarativeSource( - source_config=manifest, config=_CONFIG, catalog=catalog, state=state - ) - concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) - - assert len(concurrent_streams) == 1 - assert len(synchronous_streams) == 0 - - def test_async_incremental_stream_uses_concurrent_cursor_with_state(): state = [ AirbyteStateMessage( @@ -1855,8 +1634,8 @@ def test_async_incremental_stream_uses_concurrent_cursor_with_state(): "state_type": "date-range", } - concurrent_streams, _ = source._group_streams(config=_CONFIG) - async_job_stream = concurrent_streams[6] + streams = source.streams(config=_CONFIG) + async_job_stream = streams[6] assert isinstance(async_job_stream, DefaultStream) cursor = async_job_stream._cursor assert isinstance(cursor, ConcurrentCursor) @@ -1912,9 +1691,9 @@ def test_stream_using_is_client_side_incremental_has_cursor_state(): catalog=_CATALOG, state=state, ) - concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) + streams = source.streams(config=_CONFIG) - locations_stream = concurrent_streams[2] + locations_stream = streams[2] assert isinstance(locations_stream, DefaultStream) simple_retriever = locations_stream._stream_partition_generator._partition_factory._retriever @@ -1972,9 +1751,9 @@ def test_stream_using_is_client_side_incremental_has_transform_before_filtering_ catalog=_CATALOG, state=state, ) - concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG) + streams = source.streams(config=_CONFIG) - locations_stream = concurrent_streams[2] + locations_stream = streams[2] assert isinstance(locations_stream, DefaultStream) simple_retriever = locations_stream._stream_partition_generator._partition_factory._retriever @@ -2312,8 +2091,14 @@ def test_valid_manifest(self): "url_base": "https://api.sendgrid.com", }, "schema_loader": { - "name": "{{ parameters.stream_name }}", - "file_path": "./source_sendgrid/schemas/{{ parameters.name }}.yaml", + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "string"}, + }, + "type": "object", + }, }, "retriever": { "paginator": { @@ -2337,7 +2122,6 @@ def test_valid_manifest(self): "type": "BearerAuthenticator", "api_token": "{{ config.apikey }}", }, - "request_parameters": {"page_size": "{{ 10 }}"}, }, "record_selector": {"extractor": {"field_path": ["result"]}}, }, @@ -2350,8 +2134,14 @@ def test_valid_manifest(self): "url_base": "https://api.sendgrid.com", }, "schema_loader": { - "name": "{{ parameters.stream_name }}", - "file_path": "./source_sendgrid/schemas/{{ parameters.name }}.yaml", + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "string"}, + }, + "type": "object", + }, }, "retriever": { "paginator": { @@ -2390,8 +2180,10 @@ def test_valid_manifest(self): source_config=manifest, config={}, catalog=create_catalog("lists"), state=None ) - check_stream = source.connection_checker - check_stream.check_connection(source, logging.getLogger(""), {}) + pages = [_create_page({"records": [{"id": 0}], "_metadata": {}})] + with patch.object(SimpleRetriever, "_fetch_next_page", side_effect=pages): + connection_status = source.check(logging.getLogger(""), {}) + assert connection_status.status == Status.SUCCEEDED streams = source.streams({}) assert len(streams) == 2 @@ -3017,8 +2809,17 @@ def test_conditional_streams_manifest(self, is_sandbox, expected_stream_count): "url_base": "https://api.yasogamihighschool.com", }, "schema_loader": { - "name": "{{ parameters.stream_name }}", - "file_path": "./source_yasogami_high_school/schemas/{{ parameters.name }}.yaml", + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "string"}, + "first_name": {"type": "string"}, + "last_name": {"type": "string"}, + "grade": {"type": "number"}, + }, + "type": "object", + }, }, "retriever": { "paginator": { @@ -3042,7 +2843,6 @@ def test_conditional_streams_manifest(self, is_sandbox, expected_stream_count): "type": "BearerAuthenticator", "api_token": "{{ config.apikey }}", }, - "request_parameters": {"page_size": "{{ 10 }}"}, }, "record_selector": {"extractor": {"field_path": ["result"]}}, }, @@ -3059,8 +2859,16 @@ def test_conditional_streams_manifest(self, is_sandbox, expected_stream_count): "url_base": "https://api.yasogamihighschool.com", }, "schema_loader": { - "name": "{{ parameters.stream_name }}", - "file_path": "./source_yasogami_high_school/schemas/{{ parameters.name }}.yaml", + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "string"}, + "floor": {"type": "number"}, + "room_number": {"type": "number"}, + }, + "type": "object", + }, }, "retriever": { "paginator": { @@ -3084,7 +2892,6 @@ def test_conditional_streams_manifest(self, is_sandbox, expected_stream_count): "type": "BearerAuthenticator", "api_token": "{{ config.apikey }}", }, - "request_parameters": {"page_size": "{{ 10 }}"}, }, "record_selector": {"extractor": {"field_path": ["result"]}}, }, @@ -3097,8 +2904,16 @@ def test_conditional_streams_manifest(self, is_sandbox, expected_stream_count): "url_base": "https://api.yasogamihighschool.com", }, "schema_loader": { - "name": "{{ parameters.stream_name }}", - "file_path": "./source_yasogami_high_school/schemas/{{ parameters.name }}.yaml", + "type": "InlineSchemaLoader", + "schema": { + "$schema": "http://json-schema.org/schema#", + "properties": { + "id": {"type": "string"}, + "name": {"type": "string"}, + "category": {"type": "string"}, + }, + "type": "object", + }, }, "retriever": { "paginator": { @@ -3122,7 +2937,6 @@ def test_conditional_streams_manifest(self, is_sandbox, expected_stream_count): "type": "BearerAuthenticator", "api_token": "{{ config.apikey }}", }, - "request_parameters": {"page_size": "{{ 10 }}"}, }, "record_selector": {"extractor": {"field_path": ["result"]}}, }, @@ -3145,8 +2959,17 @@ def test_conditional_streams_manifest(self, is_sandbox, expected_stream_count): source_config=manifest, config=config, catalog=catalog, state=None ) - check_stream = source.connection_checker - check_stream.check_connection(source, logging.getLogger(""), config=config) + pages = [ + _create_page( + { + "students": [{"id": 0, "first_name": "yu", "last_name": "narukami"}], + "_metadata": {}, + } + ) + ] + with patch.object(SimpleRetriever, "_fetch_next_page", side_effect=pages): + connection_status = source.check(logging.getLogger(""), config=config) + assert connection_status.status == Status.SUCCEEDED actual_streams = source.streams(config=config) assert len(actual_streams) == expected_stream_count diff --git a/unit_tests/sources/streams/concurrent/test_cursor.py b/unit_tests/sources/streams/concurrent/test_cursor.py index 326deab49..34c92800d 100644 --- a/unit_tests/sources/streams/concurrent/test_cursor.py +++ b/unit_tests/sources/streams/concurrent/test_cursor.py @@ -1,20 +1,17 @@ # # Copyright (c) 2023 Airbyte, Inc., all rights reserved. # + from copy import deepcopy from datetime import datetime, timedelta, timezone -from functools import partial from typing import Any, Mapping, Optional from unittest import TestCase from unittest.mock import Mock import freezegun import pytest -from isodate import parse_duration from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager -from airbyte_cdk.sources.declarative.datetime.min_max_datetime import MinMaxDatetime -from airbyte_cdk.sources.declarative.incremental.datetime_based_cursor import DatetimeBasedCursor from airbyte_cdk.sources.message import MessageRepository from airbyte_cdk.sources.streams.concurrent.clamping import ( ClampingEndProvider, @@ -35,7 +32,6 @@ from airbyte_cdk.sources.streams.concurrent.state_converters.datetime_stream_state_converter import ( CustomFormatConcurrentStreamStateConverter, EpochValueConcurrentStreamStateConverter, - IsoMillisConcurrentStreamStateConverter, ) from airbyte_cdk.sources.types import Record, StreamSlice @@ -1124,661 +1120,6 @@ def test_given_weekly_clamp_and_granularity_when_stream_slices_then_slice_per_we ] -@freezegun.freeze_time(time_to_freeze=datetime(2024, 4, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) -@pytest.mark.parametrize( - "start_datetime,end_datetime,step,cursor_field,lookback_window,state,expected_slices", - [ - pytest.param( - "{{ config.start_time }}", - "{{ config.end_time or now_utc() }}", - "P10D", - "updated_at", - "P5D", - {}, - [ - { - "start": "2024-01-01T00:00:00.000Z", - "end": "2024-01-10T23:59:59.000Z", - }, - { - "start": "2024-01-11T00:00:00.000Z", - "end": "2024-01-20T23:59:59.000Z", - }, - { - "start": "2024-01-21T00:00:00.000Z", - "end": "2024-01-30T23:59:59.000Z", - }, - { - "start": "2024-01-31T00:00:00.000Z", - "end": "2024-02-09T23:59:59.000Z", - }, - { - "start": "2024-02-10T00:00:00.000Z", - "end": "2024-02-19T23:59:59.000Z", - }, - { - "start": "2024-02-20T00:00:00.000Z", - "end": "2024-03-01T00:00:00.000Z", - }, - ], - id="test_datetime_based_cursor_all_fields", - ), - pytest.param( - "{{ config.start_time }}", - "{{ config.end_time or '2024-01-01T00:00:00.000000+0000' }}", - "P10D", - "updated_at", - "P5D", - { - "slices": [ - { - "start": "2024-01-01T00:00:00.000000+0000", - "end": "2024-02-10T00:00:00.000000+0000", - } - ], - "state_type": "date-range", - }, - [ - { - "start": "2024-02-05T00:00:00.000Z", - "end": "2024-02-14T23:59:59.000Z", - }, - { - "start": "2024-02-15T00:00:00.000Z", - "end": "2024-02-24T23:59:59.000Z", - }, - { - "start": "2024-02-25T00:00:00.000Z", - "end": "2024-03-01T00:00:00.000Z", - }, - ], - id="test_datetime_based_cursor_with_state", - ), - pytest.param( - "{{ config.start_time }}", - "{{ config.missing or now_utc().strftime('%Y-%m-%dT%H:%M:%S.%fZ') }}", - "P20D", - "updated_at", - "P1D", - { - "slices": [ - { - "start": "2024-01-01T00:00:00.000000+0000", - "end": "2024-01-21T00:00:00.000000+0000", - } - ], - "state_type": "date-range", - }, - [ - { - "start": "2024-01-20T00:00:00.000Z", - "end": "2024-02-08T23:59:59.000Z", - }, - { - "start": "2024-02-09T00:00:00.000Z", - "end": "2024-02-28T23:59:59.000Z", - }, - { - "start": "2024-02-29T00:00:00.000Z", - "end": "2024-03-19T23:59:59.000Z", - }, - { - "start": "2024-03-20T00:00:00.000Z", - "end": "2024-04-01T00:00:00.000Z", - }, - ], - id="test_datetime_based_cursor_with_state_and_end_date", - ), - pytest.param( - "{{ config.start_time }}", - "{{ config.end_time }}", - "P1M", - "updated_at", - "P5D", - {}, - [ - { - "start": "2024-01-01T00:00:00.000Z", - "end": "2024-01-31T23:59:59.000Z", - }, - { - "start": "2024-02-01T00:00:00.000Z", - "end": "2024-03-01T00:00:00.000Z", - }, - ], - id="test_datetime_based_cursor_using_large_step_duration", - ), - ], -) -def test_generate_slices_concurrent_cursor_from_datetime_based_cursor( - start_datetime, - end_datetime, - step, - cursor_field, - lookback_window, - state, - expected_slices, -): - message_repository = Mock(spec=MessageRepository) - state_manager = Mock(spec=ConnectorStateManager) - - config = { - "start_time": "2024-01-01T00:00:00.000000+0000", - "end_time": "2024-03-01T00:00:00.000000+0000", - } - - datetime_based_cursor = DatetimeBasedCursor( - start_datetime=MinMaxDatetime(datetime=start_datetime, parameters={}), - end_datetime=MinMaxDatetime(datetime=end_datetime, parameters={}), - step=step, - cursor_field=cursor_field, - partition_field_start="start", - partition_field_end="end", - datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", - cursor_granularity="PT1S", - lookback_window=lookback_window, - is_compare_strictly=True, - config=config, - parameters={}, - ) - - # I don't love that we're back to this inching close to interpolation at parse time instead of runtime - # We also might need to add a wrapped class that exposes these fields publicly or live with ugly private access - interpolated_state_date = datetime_based_cursor._start_datetime - start_date = interpolated_state_date.get_datetime(config=config) - - interpolated_end_date = datetime_based_cursor._end_datetime - interpolated_end_date_provider = partial(interpolated_end_date.get_datetime, config) - - interpolated_cursor_field = datetime_based_cursor.cursor_field - cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) - - lower_slice_boundary = datetime_based_cursor._partition_field_start.eval(config=config) - upper_slice_boundary = datetime_based_cursor._partition_field_end.eval(config=config) - slice_boundary_fields = (lower_slice_boundary, upper_slice_boundary) - - # DatetimeBasedCursor returns an isodate.Duration if step uses month or year precision. This still works in our - # code, but mypy may complain when we actually implement this in the concurrent low-code source. To fix this, we - # may need to convert a Duration to timedelta by multiplying month by 30 (but could lose precision). - step_length = datetime_based_cursor._step - - lookback_window = ( - parse_duration(datetime_based_cursor.lookback_window) - if datetime_based_cursor.lookback_window - else None - ) - - cursor_granularity = ( - parse_duration(datetime_based_cursor.cursor_granularity) - if datetime_based_cursor.cursor_granularity - else None - ) - - cursor = ConcurrentCursor( - stream_name=_A_STREAM_NAME, - stream_namespace=_A_STREAM_NAMESPACE, - stream_state=state, - message_repository=message_repository, - connector_state_manager=state_manager, - connector_state_converter=IsoMillisConcurrentStreamStateConverter(is_sequential_state=True), - cursor_field=cursor_field, - slice_boundary_fields=slice_boundary_fields, - start=start_date, - end_provider=interpolated_end_date_provider, - lookback_window=lookback_window, - slice_range=step_length, - cursor_granularity=cursor_granularity, - ) - - actual_slices = list(cursor.stream_slices()) - assert actual_slices == expected_slices - - -@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) -def test_observe_concurrent_cursor_from_datetime_based_cursor(): - message_repository = Mock(spec=MessageRepository) - state_manager = Mock(spec=ConnectorStateManager) - - config = {"start_time": "2024-08-01T00:00:00.000000+0000", "dynamic_cursor_key": "updated_at"} - - datetime_based_cursor = DatetimeBasedCursor( - start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), - cursor_field="{{ config.dynamic_cursor_key }}", - datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", - config=config, - parameters={}, - ) - - interpolated_state_date = datetime_based_cursor._start_datetime - start_date = interpolated_state_date.get_datetime(config=config) - - interpolated_cursor_field = datetime_based_cursor.cursor_field - cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) - - step_length = datetime_based_cursor._step - - concurrent_cursor = ConcurrentCursor( - stream_name="gods", - stream_namespace=_A_STREAM_NAMESPACE, - stream_state={}, - message_repository=message_repository, - connector_state_manager=state_manager, - connector_state_converter=IsoMillisConcurrentStreamStateConverter(is_sequential_state=True), - cursor_field=cursor_field, - slice_boundary_fields=None, - start=start_date, - end_provider=IsoMillisConcurrentStreamStateConverter.get_end_provider(), - slice_range=step_length, - ) - - partition = _partition( - StreamSlice( - partition={ - _LOWER_SLICE_BOUNDARY_FIELD: "2024-08-01T00:00:00.000000+0000", - _UPPER_SLICE_BOUNDARY_FIELD: "2024-09-01T00:00:00.000000+0000", - }, - cursor_slice={}, - ), - _stream_name="gods", - ) - - record_1 = Record( - associated_slice=partition.to_slice(), - data={ - "id": "999", - "updated_at": "2024-08-23T00:00:00.000000+0000", - "name": "kratos", - "mythology": "greek", - }, - stream_name="gods", - ) - record_2 = Record( - associated_slice=partition.to_slice(), - data={ - "id": "1000", - "updated_at": "2024-08-22T00:00:00.000000+0000", - "name": "odin", - "mythology": "norse", - }, - stream_name="gods", - ) - record_3 = Record( - associated_slice=partition.to_slice(), - data={ - "id": "500", - "updated_at": "2024-08-24T00:00:00.000000+0000", - "name": "freya", - "mythology": "norse", - }, - stream_name="gods", - ) - - concurrent_cursor.observe(record_1) - actual_most_recent_record = concurrent_cursor._most_recent_cursor_value_per_partition[ - partition.to_slice() - ] - assert actual_most_recent_record == concurrent_cursor._extract_cursor_value(record_1) - - concurrent_cursor.observe(record_2) - actual_most_recent_record = concurrent_cursor._most_recent_cursor_value_per_partition[ - partition.to_slice() - ] - assert actual_most_recent_record == concurrent_cursor._extract_cursor_value(record_1) - - concurrent_cursor.observe(record_3) - actual_most_recent_record = concurrent_cursor._most_recent_cursor_value_per_partition[ - partition.to_slice() - ] - assert actual_most_recent_record == concurrent_cursor._extract_cursor_value(record_3) - - -@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) -def test_close_partition_concurrent_cursor_from_datetime_based_cursor(): - message_repository = Mock(spec=MessageRepository) - state_manager = Mock(spec=ConnectorStateManager) - - config = {"start_time": "2024-08-01T00:00:00.000000+0000", "dynamic_cursor_key": "updated_at"} - - datetime_based_cursor = DatetimeBasedCursor( - start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), - cursor_field="{{ config.dynamic_cursor_key }}", - datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", - config=config, - parameters={}, - ) - - interpolated_state_date = datetime_based_cursor._start_datetime - start_date = interpolated_state_date.get_datetime(config=config) - - interpolated_cursor_field = datetime_based_cursor.cursor_field - cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) - - step_length = datetime_based_cursor._step - - concurrent_cursor = ConcurrentCursor( - stream_name="gods", - stream_namespace=_A_STREAM_NAMESPACE, - stream_state={}, - message_repository=message_repository, - connector_state_manager=state_manager, - connector_state_converter=IsoMillisConcurrentStreamStateConverter( - is_sequential_state=False - ), - cursor_field=cursor_field, - slice_boundary_fields=None, - start=start_date, - end_provider=IsoMillisConcurrentStreamStateConverter.get_end_provider(), - slice_range=step_length, - ) - - partition = _partition( - StreamSlice( - partition={ - _LOWER_SLICE_BOUNDARY_FIELD: "2024-08-01T00:00:00.000000+0000", - _UPPER_SLICE_BOUNDARY_FIELD: "2024-09-01T00:00:00.000000+0000", - }, - cursor_slice={}, - ), - _stream_name="gods", - ) - - record_1 = Record( - associated_slice=partition.to_slice(), - data={ - "id": "999", - "updated_at": "2024-08-23T00:00:00.000000+0000", - "name": "kratos", - "mythology": "greek", - }, - stream_name="gods", - ) - concurrent_cursor.observe(record_1) - - concurrent_cursor.close_partition(partition) - - message_repository.emit_message.assert_called_once_with( - state_manager.create_state_message.return_value - ) - state_manager.update_state_for_stream.assert_called_once_with( - "gods", - _A_STREAM_NAMESPACE, - { - "slices": [ - { - "end": "2024-08-23T00:00:00.000Z", - "start": "2024-08-01T00:00:00.000Z", - "most_recent_cursor_value": "2024-08-23T00:00:00.000Z", - } - ], - "state_type": "date-range", - }, - ) - - -@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) -def test_close_partition_with_slice_range_concurrent_cursor_from_datetime_based_cursor(): - message_repository = Mock(spec=MessageRepository) - state_manager = Mock(spec=ConnectorStateManager) - - config = {"start_time": "2024-07-01T00:00:00.000000+0000", "dynamic_cursor_key": "updated_at"} - - datetime_based_cursor = DatetimeBasedCursor( - start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), - cursor_field="{{ config.dynamic_cursor_key }}", - datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", - step="P15D", - cursor_granularity="P1D", - config=config, - parameters={}, - ) - - interpolated_state_date = datetime_based_cursor._start_datetime - start_date = interpolated_state_date.get_datetime(config=config) - - interpolated_cursor_field = datetime_based_cursor.cursor_field - cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) - - lower_slice_boundary = datetime_based_cursor._partition_field_start.eval(config=config) - upper_slice_boundary = datetime_based_cursor._partition_field_end.eval(config=config) - slice_boundary_fields = (lower_slice_boundary, upper_slice_boundary) - - step_length = datetime_based_cursor._step - - concurrent_cursor = ConcurrentCursor( - stream_name="gods", - stream_namespace=_A_STREAM_NAMESPACE, - stream_state={}, - message_repository=message_repository, - connector_state_manager=state_manager, - connector_state_converter=IsoMillisConcurrentStreamStateConverter( - is_sequential_state=False, cursor_granularity=None - ), - cursor_field=cursor_field, - slice_boundary_fields=slice_boundary_fields, - start=start_date, - slice_range=step_length, - cursor_granularity=None, - end_provider=IsoMillisConcurrentStreamStateConverter.get_end_provider(), - ) - - partition_0 = _partition( - StreamSlice( - partition={ - "start_time": "2024-07-01T00:00:00.000000+0000", - "end_time": "2024-07-16T00:00:00.000000+0000", - }, - cursor_slice={}, - ), - _stream_name="gods", - ) - partition_3 = _partition( - StreamSlice( - partition={ - "start_time": "2024-08-15T00:00:00.000000+0000", - "end_time": "2024-08-30T00:00:00.000000+0000", - }, - cursor_slice={}, - ), - _stream_name="gods", - ) - record_1 = Record( - associated_slice=partition_0.to_slice(), - data={ - "id": "1000", - "updated_at": "2024-07-05T00:00:00.000000+0000", - "name": "loki", - "mythology": "norse", - }, - stream_name="gods", - ) - record_2 = Record( - associated_slice=partition_3.to_slice(), - data={ - "id": "999", - "updated_at": "2024-08-20T00:00:00.000000+0000", - "name": "kratos", - "mythology": "greek", - }, - stream_name="gods", - ) - - concurrent_cursor.observe(record_1) - concurrent_cursor.close_partition(partition_0) - concurrent_cursor.observe(record_2) - concurrent_cursor.close_partition(partition_3) - - message_repository.emit_message.assert_called_with( - state_manager.create_state_message.return_value - ) - assert message_repository.emit_message.call_count == 2 - state_manager.update_state_for_stream.assert_called_with( - "gods", - _A_STREAM_NAMESPACE, - { - "slices": [ - { - "start": "2024-07-01T00:00:00.000Z", - "end": "2024-07-16T00:00:00.000Z", - "most_recent_cursor_value": "2024-07-05T00:00:00.000Z", - }, - { - "start": "2024-08-15T00:00:00.000Z", - "end": "2024-08-30T00:00:00.000Z", - "most_recent_cursor_value": "2024-08-20T00:00:00.000Z", - }, - ], - "state_type": "date-range", - }, - ) - assert state_manager.update_state_for_stream.call_count == 2 - - -@freezegun.freeze_time(time_to_freeze=datetime(2024, 9, 1, 0, 0, 0, 0, tzinfo=timezone.utc)) -def test_close_partition_with_slice_range_granularity_concurrent_cursor_from_datetime_based_cursor(): - message_repository = Mock(spec=MessageRepository) - state_manager = Mock(spec=ConnectorStateManager) - - config = {"start_time": "2024-07-01T00:00:00.000000+0000", "dynamic_cursor_key": "updated_at"} - - datetime_based_cursor = DatetimeBasedCursor( - start_datetime=MinMaxDatetime(datetime="{{ config.start_time }}", parameters={}), - cursor_field="{{ config.dynamic_cursor_key }}", - datetime_format="%Y-%m-%dT%H:%M:%S.%f%z", - step="P15D", - cursor_granularity="P1D", - config=config, - parameters={}, - ) - - interpolated_state_date = datetime_based_cursor._start_datetime - start_date = interpolated_state_date.get_datetime(config=config) - - interpolated_cursor_field = datetime_based_cursor.cursor_field - cursor_field = CursorField(cursor_field_key=interpolated_cursor_field.eval(config=config)) - - lower_slice_boundary = datetime_based_cursor._partition_field_start.eval(config=config) - upper_slice_boundary = datetime_based_cursor._partition_field_end.eval(config=config) - slice_boundary_fields = (lower_slice_boundary, upper_slice_boundary) - - step_length = datetime_based_cursor._step - - cursor_granularity = ( - parse_duration(datetime_based_cursor.cursor_granularity) - if datetime_based_cursor.cursor_granularity - else None - ) - - concurrent_cursor = ConcurrentCursor( - stream_name="gods", - stream_namespace=_A_STREAM_NAMESPACE, - stream_state={}, - message_repository=message_repository, - connector_state_manager=state_manager, - connector_state_converter=IsoMillisConcurrentStreamStateConverter( - is_sequential_state=False, cursor_granularity=cursor_granularity - ), - cursor_field=cursor_field, - slice_boundary_fields=slice_boundary_fields, - start=start_date, - slice_range=step_length, - cursor_granularity=cursor_granularity, - end_provider=IsoMillisConcurrentStreamStateConverter.get_end_provider(), - ) - - partition_0 = _partition( - StreamSlice( - partition={ - "start_time": "2024-07-01T00:00:00.000000+0000", - "end_time": "2024-07-15T00:00:00.000000+0000", - }, - cursor_slice={}, - ), - _stream_name="gods", - ) - partition_1 = _partition( - StreamSlice( - partition={ - "start_time": "2024-07-16T00:00:00.000000+0000", - "end_time": "2024-07-31T00:00:00.000000+0000", - }, - cursor_slice={}, - ), - _stream_name="gods", - ) - partition_3 = _partition( - StreamSlice( - partition={ - "start_time": "2024-08-15T00:00:00.000000+0000", - "end_time": "2024-08-29T00:00:00.000000+0000", - }, - cursor_slice={}, - ), - _stream_name="gods", - ) - record_1 = Record( - associated_slice=partition_0.to_slice(), - data={ - "id": "1000", - "updated_at": "2024-07-05T00:00:00.000000+0000", - "name": "loki", - "mythology": "norse", - }, - stream_name="gods", - ) - record_2 = Record( - associated_slice=partition_1.to_slice(), - data={ - "id": "2000", - "updated_at": "2024-07-25T00:00:00.000000+0000", - "name": "freya", - "mythology": "norse", - }, - stream_name="gods", - ) - record_3 = Record( - associated_slice=partition_3.to_slice(), - data={ - "id": "999", - "updated_at": "2024-08-20T00:00:00.000000+0000", - "name": "kratos", - "mythology": "greek", - }, - stream_name="gods", - ) - - concurrent_cursor.observe(record_1) - concurrent_cursor.close_partition(partition_0) - concurrent_cursor.observe(record_2) - concurrent_cursor.close_partition(partition_1) - concurrent_cursor.observe(record_3) - concurrent_cursor.close_partition(partition_3) - - message_repository.emit_message.assert_called_with( - state_manager.create_state_message.return_value - ) - assert message_repository.emit_message.call_count == 3 - state_manager.update_state_for_stream.assert_called_with( - "gods", - _A_STREAM_NAMESPACE, - { - "slices": [ - { - "start": "2024-07-01T00:00:00.000Z", - "end": "2024-07-31T00:00:00.000Z", - "most_recent_cursor_value": "2024-07-25T00:00:00.000Z", - }, - { - "start": "2024-08-15T00:00:00.000Z", - "end": "2024-08-29T00:00:00.000Z", - "most_recent_cursor_value": "2024-08-20T00:00:00.000Z", - }, - ], - "state_type": "date-range", - }, - ) - assert state_manager.update_state_for_stream.call_count == 3 - - _SHOULD_BE_SYNCED_START = 10 diff --git a/unit_tests/sources/streams/concurrent/test_partition_reader.py b/unit_tests/sources/streams/concurrent/test_partition_reader.py index a41750772..ac34a7062 100644 --- a/unit_tests/sources/streams/concurrent/test_partition_reader.py +++ b/unit_tests/sources/streams/concurrent/test_partition_reader.py @@ -7,8 +7,8 @@ import pytest -from airbyte_cdk import InMemoryMessageRepository from airbyte_cdk.sources.concurrent_source.stream_thread_exception import StreamThreadException +from airbyte_cdk.sources.message.repository import InMemoryMessageRepository from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor from airbyte_cdk.sources.streams.concurrent.partition_reader import PartitionReader from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition