Skip to content

Commit f8199e1

Browse files
committed
update per comments
1 parent a07aa39 commit f8199e1

File tree

1 file changed

+27
-28
lines changed

1 file changed

+27
-28
lines changed

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 27 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@
88
from copy import deepcopy
99
from importlib import metadata
1010
from types import ModuleType
11-
from typing import Any, Dict, Iterator, List, Mapping, Optional, Set
11+
from typing import Any, Dict, Iterator, List, Mapping, MutableMapping, Optional, Set
1212

1313
import orjson
1414
import yaml
@@ -145,33 +145,13 @@ def __init__(
145145
self._post_process_manifest()
146146

147147
self._config: Mapping[str, Any]
148-
self._spec_component: Optional[Spec] = None
149-
spec = self._source_config.get("spec")
150-
if spec:
151-
if "type" not in spec:
152-
spec["type"] = "Spec"
153-
self._spec_component = self._constructor.create_component(SpecModel, spec, dict())
154-
mutable_config = dict(config) if config else {}
155-
156-
if config_path:
157-
self._spec_component.migrate_config(mutable_config)
158-
try:
159-
if mutable_config != config:
160-
with open(config_path, "w") as f:
161-
json.dump(mutable_config, f)
162-
self.message_repository.emit_message(
163-
create_connector_config_control_message(mutable_config)
164-
)
165-
# We have no mechanism for consuming the queue, so we print the messages to stdout
166-
for message in self.message_repository.consume_queue():
167-
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
168-
except Exception as e:
169-
self.logger.error(f"Error migrating config: {str(e)}")
170-
mutable_config = dict(config) if config else {}
171-
self._spec_component.transform_config(mutable_config)
172-
self._config = mutable_config
173-
else:
174-
self._config = config or {}
148+
self._spec_component: Spec
149+
spec: Mapping[str, Any] = self._source_config["spec"]
150+
self._spec_component = self._constructor.create_component(SpecModel, spec, dict())
151+
mutable_config = dict(config) if config else {}
152+
self._migrate_config(config_path, mutable_config, config)
153+
self._spec_component.transform_config(mutable_config)
154+
self._config = mutable_config
175155

176156
@property
177157
def resolved_manifest(self) -> Mapping[str, Any]:
@@ -233,6 +213,25 @@ def _normalize_manifest(self) -> None:
233213
normalizer = ManifestNormalizer(self._source_config, self._declarative_component_schema)
234214
self._source_config = normalizer.normalize()
235215

216+
def _migrate_config(
217+
self,
218+
config_path: Optional[str],
219+
mutable_config: MutableMapping[str, Any],
220+
config: Optional[Mapping[str, Any]],
221+
) -> None:
222+
if config_path and config:
223+
self._spec_component.migrate_config(mutable_config)
224+
if mutable_config != config:
225+
if config_path:
226+
with open(config_path, "w") as f:
227+
json.dump(mutable_config, f)
228+
self.message_repository.emit_message(
229+
create_connector_config_control_message(mutable_config)
230+
)
231+
# We have no mechanism for consuming the queue, so we print the messages to stdout
232+
for message in self.message_repository.consume_queue():
233+
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
234+
236235
def _migrate_manifest(self) -> None:
237236
"""
238237
This method is used to migrate the manifest. It should be called after the manifest has been validated.

0 commit comments

Comments
 (0)