2020import pkgutil
2121import sys
2222import traceback
23- from collections .abc import Mapping
23+ from collections .abc import Mapping , MutableMapping
2424from pathlib import Path
2525from typing import Any , cast
2626
2727import orjson
28+ import yaml
2829
2930from airbyte_cdk .entrypoint import AirbyteEntrypoint , launch
3031from airbyte_cdk .models import (
@@ -54,7 +55,7 @@ class SourceLocalYaml(YamlDeclarativeSource):
5455 def __init__ (
5556 self ,
5657 catalog : ConfiguredAirbyteCatalog | None ,
57- config : Mapping [str , Any ] | None ,
58+ config : MutableMapping [str , Any ] | None ,
5859 state : TState ,
5960 ** kwargs : Any ,
6061 ) -> None :
@@ -162,14 +163,27 @@ def create_declarative_source(
162163 connector builder.
163164 """
164165 try :
165- config : Mapping [str , Any ] | None
166+ config : MutableMapping [str , Any ] | None
166167 catalog : ConfiguredAirbyteCatalog | None
167168 state : list [AirbyteStateMessage ]
168169 config , catalog , state = _parse_inputs_into_config_catalog_state (args )
169- if config is None or "__injected_declarative_manifest" not in config :
170+
171+ if config is None :
170172 raise ValueError (
171173 "Invalid config: `__injected_declarative_manifest` should be provided at the root "
172- f"of the config but config only has keys: { list (config .keys () if config else [])} "
174+ "of the config or using the --manifest-path argument."
175+ )
176+
177+ # If a manifest_path is provided in the args, inject it into the config
178+ injected_manifest = _parse_manifest_from_args (args )
179+ if injected_manifest :
180+ config ["__injected_declarative_manifest" ] = injected_manifest
181+
182+ if "__injected_declarative_manifest" not in config :
183+ raise ValueError (
184+ "Invalid config: `__injected_declarative_manifest` should be provided at the root "
185+ "of the config or using the --manifest-path argument. "
186+ f"Config only has keys: { list (config .keys () if config else [])} "
173187 )
174188 if not isinstance (config ["__injected_declarative_manifest" ], dict ):
175189 raise ValueError (
@@ -207,7 +221,7 @@ def create_declarative_source(
207221def _parse_inputs_into_config_catalog_state (
208222 args : list [str ],
209223) -> tuple [
210- Mapping [str , Any ] | None ,
224+ MutableMapping [str , Any ] | None ,
211225 ConfiguredAirbyteCatalog | None ,
212226 list [AirbyteStateMessage ],
213227]:
@@ -231,6 +245,27 @@ def _parse_inputs_into_config_catalog_state(
231245 return config , catalog , state
232246
233247
248+ def _parse_manifest_from_args (args : list [str ]) -> dict [str , Any ] | None :
249+ """Extracts and parse the manifest file if specified in the args."""
250+ parsed_args = AirbyteEntrypoint .parse_args (args )
251+
252+ # Safely check if manifest_path is provided in the args
253+ if hasattr (parsed_args , "manifest_path" ) and parsed_args .manifest_path :
254+ try :
255+ # Read the manifest file
256+ with open (parsed_args .manifest_path , "r" ) as manifest_file :
257+ manifest_content = yaml .safe_load (manifest_file )
258+ if not isinstance (manifest_content , dict ):
259+ raise ValueError (f"Manifest must be a dictionary, got { type (manifest_content )} " )
260+ return manifest_content
261+ except Exception as error :
262+ raise ValueError (
263+ f"Failed to load manifest file from { parsed_args .manifest_path } : { error } "
264+ )
265+
266+ return None
267+
268+
234269def run () -> None :
235270 args : list [str ] = sys .argv [1 :]
236271 handle_command (args )
0 commit comments