From f9c090d85a0b0b149332d34f70e9ab8b52b1735c Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Dec 2025 02:37:57 +0000 Subject: [PATCH 1/3] fix: Honor force_full_refresh in configured catalog sync mode When force_full_refresh=True, the configured catalog now uses SyncMode.full_refresh for streams that support it, instead of always using SyncMode.incremental. This fixes issue #772 where the Postgres source would default to incremental sync even when force_full_refresh=True was specified. The fix updates get_configured_catalog() to accept a force_full_refresh parameter and properly set the sync mode based on: - If force_full_refresh=True: prefer full_refresh if supported - If force_full_refresh=False: prefer incremental if supported Fixes #772 Co-Authored-By: AJ Steers --- airbyte/destinations/base.py | 1 + airbyte/sources/base.py | 29 ++++++++++++++++++++++++++--- 2 files changed, 27 insertions(+), 3 deletions(-) diff --git a/airbyte/destinations/base.py b/airbyte/destinations/base.py index b1482bc0d..776932597 100644 --- a/airbyte/destinations/base.py +++ b/airbyte/destinations/base.py @@ -167,6 +167,7 @@ def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements catalog_provider = CatalogProvider( configured_catalog=source.get_configured_catalog( streams=streams, + force_full_refresh=force_full_refresh, ) ) elif read_result: diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 7fd5093c5..8084635db 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -420,6 +420,8 @@ def configured_catalog(self) -> ConfiguredAirbyteCatalog: def get_configured_catalog( self, streams: Literal["*"] | list[str] | None = None, + *, + force_full_refresh: bool = False, ) -> ConfiguredAirbyteCatalog: """Get a configured catalog for the given streams. @@ -427,6 +429,9 @@ def get_configured_catalog( all available streams will be used. If '*' is provided, all available streams will be used. + + If force_full_refresh is True, streams will be configured with full_refresh sync mode + when supported by the stream. Otherwise, incremental sync mode is used when supported. """ selected_streams: list[str] = [] if streams is None: @@ -441,12 +446,25 @@ def get_configured_catalog( input_value=streams, ) + def _get_sync_mode(stream: AirbyteStream) -> SyncMode: + """Determine the sync mode for a stream based on force_full_refresh and support.""" + if force_full_refresh: + # When force_full_refresh is True, prefer full_refresh if supported + if SyncMode.full_refresh in stream.supported_sync_modes: + return SyncMode.full_refresh + # Fall back to incremental if full_refresh is not supported + return SyncMode.incremental + # Default behavior: prefer incremental if supported + if SyncMode.incremental in stream.supported_sync_modes: + return SyncMode.incremental + return SyncMode.full_refresh + return ConfiguredAirbyteCatalog( streams=[ ConfiguredAirbyteStream( stream=stream, destination_sync_mode=DestinationSyncMode.overwrite, - sync_mode=SyncMode.incremental, + sync_mode=_get_sync_mode(stream), primary_key=( [self._primary_key_overrides[stream.name.lower()]] if stream.name.lower() in self._primary_key_overrides @@ -726,7 +744,10 @@ def _get_airbyte_message_iterator( """Get an AirbyteMessageIterator for this source.""" return AirbyteMessageIterator( self._read_with_catalog( - catalog=self.get_configured_catalog(streams=streams), + catalog=self.get_configured_catalog( + streams=streams, + force_full_refresh=force_full_refresh, + ), state=state_provider if not force_full_refresh else None, progress_tracker=progress_tracker, ) @@ -868,7 +889,9 @@ def read( try: result = self._read_to_cache( cache=cache, - catalog_provider=CatalogProvider(self.configured_catalog), + catalog_provider=CatalogProvider( + self.get_configured_catalog(force_full_refresh=force_full_refresh) + ), stream_names=self._selected_stream_names, state_provider=state_provider, state_writer=state_writer, From 9abd32a1147ca1296164a011b02ef0371b5d99a2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Dec 2025 02:42:23 +0000 Subject: [PATCH 2/3] fix: Use getattr to handle streams without supported_sync_modes Make _get_sync_mode defensive by using getattr to handle mock objects or older stream objects that don't have the supported_sync_modes attribute. This preserves backward compatibility with existing tests while still fixing the force_full_refresh behavior for real streams. Co-Authored-By: AJ Steers --- airbyte/sources/base.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index 8084635db..da64ced59 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -448,16 +448,18 @@ def get_configured_catalog( def _get_sync_mode(stream: AirbyteStream) -> SyncMode: """Determine the sync mode for a stream based on force_full_refresh and support.""" + # Use getattr to handle mocks or streams without supported_sync_modes attribute + supported_modes = getattr(stream, "supported_sync_modes", None) + if force_full_refresh: # When force_full_refresh is True, prefer full_refresh if supported - if SyncMode.full_refresh in stream.supported_sync_modes: + if supported_modes and SyncMode.full_refresh in supported_modes: return SyncMode.full_refresh # Fall back to incremental if full_refresh is not supported return SyncMode.incremental - # Default behavior: prefer incremental if supported - if SyncMode.incremental in stream.supported_sync_modes: - return SyncMode.incremental - return SyncMode.full_refresh + + # Default behavior: preserve previous semantics (always incremental) + return SyncMode.incremental return ConfiguredAirbyteCatalog( streams=[ From 475ffa31df89a547a3ce6a05650eaa8f9f93e8e2 Mon Sep 17 00:00:00 2001 From: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com> Date: Tue, 9 Dec 2025 03:56:39 +0000 Subject: [PATCH 3/3] fix: Correct parameter name in warning message (full_refresh_mode -> force_full_refresh) Co-Authored-By: AJ Steers --- airbyte/sources/base.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/airbyte/sources/base.py b/airbyte/sources/base.py index da64ced59..7286804b1 100644 --- a/airbyte/sources/base.py +++ b/airbyte/sources/base.py @@ -932,7 +932,7 @@ def _read_to_cache( # noqa: PLR0913 # Too many arguments if write_strategy == WriteStrategy.REPLACE and not force_full_refresh: warnings.warn( message=( - "Using `REPLACE` strategy without also setting `full_refresh_mode=True` " + "Using `REPLACE` strategy without also setting `force_full_refresh=True` " "could result in data loss. " "To silence this warning, use the following: " 'warnings.filterwarnings("ignore", '