Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions airbyte/destinations/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,7 @@ def write( # noqa: PLR0912, PLR0915 # Too many arguments/statements
catalog_provider = CatalogProvider(
configured_catalog=source.get_configured_catalog(
streams=streams,
force_full_refresh=force_full_refresh,
)
)
elif read_result:
Expand Down
31 changes: 28 additions & 3 deletions airbyte/sources/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -420,13 +420,18 @@ def configured_catalog(self) -> ConfiguredAirbyteCatalog:
def get_configured_catalog(
self,
streams: Literal["*"] | list[str] | None = None,
*,
force_full_refresh: bool = False,
) -> ConfiguredAirbyteCatalog:
"""Get a configured catalog for the given streams.

If no streams are provided, the selected streams will be used. If no streams are selected,
all available streams will be used.

If '*' is provided, all available streams will be used.

If force_full_refresh is True, streams will be configured with full_refresh sync mode
when supported by the stream. Otherwise, incremental sync mode is used when supported.
"""
selected_streams: list[str] = []
if streams is None:
Expand All @@ -441,12 +446,27 @@ def get_configured_catalog(
input_value=streams,
)

def _get_sync_mode(stream: AirbyteStream) -> SyncMode:
"""Determine the sync mode for a stream based on force_full_refresh and support."""
# Use getattr to handle mocks or streams without supported_sync_modes attribute
supported_modes = getattr(stream, "supported_sync_modes", None)

if force_full_refresh:
# When force_full_refresh is True, prefer full_refresh if supported
if supported_modes and SyncMode.full_refresh in supported_modes:
return SyncMode.full_refresh
# Fall back to incremental if full_refresh is not supported
return SyncMode.incremental

# Default behavior: preserve previous semantics (always incremental)
return SyncMode.incremental

return ConfiguredAirbyteCatalog(
streams=[
ConfiguredAirbyteStream(
stream=stream,
destination_sync_mode=DestinationSyncMode.overwrite,
sync_mode=SyncMode.incremental,
sync_mode=_get_sync_mode(stream),
primary_key=(
[self._primary_key_overrides[stream.name.lower()]]
if stream.name.lower() in self._primary_key_overrides
Expand Down Expand Up @@ -726,7 +746,10 @@ def _get_airbyte_message_iterator(
"""Get an AirbyteMessageIterator for this source."""
return AirbyteMessageIterator(
self._read_with_catalog(
catalog=self.get_configured_catalog(streams=streams),
catalog=self.get_configured_catalog(
streams=streams,
force_full_refresh=force_full_refresh,
),
state=state_provider if not force_full_refresh else None,
progress_tracker=progress_tracker,
)
Expand Down Expand Up @@ -868,7 +891,9 @@ def read(
try:
result = self._read_to_cache(
cache=cache,
catalog_provider=CatalogProvider(self.configured_catalog),
catalog_provider=CatalogProvider(
self.get_configured_catalog(force_full_refresh=force_full_refresh)
),
stream_names=self._selected_stream_names,
state_provider=state_provider,
state_writer=state_writer,
Expand Down