Skip to content

Commit 4720990

Browse files
author
maxi297
committed
[WIP] destination discover PoC
1 parent ce2a7bb commit 4720990

File tree

2 files changed

+86
-54
lines changed

2 files changed

+86
-54
lines changed

airbyte_cdk/destinations/destination.py

Lines changed: 69 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from airbyte_cdk.connector import Connector
1515
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
1616
from airbyte_cdk.models import (
17+
AirbyteCatalog,
1718
AirbyteMessage,
1819
AirbyteMessageSerializer,
1920
ConfiguredAirbyteCatalog,
@@ -26,8 +27,70 @@
2627
logger = logging.getLogger("airbyte")
2728

2829

30+
def parse_args(args: List[str]) -> argparse.Namespace:
31+
"""
32+
:param args: commandline arguments
33+
:return:
34+
"""
35+
36+
parent_parser = argparse.ArgumentParser(add_help=False)
37+
parent_parser.add_argument("--debug", action="store_true", help="enables detailed debug logs related to the sync")
38+
main_parser = argparse.ArgumentParser()
39+
subparsers = main_parser.add_subparsers(title="commands", dest="command")
40+
41+
# spec
42+
subparsers.add_parser(
43+
"spec", help="outputs the json configuration specification", parents=[parent_parser]
44+
)
45+
46+
# check
47+
check_parser = subparsers.add_parser(
48+
"check", help="checks the config can be used to connect", parents=[parent_parser]
49+
)
50+
required_check_parser = check_parser.add_argument_group("required named arguments")
51+
required_check_parser.add_argument(
52+
"--config", type=str, required=True, help="path to the json configuration file"
53+
)
54+
55+
# discover
56+
discover_parser = subparsers.add_parser(
57+
"discover", help="discover the objects available in the destination", parents=[parent_parser]
58+
)
59+
required_discover_parser = discover_parser.add_argument_group("required named arguments")
60+
required_discover_parser.add_argument(
61+
"--config", type=str, required=True, help="path to the json configuration file"
62+
)
63+
64+
# write
65+
write_parser = subparsers.add_parser(
66+
"write", help="Writes data to the destination", parents=[parent_parser]
67+
)
68+
write_required = write_parser.add_argument_group("required named arguments")
69+
write_required.add_argument(
70+
"--config", type=str, required=True, help="path to the JSON configuration file"
71+
)
72+
write_required.add_argument(
73+
"--catalog", type=str, required=True, help="path to the configured catalog JSON file"
74+
)
75+
76+
parsed_args = main_parser.parse_args(args)
77+
cmd = parsed_args.command
78+
if not cmd:
79+
raise Exception("No command entered. ")
80+
elif cmd not in ["spec", "check", "discover", "write"]:
81+
# This is technically dead code since parse_args() would fail if this was the case
82+
# But it's non-obvious enough to warrant placing it here anyways
83+
raise Exception(f"Unknown command entered: {cmd}")
84+
85+
return parsed_args
86+
87+
2988
class Destination(Connector, ABC):
30-
VALID_CMDS = {"spec", "check", "write"}
89+
VALID_CMDS = {"spec", "check", "discover", "write"}
90+
91+
def discover(self) -> AirbyteCatalog:
92+
"""Implement to define what objects are available in the destination"""
93+
raise NotImplementedError("Discover method is not implemented")
3194

3295
@abstractmethod
3396
def write(
@@ -68,52 +131,9 @@ def _run_write(
68131
)
69132
logger.info("Writing complete.")
70133

71-
def parse_args(self, args: List[str]) -> argparse.Namespace:
72-
"""
73-
:param args: commandline arguments
74-
:return:
75-
"""
76-
77-
parent_parser = argparse.ArgumentParser(add_help=False)
78-
main_parser = argparse.ArgumentParser()
79-
subparsers = main_parser.add_subparsers(title="commands", dest="command")
80-
81-
# spec
82-
subparsers.add_parser(
83-
"spec", help="outputs the json configuration specification", parents=[parent_parser]
84-
)
85-
86-
# check
87-
check_parser = subparsers.add_parser(
88-
"check", help="checks the config can be used to connect", parents=[parent_parser]
89-
)
90-
required_check_parser = check_parser.add_argument_group("required named arguments")
91-
required_check_parser.add_argument(
92-
"--config", type=str, required=True, help="path to the json configuration file"
93-
)
94-
95-
# write
96-
write_parser = subparsers.add_parser(
97-
"write", help="Writes data to the destination", parents=[parent_parser]
98-
)
99-
write_required = write_parser.add_argument_group("required named arguments")
100-
write_required.add_argument(
101-
"--config", type=str, required=True, help="path to the JSON configuration file"
102-
)
103-
write_required.add_argument(
104-
"--catalog", type=str, required=True, help="path to the configured catalog JSON file"
105-
)
106-
107-
parsed_args = main_parser.parse_args(args)
108-
cmd = parsed_args.command
109-
if not cmd:
110-
raise Exception("No command entered. ")
111-
elif cmd not in ["spec", "check", "write"]:
112-
# This is technically dead code since parse_args() would fail if this was the case
113-
# But it's non-obvious enough to warrant placing it here anyways
114-
raise Exception(f"Unknown command entered: {cmd}")
115-
116-
return parsed_args
134+
@staticmethod
135+
def parse_args(args: List[str]) -> argparse.Namespace:
136+
return parse_args(args)
117137

118138
def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
119139
cmd = parsed_args.command
@@ -137,6 +157,8 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
137157

138158
if cmd == "check":
139159
yield self._run_check(config=config)
160+
elif cmd == "discover":
161+
yield AirbyteMessage(type=Type.CATALOG, catalog=self.discover())
140162
elif cmd == "write":
141163
# Wrap in UTF-8 to override any other input encodings
142164
wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")

airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py

Lines changed: 17 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,10 @@
11
#
22
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
#
4-
5-
4+
from abc import ABC, abstractmethod
65
from copy import deepcopy
76
from dataclasses import InitVar, dataclass, field
8-
from typing import Any, List, Mapping, MutableMapping, Optional, Union
7+
from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Union
98

109
import dpath
1110
from typing_extensions import deprecated
@@ -114,6 +113,16 @@ def _update_pointer(
114113
)
115114

116115

116+
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
117+
class AdditionalPropertyFieldsInferer(ABC):
118+
@abstractmethod
119+
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
120+
"""
121+
Infers additional property fields from the given property definition.
122+
"""
123+
pass
124+
125+
117126
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
118127
@dataclass
119128
class DynamicSchemaLoader(SchemaLoader):
@@ -126,6 +135,7 @@ class DynamicSchemaLoader(SchemaLoader):
126135
parameters: InitVar[Mapping[str, Any]]
127136
schema_type_identifier: SchemaTypeIdentifier
128137
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
138+
additional_property_fields_inferrer: Optional[AdditionalPropertyFieldsInferer] = None
129139

130140
def get_json_schema(self) -> Mapping[str, Any]:
131141
"""
@@ -149,9 +159,11 @@ def get_json_schema(self) -> Mapping[str, Any]:
149159
property_definition,
150160
self.schema_type_identifier.type_pointer,
151161
)
162+
163+
value.update(self.additional_property_fields_inferrer.infer(property_definition) if self.additional_property_fields_inferrer else {})
152164
properties[key] = value
153165

154-
transformed_properties = self._transform(properties, {})
166+
transformed_properties = self._transform(properties)
155167

156168
return {
157169
"$schema": "https://json-schema.org/draft-07/schema#",
@@ -163,8 +175,6 @@ def get_json_schema(self) -> Mapping[str, Any]:
163175
def _transform(
164176
self,
165177
properties: Mapping[str, Any],
166-
stream_state: StreamState,
167-
stream_slice: Optional[StreamSlice] = None,
168178
) -> Mapping[str, Any]:
169179
for transformation in self.schema_transformations:
170180
transformation.transform(
@@ -190,7 +200,7 @@ def _get_type(
190200
self,
191201
raw_schema: MutableMapping[str, Any],
192202
field_type_path: Optional[List[Union[InterpolatedString, str]]],
193-
) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
203+
) -> Dict[str, Any]:
194204
"""
195205
Determines the JSON Schema type for a field, supporting nullable and combined types.
196206
"""

0 commit comments

Comments
 (0)