Skip to content

feat(cloud): Add CloudConnection methods: get_state_artifacts() and get_catalog_artifact()#906

Merged
Aaron ("AJ") Steers (aaronsteers) merged 6 commits intomainfrom
devin/1765418767-cloud-connection-artifacts
Dec 11, 2025
Merged

feat(cloud): Add CloudConnection methods: get_state_artifacts() and get_catalog_artifact()#906
Aaron ("AJ") Steers (aaronsteers) merged 6 commits intomainfrom
devin/1765418767-cloud-connection-artifacts

Conversation

@aaronsteers
Copy link
Member

@aaronsteers Aaron ("AJ") Steers (aaronsteers) commented Dec 11, 2025

Summary

Adds two new methods to CloudConnection for retrieving connection artifacts as JSON strings:

  • get_state_artifact_json() - Returns the persisted connection state via Config API /state/get
  • get_catalog_artifact_json() - Returns the configured catalog (syncCatalog) via Config API /web_backend/connections/get

These methods enable live testing workflows to fetch connection artifacts without requiring direct backend database access.

Updates since last revision

  • Local testing confirmed auth works - Both Config API endpoints successfully returned data with standard OAuth credentials
  • Test results:
    • State endpoint: Returned stateType, connectionId fields
    • Catalog endpoint: Returned full connection info with syncCatalog containing 39 streams

Review & Testing Checklist for Human

  • Test with a connection that has populated state - Local testing used a connection with stateType: not_set; verify behavior with actual incremental state
  • Verify syncCatalog structure matches expected format - Confirm the returned catalog is compatible with connector live test requirements
  • Consider adding integration tests - No unit/integration tests are included in this PR

Recommended test plan:

import airbyte as ab

workspace = ab.CloudWorkspace(
    workspace_id="your-workspace-id",
    client_id="your-client-id",
    client_secret="your-client-secret",
)
connection = workspace.get_connection("your-connection-id")

# Test both methods
print(connection.get_state_artifact_json())
print(connection.get_catalog_artifact_json())

Notes

Summary by CodeRabbit

  • New Features
    • Fetch a connection's persisted state via API.
    • Fetch a connection's configured catalog via API.
    • Cloud connection objects can now retrieve these artifacts for inspection or export as JSON.

✏️ Tip: You can customize this high-level summary in your review settings.

…n to CloudConnection

Add two new methods to CloudConnection for retrieving connection artifacts:
- get_state_artifact_json(): Returns the persisted state as JSON string
- get_catalog_artifact_json(): Returns the configured catalog (syncCatalog) as JSON string

These methods use the Config API endpoints:
- /state/get for state retrieval
- /web_backend/connections/get for catalog retrieval

This enables live testing workflows to fetch connection artifacts without
requiring direct backend database access.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@devin-ai-integration
Copy link
Contributor

Original prompt from AJ Steers
Received message in Slack channel #ask-devin-ai:

@Devin - Let's get live test and regression tests working in the github workflow dispatch workflows.
Thread URL: https://airbytehq-team.slack.com/archives/C08BHPUMEPJ/p1765414077066439

@devin-ai-integration
Copy link
Contributor

🤖 Devin AI Engineer

I'll be helping with this pull request! Here's what you should know:

✅ I will automatically:

  • Address comments on this PR. Add '(aside)' to your comment to have me ignore it.
  • Look at CI failures and help fix them

Note: I can only respond to comments from users who have write access to this repository.

⚙️ Control Options:

  • Disable automatic comment and CI monitoring

@github-actions
Copy link

👋 Greetings, Airbyte Team Member!

Here are some helpful tips and reminders for your convenience.

Testing This PyAirbyte Version

You can test this version of PyAirbyte using the following:

# Run PyAirbyte CLI from this branch:
uvx --from 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765418767-cloud-connection-artifacts' pyairbyte --help

# Install PyAirbyte from this branch for development:
pip install 'git+https://github.com/airbytehq/PyAirbyte.git@devin/1765418767-cloud-connection-artifacts'

Helpful Resources

PR Slash Commands

Airbyte Maintainers can execute the following slash commands on your PR:

  • /fix-pr - Fixes most formatting and linting issues
  • /poetry-lock - Updates poetry.lock file
  • /test-pr - Runs tests with the updated PyAirbyte

Community Support

Questions? Join the #pyairbyte channel in our Slack workspace.

📝 Edit this welcome message.

@aaronsteers Aaron ("AJ") Steers (aaronsteers) changed the title feat(cloud): Add get_state_artifact_json and get_catalog_artifact_json to CloudConnection feat(cloud): Add CloudConnection methods: get_state_artifact_json and get_catalog_artifact_json Dec 11, 2025
@aaronsteers Aaron ("AJ") Steers (aaronsteers) changed the title feat(cloud): Add CloudConnection methods: get_state_artifact_json and get_catalog_artifact_json feat(cloud): Add CloudConnection methods: get_state_artifact_json() and get_catalog_artifact_json() Dec 11, 2025
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Dec 11, 2025

Warning

Rate limit exceeded

devin-ai-integration[bot] has exceeded the limit for the number of commits or files that can be reviewed per hour. Please wait 17 minutes and 18 seconds before requesting another review.

⌛ How to resolve this issue?

After the wait time has elapsed, a review can be triggered using the @coderabbitai review command as a PR comment. Alternatively, push new commits to this PR.

We recommend that you space out your commits to avoid hitting the rate limit.

🚦 How do rate limits work?

CodeRabbit enforces hourly rate limits for each developer per organization.

Our paid plans have higher rate limits than the trial, open-source and free plans. In all cases, we re-allow further reviews after a brief timeout.

Please see our FAQ for further information.

📥 Commits

Reviewing files that changed from the base of the PR and between 26c6e58 and d2519b6.

📒 Files selected for processing (1)
  • airbyte/mcp/cloud_ops.py (2 hunks)
📝 Walkthrough

Walkthrough

Adds two Config API helpers in airbyte/_util/api_util.py to POST for a connection's state and catalog, and two wrapper methods on CloudConnection in airbyte/cloud/connections.py that call those helpers and return the extracted state (list) or catalog (dict).

Changes

Cohort / File(s) Change Summary
API utility functions
airbyte/_util/api_util.py
Added get_connection_state(connection_id: str, *, api_root: str, client_id: SecretString, client_secret: SecretString) -> dict[str, Any] which POSTs {"connectionId": connection_id} to /state/get and returns the JSON, and get_connection_catalog(connection_id: str, *, api_root: str, client_id: SecretString, client_secret: SecretString) -> dict[str, Any] which POSTs {"connectionId": connection_id, "withRefreshedCatalog": False} to /web_backend/connections/get. Both delegate to _make_config_api_request.
CloudConnection methods
airbyte/cloud/connections.py
Added `get_state_artifacts(self) -> list[dict[str, Any]]

Estimated code review effort

🎯 2 (Simple) | ⏱️ ~10 minutes

  • Review focus:
    • airbyte/_util/api_util.py: verify endpoint paths, request payloads, delegation to _make_config_api_request, and return typing.
    • airbyte/cloud/connections.py: ensure correct extraction/None-handling of state / streamState and syncCatalog, and typing import changes.
  • Question: should the CloudConnection wrappers validate or normalize the returned catalog/state shape before returning to callers, or is returning the raw dict/list acceptable? wdyt?

Pre-merge checks and finishing touches

❌ Failed checks (1 warning)
Check name Status Explanation Resolution
Title check ⚠️ Warning The PR title references methods get_state_artifacts() and get_catalog_artifact(), but the PR objectives describe the methods as get_state_artifact_json() and get_catalog_artifact_json() with JSON string returns. Verify the actual method names in the implementation and update the PR title to match. The objectives mention JSON string returns, but the title and summary suggest dict returns—wdyt?
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Docstring Coverage ✅ Passed Docstring coverage is 100.00% which is sufficient. The required threshold is 80.00%.

Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out.

❤️ Share

Comment @coderabbitai help to get the list of available commands and usage tips.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte/cloud/connections.py (2)

286-301: Clean implementation, but consider error context enrichment?

The method correctly delegates to the API utility function and formats the response as JSON. The pretty-printing with indent=2 is a nice touch for readability.

One small consideration: if the API call fails, the error won't include the connection_id in its context. Would it be worth wrapping this in a try/except to add connection-specific context, wdyt? For example:

 def get_state_artifact_json(self) -> str:
     """Get the connection state as a JSON string.

     Returns the persisted state for this connection, which can be used
     for incremental syncs or live testing.

     Returns:
         JSON string containing the connection state.
     """
-    state_response = api_util.get_connection_state(
-        connection_id=self.connection_id,
-        api_root=self.workspace.api_root,
-        client_id=self.workspace.client_id,
-        client_secret=self.workspace.client_secret,
-    )
-    return json.dumps(state_response, indent=2)
+    try:
+        state_response = api_util.get_connection_state(
+            connection_id=self.connection_id,
+            api_root=self.workspace.api_root,
+            client_id=self.workspace.client_id,
+            client_secret=self.workspace.client_secret,
+        )
+        return json.dumps(state_response, indent=2)
+    except AirbyteError as ex:
+        raise AirbyteError(
+            message=f"Failed to retrieve state for connection {self.connection_id}",
+            context={"connection_id": self.connection_id},
+        ) from ex

Not strictly necessary since the API already provides context, but it could help with debugging.


286-319: Consider adding tests for the new artifact methods?

The PR description mentions that unit/integration tests are not included. Since these methods interact with external APIs and parse responses, testing would help ensure:

  • Correct handling of successful responses
  • Graceful handling when syncCatalog is missing
  • Proper error propagation from the API layer

Would you like me to help generate test cases for these two methods? I can create test stubs that mock the api_util functions and verify the JSON serialization behavior.

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between a7c9ccf and e91481a.

📒 Files selected for processing (2)
  • airbyte/_util/api_util.py (1 hunks)
  • airbyte/cloud/connections.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
airbyte/_util/api_util.py (1)
airbyte/secrets/base.py (1)
  • SecretString (38-143)
airbyte/cloud/connections.py (1)
airbyte/_util/api_util.py (2)
  • get_connection_state (1780-1806)
  • get_connection_catalog (1809-1838)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (No Creds)
🔇 Additional comments (3)
airbyte/cloud/connections.py (2)

6-6: LGTM! Standard library import added correctly.

The json import is placed appropriately in the imports section.


303-319: Looks good! syncCatalog fallback is handled appropriately.

The use of .get("syncCatalog", {}) provides a sensible default when the field is missing. This aligns with the documented behavior from the Config API endpoint, which returns connection info including the syncCatalog field. The defensive approach matches the pattern used elsewhere in the codebase for optional dictionary fields.

Regarding error context enrichment—would adding logging or exception handling (e.g., if the API response is unexpectedly malformed) be helpful here, similar to other artifact retrieval patterns, wdyt?

airbyte/_util/api_util.py (1)

1780-1806: Clean implementation that follows established patterns.

The function correctly delegates to _make_config_api_request and the endpoint path and payload structure align with the PR description. The docstring is clear and complete.

One thought: Since the PR description mentions Config API endpoints previously returned 403 with standard OAuth credentials, would you want to add a quick integration test or manual verification that this specific endpoint works as expected with the current authentication setup? That way we can be confident it's compatible before merging. Wdyt?

@github-actions
Copy link

PyTest Results (Fast Tests Only, No Creds)

348 tests  ±0   348 ✅ ±0   5m 50s ⏱️ -5s
  1 suites ±0     0 💤 ±0 
  1 files   ±0     0 ❌ ±0 

Results for commit e91481a. ± Comparison against base commit a7c9ccf.

devin-ai-integration bot and others added 2 commits December 11, 2025 02:27
…eturn types

- Rename methods from get_*_artifact_json to get_*_artifact
- get_state_artifact: returns list[dict[str, Any]] | None
  - Returns streamState array directly (not the envelope)
  - Returns None when stateType is 'not_set'
- get_catalog_artifact: returns dict[str, Any]
  - Returns syncCatalog dict directly
- Remove json import (no longer needed)

Co-Authored-By: AJ Steers <aj@airbyte.io>
Pluralize method name since it returns a list of state objects.

Co-Authored-By: AJ Steers <aj@airbyte.io>
@aaronsteers Aaron ("AJ") Steers (aaronsteers) changed the title feat(cloud): Add CloudConnection methods: get_state_artifact_json() and get_catalog_artifact_json() feat(cloud): Add CloudConnection methods: get_state_artifacts() and get_catalog_artifact() Dec 11, 2025
…f {}

Return None when syncCatalog is not found in the response, instead of
returning an empty dict. This addresses bot feedback for consistency.

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte/cloud/connections.py (2)

283-305: Clarify behavior for non-stream state types and the JSON/string vs. Python-object contract

get_state_artifacts() looks clean and lines up with api_util.get_connection_state, but a couple of edge cases might be worth double‑checking:

  • Line [302]: we only treat stateType == "not_set" as meaning “no state” and return None. For other stateType values (e.g. legacy/global) the method will quietly return [] if streamState isn’t present, which might silently drop valid state. Would it be safer either to (a) branch on stateType and surface the relevant field, or (b) return the full state_response when stateType isn’t "stream", so callers can decide, wdyt?
  • Lines [294]-[305]: the docstring promises “persisted state for this connection”, but the implementation returns only streamState. If non-stream state types can occur for these Cloud connections, should we document that this is intentionally “per‑stream artifacts only”, or broaden the return shape, wdyt?
  • PR objectives describe methods named get_state_artifact_json() returning JSON strings; here we expose get_state_artifacts() returning Python objects. Is this divergence intentional so that higher‑level callers can json.dumps() as needed, or do you want the public API to stick with the original JSON-string contract and naming, wdyt?

Given this is new surface area, would a small unit/integration test around the stateType == "not_set" behavior and a “normal streamState present” case be worth adding to lock in expectations, wdyt?


306-323: Confirm semantics when syncCatalog is missing and alignment with the original JSON-centric API design

The basic flow in get_catalog_artifact() looks good and nicely reuses api_util.get_connection_catalog, but a couple of points might merit alignment/clarification:

  • Line [323]: returning {} when "syncCatalog" is absent makes the return type always dict[str, Any], but it doesn’t distinguish “no catalog configured” from “empty catalog”. Would it be clearer to return None for the “no catalog” case (or raise) and document that, or is {} the intended sentinel here, wdyt?
  • Similar to the state method, the PR objectives talk about a get_catalog_artifact_json() that returns a JSON string, whereas this method returns a Python dict. Is that an intentional API design change (prefer structured access; let callers serialize if needed), or should this be renamed/adjusted to match the JSON-string contract, wdyt?

If downstream tooling (e.g., your live testing workflows) expects a specific shape for syncCatalog, it might be worth capturing that expectation in a small test around this helper as well, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between e91481a and bdaaccd.

📒 Files selected for processing (1)
  • airbyte/cloud/connections.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/connections.py (1)
airbyte/_util/api_util.py (2)
  • get_connection_state (1780-1806)
  • get_connection_catalog (1809-1838)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (Fast)
🔇 Additional comments (1)
airbyte/cloud/connections.py (1)

6-6: Importing Any fits the new artifact-returning methods

Pulling in Any here to type the JSON-like artifact payloads from the Config API looks appropriate and consistent with the rest of the file; I don’t see any issues with this change.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (2)
airbyte/cloud/connections.py (2)

302-304: Consider the empty list vs None semantics.

When stateType is "not_set", the method returns None (line 303), but when streamState is missing from the response, it returns an empty list [] (line 304 default value). This could create semantic ambiguity: does [] mean "no streams" or "state exists but is empty"?

If you want strict None-only behavior for "no state," you might consider checking if streamState exists explicitly. But if empty list is intentional for missing keys, that's fine too—just worth clarifying in the docstring. Wdyt?

Example alternative if you want consistent None behavior:

     if state_response.get("stateType") == "not_set":
         return None
-    return state_response.get("streamState", [])
+    stream_state = state_response.get("streamState")
+    return stream_state if stream_state is not None else None

285-323: Optional: Consider naming consistency between the two methods.

One method is plural (get_state_artifacts()) while the other is singular (get_catalog_artifact()). This isn't wrong, but it might be clearer if they followed the same pattern—either both plural or both singular. For instance, get_state_artifacts() and get_catalog_artifacts(), or get_state_artifact() and get_catalog_artifact(). Just a thought—wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between bdaaccd and 568483a.

📒 Files selected for processing (1)
  • airbyte/cloud/connections.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/connections.py (1)
airbyte/_util/api_util.py (2)
  • get_connection_state (1780-1806)
  • get_connection_catalog (1809-1838)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (No Creds)
🔇 Additional comments (3)
airbyte/cloud/connections.py (3)

6-6: LGTM! Import addition looks good.

The Any import is properly placed in the TYPE_CHECKING block and is necessary for the new method signatures below.


285-304: Heads up: Implementation differs from PR objectives.

The PR objectives mention a method named get_state_artifact_json() that returns a JSON string, but the implementation is get_state_artifacts() (plural, no _json suffix) returning a Python list. While the current implementation looks functionally correct, this discrepancy might confuse reviewers or users expecting the API described in the PR summary. Wdyt about updating the PR description to match the actual implementation?


306-323: Same naming discrepancy with PR objectives.

Similar to get_state_artifacts(), the PR objectives describe get_catalog_artifact_json() returning a JSON string, but the implementation is get_catalog_artifact() returning a Python dict. The implementation looks solid, but the PR description should be updated to reflect what was actually built. Wdyt?

…catalog

Adds a new MCP tool that retrieves connection artifacts (state or catalog)
from Airbyte Cloud connections.

- get_connection_artifact(connection_id, workspace_id, artifact_type)
- Returns state as list[dict] or catalog as dict
- Returns {"ERROR": "..."} if artifact not found

Co-Authored-By: AJ Steers <aj@airbyte.io>
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (2)
airbyte/cloud/connections.py (2)

6-6: Import of Any looks appropriate; consider a lightweight alias for artifact shapes?

Using Any here makes sense given the flexible shapes coming back from the Config API. To make downstream usage a bit clearer, would it be worth introducing small type aliases (e.g., StateArtifact = dict[str, Any], CatalogArtifact = dict[str, Any]) and using those in the method signatures, instead of raw dict[str, Any] everywhere, wdyt?


306-324: Catalog helper looks good; confirm public contract (Python structures vs JSON) and consider minimal tests

The get_catalog_artifact implementation itself looks straightforward and consistent with how other helpers wrap api_util—it just returns connection_response.get("syncCatalog"), which aligns with the docstring (“Dictionary containing the configured catalog, or None if not found.”).

Two things you might want to double‑check:

  1. Both get_state_artifacts and get_catalog_artifact currently return Python lists/dicts, not JSON strings. Given the PR description mentions “JSON” artifacts, is the intent that callers work with native Python structures (and call json.dumps themselves if they truly need text), or should these helpers actually serialize to JSON before returning? If the native-structure contract is preferred (which seems cleaner from a Python SDK perspective), maybe we just ensure any external docs/examples are aligned, wdyt?

  2. Since these are new CloudConnection surface-area methods, would a tiny unit test that patches api_util.get_connection_state / get_connection_catalog to return canned dicts be worthwhile, just to lock in the behavior around None vs present values and avoid regressions later, wdyt?

📜 Review details

Configuration used: CodeRabbit UI

Review profile: CHILL

Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 568483a and 26c6e58.

📒 Files selected for processing (1)
  • airbyte/cloud/connections.py (2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
airbyte/cloud/connections.py (1)
airbyte/_util/api_util.py (2)
  • get_connection_state (1780-1806)
  • get_connection_catalog (1809-1838)
⏰ Context from checks skipped due to timeout of 90000ms. You can increase the timeout in your CodeRabbit configuration to a maximum of 15 minutes (900000ms). (6)
  • GitHub Check: Pytest (All, Python 3.10, Ubuntu)
  • GitHub Check: Pytest (All, Python 3.10, Windows)
  • GitHub Check: Pytest (All, Python 3.11, Windows)
  • GitHub Check: Pytest (No Creds)
  • GitHub Check: Pytest (All, Python 3.11, Ubuntu)
  • GitHub Check: Pytest (Fast)

@aaronsteers Aaron ("AJ") Steers (aaronsteers) merged commit 441105e into main Dec 11, 2025
20 checks passed
@aaronsteers Aaron ("AJ") Steers (aaronsteers) deleted the devin/1765418767-cloud-connection-artifacts branch December 11, 2025 02:44
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant