Skip to content

Commit 47e1cc2

Browse files
committed
fix: add basic error handling to manifest-server calls
1 parent a1428bf commit 47e1cc2

File tree

2 files changed

+197
-39
lines changed

2 files changed

+197
-39
lines changed

airbyte_cdk/manifest_server/routers/manifest.py

Lines changed: 74 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,10 @@
55
import jsonschema
66
from airbyte_protocol_dataclasses.models import AirbyteStateMessage, ConfiguredAirbyteCatalog
77
from fastapi import APIRouter, Depends, HTTPException
8+
from fastapi.responses import JSONResponse
89

910
from airbyte_cdk.models import AirbyteStateMessageSerializer
11+
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
1012
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
1113
ConcurrentDeclarativeSource,
1214
)
@@ -98,15 +100,21 @@ def test_read(request: StreamTestReadRequest) -> StreamReadResponse:
98100
)
99101

100102
runner = ManifestCommandProcessor(source)
101-
cdk_result = runner.test_read(
102-
config_dict,
103-
catalog,
104-
converted_state,
105-
request.record_limit,
106-
request.page_limit,
107-
request.slice_limit,
108-
)
109-
return StreamReadResponse.model_validate(asdict(cdk_result))
103+
try:
104+
cdk_result = runner.test_read(
105+
config_dict,
106+
catalog,
107+
converted_state,
108+
request.record_limit,
109+
request.page_limit,
110+
request.slice_limit,
111+
)
112+
return StreamReadResponse.model_validate(asdict(cdk_result))
113+
except Exception as exc:
114+
error = AirbyteTracedException.from_exception(
115+
exc, message=f"Error reading stream: {str(exc)}"
116+
)
117+
raise HTTPException(status_code=400, detail=error.message)
110118

111119

112120
@router.post("/check", operation_id="check")
@@ -119,10 +127,16 @@ def check(request: CheckRequest) -> CheckResponse:
119127
project_id=request.context.project_id,
120128
)
121129

122-
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
123-
runner = ManifestCommandProcessor(source)
124-
success, message = runner.check_connection(request.config.model_dump())
125-
return CheckResponse(success=success, message=message)
130+
try:
131+
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
132+
runner = ManifestCommandProcessor(source)
133+
success, message = runner.check_connection(request.config.model_dump())
134+
return CheckResponse(success=success, message=message)
135+
except Exception as exc:
136+
error = AirbyteTracedException.from_exception(
137+
exc, message=f"Error checking connection: {str(exc)}"
138+
)
139+
raise HTTPException(status_code=400, detail=error.message)
126140

127141

128142
@router.post("/discover", operation_id="discover")
@@ -135,12 +149,21 @@ def discover(request: DiscoverRequest) -> DiscoverResponse:
135149
project_id=request.context.project_id,
136150
)
137151

138-
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
139-
runner = ManifestCommandProcessor(source)
140-
catalog = runner.discover(request.config.model_dump())
141-
if catalog is None:
142-
raise HTTPException(status_code=422, detail="Connector did not return a discovered catalog")
143-
return DiscoverResponse(catalog=catalog)
152+
try:
153+
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
154+
runner = ManifestCommandProcessor(source)
155+
catalog = runner.discover(request.config.model_dump())
156+
if catalog is None:
157+
raise HTTPException(status_code=422, detail="Connector did not return a discovered catalog")
158+
return DiscoverResponse(catalog=catalog)
159+
except HTTPException:
160+
# Re-raise HTTPExceptions as-is (like the catalog None check above)
161+
raise
162+
except Exception as exc:
163+
error = AirbyteTracedException.from_exception(
164+
exc, message=f"Error discovering streams: {str(exc)}"
165+
)
166+
raise HTTPException(status_code=400, detail=error.message)
144167

145168

146169
@router.post("/resolve", operation_id="resolve")
@@ -153,8 +176,14 @@ def resolve(request: ResolveRequest) -> ManifestResponse:
153176
project_id=request.context.project_id,
154177
)
155178

156-
source = safe_build_source(request.manifest.model_dump(), {})
157-
return ManifestResponse(manifest=Manifest(**source.resolved_manifest))
179+
try:
180+
source = safe_build_source(request.manifest.model_dump(), {})
181+
return ManifestResponse(manifest=Manifest(**source.resolved_manifest))
182+
except Exception as exc:
183+
error = AirbyteTracedException.from_exception(
184+
exc, message=f"Error resolving manifest: {str(exc)}"
185+
)
186+
raise HTTPException(status_code=400, detail=error.message)
158187

159188

160189
@router.post("/full_resolve", operation_id="fullResolve")
@@ -171,21 +200,27 @@ def full_resolve(request: FullResolveRequest) -> ManifestResponse:
171200
project_id=request.context.project_id,
172201
)
173202

174-
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
175-
manifest = {**source.resolved_manifest}
176-
streams = manifest.get("streams", [])
177-
for stream in streams:
178-
stream["dynamic_stream_name"] = None
179-
180-
mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
181-
for stream in source.dynamic_streams:
182-
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
183-
184-
if len(generated_streams) < request.stream_limit:
185-
generated_streams += [stream]
186-
187-
for generated_streams_list in mapped_streams.values():
188-
streams.extend(generated_streams_list)
189-
190-
manifest["streams"] = streams
191-
return ManifestResponse(manifest=Manifest(**manifest))
203+
try:
204+
source = safe_build_source(request.manifest.model_dump(), request.config.model_dump())
205+
manifest = {**source.resolved_manifest}
206+
streams = manifest.get("streams", [])
207+
for stream in streams:
208+
stream["dynamic_stream_name"] = None
209+
210+
mapped_streams: Dict[str, List[Dict[str, Any]]] = {}
211+
for stream in source.dynamic_streams:
212+
generated_streams = mapped_streams.setdefault(stream["dynamic_stream_name"], [])
213+
214+
if len(generated_streams) < request.stream_limit:
215+
generated_streams += [stream]
216+
217+
for generated_streams_list in mapped_streams.values():
218+
streams.extend(generated_streams_list)
219+
220+
manifest["streams"] = streams
221+
return ManifestResponse(manifest=Manifest(**manifest))
222+
except Exception as exc:
223+
error = AirbyteTracedException.from_exception(
224+
exc, message=f"Error full resolving manifest: {str(exc)}"
225+
)
226+
raise HTTPException(status_code=400, detail=error.message)

unit_tests/manifest_server/routers/test_manifest.py

Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -527,3 +527,126 @@ def test_discover_endpoint_missing_catalog(
527527
assert response.status_code == 422
528528
data = response.json()
529529
assert "Connector did not return a discovered catalog" in data["detail"]
530+
531+
# Test cases for error handling improvements
532+
@patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor")
533+
@patch("airbyte_cdk.manifest_server.routers.manifest.build_source")
534+
def test_test_read_cdk_error_handling(
535+
self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source
536+
):
537+
"""Test that CDK errors in test_read are properly caught and converted to HTTP 400."""
538+
request_data = {
539+
"manifest": sample_manifest,
540+
"config": sample_config,
541+
"stream_name": "products"
542+
}
543+
544+
mock_build_source.return_value = mock_source
545+
546+
mock_runner = Mock()
547+
# Simulate a CDK error (like datetime parsing error)
548+
mock_runner.test_read.side_effect = ValueError("time data '' does not match format '%Y-%m-%dT%H:%M:%SZ'")
549+
mock_runner_class.return_value = mock_runner
550+
551+
response = client.post("/v1/manifest/test_read", json=request_data)
552+
553+
assert response.status_code == 400
554+
data = response.json()
555+
assert "detail" in data
556+
assert "Error reading stream:" in data["detail"]
557+
assert "time data" in data["detail"]
558+
assert "does not match format" in data["detail"]
559+
560+
@patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor")
561+
@patch("airbyte_cdk.manifest_server.routers.manifest.build_source")
562+
def test_check_cdk_error_handling(
563+
self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source
564+
):
565+
"""Test that CDK errors in check are properly caught and converted to HTTP 400."""
566+
request_data = {
567+
"manifest": sample_manifest,
568+
"config": sample_config,
569+
}
570+
571+
mock_build_source.return_value = mock_source
572+
573+
mock_runner = Mock()
574+
# Simulate a CDK error (like connection error)
575+
mock_runner.check_connection.side_effect = ConnectionError("Failed to connect to API")
576+
mock_runner_class.return_value = mock_runner
577+
578+
response = client.post("/v1/manifest/check", json=request_data)
579+
580+
assert response.status_code == 400
581+
data = response.json()
582+
assert "detail" in data
583+
assert "Error checking connection:" in data["detail"]
584+
assert "Failed to connect to API" in data["detail"]
585+
586+
@patch("airbyte_cdk.manifest_server.routers.manifest.ManifestCommandProcessor")
587+
@patch("airbyte_cdk.manifest_server.routers.manifest.build_source")
588+
def test_discover_cdk_error_handling(
589+
self, mock_build_source, mock_runner_class, sample_manifest, sample_config, mock_source
590+
):
591+
"""Test that CDK errors in discover are properly caught and converted to HTTP 400."""
592+
request_data = {
593+
"manifest": sample_manifest,
594+
"config": sample_config,
595+
}
596+
597+
mock_build_source.return_value = mock_source
598+
599+
mock_runner = Mock()
600+
# Simulate a CDK error
601+
mock_runner.discover.side_effect = RuntimeError("Schema validation failed")
602+
mock_runner_class.return_value = mock_runner
603+
604+
response = client.post("/v1/manifest/discover", json=request_data)
605+
606+
assert response.status_code == 400
607+
data = response.json()
608+
assert "detail" in data
609+
assert "Error discovering streams:" in data["detail"]
610+
assert "Schema validation failed" in data["detail"]
611+
612+
@patch("airbyte_cdk.manifest_server.routers.manifest.build_source")
613+
def test_resolve_cdk_error_handling(
614+
self, mock_build_source, sample_manifest
615+
):
616+
"""Test that CDK errors in resolve are properly caught and converted to HTTP 400."""
617+
request_data = {
618+
"manifest": sample_manifest,
619+
}
620+
621+
# Simulate a CDK error during source building
622+
mock_build_source.side_effect = AttributeError("'NoneType' object has no attribute 'get'")
623+
624+
response = client.post("/v1/manifest/resolve", json=request_data)
625+
626+
assert response.status_code == 400
627+
data = response.json()
628+
assert "detail" in data
629+
assert "Error resolving manifest:" in data["detail"]
630+
assert "'NoneType' object has no attribute 'get'" in data["detail"]
631+
632+
@patch("airbyte_cdk.manifest_server.routers.manifest.build_source")
633+
def test_full_resolve_cdk_error_handling(
634+
self, mock_build_source, sample_manifest, sample_config
635+
):
636+
"""Test that CDK errors in full_resolve are properly caught and converted to HTTP 400."""
637+
request_data = {
638+
"manifest": sample_manifest,
639+
"config": sample_config,
640+
"stream_limit": 10
641+
}
642+
643+
# Simulate a CDK error during source building
644+
mock_build_source.side_effect = KeyError("Missing required field 'streams'")
645+
646+
response = client.post("/v1/manifest/full_resolve", json=request_data)
647+
648+
assert response.status_code == 400
649+
data = response.json()
650+
assert "detail" in data
651+
assert "Error full resolving manifest:" in data["detail"]
652+
assert "Missing required field 'streams'" in data["detail"]

0 commit comments

Comments
 (0)