Skip to content

Commit abf317a

Browse files
fix: Pass explicit SyncMode.full_refresh when force_full_refresh=True (#899)
Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent a2ea38f commit abf317a

File tree

2 files changed

+30
-4
lines changed

2 files changed

+30
-4
lines changed

airbyte/destinations/base.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -167,6 +167,7 @@ def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements
167167
catalog_provider = CatalogProvider(
168168
configured_catalog=source.get_configured_catalog(
169169
streams=streams,
170+
force_full_refresh=force_full_refresh,
170171
)
171172
)
172173
elif read_result:

airbyte/sources/base.py

Lines changed: 29 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -420,13 +420,18 @@ def configured_catalog(self) -> ConfiguredAirbyteCatalog:
420420
def get_configured_catalog(
421421
self,
422422
streams: Literal["*"] | list[str] | None = None,
423+
*,
424+
force_full_refresh: bool = False,
423425
) -> ConfiguredAirbyteCatalog:
424426
"""Get a configured catalog for the given streams.
425427
426428
If no streams are provided, the selected streams will be used. If no streams are selected,
427429
all available streams will be used.
428430
429431
If '*' is provided, all available streams will be used.
432+
433+
If force_full_refresh is True, streams will be configured with full_refresh sync mode
434+
when supported by the stream. Otherwise, incremental sync mode is used when supported.
430435
"""
431436
selected_streams: list[str] = []
432437
if streams is None:
@@ -441,12 +446,27 @@ def get_configured_catalog(
441446
input_value=streams,
442447
)
443448

449+
def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
450+
"""Determine the sync mode for a stream based on force_full_refresh and support."""
451+
# Use getattr to handle mocks or streams without supported_sync_modes attribute
452+
supported_modes = getattr(stream, "supported_sync_modes", None)
453+
454+
if force_full_refresh:
455+
# When force_full_refresh is True, prefer full_refresh if supported
456+
if supported_modes and SyncMode.full_refresh in supported_modes:
457+
return SyncMode.full_refresh
458+
# Fall back to incremental if full_refresh is not supported
459+
return SyncMode.incremental
460+
461+
# Default behavior: preserve previous semantics (always incremental)
462+
return SyncMode.incremental
463+
444464
return ConfiguredAirbyteCatalog(
445465
streams=[
446466
ConfiguredAirbyteStream(
447467
stream=stream,
448468
destination_sync_mode=DestinationSyncMode.overwrite,
449-
sync_mode=SyncMode.incremental,
469+
sync_mode=_get_sync_mode(stream),
450470
primary_key=(
451471
[self._primary_key_overrides[stream.name.lower()]]
452472
if stream.name.lower() in self._primary_key_overrides
@@ -726,7 +746,10 @@ def _get_airbyte_message_iterator(
726746
"""Get an AirbyteMessageIterator for this source."""
727747
return AirbyteMessageIterator(
728748
self._read_with_catalog(
729-
catalog=self.get_configured_catalog(streams=streams),
749+
catalog=self.get_configured_catalog(
750+
streams=streams,
751+
force_full_refresh=force_full_refresh,
752+
),
730753
state=state_provider if not force_full_refresh else None,
731754
progress_tracker=progress_tracker,
732755
)
@@ -868,7 +891,9 @@ def read(
868891
try:
869892
result = self._read_to_cache(
870893
cache=cache,
871-
catalog_provider=CatalogProvider(self.configured_catalog),
894+
catalog_provider=CatalogProvider(
895+
self.get_configured_catalog(force_full_refresh=force_full_refresh)
896+
),
872897
stream_names=self._selected_stream_names,
873898
state_provider=state_provider,
874899
state_writer=state_writer,
@@ -907,7 +932,7 @@ def _read_to_cache( # noqa: PLR0913 # Too many arguments
907932
if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
908933
warnings.warn(
909934
message=(
910-
"Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
935+
"Using `REPLACE` strategy without also setting `force_full_refresh=True` "
911936
"could result in data loss. "
912937
"To silence this warning, use the following: "
913938
'warnings.filterwarnings("ignore", '

0 commit comments

Comments
 (0)