Skip to content

Commit c6b1ffe

Browse files
committed
fix typing, tests
1 parent 3bd4306 commit c6b1ffe

File tree

2 files changed

+17
-9
lines changed
  • airbyte_cdk/manifest_server/command_processor
  • unit_tests/manifest_server/command_processor

2 files changed

+17
-9
lines changed

airbyte_cdk/manifest_server/command_processor/utils.py

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -71,22 +71,26 @@ def build_source(
7171
) -> ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]]:
7272
# We enforce a concurrency level of 1 so that the stream is processed on a single thread
7373
# to retain ordering for the grouping of the builder message responses.
74-
if "concurrency_level" in manifest:
75-
manifest["concurrency_level"]["default_concurrency"] = 1
74+
manifest_no_concurrency = dict(manifest)
75+
if "concurrency_level" in manifest_no_concurrency:
76+
manifest_no_concurrency["concurrency_level"]["default_concurrency"] = 1
7677
else:
77-
manifest["concurrency_level"] = {"type": "ConcurrencyLevel", "default_concurrency": 1}
78+
manifest_no_concurrency["concurrency_level"] = {
79+
"type": "ConcurrencyLevel",
80+
"default_concurrency": 1,
81+
}
7882

7983
return ConcurrentDeclarativeSource(
8084
catalog=catalog,
8185
state=state,
82-
source_config=manifest,
86+
source_config=manifest_no_concurrency,
8387
config=config,
8488
normalize_manifest=should_normalize_manifest(manifest),
8589
migrate_manifest=should_migrate_manifest(manifest),
8690
emit_connector_builder_messages=True,
8791
limits=TestLimits(
88-
max_pages_per_slice=page_limit,
89-
max_slices=slice_limit,
90-
max_records=record_limit,
92+
max_pages_per_slice=page_limit or TestLimits.DEFAULT_MAX_PAGES_PER_SLICE,
93+
max_slices=slice_limit or TestLimits.DEFAULT_MAX_SLICES,
94+
max_records=record_limit or TestLimits.DEFAULT_MAX_RECORDS,
9195
),
9296
)

unit_tests/manifest_server/command_processor/test_utils.py

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def test_build_catalog_creates_correct_structure(self):
3232
assert configured_stream.destination_sync_mode == DestinationSyncMode.overwrite
3333

3434
@patch("airbyte_cdk.manifest_server.command_processor.utils.ConcurrentDeclarativeSource")
35-
def test_build_source_creates_manifest_declarative_source(self, mock_source_class):
35+
def test_build_source_creates_source(self, mock_source_class):
3636
"""Test that build_source creates a ConcurrentDeclarativeSource with correct parameters."""
3737
# Setup mocks
3838
mock_source = Mock()
@@ -69,10 +69,14 @@ def test_build_source_creates_manifest_declarative_source(self, mock_source_clas
6969
result = build_source(manifest, catalog, config, state)
7070

7171
# Verify ConcurrentDeclarativeSource was created with correct parameters
72+
expected_source_config = {
73+
**manifest,
74+
"concurrency_level": {"type": "ConcurrencyLevel", "default_concurrency": 1},
75+
}
7276
mock_source_class.assert_called_once_with(
7377
catalog=catalog,
7478
state=state,
75-
source_config=manifest,
79+
source_config=expected_source_config,
7680
config=config,
7781
normalize_manifest=False, # Default when flag not set
7882
migrate_manifest=False, # Default when flag not set

0 commit comments

Comments
 (0)