99import orjson
1010from pydantic import ValidationError
1111
12+ from airbyte_cdk .connector_builder .test_reader .helpers import airbyte_message_to_json
13+
1214from .airbyte_protocol import ( # type: ignore[attr-defined] # all classes are imported to airbyte_protocol via *
1315 AirbyteCatalog ,
1416 AirbyteMessage ,
3133
3234logger = logging .getLogger ("airbyte" )
3335
34-
35- class CustomSerializer :
36- """Custom serializer that mimics serpyco-rs Serializer API"""
37-
38- def __init__ (
39- self ,
40- model_class : Type [T ],
41- omit_none : bool = False ,
42- custom_type_resolver : Callable | None = None ,
43- ):
44- self .model_class = model_class
45- self .omit_none = omit_none
46- self .custom_type_resolver = custom_type_resolver
47-
48- def dump (self , obj : T ) -> Dict [str , Any ]:
49- """Convert dataclass to dictionary, omitting None values if configured"""
50- if hasattr (obj , "__dict__" ):
51- result = {}
52- for key , value in obj .__dict__ .items ():
53- if self .omit_none and value is None :
54- continue
55- # Handle custom types like AirbyteStateBlob
56- if self .custom_type_resolver and hasattr (value , "__class__" ):
57- custom_handler = self .custom_type_resolver (value .__class__ )
58- if custom_handler :
59- value = custom_handler .serialize (value )
60- # Recursively handle nested objects
61- if hasattr (value , "__dict__" ):
62- value = self ._serialize_nested (value )
63- elif isinstance (value , list ):
64- value = [
65- self ._serialize_nested (item ) if hasattr (item , "__dict__" ) else item
66- for item in value
67- ]
68- result [key ] = value
69- return result
70- return obj .__dict__ if hasattr (obj , "__dict__" ) else {}
71-
72- def load (self , data : Dict [str , Any ]) -> T :
73- """Convert dictionary to dataclass instance"""
74- # Handle custom types
75- return dacite .from_dict (data_class = self .model_class , data = data )
76-
77- def _serialize_nested (self , obj : Any ) -> Any :
78- """Helper to serialize nested objects"""
79- if hasattr (obj , "__dict__" ):
80- result = {}
81- for key , value in obj .__dict__ .items ():
82- if self .omit_none and value is None :
83- continue
84- result [key ] = value
85- return result
86- return obj
87-
88-
89- if USE_RUST_BACKEND :
90- from serpyco_rs import CustomType , Serializer # type: ignore[import]
91-
92- SERIALIZER = Serializer if USE_RUST_BACKEND else CustomSerializer
93-
9436# Making this a no-op for now:
95- custom_type_resolver = None
96-
97- # No idea why this is here. Commenting out for now.
98- # def custom_type_resolver(t: type) -> AirbyteStateBlobType | None:
99- # return AirbyteStateBlobType() if t is AirbyteStateBlob else None
100- #
101- # class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]):
102- # def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]:
103- # # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete"
104- # return {k: v for k, v in value.__dict__.items()}
105-
106- # def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob:
107- # return AirbyteStateBlob(value)
108-
109- # def get_json_schema(self) -> Dict[str, Any]:
110- # return {"type": "object"}
111-
112- # Create serializer instances maintaining the same API
113- AirbyteStateMessageSerializer = SERIALIZER (
114- AirbyteStateMessage , omit_none = True , custom_type_resolver = custom_type_resolver
115- )
116- AirbyteMessageSerializer = SERIALIZER (
117- AirbyteMessage , omit_none = True , custom_type_resolver = custom_type_resolver
118- )
119- ConfiguredAirbyteCatalogSerializer = SERIALIZER (ConfiguredAirbyteCatalog , omit_none = True )
120- ConnectorSpecificationSerializer = SERIALIZER (ConnectorSpecification , omit_none = True )
121-
122-
123- def _custom_json_serializer (val : object ) -> str :
124- """Handle custom serialization needs for AirbyteMessage."""
125- if isinstance (val , Enum ):
126- return str (val .value )
127-
128- return str (val )
12937
13038
13139def ab_message_to_string (
@@ -140,28 +48,11 @@ def ab_message_to_string(
14048 Returns:
14149 str: JSON string representation of the AirbyteMessage.
14250 """
143- global _HAS_LOGGED_FOR_SERIALIZATION_ERROR
144- dict_obj = AirbyteMessageSerializer .dump (message )
145-
146- try :
147- return orjson .dumps (
148- dict_obj ,
149- default = _custom_json_serializer ,
150- ).decode ()
151- except Exception as exception :
152- if not _HAS_LOGGED_FOR_SERIALIZATION_ERROR :
153- logger .warning (
154- f"There was an error during the serialization of an AirbyteMessage: `{ exception } `. This might impact the sync performances."
155- )
156- _HAS_LOGGED_FOR_SERIALIZATION_ERROR = True
157- return json .dumps (
158- dict_obj ,
159- default = _custom_json_serializer ,
160- )
51+ return message .model_dump_json ()
16152
16253
16354def ab_message_from_string (
164- message_str : str ,
55+ message_json : str ,
16556) -> AirbyteMessage :
16657 """
16758 Convert a JSON string to an AirbyteMessage.
@@ -173,9 +64,118 @@ def ab_message_from_string(
17364 AirbyteMessage: The deserialized AirbyteMessage.
17465 """
17566 try :
176- message_dict = orjson .loads (message_str )
177- return AirbyteMessageSerializer .load (message_dict )
67+ return AirbyteMessage .model_validate_json (message_json )
17868 except ValidationError as e :
17969 raise ValueError (f"Invalid AirbyteMessage format: { e } " ) from e
18070 except orjson .JSONDecodeError as e :
18171 raise ValueError (f"Failed to decode JSON: { e } " ) from e
72+
73+
74+ def ab_connector_spec_from_string (
75+ spec_json : str ,
76+ ) -> ConnectorSpecification :
77+ """
78+ Convert a JSON string to a ConnectorSpecification.
79+
80+ Args:
81+ spec_str (str): The JSON string to convert.
82+
83+ Returns:
84+ ConnectorSpecification: The deserialized ConnectorSpecification.
85+ """
86+ try :
87+ return ConnectorSpecification .model_validate_json (spec_json )
88+ except ValidationError as e :
89+ raise ValueError (f"Invalid ConnectorSpecification format: { e } " ) from e
90+ except orjson .JSONDecodeError as e :
91+ raise ValueError (f"Failed to decode JSON: { e } " ) from e
92+
93+
94+ def ab_connector_spec_to_string (
95+ spec : ConnectorSpecification ,
96+ ) -> str :
97+ """
98+ Convert a ConnectorSpecification to a JSON string.
99+
100+ Args:
101+ spec (ConnectorSpecification): The ConnectorSpecification to convert.
102+
103+ Returns:
104+ str: JSON string representation of the ConnectorSpecification.
105+ """
106+ return spec .model_dump_json ()
107+
108+
109+ def ab_configured_catalog_to_string (
110+ catalog : ConfiguredAirbyteCatalog ,
111+ ) -> str :
112+ """
113+ Convert a ConfiguredAirbyteCatalog to a JSON string.
114+
115+ Args:
116+ catalog (ConfiguredAirbyteCatalog): The ConfiguredAirbyteCatalog to convert.
117+
118+ Returns:
119+ str: JSON string representation of the ConfiguredAirbyteCatalog.
120+ """
121+ return catalog .model_dump_json ()
122+
123+
124+ def ab_configured_catalog_from_string (
125+ catalog_json : str ,
126+ ) -> ConfiguredAirbyteCatalog :
127+ """
128+ Convert a JSON string to a ConfiguredAirbyteCatalog.
129+
130+ Args:
131+ catalog_json (str): The JSON string to convert.
132+
133+ Returns:
134+ ConfiguredAirbyteCatalog: The deserialized ConfiguredAirbyteCatalog.
135+ """
136+ try :
137+ return ConfiguredAirbyteCatalog .model_validate_json (catalog_json )
138+ except ValidationError as e :
139+ raise ValueError (f"Invalid ConfiguredAirbyteCatalog format: { e } " ) from e
140+ except orjson .JSONDecodeError as e :
141+ raise ValueError (f"Failed to decode JSON: { e } " ) from e
142+
143+
144+ def ab_state_message_from_string (
145+ state_json : str ,
146+ ) -> AirbyteStateMessage :
147+ """
148+ Convert a JSON string to an AirbyteStateMessage.
149+
150+ Args:
151+ state_json (str): The JSON string to convert.
152+
153+ Returns:
154+ AirbyteStateMessage: The deserialized AirbyteStateMessage.
155+ """
156+ try :
157+ return AirbyteStateMessage .model_validate_json (state_json )
158+ except ValidationError as e :
159+ raise ValueError (f"Invalid AirbyteStateMessage format: { e } " ) from e
160+ except orjson .JSONDecodeError as e :
161+ raise ValueError (f"Failed to decode JSON: { e } " ) from e
162+
163+
164+ def ab_state_blob_from_string (
165+ state_blob_json : str ,
166+ ) -> AirbyteStateBlob :
167+ """
168+ Convert a JSON string to an AirbyteStateBlob.
169+
170+ Args:
171+ state_blob_json (str): The JSON string to convert.
172+
173+ Returns:
174+ AirbyteStateBlob: The deserialized AirbyteStateBlob.
175+ """
176+ try :
177+ return AirbyteStateBlob .model_validate_json (state_blob_json )
178+ except ValidationError as e :
179+ raise ValueError (f"Invalid AirbyteStateBlob format: { e } " ) from e
180+ except orjson .JSONDecodeError as e :
181+ raise ValueError (f"Failed to decode JSON: { e } " ) from e
0 commit comments