@@ -86,6 +86,21 @@ class AirbyteMessage:
8686 control : Optional [AirbyteControlMessage ] = None # type: ignore [name-defined]
8787
8888
89+ # Add optimized serdes methods to the protocol data classes:
90+
91+ def _with_serdes (
92+ cls ,
93+ type_resolver : Callable [[type ], CustomType [Any , Any ] | None ] | None = None ,
94+ ) -> type :
95+ """Decorator to add SerDes (serialize/deserialize) methods to a dataclass."""
96+ cls ._serializer = Serializer (cls , omit_none = True , custom_type_resolver = type_resolver )
97+ cls .to_dict = lambda self : self ._serializer .dump (self )
98+ cls .to_json = lambda self : orjson .dumps (self ._serializer .dump (self )).decode ("utf-8" )
99+ cls .from_json = lambda self , string : self ._serializer .load (orjson .loads (string ))
100+ cls .from_dict = lambda self , dictionary : self ._serializer .load (dictionary )
101+ return cls
102+
103+
89104def _custom_state_resolver (t : type ) -> CustomType [AirbyteStateBlob , dict [str , Any ]] | None :
90105 class AirbyteStateBlobType (CustomType [AirbyteStateBlob , Dict [str , Any ]]):
91106 def serialize (self , value : AirbyteStateBlob ) -> Dict [str , Any ]:
@@ -101,18 +116,6 @@ def get_json_schema(self) -> Dict[str, Any]:
101116 return AirbyteStateBlobType () if t is AirbyteStateBlob else None
102117
103118
104- def _with_serdes (
105- cls ,
106- type_resolver : Callable [[type ], CustomType [Any , Any ] | None ] | None = None ,
107- ) -> type :
108- """Decorator to add SerDes (serialize/deserialize) methods to a dataclass."""
109- cls ._serializer = Serializer (cls , omit_none = True , custom_type_resolver = type_resolver )
110- cls .to_dict = lambda self : self ._serializer .dump (self )
111- cls .to_json = lambda self : orjson .dumps (self ._serializer .dump (self )).decode ("utf-8" )
112- cls .from_json = lambda self , string : self ._serializer .load (orjson .loads (string ))
113- cls .from_dict = lambda self , dictionary : self ._serializer .load (dictionary )
114- return cls
115-
116119# Add serdes capabilities to all data classes that need to serialize and deserialize:
117120AirbyteMessage = _with_serdes (AirbyteMessage , _custom_state_resolver )
118121AirbyteStateMessage = _with_serdes (AirbyteStateMessage , _custom_state_resolver )
0 commit comments