Skip to content

Commit d44aea8

Browse files
author
Baz
authored
fix: (CDK) (Connector Builder) - refactor the MessageGrouper > TestRead (#332)
1 parent 522caab commit d44aea8

File tree

11 files changed

+1348
-518
lines changed

11 files changed

+1348
-518
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 16 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,11 @@
22
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
33
#
44

5-
import dataclasses
5+
6+
from dataclasses import asdict, dataclass, field
67
from typing import Any, List, Mapping
78

8-
from airbyte_cdk.connector_builder.message_grouper import MessageGrouper
9+
from airbyte_cdk.connector_builder.test_reader import TestReader
910
from airbyte_cdk.models import (
1011
AirbyteMessage,
1112
AirbyteRecordMessage,
@@ -32,11 +33,11 @@
3233
MAX_RECORDS_KEY = "max_records"
3334

3435

35-
@dataclasses.dataclass
36+
@dataclass
3637
class TestReadLimits:
37-
max_records: int = dataclasses.field(default=DEFAULT_MAXIMUM_RECORDS)
38-
max_pages_per_slice: int = dataclasses.field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
39-
max_slices: int = dataclasses.field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
38+
max_records: int = field(default=DEFAULT_MAXIMUM_RECORDS)
39+
max_pages_per_slice: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_PAGES_PER_SLICE)
40+
max_slices: int = field(default=DEFAULT_MAXIMUM_NUMBER_OF_SLICES)
4041

4142

4243
def get_limits(config: Mapping[str, Any]) -> TestReadLimits:
@@ -73,17 +74,20 @@ def read_stream(
7374
limits: TestReadLimits,
7475
) -> AirbyteMessage:
7576
try:
76-
handler = MessageGrouper(limits.max_pages_per_slice, limits.max_slices, limits.max_records)
77-
stream_name = configured_catalog.streams[
78-
0
79-
].stream.name # The connector builder only supports a single stream
80-
stream_read = handler.get_message_groups(
77+
test_read_handler = TestReader(
78+
limits.max_pages_per_slice, limits.max_slices, limits.max_records
79+
)
80+
# The connector builder only supports a single stream
81+
stream_name = configured_catalog.streams[0].stream.name
82+
83+
stream_read = test_read_handler.run_test_read(
8184
source, config, configured_catalog, state, limits.max_records
8285
)
86+
8387
return AirbyteMessage(
8488
type=MessageType.RECORD,
8589
record=AirbyteRecordMessage(
86-
data=dataclasses.asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
90+
data=asdict(stream_read), stream=stream_name, emitted_at=_emitted_at()
8791
),
8892
)
8993
except Exception as exc:

0 commit comments

Comments
 (0)