-
Notifications
You must be signed in to change notification settings - Fork 32
feat: support CustomRetriever with partition router #753
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: support CustomRetriever with partition router #753
Conversation
👋 Greetings, Airbyte Team Member!Here are some helpful tips and reminders for your convenience. Testing This CDK VersionYou can test this version of the CDK using the following: # Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/support_custom_retriever_with_partition_router#egg=airbyte-python-cdk[dev]' --help
# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/support_custom_retriever_with_partition_routerHelpful ResourcesPR Slash CommandsAirbyte Maintainers can execute the following slash commands on your PR:
|
|
/autofix
|
📝 WalkthroughPre-merge checks (2 passed, 1 warning)❌ Failed checks (1 warning)
✅ Passed checks (2 passed)
Tip 👮 Agentic pre-merge checks are now available in preview!Pro plan users can now enable pre-merge checks in their settings to enforce checklists before merging PRs.
Example: reviews:
pre_merge_checks:
custom_checks:
- name: "Undocumented Breaking Changes"
mode: "warning"
instructions: |
Pass/fail criteria: All breaking changes to public APIs, CLI flags, environment variables, configuration keys, database schemas, or HTTP/GraphQL endpoints must be documented in the "Breaking Change" section of the PR description and in CHANGELOG.md. Exclude purely internal or private changes (e.g., code not exported from package entry points or explicitly marked as internal).Please share your feedback with us on this Discord post. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (5)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
2903-2910: stream_name kw-only param not forwarded — intentional?create_parent_stream_config accepts stream_name but doesn’t forward it to the nested stream creation. It’s probably fine because create_default_stream passes its own model.name to slicer construction, but if you intended the child’s name to flow further, should we explicitly forward stream_name here too, or is the current behavior desired? wdyt?
3738-3745: Minor: avoid duplicate get_stream_state callsYou already fetched child_state just above; has_parent_state could reuse it to avoid a second lookup:
- has_parent_state = bool( - self._connector_state_manager.get_stream_state(stream_name, None) - if model.incremental_dependency - else False - ) + has_parent_state = bool(child_state) if model.incremental_dependency else FalseTiny, but trims one map lookup. wdyt?
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (3)
995-1061: Strengthen the assertion by validating parent_stream_configs tooYou already assert the correct slicer type. Adding a couple of sanity checks on the constructed parent stream helps prove end-to-end wiring of the nested components, especially for the CustomRetriever path. Wdyt?
@@ assert isinstance(stream, DefaultStream) assert isinstance(stream._stream_partition_generator._stream_slicer, SubstreamPartitionRouter) + + # Extra validation: the router actually contains a parent stream config + partition_router = stream._stream_partition_generator._stream_slicer + parent_stream_configs = partition_router.parent_stream_configs + assert len(parent_stream_configs) == 1 + assert isinstance(parent_stream_configs[0].stream, DefaultStream)
1111-1114: Make the exception assertion less brittleAsserting the full error string is fragile across Python versions/implementations. Matching the relevant fragment keeps intent while improving stability. Wdyt?
- with pytest.raises(AttributeError) as e: - list(stream.generate_partitions()) - assert e.value.args[0] == "'RecordSelector' object has no attribute 'stream_slices'" + with pytest.raises(AttributeError, match="has no attribute 'stream_slices'"): + list(stream.generate_partitions())
1186-1189: Prefer public API in assertions for consistency and resilienceElsewhere we assert on
stream.cursorfor incremental+partitioned streams. Using the public attribute here avoids depending on private internals and aligns with nearby tests. Wdyt?- assert isinstance( - stream._stream_partition_generator._stream_slicer, ConcurrentPerPartitionCursor - ) + assert isinstance(stream.cursor, ConcurrentPerPartitionCursor)
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(7 hunks)unit_tests/sources/declarative/parsers/test_model_to_component_factory.py(1 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)
SubstreamPartitionRouter(107-403)airbyte_cdk/sources/declarative/yaml_declarative_source.py (2)
YamlDeclarativeSource(17-69)_parse(62-69)airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
create_component(789-822)airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
ConcurrentPerPartitionCursor(106-646)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (18)
Config(136-137)Config(150-151)Config(164-165)Config(178-179)Config(192-193)Config(206-207)Config(220-221)Config(234-235)Config(248-249)Config(262-263)Config(276-277)Config(292-293)Config(306-307)Config(320-321)Config(354-355)Config(378-379)SubstreamPartitionRouter(2949-2956)GroupingPartitionRouter(2959-2979)airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)
SubstreamPartitionRouter(107-403)get_stream_state(376-399)airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (2)
get_stream_state(143-145)GroupingPartitionRouter(13-145)airbyte_cdk/sources/connector_state_manager.py (1)
get_stream_state(53-67)
🪛 GitHub Actions: Linters
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
[error] 2182-2182: Command 'poetry run mypy --config-file mypy.ini airbyte_cdk' failed: Returning Any from function declared to return "PartitionRouter" [no-any-return].
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (Fast)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
🔇 Additional comments (7)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (7)
1979-1982: Forwarding kwargs into slicer builder looks goodThis will let downstream constructors receive extra context.
2152-2162: Signature change (+kwargs) is fine, but mind typing at returnsAdding kwargs here is reasonable for threading context through nested custom components. See my other comment on ensuring typed return in the dict branch to satisfy mypy, wdyt?
2163-2167: CustomRetriever support: nice additionRecognizing CustomRetrieverModel in the retriever-type check is exactly what’s needed for this feature.
3710-3716: Keyword-only stream_name for SubstreamPartitionRouter: goodThis aligns with how nested components receive the stream identifier.
3721-3723: Correctly threading stream_name into parent configsPassing stream_name down ensures substreams can resolve state against the right child stream. LGTM.
4135-4146: GroupingPartitionRouter: stream_name propagation is correctForwarding stream_name to the underlying router is consistent with the other constructors.
2179-2188: Guard missing $parameters & tighten return type
Could you default “$parameters”, normalizestream_nameto str, and cast the nested component toPartitionRouter? For example:- elif isinstance(stream_slicer_model, dict): - stream_slicer_model["$parameters"]["stream_name"] = stream_name - return self._create_nested_component( - model, - "partition_router", - stream_slicer_model, - config, - **kwargs, - ) + elif isinstance(stream_slicer_model, dict): + params = stream_slicer_model.setdefault("$parameters", {}) + params["stream_name"] = stream_name or "" + return cast( + PartitionRouter, + self._create_nested_component( + model, + "partition_router", + stream_slicer_model, + config, + stream_name=stream_name or "", + **kwargs, + ), + )I couldn’t run mypy here—please verify this fixes the
no-any-returnerror and that CI passes. wdyt?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
2549-2553: Forward stream_name into DynamicSchemaLoader’s partition-router for consistencyPassing the name helps custom/dict routers that depend on it. Shall we thread it here too, wdyt?
- partition_router=self._build_stream_slicer_from_partition_router( - model.retriever, config - ), + partition_router=self._build_stream_slicer_from_partition_router( + model.retriever, config, stream_name=name + ),
3892-3894: Also pass stream_name to HTTP components resolver slicersThis keeps behavior uniform for CustomRetriever-backed routers. Worth aligning, wdyt?
- stream_slicer=self._build_stream_slicer_from_partition_router(model.retriever, config), + stream_slicer=self._build_stream_slicer_from_partition_router( + model.retriever, config, stream_name=stream_name + ), @@ - stream_slicer=self._build_stream_slicer_from_partition_router(model.retriever, config), + stream_slicer=self._build_stream_slicer_from_partition_router( + model.retriever, config, stream_name=stream_name + ),Also applies to: 3912-3915
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)
airbyte_cdk/sources/declarative/partition_routers/partition_router.py (2)
PartitionRouter(14-40)get_stream_state(22-40)airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (1)
get_stream_state(376-399)airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (1)
get_stream_state(143-145)airbyte_cdk/sources/connector_state_manager.py (1)
get_stream_state(53-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (Fast)
- GitHub Check: Manifest Server Docker Image Build
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)
1978-1983: LGTM: forwarding stream_name/kwargs into partition-router construction is the right moveThis should unblock CustomRetriever partitioned state scenarios. Nice.
2153-2166: LGTM: broadened retriever typing and kwargs passthroughRecognizing CustomRetrieverModel and accepting kwargs at this layer looks good and consistent with downstream factory signatures.
2903-2911: LGTM: keyword-only stream_name for ParentStreamConfigClearer API; reduces accidental arg ordering mistakes.
3710-3726: LGTM: keyword-only stream_name on SubstreamPartitionRouter and propagationThe explicit pass-through to the wrapper looks correct.
3740-3745: LGTM: derive has_parent_state directly from explicit stream_nameThis removes the kwargs coupling and matches the intent.
4135-4147: LGTM: propagate stream_name into GroupingPartitionRouter’s underlying routerGood consistency win. The guardrails on request options are also a nice touch.
brianjlai
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🚢
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
2179-2195: Avoid mutating the manifest dict and pass stream_name explicitly to the nested constructorTwo small hardening tweaks here:
- Don’t mutate the incoming dict; copy it before injecting $parameters.
- Pass stream_name explicitly to _create_nested_component; relying solely on $parameters makes None a silent footgun and complicates kw-only propagation, wdyt?
Non-breaking patch:
- elif isinstance(stream_slicer_model, dict): - # partition router comes from CustomRetrieverModel therefore has not been parsed as a model - params = stream_slicer_model.get("$parameters") - if not isinstance(params, dict): - params = {} - stream_slicer_model["$parameters"] = params - - if stream_name is not None: - params["stream_name"] = stream_name - - return self._create_nested_component( # type: ignore[no-any-return] # There is no guarantee that this will return a stream slicer. If not, we expect an AttributeError during the call to `stream_slices` - model, - "partition_router", - stream_slicer_model, - config, - **kwargs, - ) + elif isinstance(stream_slicer_model, dict): + # partition router comes from CustomRetrieverModel and has not been parsed as a model + router_def = dict(stream_slicer_model) # shallow copy to avoid mutating the manifest + params = dict(router_def.get("$parameters") or {}) + router_def["$parameters"] = params + if stream_name is not None: + params["stream_name"] = stream_name + return self._create_nested_component( # type: ignore[no-any-return] + model, + "partition_router", + router_def, + config, + stream_name=stream_name or "", + **kwargs, + )Optional follow-up (future-facing): would you consider asserting the returned component is a PartitionRouter and raising a clearer ValueError if not (instead of surfacing AttributeError later)? We can gate it behind an env/flag to avoid breaking current tests, wdyt?
🧹 Nitpick comments (3)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (3)
2180-2181: Tiny nit in the commentMaybe “and has not been parsed as a model” reads a bit smoother than “therefore has not been parsed as a model”, wdyt?
4158-4171: Suggest including offending parent info in GroupingPartitionRouter error
Including the names of the parent streams (for SubstreamPartitionRouter) or marking ListPartitionRouter in the raised ValueError can speed up debugging. If you pursue this, you’ll need to update the existing test matchers for the error message (e.g. in test_model_to_component_factory.py:4232). wdyt?
4142-4148: Propagate kw-onlystream_nameto all_build_stream_slicer_from_partition_routercalls
- In create_dynamic_schema_loader, add
stream_name=nameto both
partition_router=self._build_stream_slicer_from_partition_router(...)invocations.- In create_http_components_resolver, add
stream_name=stream_name or "__http_components_resolver"to both
stream_slicer=self._build_stream_slicer_from_partition_router(...)invocations.Wdyt?
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py(7 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
airbyte_cdk/sources/declarative/partition_routers/partition_router.py (2)
PartitionRouter(14-40)get_stream_state(22-40)airbyte_cdk/sources/declarative/models/declarative_component_schema.py (18)
Config(136-137)Config(150-151)Config(164-165)Config(178-179)Config(192-193)Config(206-207)Config(220-221)Config(234-235)Config(248-249)Config(262-263)Config(276-277)Config(292-293)Config(306-307)Config(320-321)Config(354-355)Config(378-379)SubstreamPartitionRouter(2949-2956)GroupingPartitionRouter(2959-2979)airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py (2)
SubstreamPartitionRouter(107-403)get_stream_state(376-399)airbyte_cdk/sources/declarative/partition_routers/grouping_partition_router.py (2)
get_stream_state(143-145)GroupingPartitionRouter(13-145)airbyte_cdk/sources/connector_state_manager.py (1)
get_stream_state(53-67)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (12)
- GitHub Check: Check: source-pokeapi
- GitHub Check: Check: destination-motherduck
- GitHub Check: Check: source-hardcoded-records
- GitHub Check: Check: source-intercom
- GitHub Check: Check: source-shopify
- GitHub Check: SDM Docker Image Build
- GitHub Check: Pytest (All, Python 3.13, Ubuntu)
- GitHub Check: Manifest Server Docker Image Build
- GitHub Check: Pytest (All, Python 3.12, Ubuntu)
- GitHub Check: Pytest (All, Python 3.11, Ubuntu)
- GitHub Check: Pytest (All, Python 3.10, Ubuntu)
- GitHub Check: Pytest (Fast)
🔇 Additional comments (6)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (6)
1978-1983: Forwarding kwargs into slicer-builder looks goodPropagating kwargs into _build_stream_slicer_from_partition_router keeps context flowing; nice.
2154-2166: Adding CustomRetrieverModel to the retriever-type check — good callThis aligns the factory with custom retrievers carrying partition routers.
3716-3723: Kw-only stream_name for SubstreamPartitionRouter — LGTMThis makes per-partition state derivation explicit.
3729-3730: Passing stream_name through the wrapper — LGTMThis ensures parent configs are created with the correct child context.
3748-3750: has_parent_state derivation is correctUsing the explicit stream_name avoids accidental dependency on kwargs plumbing.
2909-2911: Signature change safe—no external calls detected
Ripgrep across the repo found only the method definition and its wrapper invocation in model_to_component_factory.py (line 3776), with no direct calls elsewhere—requiringstream_namewon’t break external callers, wdyt?
What
With the deprecation of DeclarativeStream, it is not possible anymore to override some cursor logic in the Retriever like this. Hence, if a connector wants to have per partition state with a custom retriever, this is not possible.
How
Check if key
partition_routeris present in CustomRetriever and if so, generate the nested components.This has been tested on stream
ad_group_criterionwith source-google-ads by removing the__post_init__on CustomRetrievers and by fixing this line where theassociated_sliceneeded to be defined else the per partition cursor would fail here. The latter seems to be a breaking change but it honestly seems fair to me as without stream slice information, the per partition cursor can't work very well.Summary by CodeRabbit
New Features
Refactor
Tests