Skip to content

Commit cee6157

Browse files
author
maxime.c
committed
[WIP still tests failing] Remove DeclarativeStream instantiation
1 parent 8566607 commit cee6157

File tree

16 files changed

+889
-1018
lines changed

16 files changed

+889
-1018
lines changed

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
from datetime import timedelta
1212
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional
1313

14+
from airbyte_cdk.models import AirbyteStateMessage, AirbyteStateBlob, AirbyteStreamState, AirbyteStateType, StreamDescriptor
1415
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1516
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
1617
Timer,
@@ -128,6 +129,7 @@ def __init__(
128129

129130
# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
130131
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
132+
self._synced_some_data = False
131133

132134
@property
133135
def cursor_field(self) -> CursorField:
@@ -168,8 +170,8 @@ def close_partition(self, partition: Partition) -> None:
168170
with self._lock:
169171
self._semaphore_per_partition[partition_key].acquire()
170172
if not self._use_global_cursor:
171-
self._cursor_per_partition[partition_key].close_partition(partition=partition)
172173
cursor = self._cursor_per_partition[partition_key]
174+
cursor.close_partition(partition=partition)
173175
if (
174176
partition_key in self._partitions_done_generating_stream_slices
175177
and self._semaphore_per_partition[partition_key]._value == 0
@@ -213,8 +215,10 @@ def ensure_at_least_one_state_emitted(self) -> None:
213215
if not any(
214216
semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
215217
):
216-
self._global_cursor = self._new_global_cursor
217-
self._lookback_window = self._timer.finish()
218+
if self._synced_some_data:
219+
# we only update those if we actually synced some data
220+
self._global_cursor = self._new_global_cursor
221+
self._lookback_window = self._timer.finish()
218222
self._parent_state = self._partition_router.get_stream_state()
219223
self._emit_state_message(throttle=False)
220224

@@ -458,6 +462,7 @@ def observe(self, record: Record) -> None:
458462
except ValueError:
459463
return
460464

465+
self._synced_some_data = True
461466
record_cursor = self._connector_state_converter.output_format(
462467
self._connector_state_converter.parse_value(record_cursor_value)
463468
)
@@ -541,3 +546,23 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
541546

542547
def limit_reached(self) -> bool:
543548
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
549+
550+
@staticmethod
551+
def get_parent_state(stream_state: Optional[StreamState], parent_stream_name: str) -> Optional[AirbyteStateMessage]:
552+
return AirbyteStateMessage(
553+
type=AirbyteStateType.STREAM,
554+
stream=AirbyteStreamState(
555+
stream_descriptor=StreamDescriptor(parent_stream_name, None),
556+
stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name])
557+
)
558+
) if stream_state and "parent_state" in stream_state else None
559+
560+
@staticmethod
561+
def get_global_state(stream_state: Optional[StreamState], parent_stream_name: str) -> Optional[AirbyteStateMessage]:
562+
return AirbyteStateMessage(
563+
type=AirbyteStateType.STREAM,
564+
stream=AirbyteStreamState(
565+
stream_descriptor=StreamDescriptor(parent_stream_name, None),
566+
stream_state=AirbyteStateBlob(stream_state["state"])
567+
)
568+
) if stream_state and "state" in stream_state else None

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 100 additions & 131 deletions
Large diffs are not rendered by default.

airbyte_cdk/sources/declarative/partition_routers/substream_partition_router.py

Lines changed: 61 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,39 @@
77
import json
88
import logging
99
from dataclasses import InitVar, dataclass
10-
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
10+
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union, TypeVar
1111

1212
import dpath
1313
import requests
1414

1515
from airbyte_cdk.models import AirbyteMessage
16-
from airbyte_cdk.models import Type as MessageType
1716
from airbyte_cdk.sources.declarative.interpolation.interpolated_string import InterpolatedString
1817
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
1918
from airbyte_cdk.sources.declarative.requesters.request_option import (
2019
RequestOption,
2120
RequestOptionType,
2221
)
23-
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
24-
from airbyte_cdk.utils import AirbyteTracedException
22+
from airbyte_cdk.sources.streams.concurrent.partitions.partition import Partition
23+
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
2524

2625
if TYPE_CHECKING:
27-
from airbyte_cdk.sources.declarative.declarative_stream import DeclarativeStream
26+
from airbyte_cdk.sources.streams.concurrent.abstract_stream import AbstractStream
27+
28+
29+
def iterate_with_last_flag(generator: Iterable[Partition]) -> Iterable[tuple[Partition, bool]]:
30+
31+
iterator = iter(generator)
32+
33+
try:
34+
current = next(iterator)
35+
except StopIteration:
36+
return # Return an empty iterator
37+
38+
for next_item in iterator:
39+
yield current, False
40+
current = next_item
41+
42+
yield current, True
2843

2944

3045
@dataclass
@@ -40,7 +55,7 @@ class ParentStreamConfig:
4055
incremental_dependency (bool): Indicates if the parent stream should be read incrementally.
4156
"""
4257

43-
stream: "DeclarativeStream" # Parent streams must be DeclarativeStream because we can't know which part of the stream slice is a partition for regular Stream
58+
stream: "AbstractStream"
4459
parent_key: Union[InterpolatedString, str]
4560
partition_field: Union[InterpolatedString, str]
4661
config: Config
@@ -176,59 +191,51 @@ def stream_slices(self) -> Iterable[StreamSlice]:
176191
for field_path in parent_stream_config.extra_fields
177192
]
178193

179-
# read_stateless() assumes the parent is not concurrent. This is currently okay since the concurrent CDK does
180-
# not support either substreams or RFR, but something that needs to be considered once we do
181-
for parent_record in parent_stream.read_only_records():
182-
parent_partition = None
183-
# Skip non-records (eg AirbyteLogMessage)
184-
if isinstance(parent_record, AirbyteMessage):
185-
self.logger.warning(
186-
f"Parent stream {parent_stream.name} returns records of type AirbyteMessage. This SubstreamPartitionRouter is not able to checkpoint incremental parent state."
187-
)
188-
if parent_record.type == MessageType.RECORD:
189-
parent_record = parent_record.record.data # type: ignore[union-attr, assignment] # record is always a Record
190-
else:
191-
continue
192-
elif isinstance(parent_record, Record):
194+
for partition, is_last_slice in iterate_with_last_flag(parent_stream.generate_partitions()):
195+
for parent_record, is_last_record_in_slice in iterate_with_last_flag(partition.read()):
196+
parent_stream.cursor.observe(parent_record)
193197
parent_partition = (
194198
parent_record.associated_slice.partition
195199
if parent_record.associated_slice
196200
else {}
197201
)
198-
parent_record = parent_record.data
199-
elif not isinstance(parent_record, Mapping):
200-
# The parent_record should only take the form of a Record, AirbyteMessage, or Mapping. Anything else is invalid
201-
raise AirbyteTracedException(
202-
message=f"Parent stream returned records as invalid type {type(parent_record)}"
203-
)
204-
try:
205-
partition_value = dpath.get(
206-
parent_record, # type: ignore [arg-type]
207-
parent_field,
202+
record_data = parent_record.data
203+
204+
try:
205+
partition_value = dpath.get(
206+
record_data, # type: ignore [arg-type]
207+
parent_field,
208+
)
209+
except KeyError:
210+
# FIXME a log here would go a long way for debugging
211+
continue
212+
213+
# Add extra fields
214+
extracted_extra_fields = self._extract_extra_fields(record_data, extra_fields)
215+
216+
if parent_stream_config.lazy_read_pointer:
217+
extracted_extra_fields = {
218+
"child_response": self._extract_child_response(
219+
record_data,
220+
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
221+
),
222+
**extracted_extra_fields,
223+
}
224+
225+
if is_last_record_in_slice:
226+
parent_stream.cursor.close_partition(partition)
227+
228+
yield StreamSlice(
229+
partition={
230+
partition_field: partition_value,
231+
"parent_slice": parent_partition or {},
232+
},
233+
cursor_slice={},
234+
extra_fields=extracted_extra_fields,
208235
)
209-
except KeyError:
210-
continue
211-
212-
# Add extra fields
213-
extracted_extra_fields = self._extract_extra_fields(parent_record, extra_fields)
214-
215-
if parent_stream_config.lazy_read_pointer:
216-
extracted_extra_fields = {
217-
"child_response": self._extract_child_response(
218-
parent_record,
219-
parent_stream_config.lazy_read_pointer, # type: ignore[arg-type] # lazy_read_pointer type handeled in __post_init__ of parent_stream_config
220-
),
221-
**extracted_extra_fields,
222-
}
223-
224-
yield StreamSlice(
225-
partition={
226-
partition_field: partition_value,
227-
"parent_slice": parent_partition or {},
228-
},
229-
cursor_slice={},
230-
extra_fields=extracted_extra_fields,
231-
)
236+
237+
parent_stream.cursor.ensure_at_least_one_state_emitted()
238+
yield from []
232239

233240
def _extract_child_response(
234241
self, parent_record: Mapping[str, Any] | AirbyteMessage, pointer: List[InterpolatedString]
@@ -414,7 +421,7 @@ def get_stream_state(self) -> Optional[Mapping[str, StreamState]]:
414421
parent_state = {}
415422
for parent_config in self.parent_stream_configs:
416423
if parent_config.incremental_dependency:
417-
parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.state)
424+
parent_state[parent_config.stream.name] = copy.deepcopy(parent_config.stream.cursor.state)
418425
return parent_state
419426

420427
@property
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
from typing import Optional, Mapping, Any, Union
2+
3+
from airbyte_cdk.sources.declarative.partition_routers import PartitionRouter
4+
from airbyte_cdk.sources.declarative.requesters.request_options import RequestOptionsProvider
5+
from airbyte_cdk.sources.types import StreamSlice, StreamState
6+
7+
8+
class PerPartitionRequestOptionsProvider(RequestOptionsProvider):
9+
def __init__(self, partition_router: PartitionRouter, cursor_provider: RequestOptionsProvider):
10+
self._partition_router = partition_router
11+
self._cursor_provider = cursor_provider
12+
13+
def get_request_params(
14+
self,
15+
*,
16+
stream_state: Optional[StreamState] = None,
17+
stream_slice: Optional[StreamSlice] = None,
18+
next_page_token: Optional[Mapping[str, Any]] = None,
19+
) -> Mapping[str, Any]:
20+
return self._partition_router.get_request_params( # type: ignore # this always returns a mapping
21+
stream_state=stream_state,
22+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
23+
next_page_token=next_page_token,
24+
) | self._cursor_provider.get_request_params(
25+
stream_state=stream_state,
26+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
27+
next_page_token=next_page_token,
28+
)
29+
30+
def get_request_headers(
31+
self,
32+
*,
33+
stream_state: Optional[StreamState] = None,
34+
stream_slice: Optional[StreamSlice] = None,
35+
next_page_token: Optional[Mapping[str, Any]] = None,
36+
) -> Mapping[str, Any]:
37+
return self._partition_router.get_request_headers( # type: ignore # this always returns a mapping
38+
stream_state=stream_state,
39+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
40+
next_page_token=next_page_token,
41+
) | self._cursor_provider.get_request_headers(
42+
stream_state=stream_state,
43+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
44+
next_page_token=next_page_token,
45+
)
46+
47+
def get_request_body_data(
48+
self,
49+
*,
50+
stream_state: Optional[StreamState] = None,
51+
stream_slice: Optional[StreamSlice] = None,
52+
next_page_token: Optional[Mapping[str, Any]] = None,
53+
) -> Union[Mapping[str, Any], str]:
54+
return self._partition_router.get_request_body_data( # type: ignore # this always returns a mapping
55+
stream_state=stream_state,
56+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
57+
next_page_token=next_page_token,
58+
) | self._cursor_provider.get_request_body_data(
59+
stream_state=stream_state,
60+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
61+
next_page_token=next_page_token,
62+
)
63+
64+
def get_request_body_json(
65+
self,
66+
*,
67+
stream_state: Optional[StreamState] = None,
68+
stream_slice: Optional[StreamSlice] = None,
69+
next_page_token: Optional[Mapping[str, Any]] = None,
70+
) -> Mapping[str, Any]:
71+
return self._partition_router.get_request_body_json( # type: ignore # this always returns a mapping
72+
stream_state=stream_state,
73+
stream_slice=StreamSlice(partition=stream_slice.partition, cursor_slice={}),
74+
next_page_token=next_page_token,
75+
) | self._cursor_provider.get_request_body_json(
76+
stream_state=stream_state,
77+
stream_slice=StreamSlice(partition={}, cursor_slice=stream_slice.cursor_slice),
78+
next_page_token=next_page_token,
79+
)

airbyte_cdk/sources/declarative/resolvers/http_components_resolver.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
)
1818
from airbyte_cdk.sources.declarative.retrievers.retriever import Retriever
1919
from airbyte_cdk.sources.source import ExperimentalClassWarning
20+
from airbyte_cdk.sources.streams.concurrent.partitions.stream_slicer import StreamSlicer
2021
from airbyte_cdk.sources.types import Config
2122

2223

@@ -28,12 +29,14 @@ class HttpComponentsResolver(ComponentsResolver):
2829
2930
Attributes:
3031
retriever (Retriever): The retriever used to fetch data from an API.
32+
stream_slicer (StreamSlicer): The how the data is sliced.
3133
config (Config): Configuration object for the resolver.
3234
components_mapping (List[ComponentMappingDefinition]): List of mappings to resolve.
3335
parameters (InitVar[Mapping[str, Any]]): Additional parameters for interpolation.
3436
"""
3537

3638
retriever: Retriever
39+
stream_slicer: StreamSlicer
3740
config: Config
3841
components_mapping: List[ComponentMappingDefinition]
3942
parameters: InitVar[Mapping[str, Any]]
@@ -88,7 +91,7 @@ def resolve_components(
8891
"""
8992
kwargs = {"stream_template_config": stream_template_config}
9093

91-
for stream_slice in self.retriever.stream_slices():
94+
for stream_slice in self.stream_slicer.stream_slices():
9295
for components_values in self.retriever.read_records(
9396
records_schema={}, stream_slice=stream_slice
9497
):

airbyte_cdk/sources/declarative/retrievers/retriever.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44

55
from abc import abstractmethod
66
from typing import Any, Iterable, Mapping, Optional
7+
from typing_extensions import deprecated
78

89
from airbyte_cdk.sources.declarative.incremental.per_partition_cursor import StreamSlice
910
from airbyte_cdk.sources.streams.core import StreamData
@@ -30,11 +31,13 @@ def read_records(
3031
"""
3132

3233
@abstractmethod
34+
@deprecated("Stream slicing is being moved to the stream level.")
3335
def stream_slices(self) -> Iterable[Optional[StreamSlice]]:
3436
"""Returns the stream slices"""
3537

3638
@property
3739
@abstractmethod
40+
@deprecated("State management is being moved to the stream level.")
3841
def state(self) -> StreamState:
3942
"""State getter, should return state in form that can serialized to a string and send to the output
4043
as a STATE AirbyteMessage.
@@ -50,5 +53,6 @@ def state(self) -> StreamState:
5053

5154
@state.setter
5255
@abstractmethod
56+
@deprecated("State management is being moved to the stream level.")
5357
def state(self, value: StreamState) -> None:
5458
"""State setter, accept state serialized by state getter."""

airbyte_cdk/sources/message/repository.py

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -95,6 +95,21 @@ def consume_queue(self) -> Iterable[AirbyteMessage]:
9595
yield self._message_queue.popleft()
9696

9797

98+
class StateFilteringMessageRepository(MessageRepository):
99+
def __init__(self, decorated: MessageRepository) -> None:
100+
self._decorated = decorated
101+
102+
def emit_message(self, message: AirbyteMessage) -> None:
103+
if message.type != Type.STATE:
104+
self._decorated.emit_message(message)
105+
106+
def log_message(self, level: Level, message_provider: Callable[[], LogMessage]) -> None:
107+
self._decorated.log_message(level, message_provider)
108+
109+
def consume_queue(self) -> Iterable[AirbyteMessage]:
110+
yield from self._decorated.consume_queue()
111+
112+
98113
class LogAppenderMessageRepositoryDecorator(MessageRepository):
99114
def __init__(
100115
self,

0 commit comments

Comments
 (0)