fix: Pass explicit SyncMode.full_refresh when force_full_refresh=True#899
Conversation
When force_full_refresh=True, the configured catalog now uses SyncMode.full_refresh for streams that support it, instead of always using SyncMode.incremental. This fixes issue #772 where the Postgres source would default to incremental sync even when force_full_refresh=True was specified. The fix updates get_configured_catalog() to accept a force_full_refresh parameter and properly set the sync mode based on: - If force_full_refresh=True: prefer full_refresh if supported - If force_full_refresh=False: prefer incremental if supported Fixes #772 Co-Authored-By: AJ Steers <aj@airbyte.io>
Original prompt from AJ Steers |
🤖 Devin AI EngineerI'll be helping with this pull request! Here's what you should know: ✅ I will automatically:
Note: I can only respond to comments from users who have write access to this repository. ⚙️ Control Options:
|
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This PyAirbyte VersionYou can test this version of PyAirbyte using the following: # Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765247774-fix-force-full-refresh' pyairbyte --help
# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765247774-fix-force-full-refresh'Helpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
Community SupportQuestions? Join the #pyairbyte channel in our Slack workspace. |
Make _get_sync_mode defensive by using getattr to handle mock objects or older stream objects that don't have the supported_sync_modes attribute. This preserves backward compatibility with existing tests while still fixing the force_full_refresh behavior for real streams. Co-Authored-By: AJ Steers <aj@airbyte.io>
📝 WalkthroughWalkthroughPropagates a new keyword-only parameter Changes
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes
Pre-merge checks and finishing touches✅ Passed checks (5 passed)
✨ Finishing touches
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Actionable comments posted: 1
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
airbyte/sources/base.py (1)
537-547:get_recordsdocstring talks about full refresh, but catalog is configured as incrementalThe
get_recordsdocstring says:“Generate a configured catalog that syncs the given stream in
full_refreshmode”But the implementation calls:
configured_catalog = self.get_configured_catalog(streams=[stream])and, with the current
_get_sync_mode, that will always setsync_mode=SyncMode.incrementalwhenforce_full_refreshisn’t passed.Given the new
force_full_refreshsupport, would it be clearer to either:
- Align behavior with the docs by forcing full refresh here:
- configured_catalog = self.get_configured_catalog(streams=[stream]) + configured_catalog = self.get_configured_catalog( + streams=[stream], + force_full_refresh=True, + )
- Or, if you prefer incremental semantics for
get_records, update the docstring to no longer promise full refresh?Curious which behavior you’d like to standardize on here, wdyt?
🧹 Nitpick comments (1)
airbyte/sources/base.py (1)
420-435: Sync mode selection vssupported_sync_modesand docstring seem slightly misalignedThe new
force_full_refreshplumbing and_get_sync_modehelper look like they’ll fix the Postgres case where full refresh was requested but the catalog still said incremental. One thing that stood out though:
- The docstring says: “If
force_full_refreshis True, streams will be configured withfull_refreshsync mode when supported by the stream. Otherwise, incremental sync mode is used when supported.”- In
_get_sync_mode, the non‑forced branch always returnsSyncMode.incremental, even if the stream only advertisesfull_refresh(or has nosupported_sync_modes), so we’re not actually “using incremental when supported” — we’re just always using incremental.This preserves previous behavior, but it means:
- Streams that only support
full_refresh(or don’t list modes) will still be configured as incremental.- The docstring oversells what the default branch does.
Would you consider either:
- Making the default path respect
supported_sync_modes(while keeping a final incremental fallback for full backward compatibility), e.g.:- def _get_sync_mode(stream: AirbyteStream) -> SyncMode: - """Determine the sync mode for a stream based on force_full_refresh and support.""" - # Use getattr to handle mocks or streams without supported_sync_modes attribute - supported_modes = getattr(stream, "supported_sync_modes", None) - - if force_full_refresh: - # When force_full_refresh is True, prefer full_refresh if supported - if supported_modes and SyncMode.full_refresh in supported_modes: - return SyncMode.full_refresh - # Fall back to incremental if full_refresh is not supported - return SyncMode.incremental - - # Default behavior: preserve previous semantics (always incremental) - return SyncMode.incremental + def _get_sync_mode(stream: AirbyteStream) -> SyncMode: + """Determine the sync mode for a stream based on force_full_refresh and support.""" + # Use getattr to handle mocks or streams without supported_sync_modes attribute + supported_modes = getattr(stream, "supported_sync_modes", None) or [] + + if force_full_refresh: + # When force_full_refresh is True, prefer full_refresh if supported + if SyncMode.full_refresh in supported_modes: + return SyncMode.full_refresh + if SyncMode.incremental in supported_modes: + return SyncMode.incremental + + # If the stream doesn’t advertise modes, keep historical behaviour. + return SyncMode.incremental + + # Default behaviour when not forcing full refresh: + if SyncMode.incremental in supported_modes: + return SyncMode.incremental + if SyncMode.full_refresh in supported_modes: + return SyncMode.full_refresh + + # Fallback for streams that don’t advertise modes (keeps old semantics). + return SyncMode.incremental
- Or, if you explicitly want to avoid changing default semantics in this PR, tightening the docstring/comment to say that the non‑forced branch always uses incremental (regardless of
supported_sync_modes) so readers aren’t surprised?Wdyt?
Also applies to: 449-462, 469-488
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte/sources/base.py(4 hunks)
🧰 Additional context used
🧠 Learnings (1)
📚 Learning: 2024-10-08T15:34:31.026Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-10-08T15:34:31.026Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/sources/base.py
🧬 Code graph analysis (1)
airbyte/sources/base.py (2)
airbyte/strategies.py (1)
destination_sync_mode(80-92)airbyte/shared/catalog_providers.py (1)
CatalogProvider(31-226)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (No Creds)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte/sources/base.py (1)
738-755: Propagation offorce_full_refreshinto iterator and state handling looks solidWiring
force_full_refreshthrough toget_configured_catalog(...)and explicitly passingstate=Nonewhen it’s True matches the intended semantics (connector seesfull_refreshwhere supported and no state). This should address the Postgres “incremental without cursor” case for both multi‑stream and single‑stream reads from this iterator.I don’t see any issues in this block; keeping the flag keyword‑only also preserves backwards compatibility for existing callers. 👍 Wdyt?
…force_full_refresh) Co-Authored-By: AJ Steers <aj@airbyte.io>
SyncMode.full_refresh when force_full_refresh=True
SyncMode.full_refresh when force_full_refresh=TrueSyncMode.full_refresh when force_full_refresh=True
There was a problem hiding this comment.
Actionable comments posted: 1
📜 Review details
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte/destinations/base.py(1 hunks)airbyte/sources/base.py(5 hunks)
🚧 Files skipped from review as they are similar to previous changes (1)
- airbyte/destinations/base.py
🧰 Additional context used
🧠 Learnings (3)
📚 Learning: 2024-07-09T06:42:41.304Z
Learnt from: Suraj-Vishwakarma70
Repo: airbytehq/PyAirbyte PR: 285
File: airbyte/sources/base.py:0-0
Timestamp: 2024-07-09T06:42:41.304Z
Learning: Ensure consistent naming for attributes in the `Source` class in `airbyte/sources/base.py`, such as renaming `_to_be_selected_stream` to `_to_be_selected_streams`.
Applied to files:
airbyte/sources/base.py
📚 Learning: 2024-10-11T22:05:15.550Z
Learnt from: aaronsteers
Repo: airbytehq/PyAirbyte PR: 411
File: airbyte/cli.py:26-26
Timestamp: 2024-10-11T22:05:15.550Z
Learning: In the PyAirbyte project, when reviewing Python code and encountering a TODO comment without an issue link, I should post a friendly reminder to resolve it before merging, instead of suggesting to add an issue link.
Applied to files:
airbyte/sources/base.py
📚 Learning: 2024-08-31T00:58:32.484Z
Learnt from: aaronsteers
Repo: airbytehq/PyAirbyte PR: 347
File: tests/integration_tests/fixtures/registry.json:47-47
Timestamp: 2024-08-31T00:58:32.484Z
Learning: When reviewing changes in test fixtures, especially renaming, consider that they might be due to fixing copy-paste errors and may not impact core codepaths.
Applied to files:
airbyte/sources/base.py
🧬 Code graph analysis (1)
airbyte/sources/base.py (5)
airbyte/cloud/sync_results.py (1)
streams(473-481)airbyte/caches/base.py (1)
streams(326-334)airbyte/results.py (1)
streams(90-95)airbyte/strategies.py (1)
destination_sync_mode(80-92)airbyte/shared/catalog_providers.py (1)
CatalogProvider(31-226)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
- GitHub Check: Pytest (All, Python 3.10, Windows)
- GitHub Check: Pytest (All, Python 3.11, Windows)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
- GitHub Check: Pytest (No Creds)
🔇 Additional comments (3)
airbyte/sources/base.py (3)
423-424: Nice addition! The parameter and documentation look good.The keyword-only
force_full_refreshparameter with a default value preserves backward compatibility while enabling the new behavior. The docstring clearly explains when full_refresh vs incremental mode will be used.Also applies to: 432-434
749-752: Correct propagation of the parameter!The
force_full_refreshparameter is properly passed toget_configured_catalog()here, ensuring the catalog configuration respects the refresh mode.
894-896: Good! The catalog provider now respects force_full_refresh.This ensures that when
read()is called withforce_full_refresh=True, the catalog passed to the cache reflects the correct sync mode.
SyncMode.full_refresh when force_full_refresh=TrueSyncMode.full_refresh when force_full_refresh=True
abf317a
into
main
Summary
Fixes #772 - When
force_full_refresh=Truewas specified, the configured catalog was still being built withsync_mode=SyncMode.incremental, causing connectors like Postgres to expect cursor fields even during full refresh operations.The root cause was that
get_configured_catalog()always hardcodedsync_mode=SyncMode.incrementalregardless of theforce_full_refreshparameter. The parameter only affected whether state was passed to the connector, not the actual sync mode in the catalog.Historic Context
Traditionally, "
full_refresh" sync mode would sometimes not generate incrementalSTATEartifacts as output, and the workaround was to implement "full refresh" by simply not sending anySTATEartifacts as input. I've researched our latest Python and Kotlin CDKs, and it seems our newer CDK implementations appear to properly generateSTATEartifacts in both incremental and full-refresh sync modes.Changes
force_full_refreshparameter toget_configured_catalog()read(),_get_airbyte_message_iterator(),Destination.write()) to pass the parameterReview & Testing Checklist for Human
force_full_refresh=True, streams should usefull_refreshif supported by the stream, otherwise fall back toincremental. Confirm this matches expected Airbyte protocol behavior.force_full_refresh=Trueagainst a connector that supports both sync modes (e.g., Postgres) and verify it no longer requests cursor fieldsget_configured_catalog()that should also passforce_full_refresh(grep showscaches/base.py:409also calls it but doesn't have access toforce_full_refreshcontext)Notes
Summary by CodeRabbit
✏️ Tip: You can customize this high-level summary in your review settings.
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.
Note
Auto-merge may have been disabled. Please check the PR status to confirm.
Important
Auto-merge enabled.
This PR is set to merge automatically when all requirements are met.