Skip to content

Commit 737b22c

Browse files
committed
some pr feedback to rework slice limiting, adding comments, and a few other tweaks
1 parent a9d8140 commit 737b22c

File tree

6 files changed

+40
-39
lines changed

6 files changed

+40
-39
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,8 @@ def __init__(
128128
disable_cache=True if limits else False,
129129
)
130130

131+
self._limits = limits
132+
131133
super().__init__(
132134
source_config=source_config,
133135
config=config,
@@ -324,6 +326,9 @@ def _group_streams(
324326
self.message_repository,
325327
),
326328
stream_slicer=declarative_stream.retriever.stream_slicer,
329+
slice_limit=self._limits.max_slices
330+
if self._limits
331+
else None, # technically not needed because create_declarative_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
327332
)
328333
else:
329334
if (
@@ -355,6 +360,7 @@ def _group_streams(
355360
self.message_repository,
356361
),
357362
stream_slicer=cursor,
363+
slice_limit=self._limits.max_slices if self._limits else None,
358364
)
359365

360366
concurrent_streams.append(
@@ -386,6 +392,9 @@ def _group_streams(
386392
self.message_repository,
387393
),
388394
declarative_stream.retriever.stream_slicer,
395+
slice_limit=self._limits.max_slices
396+
if self._limits
397+
else None, # technically not needed because create_declarative_stream() -> create_simple_retriever() will apply the decorator. But for consistency and depending how we build create_default_stream, this may be needed later
389398
)
390399

391400
final_state_cursor = FinalStateCursor(
@@ -447,6 +456,7 @@ def _group_streams(
447456
self.message_repository,
448457
),
449458
perpartition_cursor,
459+
slice_limit=self._limits.max_slices if self._limits else None,
450460
)
451461

452462
concurrent_streams.append(

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1453,7 +1453,7 @@ def create_concurrent_cursor_from_datetime_based_cursor(
14531453
f"Invalid clamping target {evaluated_target}, expected DAY, WEEK, MONTH"
14541454
)
14551455

1456-
concurrent_cursor = ConcurrentCursor(
1456+
return ConcurrentCursor(
14571457
stream_name=stream_name,
14581458
stream_namespace=stream_namespace,
14591459
stream_state=stream_state,
@@ -1470,17 +1470,6 @@ def create_concurrent_cursor_from_datetime_based_cursor(
14701470
clamping_strategy=clamping_strategy,
14711471
)
14721472

1473-
if self._should_limit_slices_fetched():
1474-
return cast( # type: ignore # For a test_read, this will return a StreamSlicer that wraps a cursor and should still work. Changing the signature creates even more type problems
1475-
StreamSlicer,
1476-
StreamSlicerTestReadDecorator(
1477-
wrapped_slicer=concurrent_cursor,
1478-
maximum_number_of_slices=self._limit_slices_fetched or 5,
1479-
),
1480-
)
1481-
else:
1482-
return concurrent_cursor
1483-
14841473
def create_concurrent_cursor_from_incrementing_count_cursor(
14851474
self,
14861475
model_type: Type[BaseModel],
@@ -1534,7 +1523,7 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
15341523
is_sequential_state=True, # ConcurrentPerPartitionCursor only works with sequential state
15351524
)
15361525

1537-
concurrent_cursor = ConcurrentCursor(
1526+
return ConcurrentCursor(
15381527
stream_name=stream_name,
15391528
stream_namespace=stream_namespace,
15401529
stream_state=stream_state,
@@ -1547,17 +1536,6 @@ def create_concurrent_cursor_from_incrementing_count_cursor(
15471536
end_provider=connector_state_converter.get_end_provider(), # type: ignore # Having issues w/ inspection for GapType and CursorValueType as shown in existing tests. Confirmed functionality is working in practice
15481537
)
15491538

1550-
if self._should_limit_slices_fetched():
1551-
return cast( # type: ignore # For a test_read, this will return a StreamSlicer that wraps a cursor and should still work. Changing the signature creates even more type problems
1552-
StreamSlicer,
1553-
StreamSlicerTestReadDecorator(
1554-
wrapped_slicer=concurrent_cursor,
1555-
maximum_number_of_slices=self._limit_slices_fetched or MAX_SLICES,
1556-
),
1557-
)
1558-
else:
1559-
return concurrent_cursor
1560-
15611539
def _assemble_weekday(self, weekday: str) -> Weekday:
15621540
match weekday:
15631541
case "MONDAY":

airbyte_cdk/sources/declarative/stream_slicers/declarative_partition_generator.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
1-
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
22

3-
from typing import Any, Iterable, Mapping, Optional
3+
from typing import Any, Iterable, Mapping, Optional, cast
44

55
from airbyte_cdk.sources.declarative.retrievers import Retriever
6+
from airbyte_cdk.sources.declarative.stream_slicers.stream_slicer_test_read_decorator import (
7+
StreamSlicerTestReadDecorator,
8+
)
69
from airbyte_cdk.sources.message import MessageRepository
710
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
811
from airbyte_cdk.sources.streams.concurrent.partitions.partition_generator import PartitionGenerator
@@ -83,10 +86,23 @@ def __hash__(self) -> int:
8386

8487
class StreamSlicerPartitionGenerator(PartitionGenerator):
8588
def __init__(
86-
self, partition_factory: DeclarativePartitionFactory, stream_slicer: StreamSlicer
89+
self,
90+
partition_factory: DeclarativePartitionFactory,
91+
stream_slicer: StreamSlicer,
92+
slice_limit: Optional[int] = None,
8793
) -> None:
8894
self._partition_factory = partition_factory
89-
self._stream_slicer = stream_slicer
95+
96+
if slice_limit:
97+
self._stream_slicer = cast(
98+
StreamSlicer,
99+
StreamSlicerTestReadDecorator(
100+
wrapped_slicer=stream_slicer,
101+
maximum_number_of_slices=slice_limit,
102+
),
103+
)
104+
else:
105+
self._stream_slicer = stream_slicer
90106

91107
def generate(self) -> Iterable[Partition]:
92108
for stream_slice in self._stream_slicer.stream_slices():

airbyte_cdk/sources/streams/concurrent/partition_reader.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,10 @@
1515
from airbyte_cdk.sources.utils.slice_logger import SliceLogger
1616

1717

18+
# Since moving all the connector builder workflow to the concurrent CDK which required correct ordering
19+
# of grouping log messages onto the main write thread using the ConcurrentMessageRepository, this
20+
# separate flow and class that was used to log slices onto this partition's message_repository
21+
# should just be replaced by emitting messages directly onto the repository instead of an intermediary.
1822
class PartitionLogger:
1923
"""
2024
Helper class that provides a mechanism for passing a log message onto the current

airbyte_cdk/sources/utils/slice_logger.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@
1111
from airbyte_cdk.models import Type as MessageType
1212

1313

14+
# Once everything runs on the concurrent CDK and we've cleaned up the legacy flows, we should try to remove
15+
# this class and write messages directly to the message_repository instead of through the logger because for
16+
# cases like the connector builder where ordering of messages is important, using the logger can cause
17+
# messages to be grouped out of order. Alas work for a different day.
1418
class SliceLogger(ABC):
1519
"""
1620
SliceLogger is an interface that allows us to log slices of data in a uniform way.

unit_tests/sources/declarative/schema/test_dynamic_schema_loader.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,15 @@
44

55
import json
66
from copy import deepcopy
7-
from queue import Queue
87
from unittest.mock import MagicMock
98

109
import pytest
11-
from airbyte_protocol_dataclasses.models import Level
1210

1311
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
1412
ConcurrentDeclarativeSource,
1513
TestLimits,
1614
)
17-
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
18-
ModelToComponentFactory,
19-
)
2015
from airbyte_cdk.sources.declarative.schema import DynamicSchemaLoader, SchemaTypeIdentifier
21-
from airbyte_cdk.sources.message.concurrent_repository import ConcurrentMessageRepository
22-
from airbyte_cdk.sources.message.repository import InMemoryMessageRepository
23-
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
2416
from airbyte_cdk.test.mock_http import HttpMocker, HttpRequest, HttpResponse
2517

2618
_CONFIG = {
@@ -360,9 +352,6 @@ def test_dynamic_schema_loader_with_type_conditions():
360352
},
361353
}
362354

363-
# queue: Queue[QueueItem] = Queue(maxsize=10_000)
364-
# message_repository = InMemoryMessageRepository(Level.INFO)
365-
366355
source = ConcurrentDeclarativeSource(
367356
source_config=_MANIFEST_WITH_TYPE_CONDITIONS,
368357
config=_CONFIG,

0 commit comments

Comments
 (0)