Skip to content

Commit cc8ec47

Browse files
artem1205maxi297
andauthored
fix: fallback to json if orjson cannot serialize value (#210)
Signed-off-by: Artem Inzhyyants <[email protected]> Co-authored-by: maxi297 <[email protected]>
1 parent 3c76ef3 commit cc8ec47

File tree

2 files changed

+36
-1
lines changed

2 files changed

+36
-1
lines changed

airbyte_cdk/entrypoint.py

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
import argparse
66
import importlib
77
import ipaddress
8+
import json
89
import logging
910
import os.path
1011
import socket
@@ -46,6 +47,7 @@
4647

4748
VALID_URL_SCHEMES = ["https"]
4849
CLOUD_DEPLOYMENT_MODE = "cloud"
50+
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = False
4951

5052

5153
class AirbyteEntrypoint(object):
@@ -291,7 +293,17 @@ def set_up_secret_filter(config: TConfig, connection_specification: Mapping[str,
291293

292294
@staticmethod
293295
def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
294-
return orjson.dumps(AirbyteMessageSerializer.dump(airbyte_message)).decode()
296+
global _HAS_LOGGED_FOR_SERIALIZATION_ERROR
297+
serialized_message = AirbyteMessageSerializer.dump(airbyte_message)
298+
try:
299+
return orjson.dumps(serialized_message).decode()
300+
except Exception as exception:
301+
if not _HAS_LOGGED_FOR_SERIALIZATION_ERROR:
302+
logger.warning(
303+
f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances."
304+
)
305+
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = True
306+
return json.dumps(serialized_message)
295307

296308
@classmethod
297309
def extract_state(cls, args: List[str]) -> Optional[Any]:

unit_tests/test_entrypoint.py

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -768,3 +768,26 @@ def test_handle_record_counts(
768768
assert isinstance(
769769
actual_message.state.sourceStats.recordCount, float
770770
), "recordCount value should be expressed as a float"
771+
772+
773+
def test_given_serialization_error_using_orjson_then_fallback_on_json(
774+
entrypoint: AirbyteEntrypoint, mocker, spec_mock, config_mock
775+
):
776+
parsed_args = Namespace(
777+
command="read", config="config_path", state="statepath", catalog="catalogpath"
778+
)
779+
record = AirbyteMessage(
780+
record=AirbyteRecordMessage(
781+
stream="stream", data={"data": 7046723166326052303072}, emitted_at=1
782+
),
783+
type=Type.RECORD,
784+
)
785+
mocker.patch.object(MockSource, "read_state", return_value={})
786+
mocker.patch.object(MockSource, "read_catalog", return_value={})
787+
mocker.patch.object(MockSource, "read", return_value=[record, record])
788+
789+
messages = list(entrypoint.run(parsed_args))
790+
791+
# There will be multiple messages here because the fixture `entrypoint` sets a control message. We only care about records here
792+
record_messages = list(filter(lambda message: "RECORD" in message, messages))
793+
assert len(record_messages) == 2

0 commit comments

Comments
 (0)