Skip to content

Commit ef0f3f8

Browse files
committed
pr feedback
1 parent ef07959 commit ef0f3f8

File tree

3 files changed

+138
-39
lines changed

3 files changed

+138
-39
lines changed

airbyte_cdk/sources/declarative/retrievers/retriever.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,6 @@ def read_records(
3030
:return: The records read from the API source
3131
"""
3232

33-
@abstractmethod
3433
@deprecated("Stream slicing is being moved to the stream level.")
3534
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
3635
"""Does nothing as this method is deprecated, so underlying Retriever implementations

airbyte_cdk/sources/declarative/retrievers/simple_retriever.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -530,6 +530,9 @@ def _read_pages(
530530

531531
yield from []
532532
else:
533+
# coderabbit detected an interesting bug/gap where if we were to not get a child_response, we
534+
# might recurse forever. This might not be the case, but it is worth noting that this code path
535+
# isn't comprehensively tested.
533536
yield from self._read_pages(records_generator_fn, stream_slice)
534537

535538
def _paginate(

unit_tests/sources/declarative/async_job/test_integration.py

Lines changed: 135 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -2,18 +2,25 @@
22

33

44
import logging
5-
from typing import Any, Iterable, List, Mapping, Optional, Set, Tuple
5+
from queue import Queue
6+
from typing import Any, Iterable, Iterator, List, Mapping, Optional, Set, Tuple
67
from unittest import TestCase, mock
78

8-
from airbyte_cdk.legacy.sources.declarative.declarative_stream import DeclarativeStream
9+
from airbyte_protocol_dataclasses.models import (
10+
AirbyteCatalog,
11+
AirbyteMessage,
12+
AirbyteStateMessage,
13+
ConfiguredAirbyteCatalog,
14+
)
15+
916
from airbyte_cdk.models import ConnectorSpecification
10-
from airbyte_cdk.sources import AbstractSource
17+
from airbyte_cdk.sources.concurrent_source.concurrent_source import ConcurrentSource
1118
from airbyte_cdk.sources.declarative.async_job.job import AsyncJob
1219
from airbyte_cdk.sources.declarative.async_job.job_orchestrator import AsyncJobOrchestrator
1320
from airbyte_cdk.sources.declarative.async_job.job_tracker import JobTracker
1421
from airbyte_cdk.sources.declarative.async_job.repository import AsyncJobRepository
1522
from airbyte_cdk.sources.declarative.async_job.status import AsyncJobStatus
16-
from airbyte_cdk.sources.declarative.extractors.record_extractor import RecordExtractor
23+
from airbyte_cdk.sources.declarative.extractors.dpath_extractor import DpathExtractor
1724
from airbyte_cdk.sources.declarative.extractors.record_selector import RecordSelector
1825
from airbyte_cdk.sources.declarative.partition_routers import SinglePartitionRouter
1926
from airbyte_cdk.sources.declarative.partition_routers.async_job_partition_router import (
@@ -22,15 +29,23 @@
2229
from airbyte_cdk.sources.declarative.retrievers.async_retriever import AsyncRetriever
2330
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader
2431
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
32+
from airbyte_cdk.sources.declarative.stream_slicers.declarative_partition_generator import (
33+
DeclarativePartitionFactory,
34+
StreamSlicerPartitionGenerator,
35+
)
2536
from airbyte_cdk.sources.message import NoopMessageRepository
26-
from airbyte_cdk.sources.streams import Stream
37+
from airbyte_cdk.sources.source import Source
38+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
39+
from airbyte_cdk.sources.streams.concurrent.cursor import FinalStateCursor
40+
from airbyte_cdk.sources.streams.concurrent.default_stream import DefaultStream
41+
from airbyte_cdk.sources.streams.concurrent.partitions.types import QueueItem
2742
from airbyte_cdk.sources.types import StreamSlice
43+
from airbyte_cdk.sources.utils.slice_logger import DebugSliceLogger
2844
from airbyte_cdk.sources.utils.transform import TransformConfig, TypeTransformer
2945
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
3046
from airbyte_cdk.test.entrypoint_wrapper import read
3147

3248
_A_STREAM_NAME = "a_stream_name"
33-
_EXTRACTOR_NOT_USED: RecordExtractor = None # type: ignore # the extractor should not be used. If it is the case, there is an issue that needs fixing
3449
_NO_LIMIT = 10000
3550

3651

@@ -52,57 +67,139 @@ def delete(self, job: AsyncJob) -> None:
5267
pass
5368

5469

55-
class MockSource(AbstractSource):
70+
class MockSource(Source):
5671
def __init__(self, stream_slicer: Optional[StreamSlicer] = None) -> None:
5772
self._stream_slicer = SinglePartitionRouter({}) if stream_slicer is None else stream_slicer
73+
queue: Queue[QueueItem] = Queue(maxsize=10_000)
5874
self._message_repository = NoopMessageRepository()
75+
self._config = {}
76+
77+
self._concurrent_source = ConcurrentSource.create(
78+
num_workers=1,
79+
initial_number_of_partitions_to_generate=1,
80+
logger=logging.getLogger("airbyte"),
81+
slice_logger=DebugSliceLogger(),
82+
queue=queue,
83+
message_repository=self._message_repository,
84+
)
5985

60-
def check_connection(
86+
def check(
6187
self, logger: logging.Logger, config: Mapping[str, Any]
6288
) -> Tuple[bool, Optional[Any]]:
6389
return True, None
6490

6591
def spec(self, logger: logging.Logger) -> ConnectorSpecification:
6692
return ConnectorSpecification(connectionSpecification={})
6793

68-
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
69-
noop_record_selector = RecordSelector(
70-
extractor=_EXTRACTOR_NOT_USED,
94+
def streams(self, config: Mapping[str, Any]) -> List[AbstractStream]:
95+
# Build the partition router with the mock repository
96+
partition_router = AsyncJobPartitionRouter(
97+
stream_slicer=self._stream_slicer,
98+
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
99+
MockAsyncJobRepository(),
100+
stream_slices,
101+
JobTracker(_NO_LIMIT),
102+
self._message_repository,
103+
),
104+
config={},
105+
parameters={},
106+
)
107+
108+
# Create the extractor that extracts records from responses
109+
extractor = DpathExtractor(
110+
field_path=[],
111+
config={},
112+
parameters={},
113+
)
114+
115+
# Create the record selector with the extractor
116+
record_selector = RecordSelector(
117+
extractor=extractor,
71118
config={},
72119
parameters={},
73120
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
121+
name=_A_STREAM_NAME,
74122
record_filter=None,
75123
transformations=[],
76124
)
77-
return [
78-
DeclarativeStream(
79-
retriever=AsyncRetriever(
80-
config={},
81-
parameters={},
82-
record_selector=noop_record_selector,
83-
stream_slicer=AsyncJobPartitionRouter(
84-
stream_slicer=self._stream_slicer,
85-
job_orchestrator_factory=lambda stream_slices: AsyncJobOrchestrator(
86-
MockAsyncJobRepository(),
87-
stream_slices,
88-
JobTracker(_NO_LIMIT),
89-
self._message_repository,
90-
),
91-
config={},
92-
parameters={},
93-
),
94-
),
95-
config={},
96-
parameters={},
97-
name=_A_STREAM_NAME,
98-
primary_key=["id"],
99-
schema_loader=InlineSchemaLoader({}, {}),
100-
# the interface mentions that this is Optional,
101-
# but I get `'NoneType' object has no attribute 'eval'` by passing None
102-
stream_cursor_field="",
103-
)
125+
126+
# Build the retriever with the partition router
127+
retriever = AsyncRetriever(
128+
config={},
129+
parameters={},
130+
record_selector=record_selector,
131+
stream_slicer=partition_router,
132+
)
133+
134+
# Create schema loader
135+
schema_loader = InlineSchemaLoader({}, {})
136+
137+
# Create partition factory that will create partitions from stream slices
138+
partition_factory = DeclarativePartitionFactory(
139+
stream_name=_A_STREAM_NAME,
140+
schema_loader=schema_loader,
141+
retriever=retriever,
142+
message_repository=self._message_repository,
143+
max_records_limit=None,
144+
)
145+
146+
# Create partition generator that wraps the partition router
147+
partition_generator = StreamSlicerPartitionGenerator(
148+
partition_factory=partition_factory,
149+
stream_slicer=partition_router,
150+
slice_limit=None,
151+
max_records_limit=None,
152+
)
153+
154+
# Create cursor (using FinalStateCursor for full refresh)
155+
cursor = FinalStateCursor(
156+
stream_name=_A_STREAM_NAME,
157+
stream_namespace=None,
158+
message_repository=self._message_repository,
159+
)
160+
161+
# Directly instantiate DefaultStream with all components
162+
stream = DefaultStream(
163+
partition_generator=partition_generator,
164+
name=_A_STREAM_NAME,
165+
json_schema={},
166+
primary_key=["id"],
167+
cursor_field=None,
168+
logger=logging.getLogger("airbyte"),
169+
cursor=cursor,
170+
)
171+
172+
return [stream]
173+
174+
def read(
175+
self,
176+
logger: logging.Logger,
177+
config: Mapping[str, Any],
178+
catalog: ConfiguredAirbyteCatalog,
179+
state: Optional[List[AirbyteStateMessage]] = None,
180+
) -> Iterator[AirbyteMessage]:
181+
stream_name_to_instance = {s.name: s for s in self.streams(config=self._config)}
182+
selected_concurrent_streams = [
183+
stream_name_to_instance[configured_stream.stream.name]
184+
for configured_stream in catalog.streams
185+
if configured_stream.stream.name in stream_name_to_instance
104186
]
105187

188+
# selected_concurrent_streams = self._select_streams(
189+
# streams=, # type: ignore # We are migrating away from the DeclarativeStream implementation and streams() only returns the concurrent-compatible AbstractStream. To preserve compatibility, we retain the existing method interface
190+
# configured_catalog=catalog,
191+
# )
192+
193+
# It would appear that passing in an empty set of streams causes an infinite loop in ConcurrentReadProcessor.
194+
# This is also evident in concurrent_source_adapter.py so I'll leave this out of scope to fix for now
195+
if len(selected_concurrent_streams) > 0:
196+
yield from self._concurrent_source.read(selected_concurrent_streams)
197+
198+
def discover(self, logger: logging.Logger, config: Mapping[str, Any]) -> AirbyteCatalog:
199+
return AirbyteCatalog(
200+
streams=[stream.as_airbyte_stream() for stream in self.streams(config=self._config)]
201+
)
202+
106203

107204
class JobDeclarativeStreamTest(TestCase):
108205
_CONFIG: Mapping[str, Any] = {}

0 commit comments

Comments
 (0)