Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 87 additions & 24 deletions airbyte/cloud/workspaces.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from typing import TYPE_CHECKING, Any, Literal

import yaml
from airbyte_api.errors import SDKError

from airbyte import exceptions as exc
from airbyte._util import api_util, text_util
Expand All @@ -55,6 +56,26 @@
from airbyte.secrets.base import SecretString


# Error patterns that indicate corrupted/malformed resources in the workspace
# These should be handled gracefully rather than failing the entire list operation
_CORRUPTED_RESOURCE_ERROR_PATTERNS = [
"Secret reference",
"does not exist but is referenced in the config",
]


def _is_corrupted_resource_error(error: SDKError) -> bool:
"""Check if an SDKError indicates a corrupted resource in the workspace.

Some resources in a workspace may have corrupted configurations (e.g., references
to deleted secrets). When listing resources, these corrupted items can cause the
entire API call to fail. This function identifies such errors so they can be
handled gracefully.
"""
error_body = str(error.body) if error.body else ""
return all(pattern in error_body for pattern in _CORRUPTED_RESOURCE_ERROR_PATTERNS)


if TYPE_CHECKING:
from collections.abc import Callable

Expand Down Expand Up @@ -454,16 +475,30 @@ def list_connections(
) -> list[CloudConnection]:
"""List connections by name in the workspace.

If the workspace contains corrupted resources (e.g., references to deleted secrets),
this method will log a warning and return an empty list rather than raising an error.

TODO: Add pagination support
"""
connections = api_util.list_connections(
api_root=self.api_root,
workspace_id=self.workspace_id,
name=name,
name_filter=name_filter,
client_id=self.client_id,
client_secret=self.client_secret,
)
try:
connections = api_util.list_connections(
api_root=self.api_root,
workspace_id=self.workspace_id,
name=name,
name_filter=name_filter,
client_id=self.client_id,
client_secret=self.client_secret,
)
except SDKError as ex:
Comment on lines +483 to +492

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wrong. Needs to be handled on the inner loop so all valid objects are still returned. It is not sufficient to explain why we are failing. We need to not fail. Try again from scratch.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right - returning an empty list isn't acceptable. I investigated the API layer and found the constraint:

The SDKError is thrown by airbyte_api.destinations.list_destinations before we receive any item data. The API fails the entire request when it tries to serialize a destination with a corrupted secret reference.

Options I see:

  1. Per-page handling with binary search fallback: Move error handling into api_util.list_destinations at the pagination level. When a page fails with the secret error, use binary search on the offset range to isolate the corrupted item(s) and return all valid items from that page. This gives true "inner loop" semantics.

  2. Alternative endpoint fallback: If there's a Config API endpoint that can list destination IDs without resolving secret-backed configs, we could use that as a fallback to enumerate IDs, then call get_destination per-item with inner-loop error handling.

  3. Backend fix: Address this in the backend so the list API doesn't fail entirely when one item is corrupted.

Which approach would you prefer? Option 1 is the most self-contained client-side fix but adds complexity. Happy to implement whichever you think is best.

if _is_corrupted_resource_error(ex):
print(
f"Warning: Failed to list connections in workspace {self.workspace_id} "
"due to a corrupted resource. Returning empty list. "
"Please clean up corrupted resources in the workspace."
)
return []
raise

return [
CloudConnection._from_connection_response( # noqa: SLF001 (non-public API)
workspace=self,
Expand All @@ -481,16 +516,30 @@ def list_sources(
) -> list[CloudSource]:
"""List all sources in the workspace.

If the workspace contains corrupted resources (e.g., references to deleted secrets),
this method will log a warning and return an empty list rather than raising an error.

TODO: Add pagination support
"""
sources = api_util.list_sources(
api_root=self.api_root,
workspace_id=self.workspace_id,
name=name,
name_filter=name_filter,
client_id=self.client_id,
client_secret=self.client_secret,
)
try:
sources = api_util.list_sources(
api_root=self.api_root,
workspace_id=self.workspace_id,
name=name,
name_filter=name_filter,
client_id=self.client_id,
client_secret=self.client_secret,
)
except SDKError as ex:
if _is_corrupted_resource_error(ex):
print(
f"Warning: Failed to list sources in workspace {self.workspace_id} "
"due to a corrupted resource. Returning empty list. "
"Please clean up corrupted resources in the workspace."
)
return []
raise

return [
CloudSource._from_source_response( # noqa: SLF001 (non-public API)
workspace=self,
Expand All @@ -508,16 +557,30 @@ def list_destinations(
) -> list[CloudDestination]:
"""List all destinations in the workspace.

If the workspace contains corrupted resources (e.g., references to deleted secrets),
this method will log a warning and return an empty list rather than raising an error.

TODO: Add pagination support
"""
destinations = api_util.list_destinations(
api_root=self.api_root,
workspace_id=self.workspace_id,
name=name,
name_filter=name_filter,
client_id=self.client_id,
client_secret=self.client_secret,
)
try:
destinations = api_util.list_destinations(
api_root=self.api_root,
workspace_id=self.workspace_id,
name=name,
name_filter=name_filter,
client_id=self.client_id,
client_secret=self.client_secret,
)
except SDKError as ex:
if _is_corrupted_resource_error(ex):
print(
f"Warning: Failed to list destinations in workspace {self.workspace_id} "
"due to a corrupted resource. Returning empty list. "
"Please clean up corrupted resources in the workspace."
)
return []
raise

return [
CloudDestination._from_destination_response( # noqa: SLF001 (non-public API)
workspace=self,
Expand Down
154 changes: 154 additions & 0 deletions tests/unit_tests/test_cloud_workspace_error_handling.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
"""Unit tests for CloudWorkspace error handling in list operations."""

from __future__ import annotations

from unittest.mock import MagicMock, patch

import pytest

from airbyte_api.errors import SDKError

from airbyte.cloud.workspaces import _is_corrupted_resource_error


@pytest.mark.parametrize(
"error_body,expected",
[
pytest.param(
'{"message":"Secret reference 0c646a6d-8cfa-4c97-9615-86d8ad3bbaf8 '
'does not exist but is referenced in the config"}',
True,
id="secret_reference_error",
),
pytest.param(
'{"message":"Internal server error"}',
False,
id="unrelated_500_error",
),
pytest.param(
'{"message":"Secret reference abc123"}',
False,
id="partial_match_secret_only",
),
pytest.param(
'{"message":"does not exist but is referenced in the config"}',
False,
id="partial_match_config_only",
),
pytest.param(
"",
False,
id="empty_body",
),
pytest.param(
None,
False,
id="none_body",
),
],
)
def test_is_corrupted_resource_error(error_body: str | None, expected: bool) -> None:
"""Test that _is_corrupted_resource_error correctly identifies corrupted resource errors."""
error = MagicMock(spec=SDKError)
error.body = error_body
assert _is_corrupted_resource_error(error) is expected


@pytest.mark.parametrize(
"list_method,api_mock_path",
[
pytest.param(
"list_destinations",
"airbyte.cloud.workspaces.api_util.list_destinations",
id="list_destinations",
),
pytest.param(
"list_sources",
"airbyte.cloud.workspaces.api_util.list_sources",
id="list_sources",
),
pytest.param(
"list_connections",
"airbyte.cloud.workspaces.api_util.list_connections",
id="list_connections",
),
],
)
def test_list_operations_return_empty_on_corrupted_resource(
list_method: str, api_mock_path: str
) -> None:
"""List operations should return empty list when corrupted resource error occurs."""
corrupted_error = SDKError(
message="API error occurred",
status_code=500,
body=(
'{"message":"Secret reference 0c646a6d-8cfa-4c97-9615-86d8ad3bbaf8 '
'does not exist but is referenced in the config"}'
),
raw_response=MagicMock(),
)

with patch(api_mock_path) as mock_list:
mock_list.side_effect = corrupted_error

from airbyte.cloud.workspaces import CloudWorkspace

with patch.object(CloudWorkspace, "__post_init__"):
workspace = CloudWorkspace(
workspace_id="test-workspace-id",
client_id="test-client-id",
client_secret="test-client-secret",
)

method = getattr(workspace, list_method)
result = method()
assert result == []


@pytest.mark.parametrize(
"list_method,api_mock_path",
[
pytest.param(
"list_destinations",
"airbyte.cloud.workspaces.api_util.list_destinations",
id="list_destinations",
),
pytest.param(
"list_sources",
"airbyte.cloud.workspaces.api_util.list_sources",
id="list_sources",
),
pytest.param(
"list_connections",
"airbyte.cloud.workspaces.api_util.list_connections",
id="list_connections",
),
],
)
def test_list_operations_raise_on_other_errors(
list_method: str, api_mock_path: str
) -> None:
"""List operations should raise for non-corrupted-resource errors."""
other_error = SDKError(
message="API error occurred",
status_code=500,
body='{"message":"Internal server error"}',
raw_response=MagicMock(),
)

with patch(api_mock_path) as mock_list:
mock_list.side_effect = other_error

from airbyte.cloud.workspaces import CloudWorkspace

with patch.object(CloudWorkspace, "__post_init__"):
workspace = CloudWorkspace(
workspace_id="test-workspace-id",
client_id="test-client-id",
client_secret="test-client-secret",
)

method = getattr(workspace, list_method)
with pytest.raises(SDKError):
method()