Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte/destinations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements
catalog_provider = CatalogProvider(
configured_catalog=source.get_configured_catalog(
streams=streams,
force_full_refresh=force_full_refresh,
)
)
elif read_result:
Expand Down
33 changes: 29 additions & 4 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,18 @@ def configured_catalog(self) -> ConfiguredAirbyteCatalog:
def get_configured_catalog(
self,
streams: Literal["*"] | list[str] | None = None,
*,
force_full_refresh: bool = False,
) -> ConfiguredAirbyteCatalog:
"""Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected,
all available streams will be used.

If '*' is provided, all available streams will be used.

If force_full_refresh is True, streams will be configured with full_refresh sync mode
when supported by the stream. Otherwise, incremental sync mode is used when supported.
"""
selected_streams: list[str] = []
if streams is None:
Expand All @@ -441,12 +446,27 @@ def get_configured_catalog(
input_value=streams,
)

def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
"""Determine the sync mode for a stream based on force_full_refresh and support."""
# Use getattr to handle mocks or streams without supported_sync_modes attribute
supported_modes = getattr(stream, "supported_sync_modes", None)

if force_full_refresh:
# When force_full_refresh is True, prefer full_refresh if supported
if supported_modes and SyncMode.full_refresh in supported_modes:
return SyncMode.full_refresh
# Fall back to incremental if full_refresh is not supported
return SyncMode.incremental

# Default behavior: preserve previous semantics (always incremental)
return SyncMode.incremental

return ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=stream,
destination_sync_mode=DestinationSyncMode.overwrite,
sync_mode=SyncMode.incremental,
sync_mode=_get_sync_mode(stream),
primary_key=(
[self._primary_key_overrides[stream.name.lower()]]
if stream.name.lower() in self._primary_key_overrides
Expand Down Expand Up @@ -726,7 +746,10 @@ def _get_airbyte_message_iterator(
"""Get an AirbyteMessageIterator for this source."""
return AirbyteMessageIterator(
self._read_with_catalog(
catalog=self.get_configured_catalog(streams=streams),
catalog=self.get_configured_catalog(
streams=streams,
force_full_refresh=force_full_refresh,
),
state=state_provider if not force_full_refresh else None,
progress_tracker=progress_tracker,
)
Expand Down Expand Up @@ -868,7 +891,9 @@ def read(
try:
result = self._read_to_cache(
cache=cache,
catalog_provider=CatalogProvider(self.configured_catalog),
catalog_provider=CatalogProvider(
self.get_configured_catalog(force_full_refresh=force_full_refresh)
),
stream_names=self._selected_stream_names,
state_provider=state_provider,
state_writer=state_writer,
Expand Down Expand Up @@ -907,7 +932,7 @@ def _read_to_cache( # noqa: PLR0913 # Too many arguments
if write_strategy == WriteStrategy.REPLACE and not force_full_refresh:
warnings.warn(
message=(
"Using `REPLACE` strategy without also setting `full_refresh_mode=True` "
"Using `REPLACE` strategy without also setting `force_full_refresh=True` "
"could result in data loss. "
"To silence this warning, use the following: "
'warnings.filterwarnings("ignore", '
Expand Down
Loading