Skip to content

Commit 4c19c87

Browse files
committed
update config migration/transformation/validation flow
1 parent 1fe042e commit 4c19c87

File tree

4 files changed

+30
-43
lines changed

4 files changed

+30
-43
lines changed

airbyte_cdk/entrypoint.py

Lines changed: 0 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -186,13 +186,6 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
186186
else:
187187
raw_config = self.source.read_config(parsed_args.config)
188188
config = self.source.configure(raw_config, temp_dir)
189-
mutable_config = dict(config)
190-
config_path = parsed_args.config if parsed_args.config else None
191-
if config_path:
192-
self.source.migrate_config(config_path, mutable_config)
193-
self.source.transform_config(mutable_config)
194-
config = mutable_config
195-
196189
yield from [
197190
self.airbyte_message_to_string(queued_message)
198191
for queued_message in self._emit_queued_messages(self.source)

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 27 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@
1010
from types import ModuleType
1111
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set
1212

13+
import orjson
1314
import yaml
1415
from jsonschema.exceptions import ValidationError
1516
from jsonschema.validators import validate
1617
from packaging.version import InvalidVersion, Version
1718

19+
from airbyte_cdk.config_observation import create_connector_config_control_message
1820
from airbyte_cdk.connector_builder.models import (
1921
LogMessage as ConnectorBuilderLogMessage,
2022
)
@@ -29,6 +31,7 @@
2931
ConnectorSpecification,
3032
FailureType,
3133
)
34+
from airbyte_cdk.models.airbyte_protocol_serializers import AirbyteMessageSerializer
3235
from airbyte_cdk.sources.declarative.checks import COMPONENTS_CHECKER_TYPE_MAPPING
3336
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
3437
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
@@ -100,6 +103,7 @@ def __init__(
100103
component_factory: Optional[ModelToComponentFactory] = None,
101104
migrate_manifest: Optional[bool] = False,
102105
normalize_manifest: Optional[bool] = False,
106+
config_path: Optional[str] = None,
103107
) -> None:
104108
"""
105109
Args:
@@ -109,6 +113,7 @@ def __init__(
109113
emit_connector_builder_messages: True if messages should be emitted to the connector builder.
110114
component_factory: optional factory if ModelToComponentFactory's default behavior needs to be tweaked.
111115
normalize_manifest: Optional flag to indicate if the manifest should be normalized.
116+
config_path: Optional path to the config file.
112117
"""
113118
self.logger = logging.getLogger(f"airbyte.{self.name}")
114119
self._should_normalize = normalize_manifest
@@ -131,7 +136,6 @@ def __init__(
131136
self._slice_logger: SliceLogger = (
132137
AlwaysLogSliceLogger() if emit_connector_builder_messages else DebugSliceLogger()
133138
)
134-
self._config = config or {}
135139

136140
# resolve all components in the manifest
137141
self._source_config = self._pre_process_manifest(dict(source_config))
@@ -140,11 +144,33 @@ def __init__(
140144
# apply additional post-processing to the manifest
141145
self._post_process_manifest()
142146

147+
self._config: Mapping[str, Any]
143148
self._spec_component: Optional[Spec] = None
144149
if spec := self._source_config.get("spec"):
145150
if "type" not in spec:
146151
spec["type"] = "Spec"
147152
self._spec_component = self._constructor.create_component(SpecModel, spec, dict())
153+
mutable_config = dict(config) if config else {}
154+
155+
if config_path:
156+
self._spec_component.migrate_config(mutable_config)
157+
try:
158+
if mutable_config != config:
159+
with open(config_path, "w") as f:
160+
json.dump(mutable_config, f)
161+
self.message_repository.emit_message(
162+
create_connector_config_control_message(mutable_config)
163+
)
164+
# We have no mechanism for consuming the queue, so we print the messages to stdout
165+
for message in self.message_repository.consume_queue():
166+
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
167+
except Exception as e:
168+
self.logger.error(f"Error migrating config: {str(e)}")
169+
mutable_config = dict(config) if config else {}
170+
self._spec_component.transform_config(mutable_config)
171+
self._config = mutable_config
172+
else:
173+
self._config = config or {}
148174

149175
@property
150176
def resolved_manifest(self) -> Mapping[str, Any]:
@@ -296,12 +322,6 @@ def streams(self, config: Mapping[str, Any]) -> List[Stream]:
296322

297323
return source_streams
298324

299-
def migrate_config(self, config_path: Optional[str], config: MutableMapping[str, Any]) -> None:
300-
self._spec_component.migrate_config(config_path, config) if self._spec_component else None
301-
302-
def transform_config(self, config: MutableMapping[str, Any]) -> None:
303-
self._spec_component.transform_config(config) if self._spec_component else None
304-
305325
@staticmethod
306326
def _initialize_cache_for_parent_streams(
307327
stream_configs: List[Dict[str, Any]],

airbyte_cdk/sources/declarative/spec/spec.py

Lines changed: 3 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -69,30 +69,16 @@ def generate_spec(self) -> ConnectorSpecification:
6969
# We remap these keys to camel case because that's the existing format expected by the rest of the platform
7070
return ConnectorSpecificationSerializer.load(obj)
7171

72-
def migrate_config(self, config_path: Optional[str], config: Mapping[str, Any]) -> None:
72+
def migrate_config(self, config: MutableMapping[str, Any]) -> None:
7373
"""
74-
Apply all specified config transformations to the provided config and save the modified config to the given path and emit a control message.
74+
Apply all specified config transformations to the provided config and emit a control message.
7575
76-
:param config_path: The path to the config file
7776
:param config: The user-provided config to migrate
7877
"""
7978

80-
if not config_path:
81-
return
82-
83-
mutable_config = dict(config)
8479
for migration in self.config_migrations:
8580
for transformation in migration.transformations:
86-
transformation.transform(mutable_config)
87-
88-
if mutable_config != config:
89-
with open(config_path, "w") as f:
90-
json.dump(mutable_config, f)
91-
self.message_repository.emit_message(
92-
create_connector_config_control_message(mutable_config)
93-
)
94-
for message in self.message_repository.consume_queue():
95-
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
81+
transformation.transform(config)
9682

9783
def transform_config(self, config: MutableMapping[str, Any]) -> None:
9884
"""

airbyte_cdk/sources/source.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -93,15 +93,3 @@ def read_catalog(cls, catalog_path: str) -> ConfiguredAirbyteCatalog:
9393
def name(self) -> str:
9494
"""Source name"""
9595
return self.__class__.__name__
96-
97-
def migrate_config(self, config_path: Optional[str], config: MutableMapping[str, Any]) -> None:
98-
"""
99-
Optional method to migrate config.
100-
"""
101-
pass
102-
103-
def transform_config(self, config: MutableMapping[str, Any]) -> None:
104-
"""
105-
Optional method to transform config.
106-
"""
107-
pass

0 commit comments

Comments
 (0)