Skip to content

Commit 814136b

Browse files
committed
remove more custom serpyco logic
1 parent bc1fb21 commit 814136b

File tree

3 files changed

+9
-95
lines changed

3 files changed

+9
-95
lines changed

airbyte_cdk/models/airbyte_protocol.py

Lines changed: 0 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -7,87 +7,3 @@
77
from typing import Annotated, Any, Dict, List, Mapping, Optional, Union
88

99
from airbyte_protocol_dataclasses.models import * # noqa: F403 # Allow '*'
10-
11-
if sys.platform == 'emscripten':
12-
from serpyco.metadata import Alias
13-
else:
14-
from serpyco_rs.metadata import Alias
15-
16-
# ruff: noqa: F405 # ignore fuzzy import issues with 'import *'
17-
18-
19-
@dataclass
20-
class AirbyteStateBlob:
21-
"""
22-
A dataclass that dynamically sets attributes based on provided keyword arguments and positional arguments.
23-
Used to "mimic" pydantic Basemodel with ConfigDict(extra='allow') option.
24-
25-
The `AirbyteStateBlob` class allows for flexible instantiation by accepting any number of keyword arguments
26-
and positional arguments. These are used to dynamically update the instance's attributes. This class is useful
27-
in scenarios where the attributes of an object are not known until runtime and need to be set dynamically.
28-
29-
Attributes:
30-
kwargs (InitVar[Mapping[str, Any]]): A dictionary of keyword arguments used to set attributes dynamically.
31-
32-
Methods:
33-
__init__(*args: Any, **kwargs: Any) -> None:
34-
Initializes the `AirbyteStateBlob` by setting attributes from the provided arguments.
35-
36-
__eq__(other: object) -> bool:
37-
Checks equality between two `AirbyteStateBlob` instances based on their internal dictionaries.
38-
Returns `False` if the other object is not an instance of `AirbyteStateBlob`.
39-
"""
40-
41-
kwargs: InitVar[Mapping[str, Any]]
42-
43-
def __init__(self, *args: Any, **kwargs: Any) -> None:
44-
# Set any attribute passed in through kwargs
45-
for arg in args:
46-
self.__dict__.update(arg)
47-
for key, value in kwargs.items():
48-
setattr(self, key, value)
49-
50-
def __eq__(self, other: object) -> bool:
51-
return (
52-
False
53-
if not isinstance(other, AirbyteStateBlob)
54-
else bool(self.__dict__ == other.__dict__)
55-
)
56-
57-
58-
# The following dataclasses have been redeclared to include the new version of AirbyteStateBlob
59-
@dataclass
60-
class AirbyteStreamState:
61-
stream_descriptor: StreamDescriptor # type: ignore [name-defined]
62-
stream_state: Optional[AirbyteStateBlob] = None
63-
64-
65-
@dataclass
66-
class AirbyteGlobalState:
67-
stream_states: List[AirbyteStreamState]
68-
shared_state: Optional[AirbyteStateBlob] = None
69-
70-
71-
@dataclass
72-
class AirbyteStateMessage:
73-
type: Optional[AirbyteStateType] = None # type: ignore [name-defined]
74-
stream: Optional[AirbyteStreamState] = None
75-
global_: Annotated[AirbyteGlobalState | None, Alias("global")] = (
76-
None # "global" is a reserved keyword in python ⇒ Alias is used for (de-)serialization
77-
)
78-
data: Optional[Dict[str, Any]] = None
79-
sourceStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined]
80-
destinationStats: Optional[AirbyteStateStats] = None # type: ignore [name-defined]
81-
82-
83-
@dataclass
84-
class AirbyteMessage:
85-
type: Type # type: ignore [name-defined]
86-
log: Optional[AirbyteLogMessage] = None # type: ignore [name-defined]
87-
spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined]
88-
connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined]
89-
catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined]
90-
record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined]
91-
state: Optional[AirbyteStateMessage] = None
92-
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
93-
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]

airbyte_cdk/test/entrypoint_wrapper.py

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,11 +30,6 @@
3030
import orjson
3131
from pydantic import ValidationError as V2ValidationError
3232

33-
if sys.platform == 'emscripten':
34-
from serpyco import SchemaValidationError
35-
else:
36-
from serpyco_rs import SchemaValidationError
37-
3833
from airbyte_cdk.entrypoint import AirbyteEntrypoint
3934
from airbyte_cdk.exception_handler import assemble_uncaught_exception
4035
from airbyte_cdk.logger import AirbyteLogFormatter
@@ -55,6 +50,13 @@
5550
from airbyte_cdk.sources import Source
5651
from airbyte_cdk.test.models.scenario import ExpectedOutcome
5752

53+
JsonValidationErrors: tuple[type[Exception], ...] = (orjson.JSONDecodeError,)
54+
# Conditionally import and create a union type for exception handling
55+
if sys.platform != "emscripten":
56+
from serpyco_rs import SchemaValidationError
57+
58+
JsonValidationErrors = (orjson.JSONDecodeError, SchemaValidationError)
59+
5860

5961
class AirbyteEntrypointException(Exception):
6062
"""Exception raised for errors in the AirbyteEntrypoint execution.
@@ -123,7 +125,7 @@ def __init__(
123125
def _parse_message(message: str) -> AirbyteMessage:
124126
try:
125127
return AirbyteMessageSerializer.load(orjson.loads(message))
126-
except (orjson.JSONDecodeError, SchemaValidationError):
128+
except JsonValidationErrors:
127129
# The platform assumes that logs that are not of AirbyteMessage format are log messages
128130
return AirbyteMessage(
129131
type=Type.LOG, log=AirbyteLogMessage(level=Level.INFO, message=message)

unit_tests/sources/test_source.py

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,7 @@
1111

1212
import orjson
1313
import pytest
14-
15-
if sys.platform == 'emscripten':
16-
from serpyco import SchemaValidationError
17-
else:
18-
from serpyco_rs import SchemaValidationError
14+
from serpyco_rs import SchemaValidationError
1915

2016
from airbyte_cdk.models import (
2117
AirbyteGlobalState,

0 commit comments

Comments
 (0)