diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index 8593dac29..ce391e109 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -167,6 +167,7 @@ def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements catalog_provider = CatalogProvider( configured_catalog=source.get_configured_catalog( streams=streams, + force_full_refresh=force_full_refresh, ) ) elif read_result: diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 9dabd07ca..0a07950a6 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -420,6 +420,8 @@ def configured_catalog(self) -> ConfiguredAirbyteCatalog: def get_configured_catalog( self, streams: Literal["*"] | list[str] | None = None, + *, + force_full_refresh: bool = False, ) -> ConfiguredAirbyteCatalog: """Get a configured catalog for the given streams. @@ -427,6 +429,9 @@ def get_configured_catalog( all available streams will be used. If '*' is provided, all available streams will be used. + + If force_full_refresh is True, streams will be configured with full_refresh sync mode + when supported by the stream. Otherwise, incremental sync mode is used when supported. """ selected_streams: list[str] = [] if streams is None: @@ -441,12 +446,27 @@ def get_configured_catalog( input_value=streams, ) + def _get_sync_mode(stream: AirbyteStream) -> SyncMode: + """Determine the sync mode for a stream based on force_full_refresh and support.""" + # Use getattr to handle mocks or streams without supported_sync_modes attribute + supported_modes = getattr(stream, "supported_sync_modes", None) + + if force_full_refresh: + # When force_full_refresh is True, prefer full_refresh if supported + if supported_modes and SyncMode.full_refresh in supported_modes: + return SyncMode.full_refresh + # Fall back to incremental if full_refresh is not supported + return SyncMode.incremental + + # Default behavior: preserve previous semantics (always incremental) + return SyncMode.incremental + return ConfiguredAirbyteCatalog( streams=[ ConfiguredAirbyteStream( stream=stream, destination_sync_mode=DestinationSyncMode.overwrite, - sync_mode=SyncMode.incremental, + sync_mode=_get_sync_mode(stream), primary_key=( [self._primary_key_overrides[stream.name.lower()]] if stream.name.lower() in self._primary_key_overrides @@ -726,7 +746,10 @@ def _get_airbyte_message_iterator( """Get an AirbyteMessageIterator for this source.""" return AirbyteMessageIterator( self._read_with_catalog( - catalog=self.get_configured_catalog(streams=streams), + catalog=self.get_configured_catalog( + streams=streams, + force_full_refresh=force_full_refresh, + ), state=state_provider if not force_full_refresh else None, progress_tracker=progress_tracker, ) @@ -868,7 +891,9 @@ def read( try: result = self._read_to_cache( cache=cache, - catalog_provider=CatalogProvider(self.configured_catalog), + catalog_provider=CatalogProvider( + self.get_configured_catalog(force_full_refresh=force_full_refresh) + ), stream_names=self._selected_stream_names, state_provider=state_provider, state_writer=state_writer, @@ -907,7 +932,7 @@ def _read_to_cache( # noqa: PLR0913 # Too many arguments if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: warnings.warn( message=( - "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " + "Using `REPLACE` strategy without also setting `force_full_refresh=True` " "could result in data loss. " "To silence this warning, use the following: " 'warnings.filterwarnings("ignore", '