|
1 | 1 | # Copyright (c) 2024 Airbyte, Inc., all rights reserved. |
2 | 2 | import sys |
3 | | -from typing import Any, Dict |
| 3 | +from typing import Any, Dict, Type, TypeVar |
| 4 | + |
| 5 | +import orjson |
| 6 | +from pydantic import ValidationError |
4 | 7 |
|
5 | 8 | from .airbyte_protocol import ( # type: ignore[attr-defined] # all classes are imported to airbyte_protocol via * |
6 | 9 | AirbyteCatalog, |
|
17 | 20 | USE_RUST_BACKEND = sys.platform != "emscripten" |
18 | 21 | """When run in WASM, use the pure Python backend for serpyco.""" |
19 | 22 |
|
20 | | -if USE_RUST_BACKEND: |
21 | | - from serpyco_rs import CustomType, Serializer |
22 | | -else: |
23 | | - from serpyco import CustomType, Serializer |
24 | 23 |
|
| 24 | +T = TypeVar("T") |
| 25 | + |
| 26 | +class CustomSerializer: |
| 27 | + """Custom serializer that mimics serpyco-rs Serializer API""" |
| 28 | + |
| 29 | + def __init__( |
| 30 | + self, |
| 31 | + model_class: Type[T], |
| 32 | + omit_none: bool = False, |
| 33 | + custom_type_resolver: Callable | None = None, |
| 34 | + ): |
| 35 | + self.model_class = model_class |
| 36 | + self.omit_none = omit_none |
| 37 | + self.custom_type_resolver = custom_type_resolver |
| 38 | + |
| 39 | + def dump(self, obj: T) -> Dict[str, Any]: |
| 40 | + """Convert dataclass to dictionary, omitting None values if configured""" |
| 41 | + if hasattr(obj, "__dict__"): |
| 42 | + result = {} |
| 43 | + for key, value in obj.__dict__.items(): |
| 44 | + if self.omit_none and value is None: |
| 45 | + continue |
| 46 | + # Handle custom types like AirbyteStateBlob |
| 47 | + if self.custom_type_resolver and hasattr(value, "__class__"): |
| 48 | + custom_handler = self.custom_type_resolver(value.__class__) |
| 49 | + if custom_handler: |
| 50 | + value = custom_handler.serialize(value) |
| 51 | + # Recursively handle nested objects |
| 52 | + if hasattr(value, "__dict__"): |
| 53 | + value = self._serialize_nested(value) |
| 54 | + elif isinstance(value, list): |
| 55 | + value = [ |
| 56 | + self._serialize_nested(item) if hasattr(item, "__dict__") else item |
| 57 | + for item in value |
| 58 | + ] |
| 59 | + result[key] = value |
| 60 | + return result |
| 61 | + return obj.__dict__ if hasattr(obj, "__dict__") else {} |
25 | 62 |
|
26 | | -class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]): |
27 | | - def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]: |
28 | | - # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete" |
29 | | - return {k: v for k, v in value.__dict__.items()} |
| 63 | + def load(self, data: Dict[str, Any]) -> T: |
| 64 | + """Convert dictionary to dataclass instance""" |
| 65 | + # Handle custom types |
| 66 | + return self.model_class(**data) |
| 67 | + |
| 68 | + def _serialize_nested(self, obj: Any) -> Any: |
| 69 | + """Helper to serialize nested objects""" |
| 70 | + if hasattr(obj, "__dict__"): |
| 71 | + result = {} |
| 72 | + for key, value in obj.__dict__.items(): |
| 73 | + if self.omit_none and value is None: |
| 74 | + continue |
| 75 | + result[key] = value |
| 76 | + return result |
| 77 | + return obj |
| 78 | + |
| 79 | + |
| 80 | +if USE_RUST_BACKEND: |
| 81 | + from serpyco_rs import CustomType, Serializer # type: ignore[import] |
30 | 82 |
|
31 | | - def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob: |
32 | | - return AirbyteStateBlob(value) |
| 83 | +SERIALIZER = Serializer if USE_RUST_BACKEND else CustomSerializer |
33 | 84 |
|
34 | | - def get_json_schema(self) -> Dict[str, Any]: |
35 | | - return {"type": "object"} |
| 85 | +# Making this a no-op for now: |
| 86 | +custom_type_resolver = None |
36 | 87 |
|
| 88 | +# No idea why this is here. Commenting out for now. |
| 89 | +# def custom_type_resolver(t: type) -> AirbyteStateBlobType | None: |
| 90 | +# return AirbyteStateBlobType() if t is AirbyteStateBlob else None |
| 91 | +# |
| 92 | +# class AirbyteStateBlobType(CustomType[AirbyteStateBlob, Dict[str, Any]]): |
| 93 | +# def serialize(self, value: AirbyteStateBlob) -> Dict[str, Any]: |
| 94 | +# # cant use orjson.dumps() directly because private attributes are excluded, e.g. "__ab_full_refresh_sync_complete" |
| 95 | +# return {k: v for k, v in value.__dict__.items()} |
37 | 96 |
|
38 | | -def custom_type_resolver(t: type) -> CustomType[AirbyteStateBlob, Dict[str, Any]] | None: |
39 | | - return AirbyteStateBlobType() if t is AirbyteStateBlob else None |
| 97 | +# def deserialize(self, value: Dict[str, Any]) -> AirbyteStateBlob: |
| 98 | +# return AirbyteStateBlob(value) |
40 | 99 |
|
| 100 | +# def get_json_schema(self) -> Dict[str, Any]: |
| 101 | +# return {"type": "object"} |
41 | 102 |
|
42 | | -AirbyteCatalogSerializer = Serializer(AirbyteCatalog, omit_none=True) |
43 | | -AirbyteStreamSerializer = Serializer(AirbyteStream, omit_none=True) |
44 | | -AirbyteStreamStateSerializer = Serializer( |
| 103 | +# Create serializer instances maintaining the same API |
| 104 | +AirbyteCatalogSerializer = SERIALIZER(AirbyteCatalog, omit_none=True) |
| 105 | +AirbyteStreamSerializer = SERIALIZER(AirbyteStream, omit_none=True) |
| 106 | +AirbyteStreamStateSerializer = SERIALIZER( |
45 | 107 | AirbyteStreamState, omit_none=True, custom_type_resolver=custom_type_resolver |
46 | 108 | ) |
47 | | -AirbyteStateMessageSerializer = Serializer( |
| 109 | +AirbyteStateMessageSerializer = SERIALIZER( |
48 | 110 | AirbyteStateMessage, omit_none=True, custom_type_resolver=custom_type_resolver |
49 | 111 | ) |
50 | | -AirbyteMessageSerializer = Serializer( |
| 112 | +AirbyteMessageSerializer = SERIALIZER( |
51 | 113 | AirbyteMessage, omit_none=True, custom_type_resolver=custom_type_resolver |
52 | 114 | ) |
53 | | -ConfiguredAirbyteCatalogSerializer = Serializer(ConfiguredAirbyteCatalog, omit_none=True) |
54 | | -ConfiguredAirbyteStreamSerializer = Serializer(ConfiguredAirbyteStream, omit_none=True) |
55 | | -ConnectorSpecificationSerializer = Serializer(ConnectorSpecification, omit_none=True) |
| 115 | +ConfiguredAirbyteCatalogSerializer = SERIALIZER(ConfiguredAirbyteCatalog, omit_none=True) |
| 116 | +ConfiguredAirbyteStreamSerializer = SERIALIZER(ConfiguredAirbyteStream, omit_none=True) |
| 117 | +ConnectorSpecificationSerializer = SERIALIZER(ConnectorSpecification, omit_none=True) |
0 commit comments