Skip to content

Commit 57a4a7f

Browse files
committed
clean up models
1 parent 66b00c5 commit 57a4a7f

File tree

4 files changed

+65
-117
lines changed

4 files changed

+65
-117
lines changed

airbyte_cdk/__init__.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@
5252
create_connector_config_control_message,
5353
emit_configuration_as_airbyte_control_message,
5454
)
55-
from .connector import BaseConnector, Connector
55+
from .connector import BaseConnector
5656
from .destinations import Destination
5757
from .entrypoint import AirbyteEntrypoint, launch
5858
from .logger import AirbyteLogFormatter, init_logger

airbyte_cdk/connector.py

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,8 @@
1414
import yaml
1515
from typing_extensions import Self
1616

17-
from airbyte_cdk.models import (
18-
AirbyteConnectionStatus,
19-
ConnectorSpecification,
20-
ConnectorSpecificationSerializer,
21-
)
22-
from airbyte_cdk.models.airbyte_protocol import AirbyteMessage, Type
17+
from airbyte_cdk.models import AirbyteConnectionStatus
18+
from airbyte_cdk.models.airbyte_protocol import AirbyteMessage, ConnectorSpecification, Type
2319
from airbyte_cdk.sources.message.repository import MessageRepository, PassthroughMessageRepository
2420
from airbyte_cdk.utils.cli_arg_parse import ConnectorCLIArgs, parse_cli_args
2521

@@ -86,7 +82,7 @@ def spec(self, logger: logging.Logger) -> ConnectorSpecification:
8682
else:
8783
raise FileNotFoundError("Unable to find spec.yaml or spec.json in the package.")
8884

89-
return ConnectorSpecificationSerializer.load(spec_obj)
85+
return ConnectorSpecification.from_dict(spec_obj)
9086

9187
@abstractmethod
9288
def check(self, logger: logging.Logger, config: TConfig) -> AirbyteConnectionStatus:
Lines changed: 40 additions & 69 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,5 @@
1-
#
2-
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
3-
#
4-
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
from collections.abc import Callable
53
from dataclasses import InitVar, dataclass
64
from functools import cached_property
75
from typing import Annotated, Any, Dict, List, Mapping, Optional, Union
@@ -11,8 +9,6 @@
119
from serpyco_rs import CustomType, Serializer
1210
from serpyco_rs.metadata import Alias
1311

14-
# ruff: noqa: F405 # ignore fuzzy import issues with 'import *'
15-
1612

1713
@dataclass
1814
class AirbyteStateBlob:
@@ -53,19 +49,18 @@ def __eq__(self, other: object) -> bool:
5349
)
5450

5551

56-
class AirbyteStateBlobType(CustomType[AirbyteStateBlob, dict[str, Any]]):
57-
def serialize(self, value: AirbyteStateBlob) -> dict[str, Any]:
58-
# cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
59-
return {k: v for k, v in value.__dict__.items()}
60-
61-
def deserialize(self, value: dict[str, Any]) -> AirbyteStateBlob:
62-
return AirbyteStateBlob(value)
52+
def _custom_state_resolver(t: type) -> CustomType[AirbyteStateBlob, dict[str, Any]] | None:
53+
class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]):
54+
def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
55+
# cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
56+
return {k: v for k, v in value.__dict__.items()}
6357

64-
def get_json_schema(self) -> dict[str, Any]:
65-
return {"type": "object"}
58+
def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
59+
return AirbyteStateBlob(value)
6660

61+
def get_json_schema(self) -> Dict[str, Any]:
62+
return {"type": "object"}
6763

68-
def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, dict[str, Any]] | None:
6964
return AirbyteStateBlobType() if t is AirbyteStateBlob else None
7065

7166

@@ -93,33 +88,6 @@ class AirbyteStateMessage:
9388
sourceStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined]
9489
destinationStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined]
9590

96-
def to_dict(self) -> dict:
97-
return self._serializer.dump(self)
98-
99-
def to_string(self) -> str:
100-
return orjson.dumps(self.to_dict()).decode("utf-8")
101-
102-
def from_string(self, string: str, /) -> "AirbyteMessage":
103-
"""Deserialize a string into an AirbyteMessage object."""
104-
return self._serializer.load(orjson.loads(string))
105-
106-
def from_dict(self, dictionary: dict, /) -> "AirbyteMessage":
107-
"""Deserialize a dictionary into an AirbyteMessage object."""
108-
return self._serializer.load(dictionary)
109-
110-
@cached_property
111-
@classmethod
112-
def _serializer(cls) -> Serializer:
113-
"""
114-
Returns a serializer for the AirbyteMessage class.
115-
The serializer is cached for performance reasons.
116-
"""
117-
return Serializer(
118-
AirbyteStateMessage,
119-
omit_none=True,
120-
custom_type_resolver=custom_type_resolver,
121-
)
122-
12391

12492
@dataclass
12593
class AirbyteMessage:
@@ -133,29 +101,32 @@ class AirbyteMessage:
133101
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
134102
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]
135103

136-
def to_dict(self) -> dict:
137-
return self._serializer.dump(self)
138-
139-
def to_string(self) -> str:
140-
return orjson.dumps(self.to_dict()).decode("utf-8")
141-
142-
def from_string(self, string: str, /) -> "AirbyteMessage":
143-
"""Deserialize a string into an AirbyteMessage object."""
144-
return self._serializer.load(orjson.loads(string))
145-
146-
def from_dict(self, dictionary: dict, /) -> "AirbyteMessage":
147-
"""Deserialize a dictionary into an AirbyteMessage object."""
148-
return self._serializer.load(dictionary)
149-
150-
@cached_property
151-
@classmethod
152-
def _serializer(cls) -> Serializer:
153-
"""
154-
Returns a serializer for the AirbyteMessage class.
155-
The serializer is cached for performance reasons.
156-
"""
157-
return Serializer(
158-
AirbyteMessage,
159-
omit_none=True,
160-
custom_type_resolver=custom_type_resolver,
161-
)
104+
@property
105+
def _serializer(self):
106+
raise NotImplementedError
107+
108+
@_serializer.setter
109+
def _serializer(self, value):
110+
raise NotImplementedError
111+
112+
113+
def _with_serdes(
114+
cls,
115+
type_resolver: Callable[[type], CustomType[Any, Any] | None] | None = None,
116+
) -> type:
117+
"""Decorator to add SerDes (serialize/deserialize) methods to a dataclass."""
118+
cls._serializer = Serializer(cls, omit_none=True, custom_type_resolver=type_resolver)
119+
cls.to_dict = lambda self: self._serializer.dump(self)
120+
cls.to_json = lambda self: orjson.dumps(self._serializer.dump(self)).decode("utf-8")
121+
cls.from_json = lambda self, string: self._serializer.load(orjson.loads(string))
122+
cls.from_dict = lambda self, dictionary: self._serializer.load(dictionary)
123+
return cls
124+
125+
# Add serdes capabilities to all data classes that need to serialize and deserialize:
126+
AirbyteMessage = _with_serdes(AirbyteMessage, _custom_state_resolver)
127+
AirbyteStateMessage = _with_serdes(AirbyteStateMessage, _custom_state_resolver)
128+
AirbyteStreamState = _with_serdes(AirbyteStreamState, _custom_state_resolver)
129+
# These don't need the custom resolver:
130+
ConnectorSpecification = _with_serdes(ConnectorSpecification)
131+
ConfiguredAirbyteCatalog = _with_serdes(ConfiguredAirbyteCatalog)
132+
AirbyteStream = _with_serdes(AirbyteStream)
Lines changed: 21 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -1,41 +1,22 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2-
from typing import Any, Dict
3-
4-
from serpyco_rs import CustomType, Serializer
5-
6-
from .airbyte_protocol import ( # type: ignore[attr-defined] # all classes are imported to airbyte_protocol via *
7-
AirbyteMessage,
8-
AirbyteStateBlob,
9-
AirbyteStateMessage,
10-
AirbyteStreamState,
11-
ConfiguredAirbyteCatalog,
12-
ConfiguredAirbyteStream,
13-
ConnectorSpecification,
14-
)
15-
16-
17-
class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]):
18-
def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
19-
# cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
20-
return {k: v for k, v in value.__dict__.items()}
21-
22-
def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
23-
return AirbyteStateBlob(value)
24-
25-
def get_json_schema(self) -> Dict[str, Any]:
26-
return {"type": "object"}
27-
28-
29-
def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, Dict[str, Any]] | None:
30-
return AirbyteStateBlobType() if t is AirbyteStateBlob else None
31-
32-
33-
AirbyteStreamStateSerializer = Serializer(
34-
AirbyteStreamState, omit_none=True, custom_type_resolver=custom_type_resolver
35-
)
36-
ConfiguredAirbyteCatalogSerializer = Serializer(ConfiguredAirbyteCatalog, omit_none=True)
37-
ConfiguredAirbyteStreamSerializer = Serializer(ConfiguredAirbyteStream, omit_none=True)
38-
ConnectorSpecificationSerializer = Serializer(ConnectorSpecification, omit_none=True)
39-
40-
AirbyteStateMessageSerializer = AirbyteStateMessage._serializer
41-
AirbyteMessageSerializer = AirbyteMessage._serializer
2+
"""This module is deprecated and exists for legacy compatibility.
3+
4+
Instead of importing from this module, callers should import from
5+
`airbyte_cdk.models.airbyte_protocol` directly.
6+
7+
The dedicated SerDes classes are _also_ deprecated. Instead, use these methods:
8+
- `from_dict()`
9+
- `from_json()`
10+
- `to_dict()`
11+
- `to_json()`
12+
"""
13+
14+
from airbyte_cdk.models.airbyte_protocol import * # type: ignore[attr-defined]
15+
16+
# Deprecated. Declared here for legacy compatibility:
17+
AirbyteStateMessageSerializer = AirbyteStateMessage._serializer # type: ignore
18+
AirbyteMessageSerializer = AirbyteMessage._serializer # type: ignore
19+
ConnectorSpecificationSerializer = ConnectorSpecification._serializer # type: ignore
20+
ConfiguredAirbyteCatalogSerializer = ConfiguredAirbyteCatalog._serializer # type: ignore
21+
ConfiguredAirbyteStreamSerializer = ConfiguredAirbyteStream._serializer # type: ignore
22+
AirbyteStreamStateSerializer = AirbyteStreamState._serializer # type: ignore

0 commit comments

Comments
 (0)