Skip to content

Commit cde7fe0

Browse files
committed
concurrent cdk
1 parent 5093a2e commit cde7fe0

File tree

5 files changed

+153
-73
lines changed

5 files changed

+153
-73
lines changed

airbyte_cdk/manifest_server/app.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
from ..manifest_server.routers import capabilities, health, manifest
44

55
app = FastAPI(
6-
title="Manifest Server Service",
6+
title="Manifest Server",
77
description="A service for running low-code Airbyte connectors",
88
version="0.1.0",
99
contact={

airbyte_cdk/manifest_server/command_processor/utils.py

Lines changed: 25 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
from typing import Any, Mapping, Optional
1+
from typing import Any, List, Mapping, Optional
2+
3+
from airbyte_protocol_dataclasses.models import AirbyteStateMessage
24

35
from airbyte_cdk.models import (
46
AirbyteStream,
@@ -7,6 +9,10 @@
79
DestinationSyncMode,
810
SyncMode,
911
)
12+
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
13+
ConcurrentDeclarativeSource,
14+
TestLimits,
15+
)
1016
from airbyte_cdk.sources.declarative.manifest_declarative_source import (
1117
ManifestDeclarativeSource,
1218
)
@@ -56,21 +62,31 @@ def should_normalize_manifest(manifest: Mapping[str, Any]) -> bool:
5662

5763
def build_source(
5864
manifest: Mapping[str, Any],
65+
catalog: Optional[ConfiguredAirbyteCatalog],
5966
config: Mapping[str, Any],
67+
state: Optional[List[AirbyteStateMessage]],
68+
record_limit: Optional[int] = None,
6069
page_limit: Optional[int] = None,
6170
slice_limit: Optional[int] = None,
62-
) -> ManifestDeclarativeSource:
63-
return ManifestDeclarativeSource(
71+
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
72+
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
73+
# to retain ordering for the grouping of the builder message responses.
74+
if "concurrency_level" in manifest:
75+
manifest["concurrency_level"]["default_concurrency"] = 1
76+
else:
77+
manifest["concurrency_level"] = {"type": "ConcurrencyLevel", "default_concurrency": 1}
78+
79+
return ConcurrentDeclarativeSource(
80+
catalog=catalog,
81+
state=state,
6482
source_config=manifest,
6583
config=config,
6684
normalize_manifest=should_normalize_manifest(manifest),
6785
migrate_manifest=should_migrate_manifest(manifest),
6886
emit_connector_builder_messages=True,
69-
component_factory=ModelToComponentFactory(
70-
emit_connector_builder_messages=True,
71-
limit_pages_fetched_per_slice=page_limit,
72-
limit_slices_fetched=slice_limit,
73-
disable_retries=True,
74-
disable_cache=True,
87+
limits=TestLimits(
88+
max_pages_per_slice=page_limit,
89+
max_slices=slice_limit,
90+
max_records=record_limit,
7591
),
7692
)

airbyte_cdk/manifest_server/routers/manifest.py

Lines changed: 25 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from typing import Any, Dict, List, Mapping, Optional
44

55
import jsonschema
6+
from airbyte_protocol_dataclasses.models import AirbyteStateMessage, ConfiguredAirbyteCatalog
67
from fastapi import APIRouter, Depends, HTTPException
78

89
from airbyte_cdk.manifest_server.api_models.manifest import (
@@ -12,7 +13,9 @@
1213
DiscoverResponse,
1314
)
1415
from airbyte_cdk.models import AirbyteStateMessageSerializer
15-
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
16+
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
17+
ConcurrentDeclarativeSource,
18+
)
1619
from airbyte_cdk.sources.declarative.parsers.custom_code_compiler import (
1720
INJECTED_COMPONENTS_PY,
1821
INJECTED_COMPONENTS_PY_CHECKSUMS,
@@ -34,12 +37,23 @@
3437
def safe_build_source(
3538
manifest_dict: Mapping[str, Any],
3639
config_dict: Mapping[str, Any],
40+
catalog: Optional[ConfiguredAirbyteCatalog] = None,
41+
state: Optional[List[AirbyteStateMessage]] = None,
3742
page_limit: Optional[int] = None,
3843
slice_limit: Optional[int] = None,
39-
) -> ManifestDeclarativeSource:
44+
record_limit: Optional[int] = None,
45+
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
4046
"""Wrapper around build_source that converts ValidationError to HTTPException."""
4147
try:
42-
return build_source(manifest_dict, config_dict, page_limit, slice_limit)
48+
return build_source(
49+
manifest_dict,
50+
catalog,
51+
config_dict,
52+
state,
53+
record_limit,
54+
page_limit,
55+
slice_limit,
56+
)
4357
except jsonschema.exceptions.ValidationError as e:
4458
raise HTTPException(status_code=400, detail=f"Invalid manifest: {e.message}")
4559

@@ -58,10 +72,16 @@ def test_read(request: StreamTestReadRequest) -> StreamRead:
5872
"""
5973
config_dict = request.config.model_dump()
6074

75+
catalog = build_catalog(request.stream_name)
6176
source = safe_build_source(
62-
request.manifest.model_dump(), config_dict, request.page_limit, request.slice_limit
77+
request.manifest.model_dump(),
78+
config_dict,
79+
catalog,
80+
request.state,
81+
request.page_limit,
82+
request.slice_limit,
83+
request.record_limit,
6384
)
64-
catalog = build_catalog(request.stream_name)
6585
state = [AirbyteStateMessageSerializer.load(state) for state in request.state]
6686

6787
if request.custom_components_code:

unit_tests/manifest_server/command_processor/test_utils.py

Lines changed: 21 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -31,15 +31,12 @@ def test_build_catalog_creates_correct_structure(self):
3131
assert configured_stream.sync_mode == SyncMode.incremental
3232
assert configured_stream.destination_sync_mode == DestinationSyncMode.overwrite
3333

34-
@patch("airbyte_cdk.manifest_server.command_processor.utils.ManifestDeclarativeSource")
35-
@patch("airbyte_cdk.manifest_server.command_processor.utils.ModelToComponentFactory")
34+
@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
3635
def test_build_source_creates_manifest_declarative_source(
37-
self, mock_component_factory_class, mock_source_class
36+
self, mock_source_class
3837
):
39-
"""Test that build_source creates a ManifestDeclarativeSource with correct parameters."""
38+
"""Test that build_source creates a ConcurrentDeclarativeSource with correct parameters."""
4039
# Setup mocks
41-
mock_component_factory = Mock()
42-
mock_component_factory_class.return_value = mock_component_factory
4340
mock_source = Mock()
4441
mock_source_class.return_value = mock_source
4542

@@ -68,64 +65,57 @@ def test_build_source_creates_manifest_declarative_source(
6865
"timeout": 30,
6966
}
7067

71-
# Call build_source
72-
result = build_source(manifest, config)
68+
# Call build_source with additional parameters
69+
catalog = build_catalog("test_stream")
70+
state = []
71+
result = build_source(manifest, catalog, config, state)
7372

74-
# Verify ModelToComponentFactory was created with correct parameters
75-
mock_component_factory_class.assert_called_once_with(
76-
emit_connector_builder_messages=True,
77-
limit_pages_fetched_per_slice=None,
78-
limit_slices_fetched=None,
79-
disable_retries=True,
80-
disable_cache=True,
81-
)
82-
83-
# Verify ManifestDeclarativeSource was created with correct parameters
73+
# Verify ConcurrentDeclarativeSource was created with correct parameters
8474
mock_source_class.assert_called_once_with(
75+
catalog=catalog,
76+
state=state,
8577
source_config=manifest,
8678
config=config,
8779
normalize_manifest=False, # Default when flag not set
8880
migrate_manifest=False, # Default when flag not set
8981
emit_connector_builder_messages=True,
90-
component_factory=mock_component_factory,
82+
limits=mock_source_class.call_args[1]["limits"],
9183
)
9284

9385
assert result == mock_source
9486

95-
@patch("airbyte_cdk.manifest_server.command_processor.utils.ManifestDeclarativeSource")
96-
@patch("airbyte_cdk.manifest_server.command_processor.utils.ModelToComponentFactory")
87+
@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
9788
def test_build_source_with_normalize_flag(
98-
self, mock_component_factory_class, mock_source_class
89+
self, mock_source_class
9990
):
10091
"""Test build_source when normalize flag is set."""
101-
mock_component_factory = Mock()
102-
mock_component_factory_class.return_value = mock_component_factory
10392
mock_source = Mock()
10493
mock_source_class.return_value = mock_source
10594

10695
manifest = {"streams": [{"name": "test_stream"}], SHOULD_NORMALIZE_KEY: True}
10796
config = {"api_key": "test_key"}
97+
catalog = build_catalog("test_stream")
98+
state = []
10899

109-
build_source(manifest, config)
100+
build_source(manifest, catalog, config, state)
110101

111102
# Verify normalize_manifest is True
112103
call_args = mock_source_class.call_args[1]
113104
assert call_args["normalize_manifest"] is True
114105
assert call_args["migrate_manifest"] is False
115106

116-
@patch("airbyte_cdk.manifest_server.command_processor.utils.ManifestDeclarativeSource")
117-
@patch("airbyte_cdk.manifest_server.command_processor.utils.ModelToComponentFactory")
118-
def test_build_source_with_migrate_flag(self, mock_component_factory_class, mock_source_class):
107+
@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
108+
def test_build_source_with_migrate_flag(self, mock_source_class):
119109
"""Test build_source when migrate flag is set."""
120-
mock_component_factory = Mock()
121-
mock_component_factory_class.return_value = mock_component_factory
122110
mock_source = Mock()
123111
mock_source_class.return_value = mock_source
124112

125113
manifest = {"streams": [{"name": "test_stream"}], SHOULD_MIGRATE_KEY: True}
126114
config = {"api_key": "test_key"}
115+
catalog = build_catalog("test_stream")
116+
state = []
127117

128-
build_source(manifest, config)
118+
build_source(manifest, catalog, config, state)
129119

130120
# Verify migrate_manifest is True
131121
call_args = mock_source_class.call_args[1]

0 commit comments

Comments
 (0)