Skip to content

Commit 5f9efc5

Browse files
committed
refactor: connectors can launch themselves; deprecate: AirbyteEntrypoint and launch()
1 parent 4b73b46 commit 5f9efc5

File tree

14 files changed

+385
-239
lines changed

14 files changed

+385
-239
lines changed

airbyte_cdk/cli/source_declarative_manifest/_run.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,7 @@
4545
)
4646
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
4747
from airbyte_cdk.sources.source import TState
48+
from airbyte_cdk.utils.cli_arg_parse import parse_cli_args
4849
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
4950

5051

@@ -93,7 +94,7 @@ def handle_command(args: list[str]) -> None:
9394

9495
def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
9596
try:
96-
parsed_args = AirbyteEntrypoint.parse_args(args)
97+
parsed_args = parse_cli_args(args)
9798
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)
9899
return SourceLocalYaml(config=config, catalog=catalog, state=state)
99100
except Exception as error:
@@ -119,10 +120,7 @@ def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
119120

120121
def handle_local_manifest_command(args: list[str]) -> None:
121122
source = _get_local_yaml_source(args)
122-
launch(
123-
source=source,
124-
args=args,
125-
)
123+
source.launch_with_cli_args(args)
126124

127125

128126
def handle_remote_manifest_command(args: list[str]) -> None:
@@ -149,10 +147,7 @@ def handle_remote_manifest_command(args: list[str]) -> None:
149147
print(AirbyteEntrypoint.airbyte_message_to_string(message))
150148
else:
151149
source = create_declarative_source(args)
152-
launch(
153-
source=source,
154-
args=args,
155-
)
150+
source.launch_with_cli_args(args=args)
156151

157152

158153
def create_declarative_source(
@@ -169,7 +164,7 @@ def create_declarative_source(
169164
catalog: ConfiguredAirbyteCatalog | None
170165
state: list[AirbyteStateMessage]
171166

172-
parsed_args = AirbyteEntrypoint.parse_args(args)
167+
parsed_args = parse_cli_args(args)
173168
config, catalog, state = _parse_inputs_into_config_catalog_state(parsed_args)
174169

175170
if config is None:

airbyte_cdk/connector.py

Lines changed: 85 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,10 @@
88
import os
99
import pkgutil
1010
from abc import ABC, abstractmethod
11-
from typing import Any, Generic, Mapping, MutableMapping, Optional, Protocol, TypeVar
11+
from email import message
12+
from io import StringIO
13+
from pathlib import Path
14+
from typing import Any, Generic, Mapping, Optional, Self, TypeVar
1215

1316
import yaml
1417

@@ -17,15 +20,22 @@
1720
ConnectorSpecification,
1821
ConnectorSpecificationSerializer,
1922
)
23+
from airbyte_cdk.models.airbyte_protocol import AirbyteMessage, Type
24+
from airbyte_cdk.sources.message.repository import MessageRepository, PassthroughMessageRepository
25+
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
26+
from airbyte_cdk.utils.cli_arg_parse import ConnectorCLIArgs, parse_cli_args
2027

2128

22-
def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
29+
def _load_optional_package_file(package: str, filename: str) -> Optional[bytes]:
2330
"""Gets a resource from a package, returning None if it does not exist"""
2431
try:
2532
return pkgutil.get_data(package, filename)
2633
except FileNotFoundError:
2734
return None
2835

36+
def _write_config(config: Mapping[str, Any], config_path: str) -> None:
37+
Path(config_path).write_text(json.dumps(config))
38+
2939

3040
TConfig = TypeVar("TConfig", bound=Mapping[str, Any])
3141

@@ -35,37 +45,19 @@ class BaseConnector(ABC, Generic[TConfig]):
3545
check_config_against_spec: bool = True
3646

3747
@abstractmethod
38-
def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
39-
"""
40-
Persist config in temporary directory to run the Source job
41-
"""
42-
43-
@staticmethod
44-
def read_config(config_path: str) -> MutableMapping[str, Any]:
45-
config = BaseConnector._read_json_file(config_path)
46-
if isinstance(config, MutableMapping):
47-
return config
48-
else:
49-
raise ValueError(
50-
f"The content of {config_path} is not an object and therefore is not a valid config. Please ensure the file represent a config."
51-
)
52-
53-
@staticmethod
54-
def _read_json_file(file_path: str) -> Any:
55-
with open(file_path, "r") as file:
56-
contents = file.read()
57-
58-
try:
59-
return json.loads(contents)
60-
except json.JSONDecodeError as error:
61-
raise ValueError(
62-
f"Could not read json file {file_path}: {error}. Please ensure that it is a valid JSON."
63-
)
64-
65-
@staticmethod
66-
def write_config(config: TConfig, config_path: str) -> None:
67-
with open(config_path, "w") as fh:
68-
fh.write(json.dumps(config))
48+
@classmethod
49+
def to_typed_config(
50+
cls,
51+
config: Mapping[str, Any],
52+
) -> TConfig:
53+
"""Return a typed config object from a config dictionary."""
54+
...
55+
56+
@classmethod
57+
def configure(cls, config: Mapping[str, Any], temp_dir: str) -> TConfig:
58+
config_path = os.path.join(temp_dir, "config.json")
59+
_write_config(config, config_path)
60+
return cls.to_typed_config(config)
6961

7062
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
7163
"""
@@ -75,8 +67,8 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
7567

7668
package = self.__class__.__module__.split(".")[0]
7769

78-
yaml_spec = load_optional_package_file(package, "spec.yaml")
79-
json_spec = load_optional_package_file(package, "spec.json")
70+
yaml_spec = _load_optional_package_file(package, "spec.yaml")
71+
json_spec = _load_optional_package_file(package, "spec.json")
8072

8173
if yaml_spec and json_spec:
8274
raise RuntimeError(
@@ -104,20 +96,61 @@ def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionSta
10496
to the Stripe API.
10597
"""
10698

107-
108-
class _WriteConfigProtocol(Protocol):
109-
@staticmethod
110-
def write_config(config: Mapping[str, Any], config_path: str) -> None: ...
111-
112-
113-
class DefaultConnectorMixin:
114-
# can be overridden to change an input config
115-
def configure(
116-
self: _WriteConfigProtocol, config: Mapping[str, Any], temp_dir: str
117-
) -> Mapping[str, Any]:
118-
config_path = os.path.join(temp_dir, "config.json")
119-
self.write_config(config, config_path)
120-
return config
121-
122-
123-
class Connector(DefaultConnectorMixin, BaseConnector[Mapping[str, Any]], ABC): ...
99+
@abstractmethod
100+
@classmethod
101+
def create_with_cli_args(
102+
cls,
103+
cli_args: ConnectorCLIArgs,
104+
) -> Self:
105+
"""Return an instance of the connector, using the provided CLI args."""
106+
...
107+
108+
@classmethod
109+
def launch_with_cli_args(
110+
cls,
111+
args: list[str],
112+
*,
113+
logger: logging.Logger | None = None,
114+
message_repository: MessageRepository | None = None,
115+
# TODO: Add support for inputs:
116+
# stdin: StringIO | MessageRepository | None = None,
117+
) -> None:
118+
"""Launches the connector with the provided configuration."""
119+
logger = logger or logging.getLogger(f"airbyte.{type(cls).__name__}")
120+
message_repository = message_repository or PassthroughMessageRepository()
121+
parsed_cli_args: ConnectorCLIArgs = parse_cli_args(
122+
args,
123+
with_read=True if getattr(cls, "read", False) else False,
124+
with_write=True if getattr(cls, "write", False) else False,
125+
with_discover=True if getattr(cls, "discover", False) else False,
126+
)
127+
logger.info(f"Launching connector with args: {parsed_cli_args}")
128+
verb = parsed_cli_args.command
129+
130+
spec: ConnectorSpecification
131+
if verb == "check":
132+
config = cls.to_typed_config(parsed_cli_args.get_config_dict())
133+
connector = cls.create_with_cli_args(parsed_cli_args)
134+
connector.check(logger, config)
135+
elif verb == "spec":
136+
connector = cls()
137+
spec = connector.spec(logger)
138+
message_repository.emit_message(
139+
AirbyteMessage(
140+
type=Type.SPEC,
141+
spec=spec,
142+
)
143+
)
144+
elif verb == "discover":
145+
connector = cls()
146+
spec = connector.spec(logger)
147+
print(json.dumps(spec.to_dict(), indent=2))
148+
elif verb == "read":
149+
# Implementation for reading data goes here
150+
pass
151+
elif verb == "write":
152+
# Implementation for writing data goes here
153+
pass
154+
else:
155+
raise ValueError(f"Unknown command: {verb}")
156+
# Implementation for launching the connector goes here

airbyte_cdk/connector_builder/main.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
2929
from airbyte_cdk.sources.source import Source
30+
from airbyte_cdk.utils.cli_arg_parse import parse_cli_args
3031
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
3132

3233

@@ -35,7 +36,7 @@ def get_config_and_catalog_from_args(
3536
) -> Tuple[str, Mapping[str, Any], Optional[ConfiguredAirbyteCatalog], Any]:
3637
# TODO: Add functionality for the `debug` logger.
3738
# Currently, no one `debug` level log will be displayed during `read` a stream for a connector created through `connector-builder`.
38-
parsed_args = AirbyteEntrypoint.parse_args(args)
39+
parsed_args = parse_cli_args(args)
3940
config_path, catalog_path, state_path = (
4041
parsed_args.config,
4142
parsed_args.catalog,

airbyte_cdk/destinations/destination.py

Lines changed: 14 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import logging
88
import sys
99
from abc import ABC, abstractmethod
10+
from multiprocessing import Value
1011
from typing import Any, Iterable, List, Mapping
1112

1213
import orjson
@@ -21,6 +22,7 @@
2122
Type,
2223
)
2324
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
25+
from airbyte_cdk.utils.cli_arg_parse import ConnectorCLIArgs, parse_cli_args
2426
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
2527

2628
logger = logging.getLogger("airbyte")
@@ -68,54 +70,10 @@ def _run_write(
6870
)
6971
logger.info("Writing complete.")
7072

71-
def parse_args(self, args: List[str]) -> argparse.Namespace:
72-
"""
73-
:param args: commandline arguments
74-
:return:
75-
"""
76-
77-
parent_parser = argparse.ArgumentParser(add_help=False)
78-
main_parser = argparse.ArgumentParser()
79-
subparsers = main_parser.add_subparsers(title="commands", dest="command")
80-
81-
# spec
82-
subparsers.add_parser(
83-
"spec", help="outputs the json configuration specification", parents=[parent_parser]
84-
)
85-
86-
# check
87-
check_parser = subparsers.add_parser(
88-
"check", help="checks the config can be used to connect", parents=[parent_parser]
89-
)
90-
required_check_parser = check_parser.add_argument_group("required named arguments")
91-
required_check_parser.add_argument(
92-
"--config", type=str, required=True, help="path to the json configuration file"
93-
)
94-
95-
# write
96-
write_parser = subparsers.add_parser(
97-
"write", help="Writes data to the destination", parents=[parent_parser]
98-
)
99-
write_required = write_parser.add_argument_group("required named arguments")
100-
write_required.add_argument(
101-
"--config", type=str, required=True, help="path to the JSON configuration file"
102-
)
103-
write_required.add_argument(
104-
"--catalog", type=str, required=True, help="path to the configured catalog JSON file"
105-
)
106-
107-
parsed_args = main_parser.parse_args(args)
108-
cmd = parsed_args.command
109-
if not cmd:
110-
raise Exception("No command entered. ")
111-
elif cmd not in ["spec", "check", "write"]:
112-
# This is technically dead code since parse_args() would fail if this was the case
113-
# But it's non-obvious enough to warrant placing it here anyways
114-
raise Exception(f"Unknown command entered: {cmd}")
115-
116-
return parsed_args
117-
118-
def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
73+
def run_cmd(
74+
self,
75+
parsed_args: ConnectorCLIArgs,
76+
) -> Iterable[AirbyteMessage]:
11977
cmd = parsed_args.command
12078
if cmd not in self.VALID_CMDS:
12179
raise Exception(f"Unrecognized command: {cmd}")
@@ -138,6 +96,9 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
13896
if cmd == "check":
13997
yield self._run_check(config=config)
14098
elif cmd == "write":
99+
if not parsed_args.catalog:
100+
raise ValueError("Catalog path is required for write command.")
101+
141102
# Wrap in UTF-8 to override any other input encodings
142103
wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
143104
yield from self._run_write(
@@ -148,7 +109,11 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
148109

149110
def run(self, args: List[str]) -> None:
150111
init_uncaught_exception_handler(logger)
151-
parsed_args = self.parse_args(args)
112+
parsed_args: ConnectorCLIArgs = parse_cli_args(
113+
args,
114+
with_write=True,
115+
with_read=False,
116+
)
152117
output_messages = self.run_cmd(parsed_args)
153118
for message in output_messages:
154119
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())

0 commit comments

Comments
 (0)