33#
44
55
6- from dataclasses import asdict , dataclass , field
7- from typing import Any , ClassVar , Dict , List , Mapping
6+ from dataclasses import asdict
7+ from typing import Any , Dict , List , Mapping , Optional
88
99from airbyte_cdk .connector_builder .test_reader import TestReader
1010from airbyte_cdk .models import (
1515 Type ,
1616)
1717from airbyte_cdk .models import Type as MessageType
18+ from airbyte_cdk .sources .declarative .concurrent_declarative_source import (
19+ ConcurrentDeclarativeSource ,
20+ TestLimits ,
21+ )
1822from airbyte_cdk .sources .declarative .declarative_source import DeclarativeSource
1923from airbyte_cdk .sources .declarative .manifest_declarative_source import ManifestDeclarativeSource
20- from airbyte_cdk .sources .declarative .parsers .model_to_component_factory import (
21- ModelToComponentFactory ,
22- )
2324from airbyte_cdk .utils .airbyte_secrets_utils import filter_secrets
2425from airbyte_cdk .utils .datetime_helpers import ab_datetime_now
2526from airbyte_cdk .utils .traced_exception import AirbyteTracedException
2627
27- DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE = 5
28- DEFAULT_MAXIMUM_NUMBER_OF_SLICES = 5
29- DEFAULT_MAXIMUM_RECORDS = 100
30- DEFAULT_MAXIMUM_STREAMS = 100
31-
3228MAX_PAGES_PER_SLICE_KEY = "max_pages_per_slice"
3329MAX_SLICES_KEY = "max_slices"
3430MAX_RECORDS_KEY = "max_records"
3531MAX_STREAMS_KEY = "max_streams"
3632
3733
38- @dataclass
39- class TestLimits :
40- __test__ : ClassVar [bool ] = False # Tell Pytest this is not a Pytest class, despite its name
41-
42- max_records : int = field (default = DEFAULT_MAXIMUM_RECORDS )
43- max_pages_per_slice : int = field (default = DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE )
44- max_slices : int = field (default = DEFAULT_MAXIMUM_NUMBER_OF_SLICES )
45- max_streams : int = field (default = DEFAULT_MAXIMUM_STREAMS )
46-
47-
4834def get_limits (config : Mapping [str , Any ]) -> TestLimits :
4935 command_config = config .get ("__test_read_config" , {})
50- max_pages_per_slice = (
51- command_config .get (MAX_PAGES_PER_SLICE_KEY ) or DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE
36+ return TestLimits (
37+ max_records = command_config .get (MAX_RECORDS_KEY , TestLimits .DEFAULT_MAX_RECORDS ),
38+ max_pages_per_slice = command_config .get (
39+ MAX_PAGES_PER_SLICE_KEY , TestLimits .DEFAULT_MAX_PAGES_PER_SLICE
40+ ),
41+ max_slices = command_config .get (MAX_SLICES_KEY , TestLimits .DEFAULT_MAX_SLICES ),
42+ max_streams = command_config .get (MAX_STREAMS_KEY , TestLimits .DEFAULT_MAX_STREAMS ),
5243 )
53- max_slices = command_config .get (MAX_SLICES_KEY ) or DEFAULT_MAXIMUM_NUMBER_OF_SLICES
54- max_records = command_config .get (MAX_RECORDS_KEY ) or DEFAULT_MAXIMUM_RECORDS
55- max_streams = command_config .get (MAX_STREAMS_KEY ) or DEFAULT_MAXIMUM_STREAMS
56- return TestLimits (max_records , max_pages_per_slice , max_slices , max_streams )
5744
5845
5946def should_migrate_manifest (config : Mapping [str , Any ]) -> bool :
@@ -75,21 +62,30 @@ def should_normalize_manifest(config: Mapping[str, Any]) -> bool:
7562 return config .get ("__should_normalize" , False )
7663
7764
78- def create_source (config : Mapping [str , Any ], limits : TestLimits ) -> ManifestDeclarativeSource :
65+ def create_source (
66+ config : Mapping [str , Any ],
67+ limits : TestLimits ,
68+ catalog : Optional [ConfiguredAirbyteCatalog ],
69+ state : Optional [List [AirbyteStateMessage ]],
70+ ) -> ConcurrentDeclarativeSource [Optional [List [AirbyteStateMessage ]]]:
7971 manifest = config ["__injected_declarative_manifest" ]
80- return ManifestDeclarativeSource (
72+
73+ # We enforce a concurrency level of 1 so that the stream is processed on a single thread
74+ # to retain ordering for the grouping of the builder message responses.
75+ if "concurrency_level" in manifest :
76+ manifest ["concurrency_level" ]["default_concurrency" ] = 1
77+ else :
78+ manifest ["concurrency_level" ] = {"type" : "ConcurrencyLevel" , "default_concurrency" : 1 }
79+
80+ return ConcurrentDeclarativeSource (
81+ catalog = catalog ,
8182 config = config ,
82- emit_connector_builder_messages = True ,
83+ state = state ,
8384 source_config = manifest ,
85+ emit_connector_builder_messages = True ,
8486 migrate_manifest = should_migrate_manifest (config ),
8587 normalize_manifest = should_normalize_manifest (config ),
86- component_factory = ModelToComponentFactory (
87- emit_connector_builder_messages = True ,
88- limit_pages_fetched_per_slice = limits .max_pages_per_slice ,
89- limit_slices_fetched = limits .max_slices ,
90- disable_retries = True ,
91- disable_cache = True ,
92- ),
88+ limits = limits ,
9389 )
9490
9591
0 commit comments