@@ -66,22 +66,30 @@ def build_source(
6666) -> ConcurrentDeclarativeSource [Optional [List [AirbyteStateMessage ]]]:
6767 # We enforce a concurrency level of 1 so that the stream is processed on a single thread
6868 # to retain ordering for the grouping of the builder message responses.
69- manifest_no_concurrency = copy .deepcopy (manifest )
70- if "concurrency_level" in manifest_no_concurrency :
71- manifest_no_concurrency ["concurrency_level" ]["default_concurrency" ] = 1
69+ definition = copy .deepcopy (manifest )
70+ if "concurrency_level" in definition :
71+ definition ["concurrency_level" ]["default_concurrency" ] = 1
7272 else :
73- manifest_no_concurrency ["concurrency_level" ] = {
73+ definition ["concurrency_level" ] = {
7474 "type" : "ConcurrencyLevel" ,
7575 "default_concurrency" : 1 ,
7676 }
7777
78+ should_normalize = should_normalize_manifest (manifest )
79+ if should_normalize :
80+ del definition [SHOULD_NORMALIZE_KEY ]
81+
82+ should_migrate = should_migrate_manifest (manifest )
83+ if should_migrate :
84+ del definition [SHOULD_MIGRATE_KEY ]
85+
7886 return ConcurrentDeclarativeSource (
7987 catalog = catalog ,
8088 state = state ,
81- source_config = manifest_no_concurrency ,
89+ source_config = definition ,
8290 config = config ,
83- normalize_manifest = should_normalize_manifest ( manifest ) ,
84- migrate_manifest = should_migrate_manifest ( manifest ) ,
91+ normalize_manifest = should_normalize ,
92+ migrate_manifest = should_migrate ,
8593 emit_connector_builder_messages = True ,
8694 limits = TestLimits (
8795 max_pages_per_slice = page_limit or TestLimits .DEFAULT_MAX_PAGES_PER_SLICE ,
0 commit comments