Skip to content

Commit b18e7c2

Browse files
committed
cherry-pick-me: replace serializer classes with helper functions
1 parent b924fb2 commit b18e7c2

File tree

21 files changed

+180
-164
lines changed

21 files changed

+180
-164
lines changed

airbyte_cdk/cli/source_declarative_manifest/_run.py

Lines changed: 26 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,13 +32,13 @@
3232
from airbyte_cdk.models import (
3333
AirbyteErrorTraceMessage,
3434
AirbyteMessage,
35-
AirbyteMessageSerializer,
3635
AirbyteStateMessage,
3736
AirbyteTraceMessage,
3837
ConfiguredAirbyteCatalog,
3938
ConnectorSpecificationSerializer,
4039
TraceType,
4140
Type,
41+
ab_message_to_string,
4242
)
4343
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
4444
ConcurrentDeclarativeSource,
@@ -105,21 +105,19 @@ def _get_local_yaml_source(args: list[str]) -> SourceLocalYaml:
105105
)
106106
except Exception as error:
107107
print(
108-
orjson.dumps(
109-
AirbyteMessageSerializer.dump(
110-
AirbyteMessage(
111-
type=Type.TRACE,
112-
trace=AirbyteTraceMessage(
113-
type=TraceType.ERROR,
114-
emitted_at=ab_datetime_now().to_epoch_millis(),
115-
error=AirbyteErrorTraceMessage(
116-
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
117-
stack_trace=traceback.format_exc(),
118-
),
108+
ab_message_to_string(
109+
AirbyteMessage(
110+
type=Type.TRACE,
111+
trace=AirbyteTraceMessage(
112+
type=TraceType.ERROR,
113+
emitted_at=ab_datetime_now().to_epoch_millis(),
114+
error=AirbyteErrorTraceMessage(
115+
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
116+
stack_trace=traceback.format_exc(),
119117
),
120-
)
121-
)
122-
).decode()
118+
),
119+
),
120+
)
123121
)
124122
raise error
125123

@@ -153,7 +151,7 @@ def handle_remote_manifest_command(args: list[str]) -> None:
153151
spec = ConnectorSpecificationSerializer.load(spec_obj)
154152

155153
message = AirbyteMessage(type=Type.SPEC, spec=spec)
156-
print(AirbyteEntrypoint.airbyte_message_to_string(message))
154+
print(ab_message_to_string(message))
157155
else:
158156
source = create_declarative_source(args)
159157
launch(
@@ -215,21 +213,19 @@ def create_declarative_source(
215213
)
216214
except Exception as error:
217215
print(
218-
orjson.dumps(
219-
AirbyteMessageSerializer.dump(
220-
AirbyteMessage(
221-
type=Type.TRACE,
222-
trace=AirbyteTraceMessage(
223-
type=TraceType.ERROR,
224-
emitted_at=ab_datetime_now().to_epoch_millis(),
225-
error=AirbyteErrorTraceMessage(
226-
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
227-
stack_trace=traceback.format_exc(),
228-
),
216+
ab_message_to_string(
217+
AirbyteMessage(
218+
type=Type.TRACE,
219+
trace=AirbyteTraceMessage(
220+
type=TraceType.ERROR,
221+
emitted_at=ab_datetime_now().to_epoch_millis(),
222+
error=AirbyteErrorTraceMessage(
223+
message=f"Error starting the sync. This could be due to an invalid configuration or catalog. Please contact Support for assistance. Error: {error}",
224+
stack_trace=traceback.format_exc(),
229225
),
230-
)
231-
)
232-
).decode()
226+
),
227+
),
228+
),
233229
)
234230
raise error
235231

airbyte_cdk/config_observation.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,9 +16,9 @@
1616
AirbyteControlConnectorConfigMessage,
1717
AirbyteControlMessage,
1818
AirbyteMessage,
19-
AirbyteMessageSerializer,
2019
OrchestratorType,
2120
Type,
21+
ab_message_to_string,
2222
)
2323

2424

@@ -92,7 +92,7 @@ def emit_configuration_as_airbyte_control_message(config: MutableMapping[str, An
9292
See the airbyte_cdk.sources.message package
9393
"""
9494
airbyte_message = create_connector_config_control_message(config)
95-
print(orjson.dumps(AirbyteMessageSerializer.dump(airbyte_message)).decode())
95+
print(ab_message_to_string(airbyte_message))
9696

9797

9898
def create_connector_config_control_message(config: MutableMapping[str, Any]) -> AirbyteMessage:

airbyte_cdk/connector_builder/main.py

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -20,10 +20,10 @@
2020
from airbyte_cdk.entrypoint import AirbyteEntrypoint
2121
from airbyte_cdk.models import (
2222
AirbyteMessage,
23-
AirbyteMessageSerializer,
2423
AirbyteStateMessage,
2524
ConfiguredAirbyteCatalog,
2625
ConfiguredAirbyteCatalogSerializer,
26+
ab_message_to_string,
2727
)
2828
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
2929
from airbyte_cdk.sources.source import Source
@@ -92,11 +92,9 @@ def handle_request(args: List[str]) -> str:
9292
command, config, catalog, state = get_config_and_catalog_from_args(args)
9393
limits = get_limits(config)
9494
source = create_source(config, limits)
95-
return orjson.dumps(
96-
AirbyteMessageSerializer.dump(
97-
handle_connector_builder_request(source, command, config, catalog, state, limits)
98-
)
99-
).decode() # type: ignore[no-any-return] # Serializer.dump() always returns AirbyteMessage
95+
return ab_message_to_string(
96+
handle_connector_builder_request(source, command, config, catalog, state, limits)
97+
)
10098

10199

102100
if __name__ == "__main__":
@@ -107,4 +105,4 @@ def handle_request(args: List[str]) -> str:
107105
exc, message=f"Error handling request: {str(exc)}"
108106
)
109107
m = error.as_airbyte_message()
110-
print(orjson.dumps(AirbyteMessageSerializer.dump(m)).decode())
108+
print(ab_message_to_string(m))

airbyte_cdk/destinations/destination.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,11 @@
1515
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
1616
from airbyte_cdk.models import (
1717
AirbyteMessage,
18-
AirbyteMessageSerializer,
1918
ConfiguredAirbyteCatalog,
2019
ConfiguredAirbyteCatalogSerializer,
2120
Type,
21+
ab_message_from_string,
22+
ab_message_to_string,
2223
)
2324
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
2425
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
@@ -46,7 +47,7 @@ def _parse_input_stream(self, input_stream: io.TextIOWrapper) -> Iterable[Airbyt
4647
"""Reads from stdin, converting to Airbyte messages"""
4748
for line in input_stream:
4849
try:
49-
yield AirbyteMessageSerializer.load(orjson.loads(line))
50+
yield ab_message_from_string(line)
5051
except orjson.JSONDecodeError:
5152
logger.info(
5253
f"ignoring input which can't be deserialized as Airbyte Message: {line}"
@@ -151,4 +152,4 @@ def run(self, args: List[str]) -> None:
151152
parsed_args = self.parse_args(args)
152153
output_messages = self.run_cmd(parsed_args)
153154
for message in output_messages:
154-
print(orjson.dumps(AirbyteMessageSerializer.dump(message)).decode())
155+
print(ab_message_to_string(message))

airbyte_cdk/entrypoint.py

Lines changed: 8 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,12 +26,12 @@
2626
from airbyte_cdk.models import (
2727
AirbyteConnectionStatus,
2828
AirbyteMessage,
29-
AirbyteMessageSerializer,
3029
AirbyteStateStats,
3130
ConnectorSpecification,
3231
FailureType,
3332
Status,
3433
Type,
34+
ab_message_to_string,
3535
)
3636
from airbyte_cdk.sources import Source
3737
from airbyte_cdk.sources.connector_state_manager import HashableStreamDescriptor
@@ -47,7 +47,6 @@
4747

4848
VALID_URL_SCHEMES = ["https"]
4949
CLOUD_DEPLOYMENT_MODE = "cloud"
50-
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = False
5150

5251

5352
class AirbyteEntrypoint(object):
@@ -178,41 +177,41 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
178177
if cmd == "spec":
179178
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
180179
yield from [
181-
self.airbyte_message_to_string(queued_message)
180+
ab_message_to_string(queued_message)
182181
for queued_message in self._emit_queued_messages(self.source)
183182
]
184-
yield self.airbyte_message_to_string(message)
183+
yield ab_message_to_string(message)
185184
else:
186185
raw_config = self.source.read_config(parsed_args.config)
187186
config = self.source.configure(raw_config, temp_dir)
188187

189188
yield from [
190-
self.airbyte_message_to_string(queued_message)
189+
ab_message_to_string(queued_message)
191190
for queued_message in self._emit_queued_messages(self.source)
192191
]
193192
if cmd == "check":
194193
yield from map(
195-
AirbyteEntrypoint.airbyte_message_to_string,
194+
ab_message_to_string,
196195
self.check(source_spec, config),
197196
)
198197
elif cmd == "discover":
199198
yield from map(
200-
AirbyteEntrypoint.airbyte_message_to_string,
199+
ab_message_to_string,
201200
self.discover(source_spec, config),
202201
)
203202
elif cmd == "read":
204203
config_catalog = self.source.read_catalog(parsed_args.catalog)
205204
state = self.source.read_state(parsed_args.state)
206205

207206
yield from map(
208-
AirbyteEntrypoint.airbyte_message_to_string,
207+
ab_message_to_string,
209208
self.read(source_spec, config, config_catalog, state),
210209
)
211210
else:
212211
raise Exception("Unexpected command " + cmd)
213212
finally:
214213
yield from [
215-
self.airbyte_message_to_string(queued_message)
214+
ab_message_to_string(queued_message)
216215
for queued_message in self._emit_queued_messages(self.source)
217216
]
218217

@@ -327,20 +326,6 @@ def set_up_secret_filter(config: TConfig, connection_specification: Mapping[str,
327326
config_secrets = get_secrets(connection_specification, config)
328327
update_secrets(config_secrets)
329328

330-
@staticmethod
331-
def airbyte_message_to_string(airbyte_message: AirbyteMessage) -> str:
332-
global _HAS_LOGGED_FOR_SERIALIZATION_ERROR
333-
serialized_message = AirbyteMessageSerializer.dump(airbyte_message)
334-
try:
335-
return orjson.dumps(serialized_message).decode()
336-
except Exception as exception:
337-
if not _HAS_LOGGED_FOR_SERIALIZATION_ERROR:
338-
logger.warning(
339-
f"There was an error during the serialization of an AirbyteMessage: `{exception}`. This might impact the sync performances."
340-
)
341-
_HAS_LOGGED_FOR_SERIALIZATION_ERROR = True
342-
return json.dumps(serialized_message)
343-
344329
@classmethod
345330
def extract_state(cls, args: List[str]) -> Optional[Any]:
346331
parsed_args = cls.parse_args(args)

airbyte_cdk/logger.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,9 @@
1212
from airbyte_cdk.models import (
1313
AirbyteLogMessage,
1414
AirbyteMessage,
15-
AirbyteMessageSerializer,
1615
Level,
1716
Type,
17+
ab_message_to_string,
1818
)
1919
from airbyte_cdk.utils import PrintBuffer
2020
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
@@ -81,7 +81,7 @@ def format(self, record: logging.LogRecord) -> str:
8181
log_message = AirbyteMessage(
8282
type=Type.LOG, log=AirbyteLogMessage(level=airbyte_level, message=message)
8383
)
84-
return orjson.dumps(AirbyteMessageSerializer.dump(log_message)).decode()
84+
return ab_message_to_string(log_message)
8585

8686
@staticmethod
8787
def extract_extra_args_from_record(record: logging.LogRecord) -> Mapping[str, Any]:

airbyte_cdk/models/__init__.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,11 @@
5050
Type,
5151
)
5252
from .airbyte_protocol_serializers import (
53-
AirbyteMessageSerializer,
5453
AirbyteStateMessageSerializer,
55-
AirbyteStreamStateSerializer,
5654
ConfiguredAirbyteCatalogSerializer,
57-
ConfiguredAirbyteStreamSerializer,
5855
ConnectorSpecificationSerializer,
56+
ab_message_from_string,
57+
ab_message_to_string,
5958
)
6059
from .well_known_types import (
6160
BinaryData,

0 commit comments

Comments
 (0)