Skip to content

Commit c1435f5

Browse files
committed
make decorated instance can pass is_instance check as wrapped object
1 parent ca07d1a commit c1435f5

File tree

2 files changed

+91
-3
lines changed

2 files changed

+91
-3
lines changed

airbyte_cdk/sources/streams/concurrent/partitions/stream_slicer.py

Lines changed: 21 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,30 @@
11
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
22

3-
from abc import ABC, abstractmethod
4-
from typing import Iterable
3+
from abc import ABC, ABCMeta, abstractmethod
4+
from typing import Any, Iterable
55

66
from airbyte_cdk.sources.types import StreamSlice
77

88

9-
class StreamSlicer(ABC):
9+
class StreamSlicerMeta(ABCMeta):
10+
"""
11+
Metaclass for wrapper scenario that allows it to be used as a type check for StreamSlicer.
12+
This is necessary because StreamSlicerTestReadDecorator wraps a StreamSlicer and we want to be able to check
13+
if an instance is a StreamSlicer, even if it is wrapped in a StreamSlicerTestReadDecorator.
14+
15+
For example in ConcurrentDeclarativeSource, we do things like:
16+
isinstance(declarative_stream.retriever.stream_slicer,(GlobalSubstreamCursor, PerPartitionWithGlobalCursor))
17+
"""
18+
19+
def __instancecheck__(cls, instance: Any) -> bool:
20+
# Check if it's our wrapper with matching wrapped class
21+
if hasattr(instance, "wrapped_slicer"):
22+
return isinstance(instance.wrapped_slicer, cls)
23+
24+
return super().__instancecheck__(instance)
25+
26+
27+
class StreamSlicer(ABC, metaclass=StreamSlicerMeta):
1028
"""
1129
Slices the stream into chunks that can be fetched independently. Slices enable state checkpointing and data retrieval parallelization.
1230
"""
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from unittest.mock import Mock
6+
7+
from airbyte_cdk.sources.declarative.incremental import PerPartitionWithGlobalCursor
8+
from airbyte_cdk.sources.declarative.incremental.declarative_cursor import DeclarativeCursor
9+
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
10+
GlobalSubstreamCursor,
11+
)
12+
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import (
13+
StreamSlice,
14+
)
15+
from airbyte_cdk.sources.declarative.partition_routers import AsyncJobPartitionRouter
16+
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
17+
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicerTestReadDecorator
18+
19+
CURSOR_SLICE_FIELD = "cursor slice field"
20+
21+
22+
class MockedCursorBuilder:
23+
def __init__(self):
24+
self._stream_slices = []
25+
self._stream_state = {}
26+
27+
def with_stream_slices(self, stream_slices):
28+
self._stream_slices = stream_slices
29+
return self
30+
31+
def with_stream_state(self, stream_state):
32+
self._stream_state = stream_state
33+
return self
34+
35+
def build(self):
36+
cursor = Mock(spec=DeclarativeCursor)
37+
cursor.get_stream_state.return_value = self._stream_state
38+
cursor.stream_slices.return_value = self._stream_slices
39+
return cursor
40+
41+
42+
def mocked_partition_router():
43+
return Mock(spec=PartitionRouter)
44+
45+
46+
def test_show_as_wrapped_instance():
47+
first_partition = {"first_partition_key": "first_partition_value"}
48+
mocked_partition_router().stream_slices.return_value = [
49+
StreamSlice(
50+
partition=first_partition, cursor_slice={}, extra_fields={"extra_field": "extra_value"}
51+
),
52+
]
53+
cursor = (
54+
MockedCursorBuilder()
55+
.with_stream_slices([{CURSOR_SLICE_FIELD: "first slice cursor value"}])
56+
.build()
57+
)
58+
59+
global_cursor = GlobalSubstreamCursor(cursor, mocked_partition_router)
60+
wrapped_slicer = StreamSlicerTestReadDecorator(
61+
wrapped_slicer=global_cursor,
62+
maximum_number_of_slices=5,
63+
)
64+
assert isinstance(wrapped_slicer, GlobalSubstreamCursor)
65+
assert not isinstance(wrapped_slicer, AsyncJobPartitionRouter)
66+
assert not isinstance(wrapped_slicer, PerPartitionWithGlobalCursor)
67+
68+
assert isinstance(global_cursor, GlobalSubstreamCursor)
69+
assert not isinstance(global_cursor, AsyncJobPartitionRouter)
70+
assert not isinstance(global_cursor, PerPartitionWithGlobalCursor)

0 commit comments

Comments
 (0)