Skip to content

Commit acc1003

Browse files
feat(manifest): add max_concurrent_async_job_count as an option in manifest (#584)
1 parent 3469ace commit acc1003

File tree

2 files changed

+20
-0
lines changed

2 files changed

+20
-0
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ def __init__(
8989
emit_connector_builder_messages=emit_connector_builder_messages,
9090
disable_resumable_full_refresh=True,
9191
connector_state_manager=self._connector_state_manager,
92+
max_concurrent_async_job_count=source_config.get("max_concurrent_async_job_count"),
9293
)
9394

9495
super().__init__(

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
StreamDescriptor,
3131
SyncMode,
3232
)
33+
from airbyte_cdk.sources.declarative.async_job.job_tracker import ConcurrentJobLimitReached
3334
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
3435
ConcurrentDeclarativeSource,
3536
)
@@ -1839,6 +1840,24 @@ def test_async_incremental_stream_uses_concurrent_cursor_with_state():
18391840
assert async_job_partition_router.stream_slicer._concurrent_state == expected_state
18401841

18411842

1843+
def test_max_concurrent_async_job_count_is_passed_to_job_tracker():
1844+
limit = 5
1845+
manifest_with_max_concurrent_async_job_count = copy.deepcopy(_MANIFEST)
1846+
manifest_with_max_concurrent_async_job_count["max_concurrent_async_job_count"] = str(limit)
1847+
source = ConcurrentDeclarativeSource(
1848+
source_config=manifest_with_max_concurrent_async_job_count,
1849+
config=_CONFIG,
1850+
catalog=_CATALOG,
1851+
state={},
1852+
)
1853+
source_job_tracker = source._constructor._job_tracker
1854+
assert source_job_tracker._limit == limit
1855+
1856+
[source_job_tracker.try_to_get_intent() for i in range(limit)]
1857+
with pytest.raises(ConcurrentJobLimitReached):
1858+
source_job_tracker.try_to_get_intent()
1859+
1860+
18421861
def test_stream_using_is_client_side_incremental_has_cursor_state():
18431862
expected_cursor_value = "2024-07-01"
18441863
state = [

0 commit comments

Comments
 (0)