Skip to content

Commit 54bf364

Browse files
author
Oleksandr Bazarnov
committed
emit deprecation warnings in Connector Builder
1 parent 046c308 commit 54bf364

File tree

6 files changed

+124
-57
lines changed

6 files changed

+124
-57
lines changed

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 23 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,11 +110,16 @@ def run_test_read(
110110
record_limit = self._check_record_limit(record_limit)
111111
# The connector builder currently only supports reading from a single stream at a time
112112
stream = source.streams(config)[0]
113+
114+
# get any deprecation warnings during the component creation
115+
deprecation_warnings: List[AirbyteLogMessage] = source.deprecation_warnings()
116+
113117
schema_inferrer = SchemaInferrer(
114118
self._pk_to_nested_and_composite_field(stream.primary_key),
115119
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
116120
)
117121
datetime_format_inferrer = DatetimeFormatInferrer()
122+
118123
message_group = get_message_groups(
119124
self._read_stream(source, config, configured_catalog, state),
120125
schema_inferrer,
@@ -123,7 +128,7 @@ def run_test_read(
123128
)
124129

125130
slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(
126-
message_group
131+
message_group, deprecation_warnings
127132
)
128133
schema, log_messages = self._get_infered_schema(
129134
configured_catalog, schema_inferrer, log_messages
@@ -236,7 +241,11 @@ def _check_record_limit(self, record_limit: Optional[int] = None) -> int:
236241

237242
return record_limit
238243

239-
def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES:
244+
def _categorise_groups(
245+
self,
246+
message_groups: MESSAGE_GROUPS,
247+
deprecation_warnings: Optional[List[Any]] = None,
248+
) -> GROUPED_MESSAGES:
240249
"""
241250
Categorizes a sequence of message groups into slices, log messages, auxiliary requests, and the latest configuration update.
242251
@@ -267,6 +276,7 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
267276
auxiliary_requests = []
268277
latest_config_update: Optional[AirbyteControlMessage] = None
269278

279+
# process the message groups first
270280
for message_group in message_groups:
271281
match message_group:
272282
case AirbyteLogMessage():
@@ -296,6 +306,17 @@ def _categorise_groups(self, message_groups: MESSAGE_GROUPS) -> GROUPED_MESSAGES
296306
case _:
297307
raise ValueError(f"Unknown message group type: {type(message_group)}")
298308

309+
# process deprecation warnings, if present
310+
if deprecation_warnings is not None:
311+
for deprecation in deprecation_warnings:
312+
match deprecation:
313+
case AirbyteLogMessage():
314+
log_messages.append(
315+
LogMessage(message=deprecation.message, level=deprecation.level.value)
316+
)
317+
case _:
318+
raise ValueError(f"Unknown message group type: {type(deprecation)}")
319+
299320
return slices, log_messages, auxiliary_requests, latest_config_update
300321

301322
def _get_infered_schema(

airbyte_cdk/sources/declarative/declarative_source.py

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,11 @@
44

55
import logging
66
from abc import abstractmethod
7-
from typing import Any, Mapping, Tuple
7+
from typing import Any, List, Mapping, Tuple
88

9+
from airbyte_cdk.models import (
10+
AirbyteLogMessage,
11+
)
912
from airbyte_cdk.sources.abstract_source import AbstractSource
1013
from airbyte_cdk.sources.declarative.checks.connection_checker import ConnectionChecker
1114

@@ -34,3 +37,9 @@ def check_connection(
3437
The error object will be cast to string to display the problem to the user.
3538
"""
3639
return self.connection_checker.check_connection(self, logger, config)
40+
41+
def deprecation_warnings(self) -> List[AirbyteLogMessage]:
42+
"""
43+
Returns a list of deprecation warnings for the source.
44+
"""
45+
return []

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 4 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717

1818
from airbyte_cdk.models import (
1919
AirbyteConnectionStatus,
20+
AirbyteLogMessage,
2021
AirbyteMessage,
2122
AirbyteStateMessage,
2223
ConfiguredAirbyteCatalog,
@@ -60,24 +61,6 @@
6061
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
6162

6263

63-
def _get_declarative_component_schema() -> Dict[str, Any]:
64-
try:
65-
raw_component_schema = pkgutil.get_data(
66-
"airbyte_cdk", "sources/declarative/declarative_component_schema.yaml"
67-
)
68-
if raw_component_schema is not None:
69-
declarative_component_schema = yaml.load(raw_component_schema, Loader=yaml.SafeLoader)
70-
return declarative_component_schema # type: ignore
71-
else:
72-
raise RuntimeError(
73-
"Failed to read manifest component json schema required for deduplication"
74-
)
75-
except FileNotFoundError as e:
76-
raise FileNotFoundError(
77-
f"Failed to read manifest component json schema required for deduplication: {e}"
78-
)
79-
80-
8164
class ManifestDeclarativeSource(DeclarativeSource):
8265
"""Declarative source defined by a manifest of low-code components that define source connector behavior"""
8366

@@ -100,7 +83,6 @@ def __init__(
10083
"""
10184
self.logger = logging.getLogger(f"airbyte.{self.name}")
10285

103-
self._declarative_component_schema = _get_declarative_component_schema()
10486
# For ease of use we don't require the type to be specified at the top level manifest, but it should be included during processing
10587
manifest = dict(source_config)
10688
if "type" not in manifest:
@@ -151,6 +133,9 @@ def dynamic_streams(self) -> List[Dict[str, Any]]:
151133
manifest=self._source_config, config=self._config, with_dynamic_stream_name=True
152134
)
153135

136+
def deprecation_warnings(self) -> List[AirbyteLogMessage]:
137+
return self._constructor.get_model_deprecations() or []
138+
154139
@property
155140
def connection_checker(self) -> ConnectionChecker:
156141
check = self._source_config["check"]

airbyte_cdk/sources/declarative/models/base_model_with_deprecations.py

Lines changed: 58 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,13 @@
44
# WHEN DEPRECATED FIELDS ARE ACCESSED
55

66
import warnings
7-
from typing import Any
7+
from typing import Any, List
88

99
from pydantic.v1 import BaseModel
1010

1111
from airbyte_cdk.models import (
1212
AirbyteLogMessage,
13-
AirbyteMessage,
1413
Level,
15-
Type,
1614
)
1715

1816
# format the warning message
@@ -23,48 +21,49 @@
2321
FIELDS_TAG = "__fields__"
2422
DEPRECATED = "deprecated"
2523
DEPRECATION_MESSAGE = "deprecation_message"
24+
DEPRECATION_LOGS_TAG = "_deprecation_logs"
2625

2726

2827
class BaseModelWithDeprecations(BaseModel):
2928
"""
3029
Pydantic BaseModel that warns when deprecated fields are accessed.
30+
The deprecation message is stored in the field's extra attributes.
31+
This class is used to create models that can have deprecated fields
32+
and show warnings when those fields are accessed or initialized.
33+
34+
The `_deprecation_logs` attribute is storred in the model itself.
35+
The collected deprecation warnings are further proparated to the Airbyte log messages,
36+
during the component creation process, in `model_to_component._collect_model_deprecations()`.
37+
38+
The component implementation is not responsible for handling the deprecation warnings,
39+
since the deprecation warnings are already handled in the model itself.
3140
"""
3241

33-
def _deprecated_warning(self, field_name: str, message: str) -> None:
42+
class Config:
3443
"""
35-
Show a warning message for deprecated fields (to stdout).
36-
Args:
37-
field_name (str): Name of the deprecated field.
38-
message (str): Warning message to be displayed.
44+
Allow extra fields in the model. In case the model restricts extra fields.
3945
"""
4046

41-
warnings.warn(
42-
f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}",
43-
DeprecationWarning,
44-
)
47+
extra = "allow"
4548

46-
# print(
47-
# AirbyteMessage(
48-
# type=Type.LOG,
49-
# log=AirbyteLogMessage(
50-
# level=Level.WARN,
51-
# message=f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}",
52-
# ),
53-
# )
54-
# )
49+
_deprecation_logs: List[AirbyteLogMessage] = []
5550

5651
def __init__(self, **data: Any) -> None:
5752
"""
5853
Show warnings for deprecated fields during component initialization.
5954
"""
60-
6155
model_fields = self.__fields__
6256

6357
for field_name in data:
6458
if field_name in model_fields:
65-
if model_fields[field_name].field_info.extra.get(DEPRECATED, False):
66-
message = model_fields[field_name].field_info.extra.get(DEPRECATION_MESSAGE, "")
67-
self._deprecated_warning(field_name, message)
59+
is_deprecated_field = model_fields[field_name].field_info.extra.get(
60+
DEPRECATED, False
61+
)
62+
if is_deprecated_field:
63+
deprecation_message = model_fields[field_name].field_info.extra.get(
64+
DEPRECATION_MESSAGE, ""
65+
)
66+
self._deprecated_warning(field_name, deprecation_message)
6867

6968
# Call the parent constructor
7069
super().__init__(**data)
@@ -76,13 +75,39 @@ def __getattribute__(self, name: str) -> Any:
7675

7776
value = super().__getattribute__(name)
7877

79-
if name == FIELDS_TAG:
80-
try:
81-
model_fields = super().__getattribute__(FIELDS_TAG)
82-
field_info = model_fields.get(name)
83-
if field_info and field_info.field_info.extra.get(DEPRECATED):
84-
self._deprecated_warning(name, field_info)
85-
except (AttributeError, KeyError):
86-
pass
78+
try:
79+
model_fields = super().__getattribute__(FIELDS_TAG)
80+
field_info = model_fields.get(name)
81+
is_deprecated_field = (
82+
field_info.field_info.extra.get(DEPRECATED, False) if field_info else False
83+
)
84+
if is_deprecated_field:
85+
deprecation_message = field_info.extra.get(DEPRECATION_MESSAGE, "")
86+
self._deprecated_warning(name, deprecation_message)
87+
except (AttributeError, KeyError):
88+
pass
8789

8890
return value
91+
92+
def _deprecated_warning(self, field_name: str, message: str) -> None:
93+
"""
94+
Show a warning message for deprecated fields (to stdout).
95+
Args:
96+
field_name (str): Name of the deprecated field.
97+
message (str): Warning message to be displayed.
98+
"""
99+
100+
# Emit a warning message for deprecated fields (to stdout) (Python Default behavior)
101+
warnings.warn(
102+
f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}",
103+
DeprecationWarning,
104+
)
105+
106+
# Add the deprecation message to the Airbyte log messages,
107+
# this logs are displayed in the Connector Builder.
108+
self._deprecation_logs.append(
109+
AirbyteLogMessage(
110+
level=Level.WARN,
111+
message=f"Component type: `{self.__class__.__name__}`. Field '{field_name}' is deprecated. {message}",
112+
),
113+
)

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from isodate import parse_duration
2828
from pydantic.v1 import BaseModel
2929

30-
from airbyte_cdk.models import FailureType, Level
30+
from airbyte_cdk.models import AirbyteLogMessage, FailureType, Level
3131
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
3232
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
3333
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
@@ -108,6 +108,10 @@
108108
CustomStateMigration,
109109
GzipDecoder,
110110
)
111+
from airbyte_cdk.sources.declarative.models.base_model_with_deprecations import (
112+
DEPRECATION_LOGS_TAG,
113+
BaseModelWithDeprecations,
114+
)
111115
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
112116
AddedFieldDefinition as AddedFieldDefinitionModel,
113117
)
@@ -583,6 +587,8 @@ def __init__(
583587
self._connector_state_manager = connector_state_manager or ConnectorStateManager()
584588
self._api_budget: Optional[Union[APIBudget, HttpAPIBudget]] = None
585589
self._job_tracker: JobTracker = JobTracker(max_concurrent_async_job_count or 1)
590+
# placeholder for deprecation warnings
591+
self._deprecation_logs: List[AirbyteLogMessage] = []
586592

587593
def _init_mappings(self) -> None:
588594
self.PYDANTIC_MODEL_TO_CONSTRUCTOR: Mapping[Type[BaseModel], Callable[..., Any]] = {
@@ -729,8 +735,26 @@ def _create_component_from_model(self, model: BaseModel, config: Config, **kwarg
729735
component_constructor = self.PYDANTIC_MODEL_TO_CONSTRUCTOR.get(model.__class__)
730736
if not component_constructor:
731737
raise ValueError(f"Could not find constructor for {model.__class__}")
738+
739+
# collect deprecation warnings for supported models.
740+
if isinstance(model, BaseModelWithDeprecations):
741+
self._collect_model_deprecations(model)
742+
732743
return component_constructor(model=model, config=config, **kwargs)
733744

745+
def get_model_deprecations(self) -> List[Any]:
746+
"""
747+
Returns the deprecation warnings that were collected during the creation of components.
748+
"""
749+
return self._deprecation_logs
750+
751+
def _collect_model_deprecations(self, model: BaseModelWithDeprecations) -> None:
752+
if hasattr(model, DEPRECATION_LOGS_TAG) and model._deprecation_logs is not None:
753+
for log in model._deprecation_logs:
754+
# avoid duplicates for deprecation logs observed.
755+
if log not in self._deprecation_logs:
756+
self._deprecation_logs.append(log)
757+
734758
@staticmethod
735759
def create_added_field_definition(
736760
model: AddedFieldDefinitionModel, config: Config, **kwargs: Any

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77
import json
88
import logging
99
import os
10-
from typing import Literal
10+
from typing import List, Literal
1111
from unittest import mock
1212
from unittest.mock import MagicMock, patch
1313

@@ -817,6 +817,9 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
817817
connector_specification.connectionSpecification = {}
818818
return connector_specification
819819

820+
def deprecation_warnings(self) -> List[AirbyteLogMessage]:
821+
return []
822+
820823
@property
821824
def check_config_against_spec(self) -> Literal[False]:
822825
return False

0 commit comments

Comments
 (0)