Skip to content

Commit 4648550

Browse files
authored
Merge branch 'main' into lazebnyi/update-config-components-resolver-to-support-list-of-stream-configs
2 parents 3008468 + 155cdc8 commit 4648550

26 files changed

+1829
-253
lines changed

.coveragerc

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,3 +9,13 @@ omit =
99

1010
# omit as unimplemented
1111
airbyte_cdk/base_python/cdk/streams/auth/jwt.py
12+
13+
# omit temporary files and test files
14+
/tmp/tmp*.py
15+
unit_tests/*
16+
17+
[paths]
18+
# Reconcile file paths
19+
source =
20+
./
21+
/tmp/

airbyte_cdk/cli/source_declarative_manifest/_run.py

Lines changed: 78 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,17 @@
1616

1717
from __future__ import annotations
1818

19+
import argparse
1920
import json
2021
import pkgutil
2122
import sys
2223
import traceback
23-
from collections.abc import Mapping
24+
from collections.abc import MutableMapping
2425
from pathlib import Path
2526
from typing import Any, cast
2627

2728
import orjson
29+
import yaml
2830

2931
from airbyte_cdk.entrypoint import AirbyteEntrypoint, launch
3032
from airbyte_cdk.models import (
@@ -54,8 +56,9 @@ class SourceLocalYaml(YamlDeclarativeSource):
5456
def __init__(
5557
self,
5658
catalog: ConfiguredAirbyteCatalog | None,
57-
config: Mapping[str, Any] | None,
59+
config: MutableMapping[str, Any] | None,
5860
state: TState,
61+
config_path: str | None = None,
5962
**kwargs: Any,
6063
) -> None:
6164
"""
@@ -74,6 +77,7 @@ def __init__(
7477
config=config,
7578
state=state, # type: ignore [arg-type]
7679
path_to_yaml="manifest.yaml",
80+
config_path=config_path,
7781
)
7882

7983

@@ -91,8 +95,14 @@ def handle_command(args: list[str]) -> None:
9195

9296
def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
9397
try:
94-
config, catalog, state = _parse_inputs_into_config_catalog_state(args)
95-
return SourceLocalYaml(config=config, catalog=catalog, state=state)
98+
parsed_args = AirbyteEntrypoint.parse_args(args)
99+
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)
100+
return SourceLocalYaml(
101+
config=config,
102+
catalog=catalog,
103+
state=state,
104+
config_path=parsed_args.config if hasattr(parsed_args, "config") else None,
105+
)
96106
except Exception as error:
97107
print(
98108
orjson.dumps(
@@ -162,26 +172,46 @@ def create_declarative_source(
162172
connector builder.
163173
"""
164174
try:
165-
config: Mapping[str, Any] | None
175+
config: MutableMapping[str, Any] | None
166176
catalog: ConfiguredAirbyteCatalog | None
167177
state: list[AirbyteStateMessage]
168-
config, catalog, state = _parse_inputs_into_config_catalog_state(args)
169-
if config is None or "__injected_declarative_manifest" not in config:
178+
179+
parsed_args = AirbyteEntrypoint.parse_args(args)
180+
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)
181+
182+
if config is None:
170183
raise ValueError(
171184
"Invalid config: `__injected_declarative_manifest` should be provided at the root "
172-
f"of the config but config only has keys: {list(config.keys() if config else [])}"
185+
"of the config or using the --manifest-path argument."
186+
)
187+
188+
# If a manifest_path is provided in the args, inject it into the config
189+
if hasattr(parsed_args, "manifest_path") and parsed_args.manifest_path:
190+
injected_manifest = _parse_manifest_from_file(parsed_args.manifest_path)
191+
if injected_manifest:
192+
config["__injected_declarative_manifest"] = injected_manifest
193+
194+
if "__injected_declarative_manifest" not in config:
195+
raise ValueError(
196+
"Invalid config: `__injected_declarative_manifest` should be provided at the root "
197+
"of the config or using the --manifest-path argument. "
198+
f"Config only has keys: {list(config.keys() if config else [])}"
173199
)
174200
if not isinstance(config["__injected_declarative_manifest"], dict):
175201
raise ValueError(
176202
"Invalid config: `__injected_declarative_manifest` should be a dictionary, "
177203
f"but got type: {type(config['__injected_declarative_manifest'])}"
178204
)
179205

206+
if hasattr(parsed_args, "components_path") and parsed_args.components_path:
207+
_register_components_from_file(parsed_args.components_path)
208+
180209
return ConcurrentDeclarativeSource(
181210
config=config,
182211
catalog=catalog,
183212
state=state,
184213
source_config=cast(dict[str, Any], config["__injected_declarative_manifest"]),
214+
config_path=parsed_args.config if hasattr(parsed_args, "config") else None,
185215
)
186216
except Exception as error:
187217
print(
@@ -205,13 +235,12 @@ def create_declarative_source(
205235

206236

207237
def _parse_inputs_into_config_catalog_state(
208-
args: list[str],
238+
parsed_args: argparse.Namespace,
209239
) -> tuple[
210-
Mapping[str, Any] | None,
240+
MutableMapping[str, Any] | None,
211241
ConfiguredAirbyteCatalog | None,
212242
list[AirbyteStateMessage],
213243
]:
214-
parsed_args = AirbyteEntrypoint.parse_args(args)
215244
config = (
216245
ConcurrentDeclarativeSource.read_config(parsed_args.config)
217246
if hasattr(parsed_args, "config")
@@ -231,6 +260,44 @@ def _parse_inputs_into_config_catalog_state(
231260
return config, catalog, state
232261

233262

263+
def _parse_manifest_from_file(filepath: str) -> dict[str, Any] | None:
264+
"""Extract and parse a manifest file specified in the args."""
265+
try:
266+
with open(filepath, "r", encoding="utf-8") as manifest_file:
267+
manifest_content = yaml.safe_load(manifest_file)
268+
if manifest_content is None:
269+
raise ValueError(f"Manifest file at {filepath} is empty")
270+
if not isinstance(manifest_content, dict):
271+
raise ValueError(f"Manifest must be a dictionary, got {type(manifest_content)}")
272+
return manifest_content
273+
except Exception as error:
274+
raise ValueError(f"Failed to load manifest file from {filepath}: {error}")
275+
276+
277+
def _register_components_from_file(filepath: str) -> None:
278+
"""Load and register components from a Python file specified in the args."""
279+
import importlib.util
280+
import sys
281+
282+
components_path = Path(filepath)
283+
284+
module_name = "components"
285+
sdm_module_name = "source_declarative_manifest.components"
286+
287+
# Create module spec
288+
spec = importlib.util.spec_from_file_location(module_name, components_path)
289+
if spec is None or spec.loader is None:
290+
raise ImportError(f"Could not load module from {components_path}")
291+
292+
# Create module and execute code, registering the module before executing its code
293+
# To avoid issues with dataclasses that look up the module
294+
module = importlib.util.module_from_spec(spec)
295+
sys.modules[module_name] = module
296+
sys.modules[sdm_module_name] = module
297+
298+
spec.loader.exec_module(module)
299+
300+
234301
def run() -> None:
235302
args: list[str] = sys.argv[1:]
236303
handle_command(args)

airbyte_cdk/connector.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
import os
99
import pkgutil
1010
from abc import ABC, abstractmethod
11-
from typing import Any, Generic, Mapping, Optional, Protocol, TypeVar
11+
from typing import Any, Generic, Mapping, MutableMapping, Optional, Protocol, TypeVar
1212

1313
import yaml
1414

@@ -41,9 +41,9 @@ def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
4141
"""
4242

4343
@staticmethod
44-
def read_config(config_path: str) -> Mapping[str, Any]:
44+
def read_config(config_path: str) -> MutableMapping[str, Any]:
4545
config = BaseConnector._read_json_file(config_path)
46-
if isinstance(config, Mapping):
46+
if isinstance(config, MutableMapping):
4747
return config
4848
else:
4949
raise ValueError(

airbyte_cdk/entrypoint.py

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,18 @@ def parse_args(args: List[str]) -> argparse.Namespace:
8484
required_check_parser.add_argument(
8585
"--config", type=str, required=True, help="path to the json configuration file"
8686
)
87+
check_parser.add_argument(
88+
"--manifest-path",
89+
type=str,
90+
required=False,
91+
help="path to the YAML manifest file to inject into the config",
92+
)
93+
check_parser.add_argument(
94+
"--components-path",
95+
type=str,
96+
required=False,
97+
help="path to the custom components file, if it exists",
98+
)
8799

88100
# discover
89101
discover_parser = subparsers.add_parser(
@@ -95,6 +107,18 @@ def parse_args(args: List[str]) -> argparse.Namespace:
95107
required_discover_parser.add_argument(
96108
"--config", type=str, required=True, help="path to the json configuration file"
97109
)
110+
discover_parser.add_argument(
111+
"--manifest-path",
112+
type=str,
113+
required=False,
114+
help="path to the YAML manifest file to inject into the config",
115+
)
116+
discover_parser.add_argument(
117+
"--components-path",
118+
type=str,
119+
required=False,
120+
help="path to the custom components file, if it exists",
121+
)
98122

99123
# read
100124
read_parser = subparsers.add_parser(
@@ -114,6 +138,18 @@ def parse_args(args: List[str]) -> argparse.Namespace:
114138
required=True,
115139
help="path to the catalog used to determine which data to read",
116140
)
141+
read_parser.add_argument(
142+
"--manifest-path",
143+
type=str,
144+
required=False,
145+
help="path to the YAML manifest file to inject into the config",
146+
)
147+
read_parser.add_argument(
148+
"--components-path",
149+
type=str,
150+
required=False,
151+
help="path to the custom components file, if it exists",
152+
)
117153

118154
return main_parser.parse_args(args)
119155

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,7 @@ def __init__(
7474
debug: bool = False,
7575
emit_connector_builder_messages: bool = False,
7676
component_factory: Optional[ModelToComponentFactory] = None,
77+
config_path: Optional[str] = None,
7778
**kwargs: Any,
7879
) -> None:
7980
# todo: We could remove state from initialization. Now that streams are grouped during the read(), a source
@@ -96,6 +97,7 @@ def __init__(
9697
debug=debug,
9798
emit_connector_builder_messages=emit_connector_builder_messages,
9899
component_factory=component_factory,
100+
config_path=config_path,
99101
)
100102

101103
concurrency_level_from_manifest = self._source_config.get("concurrency_level")

0 commit comments

Comments
 (0)