Skip to content

Commit 2d4fdae

Browse files
Merge branch 'main' into daryna/low-code/refactor-model-to-component-factory
2 parents b212ed9 + f976c72 commit 2d4fdae

33 files changed

+616
-494
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,12 @@ def read_stream(
108108
stream_name = configured_catalog.streams[0].stream.name
109109

110110
stream_read = test_read_handler.run_test_read(
111-
source, config, configured_catalog, state, limits.max_records
111+
source,
112+
config,
113+
configured_catalog,
114+
stream_name,
115+
state,
116+
limits.max_records,
112117
)
113118

114119
return AirbyteMessage(

airbyte_cdk/connector_builder/test_reader/helpers.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,37 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby
269269
return at_least_one_page_in_group and should_process_slice_descriptor(message)
270270

271271

272+
def is_page_http_request_for_different_stream(
273+
json_message: Optional[Dict[str, Any]], stream_name: str
274+
) -> bool:
275+
"""
276+
Determines whether a given JSON message represents a page HTTP request for a different stream.
277+
278+
This function checks if the provided JSON message is a page HTTP request, and if the stream name in the log is
279+
different from the provided stream name.
280+
281+
This is needed because dynamic streams result in extra page HTTP requests for the dynamic streams that we want to ignore
282+
when they do not match the stream that is being read.
283+
284+
Args:
285+
json_message (Optional[Dict[str, Any]]): The JSON message to evaluate.
286+
stream_name (str): The name of the stream to compare against.
287+
288+
Returns:
289+
bool: True if the JSON message is a page HTTP request for a different stream, False otherwise.
290+
"""
291+
if not json_message or not is_page_http_request(json_message):
292+
return False
293+
294+
message_stream_name: str | None = (
295+
json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", None)
296+
)
297+
if message_stream_name is None:
298+
return False
299+
300+
return message_stream_name != stream_name
301+
302+
272303
def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool:
273304
"""
274305
Determines whether a given JSON message represents a page HTTP request.

airbyte_cdk/connector_builder/test_reader/message_grouper.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
is_async_auxiliary_request,
2929
is_config_update_message,
3030
is_log_message,
31+
is_page_http_request_for_different_stream,
3132
is_record_message,
3233
is_state_message,
3334
is_trace_with_error,
@@ -44,6 +45,7 @@ def get_message_groups(
4445
schema_inferrer: SchemaInferrer,
4546
datetime_format_inferrer: DatetimeFormatInferrer,
4647
limit: int,
48+
stream_name: str,
4749
) -> MESSAGE_GROUPS:
4850
"""
4951
Processes an iterator of AirbyteMessage objects to group and yield messages based on their type and sequence.
@@ -96,6 +98,9 @@ def get_message_groups(
9698
while records_count < limit and (message := next(messages, None)):
9799
json_message = airbyte_message_to_json(message)
98100

101+
if is_page_http_request_for_different_stream(json_message, stream_name):
102+
continue
103+
99104
if should_close_page(at_least_one_page_in_group, message, json_message):
100105
current_page_request, current_page_response = handle_current_page(
101106
current_page_request,

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def run_test_read(
8686
source: DeclarativeSource,
8787
config: Mapping[str, Any],
8888
configured_catalog: ConfiguredAirbyteCatalog,
89+
stream_name: str,
8990
state: List[AirbyteStateMessage],
9091
record_limit: Optional[int] = None,
9192
) -> StreamRead:
@@ -112,14 +113,17 @@ def run_test_read(
112113

113114
record_limit = self._check_record_limit(record_limit)
114115
# The connector builder currently only supports reading from a single stream at a time
115-
stream = source.streams(config)[0]
116+
streams = source.streams(config)
117+
stream = next((stream for stream in streams if stream.name == stream_name), None)
116118

117119
# get any deprecation warnings during the component creation
118120
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
119121

120122
schema_inferrer = SchemaInferrer(
121-
self._pk_to_nested_and_composite_field(stream.primary_key),
122-
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
123+
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
124+
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
125+
if stream
126+
else None,
123127
)
124128
datetime_format_inferrer = DatetimeFormatInferrer()
125129

@@ -128,6 +132,7 @@ def run_test_read(
128132
schema_inferrer,
129133
datetime_format_inferrer,
130134
record_limit,
135+
stream_name,
131136
)
132137

133138
slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 3 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -468,34 +468,11 @@ def _is_concurrent_cursor_incremental_without_partition_routing(
468468
def _get_retriever(
469469
declarative_stream: DeclarativeStream, stream_state: Mapping[str, Any]
470470
) -> Retriever:
471-
retriever = declarative_stream.retriever
472-
473-
# This is an optimization so that we don't invoke any cursor or state management flows within the
474-
# low-code framework because state management is handled through the ConcurrentCursor.
475-
if declarative_stream and isinstance(retriever, SimpleRetriever):
476-
# Also a temporary hack. In the legacy Stream implementation, as part of the read,
477-
# set_initial_state() is called to instantiate incoming state on the cursor. Although we no
478-
# longer rely on the legacy low-code cursor for concurrent checkpointing, low-code components
479-
# like StopConditionPaginationStrategyDecorator still rely on a DatetimeBasedCursor that is
480-
# properly initialized with state.
481-
if retriever.cursor:
482-
retriever.cursor.set_initial_state(stream_state=stream_state)
483-
484-
# Similar to above, the ClientSideIncrementalRecordFilterDecorator cursor is a separate instance
485-
# from the one initialized on the SimpleRetriever, so it also must also have state initialized
486-
# for semi-incremental streams using is_client_side_incremental to filter properly
487-
if isinstance(retriever.record_selector, RecordSelector) and isinstance(
488-
retriever.record_selector.record_filter, ClientSideIncrementalRecordFilterDecorator
489-
):
490-
retriever.record_selector.record_filter._cursor.set_initial_state(
491-
stream_state=stream_state
492-
) # type: ignore # After non-concurrent cursors are deprecated we can remove these cursor workarounds
493-
471+
if declarative_stream and isinstance(declarative_stream.retriever, SimpleRetriever):
494472
# We zero it out here, but since this is a cursor reference, the state is still properly
495473
# instantiated for the other components that reference it
496-
retriever.cursor = None
497-
498-
return retriever
474+
declarative_stream.retriever.cursor = None
475+
return declarative_stream.retriever
499476

500477
@staticmethod
501478
def _select_streams(

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1511,7 +1511,6 @@ definitions:
15111511
incremental_sync:
15121512
title: Incremental Sync
15131513
description: Component used to fetch data incrementally based on a time field in the data.
1514-
linkable: true
15151514
anyOf:
15161515
- "$ref": "#/definitions/DatetimeBasedCursor"
15171516
- "$ref": "#/definitions/IncrementingCountCursor"
@@ -3682,7 +3681,6 @@ definitions:
36823681
partition_router:
36833682
title: Partition Router
36843683
description: Used to iteratively execute requests over a set of values, such as a parent stream's records or a list of constant values.
3685-
linkable: true
36863684
anyOf:
36873685
- "$ref": "#/definitions/SubstreamPartitionRouter"
36883686
- "$ref": "#/definitions/ListPartitionRouter"

airbyte_cdk/sources/declarative/extractors/record_filter.py

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,6 @@
44
from dataclasses import InitVar, dataclass
55
from typing import Any, Callable, Iterable, Mapping, Optional, Union
66

7-
from airbyte_cdk.sources.declarative.incremental import (
8-
DatetimeBasedCursor,
9-
GlobalSubstreamCursor,
10-
PerPartitionWithGlobalCursor,
11-
)
127
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
138
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
149
RecordFilter as RecordFilterModel,
@@ -17,6 +12,7 @@
1712
AdditionalFlags,
1813
ComponentConstructor,
1914
)
15+
from airbyte_cdk.sources.streams.concurrent.cursor import Cursor
2016
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
2117

2218

@@ -75,13 +71,13 @@ class ClientSideIncrementalRecordFilterDecorator(RecordFilter):
7571
"""
7672
Applies a filter to a list of records to exclude those that are older than the stream_state/start_date.
7773
78-
:param DatetimeBasedCursor date_time_based_cursor: Cursor used to extract datetime values
74+
:param Cursor cursor: Cursor used to filter out values
7975
:param PerPartitionCursor per_partition_cursor: Optional Cursor used for mapping cursor value in nested stream_state
8076
"""
8177

8278
def __init__(
8379
self,
84-
cursor: Union[DatetimeBasedCursor, PerPartitionWithGlobalCursor, GlobalSubstreamCursor],
80+
cursor: Union[Cursor],
8581
**kwargs: Any,
8682
):
8783
super().__init__(**kwargs)
@@ -99,7 +95,7 @@ def filter_records(
9995
for record in records
10096
if self._cursor.should_be_synced(
10197
# Record is created on the fly to align with cursors interface; stream name is ignored as we don't need it here
102-
# Record stream name is empty cause it is not used durig the filtering
98+
# Record stream name is empty because it is not used during the filtering
10399
Record(data=record, associated_slice=stream_slice, stream_name="")
104100
)
105101
)

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 22 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -81,6 +81,7 @@ def __init__(
8181
connector_state_converter: AbstractStreamStateConverter,
8282
cursor_field: CursorField,
8383
use_global_cursor: bool = False,
84+
attempt_to_create_cursor_if_not_provided: bool = False,
8485
) -> None:
8586
self._global_cursor: Optional[StreamState] = {}
8687
self._stream_name = stream_name
@@ -125,6 +126,9 @@ def __init__(
125126

126127
self._set_initial_state(stream_state)
127128

129+
# FIXME this is a temporary field the time of the migration from declarative cursors to concurrent ones
130+
self._attempt_to_create_cursor_if_not_provided = attempt_to_create_cursor_if_not_provided
131+
128132
@property
129133
def cursor_field(self) -> CursorField:
130134
return self._cursor_field
@@ -512,13 +516,28 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
512516
raise ValueError(
513517
"Invalid state as stream slices that are emitted should refer to an existing cursor"
514518
)
519+
520+
if self._use_global_cursor:
521+
return self._create_cursor(
522+
self._global_cursor,
523+
self._lookback_window if self._global_cursor else 0,
524+
)
525+
515526
partition_key = self._to_partition_key(record.associated_slice.partition)
516-
if partition_key not in self._cursor_per_partition:
527+
if (
528+
partition_key not in self._cursor_per_partition
529+
and not self._attempt_to_create_cursor_if_not_provided
530+
):
517531
raise ValueError(
518532
"Invalid state as stream slices that are emitted should refer to an existing cursor"
519533
)
520-
cursor = self._cursor_per_partition[partition_key]
521-
return cursor
534+
elif partition_key not in self._cursor_per_partition:
535+
return self._create_cursor(
536+
self._global_cursor,
537+
self._lookback_window if self._global_cursor else 0,
538+
)
539+
else:
540+
return self._cursor_per_partition[partition_key]
522541

523542
def limit_reached(self) -> bool:
524543
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT

airbyte_cdk/sources/declarative/incremental/datetime_based_cursor.py

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -429,17 +429,6 @@ def _send_log(self, level: Level, message: str) -> None:
429429
)
430430
)
431431

432-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
433-
cursor_field = self.cursor_field.eval(self.config) # type: ignore # cursor_field is converted to an InterpolatedString in __post_init__
434-
first_cursor_value = first.get(cursor_field)
435-
second_cursor_value = second.get(cursor_field)
436-
if first_cursor_value and second_cursor_value:
437-
return self.parse_date(first_cursor_value) >= self.parse_date(second_cursor_value)
438-
elif first_cursor_value:
439-
return True
440-
else:
441-
return False
442-
443432
def set_runtime_lookback_window(self, lookback_window_in_seconds: int) -> None:
444433
"""
445434
Updates the lookback window based on a given number of seconds if the new duration

airbyte_cdk/sources/declarative/incremental/global_substream_cursor.py

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -338,12 +338,6 @@ def get_request_body_json(
338338
def should_be_synced(self, record: Record) -> bool:
339339
return self._stream_cursor.should_be_synced(self._convert_record_to_cursor_record(record))
340340

341-
def is_greater_than_or_equal(self, first: Record, second: Record) -> bool:
342-
return self._stream_cursor.is_greater_than_or_equal(
343-
self._convert_record_to_cursor_record(first),
344-
self._convert_record_to_cursor_record(second),
345-
)
346-
347341
@staticmethod
348342
def _convert_record_to_cursor_record(record: Record) -> Record:
349343
return Record(

0 commit comments

Comments
 (0)