Skip to content

Conversation

@maxi297
Copy link
Contributor

@maxi297 maxi297 commented Aug 24, 2025

What

Basically #723 without what is not yet released on branch main in case we see the impact is more widespread than we though and we need to release this patch before Monday

Summary by CodeRabbit

  • New Features

    • Added configurable test limits for records, pages per slice, slices, and streams with sensible defaults.
    • Connector Builder now builds sources via a streamlined manifest-based flow with cache/retry controls.
  • Bug Fixes

    • Enforced record limits during test reads to prevent overconsumption.
    • Improved reliability of concurrent reads with better error propagation and cleanup.
    • More accurate state updates during concurrent processing.
  • Refactor

    • Simplified source creation and concurrent processing internals for improved stability and maintainability.

@github-actions github-actions bot added bug Something isn't working security labels Aug 24, 2025
@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This CDK Version

You can test this version of the CDK using the following:

# Run the CLI from this branch:
uvx 'git+https://github.com/airbytehq/airbyte-python-cdk.git@maxi297/fix-cursor-datetime-format-issue-if-urgency#egg=airbyte-python-cdk[dev]' --help

# Update a connector to use the CDK from this branch ref:
cd airbyte-integrations/connectors/source-example
poe use-cdk-branch maxi297/fix-cursor-datetime-format-issue-if-urgency

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /autofix - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test - Runs connector tests with the updated CDK
  • /poe build - Regenerate git-committed build artifacts, such as the pydantic models which are generated from the manifest JSON schema in YAML.
  • /poe <command> - Runs any poe command in the CDK environment

📝 Edit this welcome message.

@maxi297 maxi297 requested review from brianjlai and tolik0 August 24, 2025 14:17
@github-actions
Copy link

PyTest Results (Fast)

3 694 tests  +2   3 683 ✅ +2   6m 41s ⏱️ +20s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit f750fdf. ± Comparison against base commit 2b07f93.

This pull request removes 4 and adds 6 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_on_previous_partition
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_stops_reading
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_close_slice_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_read_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit_n_records_exceed_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_record_limit_exceeds_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_with_record_limit]
unit_tests.sources.streams.concurrent.test_concurrent_read_processor.TestConcurrentReadProcessor ‑ test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_when_process_partition_then_queue_records_and_exception_and_sentinel

@github-actions
Copy link

PyTest Results (Full)

3 697 tests  +2   3 686 ✅ +2   9m 35s ⏱️ -4s
    1 suites ±0      11 💤 ±0 
    1 files   ±0       0 ❌ ±0 

Results for commit f750fdf. ± Comparison against base commit 2b07f93.

This pull request removes 4 and adds 6 tests. Note that renamed tests count towards both.
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_on_previous_partition
unit_tests.sources.declarative.stream_slicers.test_declarative_partition_generator.StreamSlicerPartitionGeneratorTest ‑ test_max_records_reached_stops_reading
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_close_slice_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_from_read_when_process_partition_then_queue_records_and_exception_and_sentinel
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_default_record_limit[test_create_request_no_record_limit_n_records_exceed_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_record_limit_exceeds_max]
unit_tests.connector_builder.test_message_grouper ‑ test_get_grouped_messages_record_limit[test_create_request_with_record_limit]
unit_tests.sources.streams.concurrent.test_concurrent_read_processor.TestConcurrentReadProcessor ‑ test_given_exception_on_partition_complete_sentinel_then_yield_error_trace_message_and_stream_is_incomplete
unit_tests.sources.streams.concurrent.test_partition_reader.PartitionReaderTest ‑ test_given_exception_when_process_partition_then_queue_records_and_exception_and_sentinel

@coderabbitai
Copy link
Contributor

coderabbitai bot commented Aug 24, 2025

📝 Walkthrough

Walkthrough

Refactors connector builder to construct ManifestDeclarativeSource using ModelToComponentFactory with explicit TestLimits defaults. Simplifies concurrent runtime: local queue in read path, revised partition processing, improved per-stream error handling, and cursor updates. Removes record/slice limit plumbing from declarative slicers/partitions. Adjusts HTTP client cache constructor. Updates unit tests accordingly.

Changes

Cohort / File(s) Summary
Connector Builder handler and entrypoint
airbyte_cdk/connector_builder/connector_builder_handler.py, airbyte_cdk/connector_builder/main.py
Adds local TestLimits with new defaults and keys; get_limits returns TestLimits. create_source now takes (config, limits) and returns ManifestDeclarativeSource wired via ModelToComponentFactory; removes catalog/state and concurrency injection. Main updated to new signature and minor formatting.
Test reader utilities
airbyte_cdk/connector_builder/test_reader/helpers.py, airbyte_cdk/connector_builder/test_reader/message_grouper.py
Drops state-blob conversion; handle_current_slice sets state to latest message dict. Message grouper loop now caps by record limit, stopping consumption after limit.
Concurrent streaming runtime
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py, airbyte_cdk/sources/concurrent_source/concurrent_source.py, airbyte_cdk/sources/streams/concurrent/partition_reader.py, airbyte_cdk/sources/streams/concurrent/partition_reader.py, airbyte_cdk/sources/streams/concurrent/partitions/types.py, airbyte_cdk/sources/message/concurrent_repository.py (deleted), airbyte_cdk/sources/streams/concurrent/cursor.py, airbyte_cdk/sources/utils/slice_logger.py
Localizes queue to read(); removes queue from constructor. PartitionReader no longer requires cursor or logger; on error enqueues exception and failure sentinel. Processor updates per-record cursor.observe, handles completion with try/except/finally and emits traced errors; closes partition on success. Removes AirbyteMessage from QueueItem. Deletes ConcurrentMessageRepository. Removes cursor locking. Comment cleanup.
Declarative source and component factory
airbyte_cdk/sources/declarative/concurrent_declarative_source.py, airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
Simplifies ConcurrentDeclarativeSource: removes limits/migrate/normalize; accepts optional ModelToComponentFactory. Drops per-partition limits wiring. Removes MAX_SLICES const. Adds temporary retriever.cursor=None for SimpleRetriever path.
Declarative stream slicers and partitions
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py, airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py
Removes max_records_limit and slice_limit support and related decorators/counters. Constructor signatures simplified. Minor typing import expansion in test decorator module.
HTTP client
airbyte_cdk/sources/streams/http/http_client.py
Updates CachedLimiterSession instantiation to use sqlite_path positional argument.
Connector Builder tests
unit_tests/connector_builder/test_connector_builder_handler.py, unit_tests/connector_builder/test_message_grouper.py
Migrate to ManifestDeclarativeSource; adopt new TestLimits/defaults; add tests for record-limit behavior. Adjust create_source usage.
Concurrent runtime tests
unit_tests/sources/streams/concurrent/test_partition_reader.py, unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py, unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py, unit_tests/sources/streams/test_stream_read.py
Update to PartitionReader without cursor; assert partition close and traced errors; adjust ConcurrentSource init; drive state via on_record and cursor.close_partition.
Declarative tests
unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py, unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py, unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py, unit_tests/sources/declarative/test_concurrent_declarative_source.py, unit_tests/sources/declarative/retrievers/test_simple_retriever.py
Remove max_records_limit usage and related tests; switch ConcurrentDeclarativeSource to component_factory=ModelToComponentFactory(...); simplify assertions; minor import cleanup.

Sequence Diagram(s)

sequenceDiagram
  autonumber
  actor CLI as CLI
  participant Handler as connector_builder_handler
  participant Factory as ModelToComponentFactory
  participant Source as ManifestDeclarativeSource

  CLI->>Handler: get_limits(config)
  Handler-->>CLI: TestLimits
  CLI->>Handler: create_source(config, limits)
  Handler->>Factory: new Factory(disable_retries/cache, limits.max_pages_per_slice, limits.max_slices)
  Handler->>Source: new ManifestDeclarativeSource(manifest, component_factory=Factory)
  Source-->>CLI: source
Loading
sequenceDiagram
  autonumber
  participant CS as ConcurrentSource
  participant TP as ThreadPoolManager
  participant Q as Local Queue
  participant PR as PartitionReader
  participant CRP as ConcurrentReadProcessor
  participant Cur as Stream Cursor
  participant Repo as MessageRepository

  CS->>Q: create local Queue
  CS->>PR: PartitionReader(Q)
  CS->>CRP: start processor(PR, Q)
  loop partitions
    TP->>PR: process_partition(partition)
    PR->>Q: records from partition.read()
    alt success
      PR->>Q: PartitionCompleteSentinel(success)
    else error
      PR->>Q: StreamThreadException
      PR->>Q: PartitionCompleteSentinel(failure)
    end
  end
  CRP->>Q: consume items
  alt Record
    CRP->>Cur: observe(record)
    CRP->>Repo: emit(record/state/log)
  else PartitionCompleteSentinel
    alt success
      CRP->>Cur: close_partition(partition)
    else failure
      CRP->>Repo: emit AirbyteTracedException
    end
  end
  CRP-->>CS: done when all partitions complete and Q empty
Loading

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~75 minutes

Possibly related PRs

Suggested labels

bug

Suggested reviewers

  • brianjlai

Would you like me to call out any risky interaction points explicitly in the concurrent processing path (e.g., lock removal in cursor, queue lifecycle), so reviewers can focus on them first, wdyt?

Tip

🔌 Remote MCP (Model Context Protocol) integration is now available!

Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats.

✨ Finishing Touches
  • 📝 Generate Docstrings
🧪 Generate unit tests
  • Create PR with unit tests
  • Post copyable unit tests in a comment
  • Commit unit tests in branch maxi297/fix-cursor-datetime-format-issue-if-urgency

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share
🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

‼️ IMPORTANT
Auto-reply has been disabled for this repository in the CodeRabbit settings. The CodeRabbit bot will not respond to your replies unless it is explicitly tagged.

  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.

Support

Need help? Create a ticket on our support page for assistance with any issues or questions.

CodeRabbit Commands (Invoked using PR/Issue comments)

Type @coderabbitai help to get the list of available commands.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Status, Documentation and Community

  • Visit our Status Page to check the current availability of CodeRabbit.
  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 3

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (3)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (1)

270-276: isinstance with PEP 604 unions will raise at runtime in 3.10/3.11

Using isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor) is not supported in Python < 3.12 and will raise TypeError: isinstance() argument 2 cannot be a union. Can we switch to a tuple, wdyt?

-                        if not isinstance(cursor, ConcurrentCursor | ConcurrentPerPartitionCursor):
+                        if not isinstance(cursor, (ConcurrentCursor, ConcurrentPerPartitionCursor)):
                             # This should never happen since we instantiate ConcurrentCursor in
                             # model_to_component_factory.py
                             raise ValueError(
                                 f"Expected AsyncJobPartitionRouter stream_slicer to be of type ConcurrentCursor, but received{cursor.__class__}"
                             )
unit_tests/connector_builder/test_connector_builder_handler.py (1)

1273-1275: Assertion inspects the page dict, not the records array

pages[0] is a dict with keys records, request, response; len(page_records) counts keys, not records. Would you fix this to check len(page_records["records"]), wdyt?

-            page_records = output_data["slices"][0]["pages"][0]
-            assert len(page_records) == len(MOCK_RESPONSE["result"])
+            page_records = output_data["slices"][0]["pages"][0]
+            assert len(page_records["records"]) == len(MOCK_RESPONSE["result"])
airbyte_cdk/connector_builder/connector_builder_handler.py (1)

152-170: Bug: full_resolve_manifest mutates the source’s manifest (needs deep copy)

Shallow-copying the dict and then editing streams in place mutates the source’s internal manifest across calls. Can we deep copy before editing, wdyt?

-def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
+def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
     try:
-        manifest = {**source.resolved_manifest}
-        streams = manifest.get("streams", [])
+        manifest = copy.deepcopy(source.resolved_manifest)
+        streams = manifest.get("streams", [])
         for stream in streams:
             stream["dynamic_stream_name"] = None
@@
-        manifest["streams"] = streams
+        manifest["streams"] = streams
         return AirbyteMessage(

And add the missing import at the top of this module:

import copy

This will prevent state leakage between calls/tests. Wdyt?

🧹 Nitpick comments (28)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (2)

7-7: Consider trimming unused typing imports to keep the diff lean (and avoid potential F401).

Mapping, Optional, and Union aren't referenced in this file. Would you like to drop them for now and reintroduce when needed, wdyt?

-from typing import Any, Iterable, Mapping, Optional, Union
+from typing import Any, Iterable

10-10: Drop unused StreamState import unless a state-aware signature change is imminent.

StreamState isn't used here, and the abstract StreamSlicer.stream_slices() signature (see partitions/stream_slicer.py) still takes no state. Shall we remove it to avoid linter noise, and add it back when the signature evolves, wdyt?

-from airbyte_cdk.sources.types import StreamSlice, StreamState
+from airbyte_cdk.sources.types import StreamSlice
airbyte_cdk/sources/streams/http/http_client.py (2)

155-157: Constructor alignment looks good; tiny readability tweak: use a keyword for sqlite_path?

Passing sqlite_path positionally matches the updated CachedLimiterSession signature and fixes the previous mismatch. For forward-compatibility and readability, would you switch to a keyword arg so this won’t silently break if the constructor reorders params later, wdyt?

-            return CachedLimiterSession(
-                sqlite_path, backend=backend, api_budget=self._api_budget, match_headers=True
-            )
+            return CachedLimiterSession(
+                sqlite_path=sqlite_path, backend=backend, api_budget=self._api_budget, match_headers=True
+            )

156-156: Gitleaks “Generic API Key” is a false positive here; consider allowlisting this pattern

Static analysis flagged Line 156, but sqlite_path is a SQLite URI/path (e.g., file::memory:?cache=shared), not a secret. Would you add an allowlist/ignore for this specific pattern or variable name in your Gitleaks config to reduce noise, wdyt?

airbyte_cdk/sources/streams/concurrent/cursor.py (1)

240-246: Lock removal in close_partition: can we confirm single-threaded call semantics or add a minimal safeguard?

By removing the lock here, concurrent calls to close_partition across threads could interleave updates to self._concurrent_state["slices"] and the subsequent _merge_partitions/_emit_state_message, potentially dropping or duplicating slices if multiple partitions finish at roughly the same time. If the runtime guarantees that close_partition is invoked serially per stream (e.g., reader-side serialization), could we document that contract on the class/method to make it explicit? Alternatively, would you consider a lightweight serialization (e.g., per-stream executor) or a very narrow critical section just around the state mutation, if that contract can’t be guaranteed, wdyt?

As a micro-improvement to reduce TOCTOU shape checks, we could avoid the pre/post len() comparison and instead let _add_slice_to_state communicate whether it appended a slice:

-        slice_count_before = len(self._concurrent_state.get("slices", []))
-        self._add_slice_to_state(partition)
-        if slice_count_before < len(
-            self._concurrent_state["slices"]
-        ):  # only emit if at least one slice has been processed
+        did_append = self._add_slice_to_state(partition)  # return True if a slice was added
+        if did_append:  # only emit if at least one slice has been processed
             self._merge_partitions()
             self._emit_state_message()

I can follow up with a small refactor to make _add_slice_to_state return a bool and add a focused concurrency unit test that attempts two simultaneous close_partition calls to verify no slice loss—interested, wdyt?

unit_tests/sources/streams/test_stream_read.py (2)

620-629: Limit on_record invocations to actual records to make the test more robust.

actual_records can, in other scenarios, include AirbyteMessage instances (e.g., LOG/STATE). Feeding those into Record(data=...) works today, but couples the test to stream_data_to_airbyte_message’s polymorphism. Would you filter to plain dict payloads (or only RECORD messages) before calling handler.on_record, to reduce brittleness, wdyt?

-    for record in actual_records:
-        list(
-            handler.on_record(
-                Record(
-                    data=record,
-                    stream_name="__mock_stream",
-                )
-            )
-        )
+    # Only pass raw record dicts to on_record to mirror production flow
+    for data in [r for r in actual_records if isinstance(r, dict)]:
+        for _ in handler.on_record(Record(data=data, stream_name="__mock_stream")):
+            pass

633-640: Nice: direct cursor emission to verify combined state.

Using cursor.close_partition(mock_partition) to flush the state queue keeps this test focused and avoids over-mocking the source. LGTM. If we later change emission timing, we might convert this to asserting against the cursor.state for even less coupling—worth considering in a follow-up, wdyt?

airbyte_cdk/connector_builder/test_reader/helpers.py (2)

8-8: Add typing.cast for stricter typing (optional).

Since latest_state_message is now a Dict and StreamReadSlices.state expects a list of mappings, adding cast helps mypy/pyright in strict mode. Would you import cast here to keep annotations tight, wdyt?

-from typing import Any, Dict, List, Mapping, Optional
+from typing import Any, Dict, List, Mapping, Optional, cast

469-469: State now carries raw dicts: confirm downstream consumers tolerate non-AirbyteStateMessage.

Switching to state=[latest_state_message] moves away from AirbyteStateBlob/AirbyteStateMessage. Can we confirm connector_builder consumers expect raw dicts here (e.g., serialization/UI)? If strict typing is enabled, we can also add a cast to make that intent explicit:

-        state=[latest_state_message] if latest_state_message else [],
+        state=[cast(Dict[str, Any], latest_state_message)] if latest_state_message else [],

Alternatively, we could loosen the parameter type to Mapping[str, Any] to avoid concrete Dict typing. Happy to patch either way, wdyt?

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)

101-105: Constructor simplification reads well; please reconcile docstring vs. implementation.

Passing a retriever instance directly is simpler. However, the class docstring above still talks about a “retriever_factory” per thread for thread-safety. Could we update the docstring to clarify that callers are expected to provide a per-thread retriever instance (and that this factory reuses it for all slices within that thread), wdyt?

If there’s a chance this factory is shared across threads, we may want to explicitly document that it’s not thread-safe, or accept a callable to create thread-local retrievers. I can draft that docstring and (optionally) a thread-local wrapper if helpful.

unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)

36-41: DRY up factory creation in tests (optional).

The DeclarativePartitionFactory setup is repeated in multiple tests with identical arguments. Would you extract a small helper (e.g., _make_factory(retriever, message_repository)) to keep the tests concise and reduce future churn, wdyt?

+def _make_factory(retriever, message_repository):
+    return DeclarativePartitionFactory(
+        _STREAM_NAME,
+        _SCHEMA_LOADER,
+        retriever,
+        message_repository,
+    )
@@
-        partition_factory = DeclarativePartitionFactory(
-            _STREAM_NAME,
-            _SCHEMA_LOADER,
-            retriever,
-            message_repository,
-        )
+        partition_factory = _make_factory(retriever, message_repository)
@@
-        partition_factory = DeclarativePartitionFactory(
-            _STREAM_NAME,
-            _SCHEMA_LOADER,
-            retriever,
-            message_repository,
-        )
+        partition_factory = _make_factory(retriever, message_repository)
@@
-        partition_factory = DeclarativePartitionFactory(
-            _STREAM_NAME,
-            _SCHEMA_LOADER,
-            retriever,
-            message_repository,
-        )
+        partition_factory = _make_factory(retriever, message_repository)

Also, the first test asserts retriever reuse across slices. That matches the current implementation; do we also want a separate test to assert per-thread behavior (i.e., factory instances are not shared across threads), or is that covered elsewhere, wdyt?

Also applies to: 51-56, 70-74

unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)

50-55: Positional ctor order follows ConcurrentSource.__init__—consider guarding empty streams, wdyt?

I double-checked the constructor in airbyte_cdk/sources/concurrent_source/concurrent_source.py—it’s defined as

def __init__(
    self,
    threadpool: ThreadPoolManager,
    logger: logging.Logger,
    slice_logger: SliceLogger = …,
    message_repository: MessageRepository = …,
    …
) -> None:
    …

and our call in
unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py:50–55

threadpool_manager = ThreadPoolManager(threadpool, streams[0].logger)
concurrent_source = ConcurrentSource(
    threadpool_manager, streams[0].logger, NeverLogSliceLogger(), self._message_repository
)

matches the first four positional parameters exactly.

Would you like to fail fast on an empty streams list (to avoid an IndexError) with a guard? For example:

   self._message_repository = InMemoryMessageRepository()
-  threadpool_manager = ThreadPoolManager(threadpool, streams[0].logger)
+  if not streams:
+      raise ValueError("StreamFacadeBuilder requires at least one stream.")
+  threadpool_manager = ThreadPoolManager(threadpool, streams[0].logger)
   concurrent_source = ConcurrentSource(
       threadpool_manager,
       streams[0].logger,
       NeverLogSliceLogger(),
       self._message_repository
   )

No other mismatches found—everything else looks solid. wdyt?

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)

2089-2094: Consider swapping in a NoopCursor or only nulling for concurrent pipelines, wdyt?

Verification shows that SimpleRetriever.cursor is actively used across its code paths—close_slice, observe, the state getter/setter, and even in DeclarativeStream.is_resumable/get_cursor. Zeroing it out unconditionally will lead to unexpected NoneType errors whenever someone invokes a standalone SimpleRetriever outside of a DefaultStream context.

What if we…

  • Swap in a no-op implementation to preserve the cursor interface when we hand it off?
  • Or at least narrow the nulling to only those cases where we know DefaultStream is taking ownership (e.g. when cursor is a ConcurrentCursor)?

Proposed diff:

 # FIXME to be removed once we migrate everything to DefaultStream
-if isinstance(retriever, SimpleRetriever):
-    retriever.cursor = None
+if isinstance(retriever, SimpleRetriever) and isinstance(cursor, ConcurrentCursor):
+    # DefaultStream owns the cursor; replace with a no-op to keep SimpleRetriever safe.
+    retriever.cursor = NoopCursor()

— wdyt?

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)

361-364: Good call disabling HttpClient cache in tests

Passing component_factory=ModelToComponentFactory(disable_cache=True) prevents cross-test contamination via cached requests. To keep test isolation consistent, should we also pass the same component_factory in test_dynamic_schema_loader_manifest_flow at Lines 277-279 so both flows run with identical cache semantics, wdyt?

Suggested patch:

-    source = ConcurrentDeclarativeSource(
-        source_config=_MANIFEST, config=_CONFIG, catalog=None, state=None
-    )
+    source = ConcurrentDeclarativeSource(
+        source_config=_MANIFEST,
+        config=_CONFIG,
+        catalog=None,
+        state=None,
+        component_factory=ModelToComponentFactory(disable_cache=True),
+    )

Would you like me to scan the suite for other ConcurrentDeclarativeSource instantiations and open a follow-up patch to standardize cache disabling across tests?

unit_tests/connector_builder/test_message_grouper.py (2)

310-376: Record-limit behavior tests are solid; minor cleanup to tighten assertions

The cases for “requested > max” and “requested <= max” read well. Two small nits:

  • The enumerate() index isn’t used.
  • Computing total_records can be more direct.

Proposed tidy-up (no behavior change):

-        single_slice = actual_response.slices[0]
-        total_records = 0
-        for i, actual_page in enumerate(single_slice.pages):
-            total_records += len(actual_page.records)
+        single_slice = actual_response.slices[0]
+        total_records = sum(len(p.records) for p in single_slice.pages)

Also, shall we rename n_records to available_records for clarity, wdyt?


378-429: Default-record-limit test covers the “no explicit limit” path; consider asserting limit-reached for symmetry

To mirror the explicit-limit test and strengthen coverage, can we assert test_read_limit_reached here as well?

-    assert total_records == min([max_record_limit, n_records])
+    assert total_records == min([max_record_limit, n_records])
+    assert (total_records >= max_record_limit) == actual_response.test_read_limit_reached

Also the sum() simplification applies here too:

-    total_records = 0
-    for i, actual_page in enumerate(single_slice.pages):
-        total_records += len(actual_page.records)
+    total_records = sum(len(p.records) for p in single_slice.pages)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)

163-164: Guard cursor.observe failures to avoid tearing down the read loop

If observe raises, the whole read path will error. Should we align this with close_partition by catching, flagging, and emitting a sanitized error trace while still emitting the record, wdyt?

Proposed patch:

-            self._record_counter[stream.name] += 1
-            stream.cursor.observe(record)
+            self._record_counter[stream.name] += 1
+            try:
+                stream.cursor.observe(record)
+            except Exception as exception:
+                self._flag_exception(stream.name, exception)
+                yield AirbyteTracedException.from_exception(
+                    exception, stream_descriptor=StreamDescriptor(name=stream.name)
+                ).as_sanitized_airbyte_message()

If you’re open to this, I can add a unit test that simulates observe() raising and asserts we emit an ERROR trace plus continue with the record.

unit_tests/sources/streams/concurrent/test_partition_reader.py (2)

31-37: Assert sentinel success flag to make the test stricter

Since PartitionCompleteSentinel carries an is_successful flag, would we want to assert it here to catch regressions, wdyt?

-        while queue_item := self._queue.get():
-            if not isinstance(queue_item, PartitionCompleteSentinel):
+        while queue_item := self._queue.get():
+            if not isinstance(queue_item, PartitionCompleteSentinel):
                 pytest.fail("Only one PartitionCompleteSentinel is expected")
-            break
+            assert queue_item.is_successful is True
+            break

49-63: Strengthen the exception-path assertion to verify is_successful=False

Because PartitionCompleteSentinel.eq only compares partition identity, the test won’t fail if the success flag regresses. Shall we explicitly assert the flag, wdyt?

         assert queue_content == _RECORDS + [
             StreamThreadException(exception, partition.stream_name()),
             PartitionCompleteSentinel(partition),
         ]
+        # Also verify the completion sentinel marks the partition as unsuccessful
+        assert isinstance(queue_content[-1], PartitionCompleteSentinel)
+        assert queue_content[-1].is_successful is False
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (1)

307-354: Great addition covering close_partition exceptions; consider asserting is_done() raises

Since on_partition_complete_sentinel now flags the exception, is_done() should raise AirbyteTracedException. Adding the assertion would close the loop for this scenario, wdyt?

         assert list(map(lambda message: message.trace.type, messages)) == [
             TraceType.ERROR,
             TraceType.STREAM_STATUS,
         ]
         assert messages[1] == expected_status_message
+        # With an error flagged, the processor should report failure on completion
+        with pytest.raises(AirbyteTracedException):
+            handler.is_done()
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)

27-35: Docstring is misleading (talks about “partitions generated” and has a small typo)

This reader processes records, not “generates partitions”, and there’s a grammar nit (“something when wrong”). Would you align the docstring wording to the new behavior, wdyt?

 def process_partition(self, partition: Partition) -> None:
-    """
-    Process a partition and put the records in the output queue.
-    When all the partitions are added to the queue, a sentinel is added to the queue to indicate that all the partitions have been generated.
-
-    If an exception is encountered, the exception will be caught and put in the queue. This is very important because if we don't, the
-    main thread will have no way to know that something when wrong and will wait until the timeout is reached
+    """
+    Process a partition and put each record in the output queue.
+    When the partition finishes, enqueue a completion sentinel for that partition.
+
+    If an exception is encountered, enqueue the exception, then a failure sentinel.
+    This is very important because otherwise the main thread has no way to know that
+    something went wrong and will wait until the timeout is reached.
     ...
     """

39-45: Include slice context in raised exceptions for better triage

Right now we only propagate the stream name. Would you consider attaching the partition slice (e.g., partition.to_slice()) to the StreamThreadException (or emitting a slice log message right before pushing the exception) so downstream logs can pinpoint the failing slice, wdyt?

airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)

142-144: Minor: set comprehension for readability

Would you prefer a set comprehension here to avoid the intermediate list, wdyt?

-            concurrent_stream_names = set(
-                [concurrent_stream.name for concurrent_stream in concurrent_streams]
-            )
+            concurrent_stream_names = {s.name for s in concurrent_streams}

381-393: Accessing a protected attribute (_partition_router)

We’re reading ._partition_router directly. If this is intentional and stable, all good. Otherwise, shall we surface an accessor on the slicer (or pass the router down alongside the slicer) to avoid relying on a private attribute, wdyt?

airbyte_cdk/sources/concurrent_source/concurrent_source.py (3)

100-105: Local bounded queue is a good move

Using a local Queue[QueueItem] with a maxsize creates backpressure and removes shared-state coupling from the instance. Nice improvement. If we see sources with very large records, we might want to make maxsize configurable later. Wdyt?


139-147: Queue get() can block indefinitely; use timeout to respect DEFAULT_TIMEOUT_SECONDS

queue.get() without a timeout can hang forever if producers stall unexpectedly. Since we have DEFAULT_TIMEOUT_SECONDS, would you use a timed get and break when is_done() holds, wdyt?

-        while airbyte_message_or_record_or_exception := queue.get():
-            yield from self._handle_item(
-                airbyte_message_or_record_or_exception,
-                concurrent_stream_processor,
-            )
-            if concurrent_stream_processor.is_done() and queue.empty():
-                # all partitions were generated and processed. we're done here
-                break
+        while True:
+            try:
+                item = queue.get(timeout=1)
+            except Exception as exc:
+                # Prefer specific Empty, see import note below.
+                if concurrent_stream_processor.is_done() and queue.empty():
+                    break
+                continue
+            else:
+                yield from self._handle_item(item, concurrent_stream_processor)
+                if concurrent_stream_processor.is_done() and queue.empty():
+                    break

Outside this hunk, we’d also need to import Empty and catch it specifically:

from queue import Queue, Empty
...
            except Empty:
                if concurrent_stream_processor.is_done() and queue.empty():
                    break
                continue

Alternatively, pass self._timeout_seconds to get(timeout=...) if you prefer longer sleeps. Wdyt?


39-48: Timeout parameter is currently unused

timeout_seconds is accepted but not used anymore after switching to a local queue. Do you want to either wire it into _consume_from_queue (see previous comment) or drop it from the API to avoid confusion, wdyt?

airbyte_cdk/connector_builder/connector_builder_handler.py (1)

48-56: Defaults overshadow explicit zeros due to or

Using x = command_config.get(KEY) or DEFAULT means an explicit 0 becomes the default. If 0 should be allowed (e.g., “no limit”), shall we switch to “if key in dict” semantics, wdyt?

-    max_pages_per_slice = (
-        command_config.get(MAX_PAGES_PER_SLICE_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
-    )
-    max_slices = command_config.get(MAX_SLICES_KEY) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
-    max_records = command_config.get(MAX_RECORDS_KEY) or DEFAULT_MAXIMUM_RECORDS
-    max_streams = command_config.get(MAX_STREAMS_KEY) or DEFAULT_MAXIMUM_STREAMS
+    max_pages_per_slice = command_config.get(
+        MAX_PAGES_PER_SLICE_KEY, DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
+    )
+    max_slices = command_config.get(MAX_SLICES_KEY, DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
+    max_records = command_config.get(MAX_RECORDS_KEY, DEFAULT_MAXIMUM_RECORDS)
+    max_streams = command_config.get(MAX_STREAMS_KEY, DEFAULT_MAXIMUM_STREAMS)
📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

💡 Knowledge Base configuration:

  • MCP integration is disabled by default for public repositories
  • Jira integration is disabled by default for public repositories
  • Linear integration is disabled by default for public repositories

You can enable these sources in your CodeRabbit configuration.

📥 Commits

Reviewing files that changed from the base of the PR and between 2b07f93 and f750fdf.

📒 Files selected for processing (27)
  • airbyte_cdk/connector_builder/connector_builder_handler.py (3 hunks)
  • airbyte_cdk/connector_builder/main.py (1 hunks)
  • airbyte_cdk/connector_builder/test_reader/helpers.py (2 hunks)
  • airbyte_cdk/connector_builder/test_reader/message_grouper.py (1 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (3 hunks)
  • airbyte_cdk/sources/concurrent_source/concurrent_source.py (5 hunks)
  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2 hunks)
  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2 hunks)
  • airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1 hunks)
  • airbyte_cdk/sources/message/concurrent_repository.py (0 hunks)
  • airbyte_cdk/sources/streams/concurrent/cursor.py (1 hunks)
  • airbyte_cdk/sources/streams/concurrent/partition_reader.py (2 hunks)
  • airbyte_cdk/sources/streams/concurrent/partitions/types.py (1 hunks)
  • airbyte_cdk/sources/streams/http/http_client.py (1 hunks)
  • airbyte_cdk/sources/utils/slice_logger.py (0 hunks)
  • unit_tests/connector_builder/test_connector_builder_handler.py (21 hunks)
  • unit_tests/connector_builder/test_message_grouper.py (1 hunks)
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py (0 hunks)
  • unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1 hunks)
  • unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (2 hunks)
  • unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (4 hunks)
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py (0 hunks)
  • unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1 hunks)
  • unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (5 hunks)
  • unit_tests/sources/streams/concurrent/test_partition_reader.py (3 hunks)
  • unit_tests/sources/streams/test_stream_read.py (2 hunks)
💤 Files with no reviewable changes (4)
  • airbyte_cdk/sources/message/concurrent_repository.py
  • unit_tests/sources/declarative/incremental/test_concurrent_perpartitioncursor.py
  • unit_tests/sources/declarative/test_concurrent_declarative_source.py
  • airbyte_cdk/sources/utils/slice_logger.py
🧰 Additional context used
🧠 Learnings (4)
📚 Learning: 2024-11-15T01:04:21.272Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/cli/source_declarative_manifest/_run.py:62-65
Timestamp: 2024-11-15T01:04:21.272Z
Learning: The files in `airbyte_cdk/cli/source_declarative_manifest/`, including `_run.py`, are imported from another repository, and changes to these files should be minimized or avoided when possible to maintain consistency.

Applied to files:

  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
  • unit_tests/connector_builder/test_connector_builder_handler.py
📚 Learning: 2025-01-14T00:20:32.310Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py:1093-1102
Timestamp: 2025-01-14T00:20:32.310Z
Learning: In the `airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py` file, the strict module name checks in `_get_class_from_fully_qualified_class_name` (requiring `module_name` to be "components" and `module_name_full` to be "source_declarative_manifest.components") are intentionally designed to provide early, clear feedback when class declarations won't be found later in execution. These restrictions may be loosened in the future if the requirements for class definition locations change.

Applied to files:

  • airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py
📚 Learning: 2024-11-18T23:40:06.391Z
Learnt from: ChristoGrab
PR: airbytehq/airbyte-python-cdk#58
File: airbyte_cdk/sources/declarative/yaml_declarative_source.py:0-0
Timestamp: 2024-11-18T23:40:06.391Z
Learning: When modifying the `YamlDeclarativeSource` class in `airbyte_cdk/sources/declarative/yaml_declarative_source.py`, avoid introducing breaking changes like altering method signatures within the scope of unrelated PRs. Such changes should be addressed separately to minimize impact on existing implementations.

Applied to files:

  • airbyte_cdk/sources/declarative/concurrent_declarative_source.py
📚 Learning: 2025-01-13T23:39:15.457Z
Learnt from: aaronsteers
PR: airbytehq/airbyte-python-cdk#174
File: unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py:21-29
Timestamp: 2025-01-13T23:39:15.457Z
Learning: The CustomPageIncrement class in unit_tests/source_declarative_manifest/resources/source_the_guardian_api/components.py is imported from another connector definition and should not be modified in this context.

Applied to files:

  • unit_tests/connector_builder/test_connector_builder_handler.py
🧬 Code graph analysis (21)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (2)
airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py (1)
  • StreamSlicer (27-39)
airbyte_cdk/sources/types.py (1)
  • StreamSlice (75-169)
unit_tests/sources/declarative/retrievers/test_simple_retriever.py (1)
airbyte_cdk/models/airbyte_protocol.py (1)
  • AirbyteMessage (79-88)
airbyte_cdk/sources/streams/concurrent/cursor.py (1)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
  • _emit_state_message (230-248)
unit_tests/sources/streams/concurrent/scenarios/stream_facade_builder.py (1)
unit_tests/sources/streams/concurrent/scenarios/thread_based_concurrent_stream_source_builder.py (2)
  • streams (65-79)
  • NeverLogSliceLogger (174-176)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (3)
airbyte_cdk/sources/types.py (1)
  • Record (21-72)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • Partition (11-48)
airbyte_cdk/sources/concurrent_source/partition_generation_completed_sentinel.py (1)
  • PartitionGenerationCompletedSentinel (9-24)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (2)
airbyte_cdk/sources/declarative/models/declarative_component_schema.py (1)
  • SimpleRetriever (2808-2862)
airbyte_cdk/sources/declarative/retrievers/simple_retriever.py (1)
  • SimpleRetriever (53-602)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • Partition (11-48)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (5)
airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
  • submit (45-46)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
  • process_partition (27-45)
airbyte_cdk/sources/streams/concurrent/partitions/partition.py (1)
  • stream_name (36-41)
airbyte_cdk/sources/streams/concurrent/cursor.py (6)
  • close_partition (63-67)
  • close_partition (114-115)
  • close_partition (239-247)
  • observe (56-60)
  • observe (111-112)
  • observe (224-234)
airbyte_cdk/utils/traced_exception.py (3)
  • AirbyteTracedException (25-145)
  • from_exception (103-121)
  • as_sanitized_airbyte_message (123-145)
unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • ModelToComponentFactory (635-4188)
unit_tests/sources/streams/test_stream_read.py (5)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (1)
  • on_record (136-165)
airbyte_cdk/sources/types.py (2)
  • Record (21-72)
  • data (35-36)
airbyte_cdk/sources/streams/concurrent/cursor.py (6)
  • close_partition (63-67)
  • close_partition (114-115)
  • close_partition (239-247)
  • state (53-53)
  • state (108-109)
  • state (178-181)
airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py (1)
  • close_partition (159-184)
airbyte_cdk/sources/message/repository.py (4)
  • consume_queue (59-60)
  • consume_queue (70-71)
  • consume_queue (93-95)
  • consume_queue (118-119)
unit_tests/connector_builder/test_message_grouper.py (3)
airbyte_cdk/connector_builder/test_reader/reader.py (2)
  • TestReader (39-463)
  • run_test_read (84-161)
unit_tests/connector_builder/utils.py (1)
  • create_configured_catalog (26-27)
airbyte_cdk/connector_builder/models.py (1)
  • StreamRead (57-64)
unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (2)
airbyte_cdk/sources/types.py (1)
  • StreamSlice (75-169)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (1)
  • DeclarativePartitionFactory (27-52)
unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (3)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • close_partition (63-67)
  • close_partition (114-115)
  • close_partition (239-247)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (4)
  • start_next_partition_generator (189-208)
  • on_partition (89-102)
  • on_partition_generation_completed (68-87)
  • on_partition_complete_sentinel (104-134)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
  • PartitionCompleteSentinel (14-30)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (4)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)
  • PartitionReader (14-45)
airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)
  • ConcurrentReadProcessor (30-255)
  • is_done (210-235)
airbyte_cdk/sources/streams/concurrent/partition_enqueuer.py (1)
  • PartitionEnqueuer (16-64)
airbyte_cdk/sources/concurrent_source/thread_pool_manager.py (1)
  • is_done (81-82)
airbyte_cdk/connector_builder/main.py (1)
airbyte_cdk/connector_builder/connector_builder_handler.py (1)
  • create_source (78-93)
unit_tests/sources/streams/concurrent/test_partition_reader.py (3)
airbyte_cdk/sources/streams/concurrent/partition_reader.py (2)
  • PartitionReader (14-45)
  • process_partition (27-45)
airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)
  • PartitionCompleteSentinel (14-30)
airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)
  • read (94-124)
airbyte_cdk/sources/declarative/concurrent_declarative_source.py (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • ModelToComponentFactory (635-4188)
airbyte_cdk/sources/connector_state_manager.py (1)
  • ConnectorStateManager (32-161)
airbyte_cdk/connector_builder/test_reader/helpers.py (2)
airbyte_cdk/sources/streams/concurrent/cursor.py (3)
  • state (53-53)
  • state (108-109)
  • state (178-181)
unit_tests/sources/streams/test_stream_read.py (3)
  • state (88-89)
  • state (92-94)
  • state (125-126)
unit_tests/connector_builder/test_connector_builder_handler.py (3)
airbyte_cdk/connector_builder/connector_builder_handler.py (3)
  • TestLimits (39-45)
  • resolve_manifest (135-149)
  • create_source (78-93)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (2)
  • ManifestDeclarativeSource (98-616)
  • resolved_manifest (159-169)
airbyte_cdk/connector_builder/main.py (1)
  • handle_connector_builder_request (70-88)
airbyte_cdk/connector_builder/connector_builder_handler.py (2)
airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py (1)
  • ModelToComponentFactory (635-4188)
airbyte_cdk/sources/declarative/manifest_declarative_source.py (1)
  • ManifestDeclarativeSource (98-616)
airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py (2)
airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py (1)
  • StreamSlicer (27-39)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer.py (1)
  • StreamSlicer (15-25)
🪛 Gitleaks (8.27.2)
airbyte_cdk/sources/streams/http/http_client.py

156-156: Detected a Generic API Key, potentially exposing access to various services and sensitive operations.

(generic-api-key)

⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (9)
  • GitHub Check: Check: source-pokeapi
  • GitHub Check: Check: source-intercom
  • GitHub Check: Check: destination-motherduck
  • GitHub Check: Check: source-shopify
  • GitHub Check: Check: source-hardcoded-records
  • GitHub Check: SDM Docker Image Build
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
🔇 Additional comments (17)
airbyte_cdk/sources/declarative/stream_slicers/stream_slicer_test_read_decorator.py (1)

23-24: LGTM: clean, predictable capping of slices via islice.

Returning an iterator here fits the Iterable[StreamSlice] contract and gracefully yields zero slices if maximum_number_of_slices <= 0. No changes requested.

airbyte_cdk/sources/streams/http/http_client.py (1)

155-157: All CachedLimiterSession call sites updated — no cache_name references remain

I performed a global search across the codebase:

  • No instances of the old cache_name= keyword argument (0 matches).
  • Only one instantiation of CachedLimiterSession found—in http_client.py—now correctly using sqlite_path.
  • No other positional uses assuming the old parameter order.

Looks like everything’s been updated. wdyt?

unit_tests/sources/streams/test_stream_read.py (1)

592-595: Expectation consolidation looks good.

Replacing per-partition expected states with a combined state matches the new cursor behavior and simplifies the assertion logic. LGTM.

unit_tests/sources/declarative/stream_slicers/test_declarative_partition_generator.py (1)

16-16: Import tightening looks good.

Switching to import StreamSlice only is clear and keeps the test focused.

airbyte_cdk/sources/streams/concurrent/partitions/types.py (1)

36-38: Confirmed: QueueItem no longer depends on AirbyteMessage

I’ve verified that within airbyte_cdk/sources/streams/concurrent/partitions

  • there are no calls enqueuing AirbyteMessage,
  • no isinstance(..., AirbyteMessage) checks, and
  • no lingering QueueItem references to AirbyteMessage.

Optional tweak—would you consider using BaseException instead of Exception to preserve cancellation/error propagation semantics? wdyt?

- QueueItem = Union[
-     Record,
-     Partition,
-     PartitionCompleteSentinel,
-     PartitionGenerationCompletedSentinel,
-     Exception,
- ]
+ QueueItem = Union[
+     Record,
+     Partition,
+     PartitionCompleteSentinel,
+     PartitionGenerationCompletedSentinel,
+     BaseException,
+ ]
airbyte_cdk/connector_builder/test_reader/message_grouper.py (1)

98-99: Consider draining trailing state/aux messages after hitting the record limit?

I ran the ripgrep search and didn’t find any existing tests covering the scenario where state or auxiliary messages are emitted after the Nth record is reached. Without draining those messages, latest_state_message and slice_auxiliary_requests may not reflect the final state for a slice when a record limit is hit.

• File airbyte_cdk/connector_builder/test_reader/message_grouper.py, lines 98–99
• No unit tests in the unit_tests/connector_builder suite assert behavior around state/aux messages post-record limit

Would you prefer to explicitly drain non-record messages until the next slice boundary, or is it acceptable to stop immediately once records_count < limit is false? wdyt?

If we decide to make the zero‐limit case explicit, we could also add:

 def get_message_groups(..., limit: int, ...):
+    if limit <= 0:
+        return
     records_count = 0
airbyte_cdk/connector_builder/main.py (1)

94-99: No stale create_source call sites detected—LGTM!

I ran a repo-wide rg -nP -C2 'create_source\s*\(' airbyte_cdk and only saw the new invocation in main.py and the definition in connector_builder_handler.py. Everything looks up-to-date, so we’re good to merge!

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py (1)

14-15: Switch to ModelToComponentFactory import looks good

This aligns tests with the factory-based construction path. No concerns.

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py (2)

102-103: Updated invocation matches new PartitionReader API

Submitting process_partition with only the partition argument is correct and matches the updated signature.


115-135: Robust partition completion handling with error capture

Catching close_partition exceptions, flagging per-stream, and emitting a sanitized trace is a solid improvement. The finally block ensuring partition cleanup and potential stream completion is also correct.

unit_tests/sources/streams/concurrent/test_partition_reader.py (1)

29-33: API usage updated correctly

Constructing PartitionReader with the queue and calling process_partition(partition) matches the production signature.

unit_tests/sources/streams/concurrent/test_concurrent_read_processor.py (2)

182-183: ThreadPool submission assertions updated correctly

Both call sites now expect submit(self._partition_reader.process_partition, partition). Looks good.

Also applies to: 207-208


256-257: Validating cursor.close_partition on completion is spot-on

These checks ensure partition cleanup occurs only on successful completion.

Also applies to: 305-306

airbyte_cdk/sources/streams/concurrent/partition_reader.py (1)

21-26: Constructor simplification looks good

Dropping the logger/cursor dependencies and accepting only the output queue makes this component easier to reason about and test. Call sites appear aligned in ConcurrentSource and ConcurrentReadProcessor. Nice cleanup, wdyt?

airbyte_cdk/sources/concurrent_source/concurrent_source.py (1)

148-165: Removal of AirbyteMessage as a queue item is consistent

Given QueueItem no longer includes AirbyteMessage, dropping that branch here looks correct and reduces ambiguity in the pipeline. LGTM, wdyt?

unit_tests/connector_builder/test_connector_builder_handler.py (1)

1028-1031: Updated create_source contract and instance type check look correct

Asserting ManifestDeclarativeSource here matches the new factory-based flow, and the limits are plumbed into the constructor as expected. LGTM, wdyt?

airbyte_cdk/connector_builder/connector_builder_handler.py (1)

78-93: Factory-based ManifestDeclarativeSource construction looks right

Passing limits into ModelToComponentFactory and disabling retries/cache for test-read is a nice, test-friendly setup. This also keeps concurrency concerns out of the builder path. LGTM, wdyt?

Comment on lines +1255 to 1260
test_manifest = MANIFEST
test_manifest["streams"][0]["$parameters"]["url_base"] = url_base
config = {"__injected_declarative_manifest": test_manifest}

source = create_source(config=config, limits=limits, catalog=catalog, state=None)
source = create_source(config, limits)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Bug: mutating the global MANIFEST across tests (deep copy needed)

test_manifest = MANIFEST followed by mutation alters the module-level constant and can cause order-dependent test failures. Shall we deep copy instead, wdyt?

-    test_manifest = MANIFEST
+    test_manifest = copy.deepcopy(MANIFEST)
     test_manifest["streams"][0]["$parameters"]["url_base"] = url_base
     config = {"__injected_declarative_manifest": test_manifest}

Note: ensure import copy is available at the top of the test module.

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
test_manifest = MANIFEST
test_manifest["streams"][0]["$parameters"]["url_base"] = url_base
config = {"__injected_declarative_manifest": test_manifest}
source = create_source(config=config, limits=limits, catalog=catalog, state=None)
source = create_source(config, limits)
test_manifest = copy.deepcopy(MANIFEST)
test_manifest["streams"][0]["$parameters"]["url_base"] = url_base
config = {"__injected_declarative_manifest": test_manifest}
source = create_source(config, limits)
🤖 Prompt for AI Agents
In unit_tests/connector_builder/test_connector_builder_handler.py around lines
1255 to 1260, the test assigns test_manifest = MANIFEST and then mutates it,
which alters the module-level constant; replace that assignment with a deep copy
(e.g., test_manifest = copy.deepcopy(MANIFEST)) so changes do not affect other
tests, and ensure import copy is present at the top of the test module.

Comment on lines +1349 to 1356
test_manifest = MANIFEST
test_manifest["definitions"]["retriever"]["requester"]["authenticator"] = (
oauth_authenticator_config
)
config = {"__injected_declarative_manifest": test_manifest}

source = create_source(config=config, limits=limits, catalog=catalog, state=None)
source = create_source(config, limits)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

⚠️ Potential issue

Same deep-copy issue when overriding authenticator config

Here too, the test mutates MANIFEST in place. Can we deep copy to avoid shared-state flakiness, wdyt?

-    test_manifest = MANIFEST
+    test_manifest = copy.deepcopy(MANIFEST)
     test_manifest["definitions"]["retriever"]["requester"]["authenticator"] = (
         oauth_authenticator_config
     )
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
test_manifest = MANIFEST
test_manifest["definitions"]["retriever"]["requester"]["authenticator"] = (
oauth_authenticator_config
)
config = {"__injected_declarative_manifest": test_manifest}
source = create_source(config=config, limits=limits, catalog=catalog, state=None)
source = create_source(config, limits)
test_manifest = copy.deepcopy(MANIFEST)
test_manifest["definitions"]["retriever"]["requester"]["authenticator"] = (
oauth_authenticator_config
)
config = {"__injected_declarative_manifest": test_manifest}
source = create_source(config, limits)
🤖 Prompt for AI Agents
In unit_tests/connector_builder/test_connector_builder_handler.py around lines
1349 to 1356, the test mutates the global MANIFEST in place causing shared-state
flakiness; make a defensive deep copy of MANIFEST (e.g., via
copy.deepcopy(MANIFEST) or json-based deepcopy) into test_manifest before
overriding
test_manifest["definitions"]["retriever"]["requester"]["authenticator"], then
use that copied config so the global MANIFEST remains unchanged.

SyncMode,
Type,
)
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, Level, SyncMode, Type
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Verification agent

🧩 Analysis chain

Import cleanup looks good and aligns with removing AirbyteRecordMessage from public imports.

The narrowed import list is accurate for this file (no direct AirbyteRecordMessage usage) and keeps the surface minimal. Nice touch. Would you mind double-checking that no other tests (or src files) still rely on from airbyte_cdk.models import AirbyteRecordMessage so we don’t hit import errors elsewhere, wdyt?


🏁 Script executed:

#!/usr/bin/env bash
set -euo pipefail

echo "Searching for deprecated public imports of AirbyteRecordMessage..."
# 1) Direct public import usage
rg -nP "from\s+airbyte_cdk\.models\s+import\s+.*\bAirbyteRecordMessage\b" -S

echo
echo "Scanning for star-import sites that might implicitly rely on AirbyteRecordMessage..."
# 2) Star imports from models and check if the same file references AirbyteRecordMessage
files_with_star_imports=$(rg -nPl "from\s+airbyte_cdk\.models\s+import\s+\*" -S || true)
if [[ -n "${files_with_star_imports}" ]]; then
  while IFS= read -r f; do
    echo "Checking $f ..."
    rg -n "AirbyteRecordMessage\b" "$f" || true
  done <<< "${files_with_star_imports}"
else
  echo "No star-import sites found."
fi

Length of output: 1500


Please remove all remaining AirbyteRecordMessage imports to avoid import errors

It looks like there are still several places in both tests and source files importing the now-removed AirbyteRecordMessage. Could you update or remove these so that we don’t hit breakage after the public-import cleanup? No star-import sites were found, so focusing on direct imports should cover everything. wdyt?

• unit_tests/utils/test_schema_inferrer.py (line 9)
• unit_tests/utils/test_datetime_format_inferrer.py (line 9)
• unit_tests/sources/declarative/partition_routers/test_substream_partition_router.py (line 11)
• unit_tests/destinations/vector_db_based/embedder_test.py (line 28)
• airbyte_cdk/utils/schema_inferrer.py (line 12)
• airbyte_cdk/utils/datetime_format_inferrer.py (line 7)
• airbyte_cdk/destinations/vector_db_based/utils.py (line 9)
• airbyte_cdk/destinations/vector_db_based/embedder.py (line 25)

🤖 Prompt for AI Agents
In unit_tests/sources/declarative/retrievers/test_simple_retriever.py around
line 14, remove AirbyteRecordMessage from the import list and any direct
references to AirbyteRecordMessage; replace constructions/usages with
AirbyteMessage objects that have type=Type.RECORD (e.g., construct
AirbyteMessage(..., type=Type.RECORD, record=...)) or adjust assertions to use
AirbyteMessage fields accordingly so no direct AirbyteRecordMessage import
remains.

Copy link
Contributor

@pnilan pnilan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

approving if needed

@maxi297
Copy link
Contributor Author

maxi297 commented Aug 25, 2025

Closing because we will merge #723

@maxi297 maxi297 closed this Aug 25, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

bug Something isn't working security

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants