Skip to content

feat(low-code CDK): Add ConditionalStreams component to low-code framework#592

Merged
Brian Lai (brianjlai) merged 3 commits intomainfrom
brian/conditional_streams
Jun 12, 2025
Merged

feat(low-code CDK): Add ConditionalStreams component to low-code framework#592
Brian Lai (brianjlai) merged 3 commits intomainfrom
brian/conditional_streams

Conversation

@brianjlai
Copy link
Contributor

@brianjlai Brian Lai (brianjlai) commented Jun 10, 2025

Resolves https://github.com/airbytehq/airbyte-internal-issues/issues/13285

What

source-tiktok-marketing has a special piece of required functionality where only a smaller subset of streams are exposed to customers when they are using a sandbox account.

There is currently no way to do this in the CDK or using custom components, so this change allows for certain streams to be emitted based on a condition of the incoming config.

How

I introduced a new component called ConditionalStreams which is comprised of a condition and a set of streams that will be available to users if the condition is met. And now the original streams field at the top level of the manifest takes in a list of streams or ConditionalStreams.

Summary by CodeRabbit

  • New Features

    • Added support for conditional stream groups, allowing streams to be included or excluded based on configuration values. This enables dynamic stream selection in declarative source configurations.
  • Tests

    • Introduced new tests to validate correct behavior and error handling for conditional stream groups.
  • Chores

    • Updated type checking configuration to improve compatibility with certain libraries.

@github-actions github-actions bot added the enhancement New feature or request label Jun 10, 2025
@github-actions
Copy link

github-actions bot commented Jun 10, 2025

PyTest Results (Fast)

3 660 tests  +4   3 650 ✅ +4   5m 44s ⏱️ -8s
    1 suites ±0      10 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit f76db7b. ± Comparison against base commit e44362a.

♻️ This comment has been updated with latest results.

@github-actions
Copy link

github-actions bot commented Jun 10, 2025

PyTest Results (Full)

3 663 tests  +4   3 653 ✅ +4   17m 13s ⏱️ -6s
    1 suites ±0      10 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit f76db7b. ± Comparison against base commit e44362a.

♻️ This comment has been updated with latest results.

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Jun 10, 2025

📝 Walkthrough

Walkthrough

This update introduces a new ConditionalStreams schema and model to enable conditional inclusion of stream groups in declarative source manifests. The manifest parsing and stream configuration logic are updated to evaluate these conditions at runtime. Tests are added to verify correct behavior and validation. Minor type hinting and dependency updates are also included.

Changes

File(s) Change Summary
.pre-commit-config.yaml Updated mypy hook to install types-requests and types-PyYAML for improved type checking.
airbyte_cdk/sources/declarative/declarative_component_schema.yaml Added ConditionalStreams schema definition; updated DeclarativeSource to allow ConditionalStreams in its streams array.
airbyte_cdk/sources/declarative/models/declarative_component_schema.py Added ConditionalStreams Pydantic model; updated DeclarativeSource1 and DeclarativeSource2 to support ConditionalStreams in the streams field.
airbyte_cdk/sources/declarative/manifest_declarative_source.py Updated stream config logic to handle ConditionalStreams, evaluating their conditions and including nested streams accordingly; updated method signatures to pass config.
airbyte_cdk/sources/declarative/concurrent_declarative_source.py Updated _group_streams to call _stream_configs with both manifest and config arguments for consistency with new signature.
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py Added explicit str type hints to local variables in a closure; no logic changes.
unit_tests/sources/declarative/test_manifest_declarative_source.py Added tests for ConditionalStreams inclusion/exclusion and validation errors when required fields are missing.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant ManifestDeclarativeSource
    participant ConditionalStreams
    participant DeclarativeStream

    User->>ManifestDeclarativeSource: streams(config)
    ManifestDeclarativeSource->>ManifestDeclarativeSource: _stream_configs(manifest, config)
    alt Stream is ConditionalStreams
        ManifestDeclarativeSource->>ConditionalStreams: Evaluate condition with config
        alt Condition is True
            ConditionalStreams->>ManifestDeclarativeSource: Return nested streams
        else Condition is False
            ConditionalStreams->>ManifestDeclarativeSource: Return empty
        end
    else Stream is DeclarativeStream
        ManifestDeclarativeSource->>DeclarativeStream: Add to stream list
    end
    ManifestDeclarativeSource-->>User: List of enabled streams
Loading

Suggested reviewers

  • maxi297
  • darynaishchenko

Would you like to add more sample manifests demonstrating ConditionalStreams usage for documentation, wdyt?


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 762f23c and f76db7b.

📒 Files selected for processing (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1 hunks)
  • unit_tests/sources/declarative/test_manifest_declarative_source.py (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • unit_tests/sources/declarative/test_manifest_declarative_source.py
⏰ Context from checks skipped due to timeout of 90000ms (9)
  • GitHub Check: Check: 'source-pokeapi' (skip=false)
  • GitHub Check: Check: 'source-hardcoded-records' (skip=false)
  • GitHub Check: Check: 'source-amplitude' (skip=false)
  • GitHub Check: Check: 'source-shopify' (skip=false)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Analyze (python)
✨ Finishing Touches
  • 📝 Generate Docstrings

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai explain this code block.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and explain its main purpose.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai generate sequence diagram to generate a sequence diagram of the changes in this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🔭 Outside diff range comments (1)
unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

4806-4911: ⚠️ Potential issue

Broken YAML → the test will explode before it even runs

The YAML embedded in test_conditional_streams is not valid:

  1. Several nested keys are indented at the same level as their parent (type, page_size_option, record_selector, …), so the parser will raise a ScannerError.
  2. The retriever block is missing a required type: SimpleRetriever.
  3. requester is missing its own type: HttpRequester.
  4. record_selector/extractor are mis-nested under requester.

Because _parse() uses yaml.safe_load, the test will fail long before the CDK logic is exercised.

Could we patch the snippet like this, wdyt?

-  retriever:
-    paginator:
-    type: "DefaultPaginator"
+  retriever:
+    type: SimpleRetriever
+    paginator:
+      type: "DefaultPaginator"
       page_size_option:
         type: RequestOption
         inject_into: request_parameter
         field_name: page_size
@@
-    requester:
-      url_base: "https://testing.com"
-      path: "/api/v1/lists"
+    requester:
+      type: HttpRequester
+      url_base: "https://testing.com"
+      path: "/api/v1/lists"
@@
-      record_selector:
-      extractor:
-        field_path: ["result"]
+    record_selector:
+      type: RecordSelector
+      extractor:
+        type: DpathExtractor
+        field_path: ["result"]

The same fixes apply to the conditions_stream definition further down. Once the YAML parses, the rest of the assertions should execute as intended.

♻️ Duplicate comments (1)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2206-2208: Same parity question for DeclarativeSource2

Same comment as above: aligning allowed inner types would keep the two “streams” shapes interchangeable no matter the nesting depth, wdyt?

🧹 Nitpick comments (10)
.pre-commit-config.yaml (1)

77-77: Great addition of type stub packages!

Adding types-requests and types-PyYAML will improve mypy coverage for those libraries. Could we alphabetize the list ("types-PyYAML", "types-requests") for consistency as we grow this list? wdyt?

airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

428-457: Align sub-schema streams items with root streams anyOf?
Currently, ConditionalStreams.streams.items only references DeclarativeStream. Should we expand this to an anyOf matching the root streams (e.g., including StateDelegatingStream or even nested ConditionalStreams) to maintain consistency? wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (4)

159-161: Minor ordering nit — keep import list alphabetised?

The bulk of this file keeps the from … import blocks alphabetised. Placing ConditionalStreams after ConcurrencyLevel breaks that visual order slightly. Would you consider moving it to preserve the alphabetical grouping, or is there a reason to keep it here, wdyt?


662-663: Mapping entry position

Same nitpick as above – inserting the new entry right after ConcurrencyLevelModel works, but it slightly disturbs the (mostly) alphabetic order of the long mapping. Re-ordering keeps future merges cleaner. Up to you if you find that worthwhile, wdyt?


1626-1641: Consider propagating $parameters to nested streams

create_conditional_streams is straightforward – it avoids instantiating nested streams when the condition is false. One thought: when the condition is true you delegate to _create_component_from_model(stream, …, **kwargs), but you don’t forward model.parameters. If a parent ConditionalStreams block is meant to share parameters with all its child streams, passing them might be helpful:

-                self._create_component_from_model(stream, config=config, **kwargs)
+                self._create_component_from_model(
+                    stream,
+                    config=config,
+                    parameters={**(model.parameters or {}), **kwargs.get("parameters", {})},
+                    **{k: v for k, v in kwargs.items() if k != "parameters"},
+                )

That keeps child-stream behaviour consistent with other composite components (e.g., AddFields). What do you think?


3173-3182: Type annotations could be Optional[str]

model.requester.url or url_base can be None, so annotating _url / _url_base as plain str will still silence mypy but doesn’t convey potential None. Would switching to Optional[str] (or str | None in 3.10+) be clearer?

-            _url: str = (
+            _url: Optional[str] = (
...
-            _url_base: str = (
+            _url_base: Optional[str] = (

(Mypy won’t complain because you immediately coalesce, but the annotation better matches reality.)

unit_tests/sources/declarative/test_manifest_declarative_source.py (3)

1149-1274: Could we factor the gigantic manifest into a small helper or reuse the existing _declarative_stream fixture to stay DRY?

The in-line manifest repeats >200 lines of boiler-plate that already live in helpers above. Extracting only the few differences (students vs classrooms/clubs, plus the ConditionalStreams wrapper) into a local fixture or helper would dramatically shrink the test and make the intent pop out, while reducing maintenance burden when the default stream template changes – wdyt?


1278-1282: Are the repeated “module is in sys.modules” assertions still useful here?

These four asserts were valuable in an earlier test to prove the monkey-patching worked, but they don’t add new coverage in this scenario and make the test longer. Removing them would keep the focus on the conditional-stream logic – what do you think?


1283-1300: Shall we assert absence of conditional streams when is_sandbox is False?

For the False case we only check the list length. Adding

assert all(s.name not in {"classrooms", "clubs"} for s in actual_streams)

would protect against an ordering bug accidentally giving length 1 but still returning a wrong stream, strengthening the test with minimal noise – wdyt?

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2287-2301: Consider supporting nested ConditionalStreams or StateDelegatingStream

Nice addition! Two small follow-ups you may want to ponder:

  1. Nested conditionals – allowing streams: List[Union[ConditionalStreams, DeclarativeStream, StateDelegatingStream]] would let authors build more complex tree-like conditions without extra boilerplate.

  2. streams description says “during an operation”, maybe change to “during a sync” for consistency with other docs.

Would expanding the union and tweaking the phrasing make sense here, wdyt?

🧰 Tools
🪛 Pylint (3.3.7)

[refactor] 2287-2287: Too few public methods (0/2)

(R0903)

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e44362a and 5d20ab9.

📒 Files selected for processing (7)
  • .pre-commit-config.yaml (1 hunks)
  • airbyte_cdk/sources/declarative/declarative_component_schema.yaml (2 hunks)
  • airbyte_cdk/sources/declarative/manifest_declarative_source.py (4 hunks)
  • airbyte_cdk/sources/declarative/models/declarative_component_schema.py (5 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5 hunks)
  • unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (2 hunks)
  • unit_tests/sources/declarative/test_manifest_declarative_source.py (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (5)
airbyte_cdk/sources/declarative/interpolation/interpolated_boolean.py (2)
  • InterpolatedBoolean (29-66)
  • eval (45-66)
airbyte_cdk/sources/declarative/interpolation/interpolated_string.py (1)
  • eval (35-55)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (18)
  • ConditionalStreams (2287-2300)
  • Config (136-137)
  • Config (150-151)
  • Config (164-165)
  • Config (178-179)
  • Config (196-197)
  • Config (210-211)
  • Config (224-225)
  • Config (238-239)
  • Config (252-253)
  • Config (266-267)
  • Config (280-281)
  • Config (294-295)
  • Config (310-311)
  • Config (324-325)
  • Config (338-339)
  • Config (372-373)
  • DeclarativeStream (2328-2397)
airbyte_cdk/sources/declarative/declarative_stream.py (1)
  • DeclarativeStream (32-241)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
  • streams (295-343)
🪛 Pylint (3.3.7)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py

[refactor] 2287-2287: Too few public methods (0/2)

(R0903)

🔇 Additional comments (5)
airbyte_cdk/sources/declarative/declarative_component_schema.yaml (1)

27-31: Great addition of ConditionalStreams support in root streams
The anyOf for top-level streams now includes the new ConditionalStreams schema, enabling dynamic stream grouping based on config, wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

108-108: Import looks good

Nice addition – the factory now has direct access to InterpolatedBoolean.

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py (1)

62-62: Import looks fine 👍

The extra import of ConditionalStreams is in-line with the new feature and keeps the import grouping intact; nothing to change here.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)

2959-2960: update_forward_refs order looks safe

ConditionalStreams.update_forward_refs() is invoked after all model classes are defined, so forward references resolve fine. Looks good to me.

airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)

386-406: Good handling of ConditionalStreamsModel!

The implementation correctly extracts nested streams from ConditionalStreamsModel and applies cache configuration to them. The use of an empty list as default for stream_config.get("streams") or [] is a nice defensive programming touch.

Copy link
Contributor

@pnilan Patrick Nilan (pnilan) left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lgtm

@brianjlai Brian Lai (brianjlai) merged commit 5797a2f into main Jun 12, 2025
30 checks passed
@brianjlai Brian Lai (brianjlai) deleted the brian/conditional_streams branch June 12, 2025 21:16
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants