Skip to content

Commit 9a162b2

Browse files
committed
feat: manifest runner service
1 parent 51cfea5 commit 9a162b2

File tree

14 files changed

+481
-0
lines changed

14 files changed

+481
-0
lines changed

airbyte_cdk/manifest_runner/__init__.py

Whitespace-only changes.
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
"""
2+
API Models for the Manifest Runner Service.
3+
4+
This package contains all Pydantic models used for API requests and responses.
5+
"""
6+
7+
from .dicts import ConnectorConfig, Manifest
8+
from .manifest import (
9+
FullResolveRequest,
10+
ManifestResponse,
11+
ResolveRequest,
12+
StreamTestReadRequest,
13+
)
14+
from .stream import (
15+
AuxiliaryRequest,
16+
HttpRequest,
17+
HttpResponse,
18+
LogMessage,
19+
StreamRead,
20+
StreamReadPages,
21+
StreamReadSlices,
22+
)
23+
24+
__all__ = [
25+
# Typed Dicts
26+
"ConnectorConfig",
27+
"Manifest",
28+
# Manifest request/response models
29+
"FullResolveRequest",
30+
"ManifestResponse",
31+
"StreamTestReadRequest",
32+
"ResolveRequest",
33+
# Stream models
34+
"AuxiliaryRequest",
35+
"HttpRequest",
36+
"HttpResponse",
37+
"LogMessage",
38+
"StreamRead",
39+
"StreamReadPages",
40+
"StreamReadSlices",
41+
]
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
"""
2+
Common API models shared across different endpoints.
3+
"""
4+
5+
from pydantic import BaseModel, ConfigDict
6+
7+
8+
class Manifest(BaseModel):
9+
"""Base manifest model. Allows client generation to replace with proper JsonNode types."""
10+
11+
model_config = ConfigDict(extra="allow")
12+
13+
14+
class ConnectorConfig(BaseModel):
15+
"""Base connector configuration model. Allows client generation to replace with proper JsonNode types."""
16+
17+
model_config = ConfigDict(extra="allow")
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
"""
2+
Manifest-related API models.
3+
4+
These models define the request and response structures for manifest operations
5+
like reading, resolving, and full resolution.
6+
"""
7+
8+
from typing import Any, List, Optional
9+
10+
from pydantic import BaseModel, Field
11+
12+
from .dicts import ConnectorConfig, Manifest
13+
14+
15+
class StreamTestReadRequest(BaseModel):
16+
"""Request to test read from a specific stream."""
17+
18+
manifest: Manifest
19+
config: ConnectorConfig
20+
stream_name: str
21+
state: List[Any] = []
22+
custom_components_code: Optional[str] = None
23+
record_limit: int = Field(default=100, ge=1, le=5000)
24+
page_limit: int = Field(default=5, ge=1, le=20)
25+
slice_limit: int = Field(default=5, ge=1, le=20)
26+
27+
28+
class ResolveRequest(BaseModel):
29+
"""Request to resolve a manifest."""
30+
31+
manifest: Manifest
32+
33+
34+
class ManifestResponse(BaseModel):
35+
"""Response containing a manifest."""
36+
37+
manifest: Manifest
38+
39+
40+
class FullResolveRequest(BaseModel):
41+
"""Request to fully resolve a manifest."""
42+
43+
manifest: Manifest
44+
config: ConnectorConfig
45+
stream_limit: int = Field(default=100, ge=1, le=100)
Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
"""
2+
Stream-related API models.
3+
4+
These models define the structure for stream reading operations and responses.
5+
They accurately reflect the runtime types returned by the CDK, particularly
6+
fixing type mismatches like slice_descriptor being a string rather than an object.
7+
"""
8+
9+
from typing import Any, Dict, List, Optional
10+
11+
from pydantic import BaseModel
12+
13+
14+
class HttpRequest(BaseModel):
15+
"""HTTP request details."""
16+
17+
url: str
18+
headers: Optional[Dict[str, Any]]
19+
http_method: str
20+
body: Optional[str] = None
21+
22+
23+
class HttpResponse(BaseModel):
24+
"""HTTP response details."""
25+
26+
status: int
27+
body: Optional[str] = None
28+
headers: Optional[Dict[str, Any]] = None
29+
30+
31+
class LogMessage(BaseModel):
32+
"""Log message from stream processing."""
33+
34+
message: str
35+
level: str
36+
internal_message: Optional[str] = None
37+
stacktrace: Optional[str] = None
38+
39+
40+
class AuxiliaryRequest(BaseModel):
41+
"""Auxiliary HTTP request made during stream processing."""
42+
43+
title: str
44+
type: str
45+
description: str
46+
request: HttpRequest
47+
response: HttpResponse
48+
49+
50+
class StreamReadPages(BaseModel):
51+
"""Pages of data read from a stream slice."""
52+
53+
records: List[object]
54+
request: Optional[HttpRequest] = None
55+
response: Optional[HttpResponse] = None
56+
57+
58+
class StreamReadSlices(BaseModel):
59+
"""Slices of data read from a stream."""
60+
61+
pages: List[StreamReadPages]
62+
slice_descriptor: Optional[str] # This is actually a string at runtime, not Dict[str, Any]
63+
state: Optional[List[Dict[str, Any]]] = None
64+
auxiliary_requests: Optional[List[AuxiliaryRequest]] = None
65+
66+
67+
class StreamRead(BaseModel):
68+
"""Complete stream read response with properly typed fields."""
69+
70+
logs: List[LogMessage]
71+
slices: List[StreamReadSlices]
72+
test_read_limit_reached: bool
73+
auxiliary_requests: List[AuxiliaryRequest]
74+
inferred_schema: Optional[Dict[str, Any]]
75+
inferred_datetime_formats: Optional[Dict[str, str]]
76+
latest_config_update: Optional[Dict[str, Any]]

airbyte_cdk/manifest_runner/app.py

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
from fastapi import FastAPI
2+
3+
from ..manifest_runner.routers import capabilities, health, manifest
4+
5+
app = FastAPI(
6+
title="Manifest Runner Service",
7+
description="A service for running low-code Airbyte connectors",
8+
version="0.1.0",
9+
contact={
10+
"name": "Airbyte",
11+
"url": "https://airbyte.com",
12+
},
13+
)
14+
15+
app.include_router(health.router)
16+
app.include_router(capabilities.router)
17+
app.include_router(manifest.router, prefix="/v1")
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
"""Main entry point for the Airbyte CDK Manifest Runner server."""
3+
4+
import sys
5+
6+
import uvicorn
7+
8+
9+
def check_dependencies() -> bool:
10+
"""Check if all required dependencies are available."""
11+
try:
12+
import fastapi
13+
import uvicorn
14+
15+
return True
16+
except ImportError:
17+
return False
18+
19+
20+
def run_server(
21+
host: str = "127.0.0.1", port: int = 8000, reload: bool = False, log_level: str = "info"
22+
) -> None:
23+
"""Run the FastAPI server."""
24+
if not check_dependencies():
25+
print("❌ Manifest runner dependencies not found. Please install with:")
26+
print(" pip install airbyte-cdk[manifest-runner]")
27+
print(" # or")
28+
print(" poetry install --extras manifest-runner")
29+
sys.exit(1)
30+
31+
print(f"🚀 Starting Airbyte CDK Manifest Runner on {host}:{port}")
32+
33+
uvicorn.run(
34+
"airbyte_cdk.manifest_runner.app:app",
35+
host=host,
36+
port=port,
37+
reload=reload,
38+
log_level=log_level,
39+
)
40+
41+
42+
if __name__ == "__main__":
43+
run_server()

airbyte_cdk/manifest_runner/manifest_runner/__init__.py

Whitespace-only changes.
Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
from typing import Any, List, Mapping
2+
3+
from airbyte_cdk.connector_builder.models import StreamRead
4+
from airbyte_cdk.connector_builder.test_reader import TestReader
5+
from airbyte_cdk.models.airbyte_protocol import (
6+
AirbyteStateMessage,
7+
ConfiguredAirbyteCatalog,
8+
)
9+
from airbyte_cdk.sources.declarative.manifest_declarative_source import (
10+
ManifestDeclarativeSource,
11+
)
12+
13+
14+
class ManifestRunner:
15+
_source: ManifestDeclarativeSource
16+
17+
def __init__(self, source: ManifestDeclarativeSource) -> None:
18+
self._source = source
19+
20+
def test_read(
21+
self,
22+
config: Mapping[str, Any],
23+
catalog: ConfiguredAirbyteCatalog,
24+
state: List[AirbyteStateMessage],
25+
record_limit: int,
26+
page_limit: int,
27+
slice_limit: int,
28+
) -> StreamRead:
29+
"""
30+
Test the read method of the source.
31+
"""
32+
33+
test_read_handler = TestReader(
34+
max_pages_per_slice=page_limit,
35+
max_slices=slice_limit,
36+
max_record_limit=record_limit,
37+
)
38+
39+
stream_read = test_read_handler.run_test_read(
40+
source=self._source,
41+
config=config,
42+
configured_catalog=catalog,
43+
state=state,
44+
)
45+
46+
return stream_read
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
from typing import Any, Mapping
2+
3+
from airbyte_cdk.models.airbyte_protocol import (
4+
AirbyteStream,
5+
ConfiguredAirbyteCatalog,
6+
ConfiguredAirbyteStream,
7+
DestinationSyncMode,
8+
SyncMode,
9+
)
10+
from airbyte_cdk.sources.declarative.manifest_declarative_source import (
11+
ManifestDeclarativeSource,
12+
)
13+
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
14+
ModelToComponentFactory,
15+
)
16+
17+
SHOULD_NORMALIZE_KEY = "__should_normalize"
18+
SHOULD_MIGRATE_KEY = "__should_migrate"
19+
20+
21+
def build_catalog(stream_name: str) -> ConfiguredAirbyteCatalog:
22+
return ConfiguredAirbyteCatalog(
23+
streams=[
24+
ConfiguredAirbyteStream(
25+
stream=AirbyteStream(
26+
name=stream_name,
27+
json_schema={},
28+
supported_sync_modes=[SyncMode.full_refresh, SyncMode.incremental],
29+
),
30+
sync_mode=SyncMode.incremental,
31+
destination_sync_mode=DestinationSyncMode.overwrite,
32+
)
33+
]
34+
)
35+
36+
37+
def should_migrate_manifest(manifest: Mapping[str, Any]) -> bool:
38+
"""
39+
Determines whether the manifest should be migrated,
40+
based on the presence of the "__should_migrate" key.
41+
42+
This flag is set by the UI.
43+
"""
44+
return manifest.get(SHOULD_MIGRATE_KEY, False)
45+
46+
47+
def should_normalize_manifest(manifest: Mapping[str, Any]) -> bool:
48+
"""
49+
Determines whether the manifest should be normalized,
50+
based on the presence of the "__should_normalize" key.
51+
52+
This flag is set by the UI.
53+
"""
54+
return manifest.get(SHOULD_NORMALIZE_KEY, False)
55+
56+
57+
def build_source(
58+
manifest: Mapping[str, Any], config: Mapping[str, Any]
59+
) -> ManifestDeclarativeSource:
60+
return ManifestDeclarativeSource(
61+
source_config=manifest,
62+
config=config,
63+
normalize_manifest=should_normalize_manifest(manifest),
64+
migrate_manifest=should_migrate_manifest(manifest),
65+
emit_connector_builder_messages=True,
66+
component_factory=ModelToComponentFactory(
67+
emit_connector_builder_messages=True,
68+
limit_pages_fetched_per_slice=None, # TODO
69+
limit_slices_fetched=None, # TODO
70+
disable_retries=True,
71+
disable_cache=True,
72+
),
73+
)

0 commit comments

Comments
 (0)