Skip to content

Commit aa74041

Browse files
author
maxime.c
committed
test_concurrent_perpartitioncursor.py are somewhat passing, there that at least
1 parent 0f17729 commit aa74041

File tree

8 files changed

+310
-181
lines changed

8 files changed

+310
-181
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -377,7 +377,7 @@ def _group_streams(
377377
and hasattr(declarative_stream.retriever, "stream_slicer")
378378
and isinstance(
379379
declarative_stream.retriever.stream_slicer,
380-
(GlobalSubstreamCursor, PerPartitionWithGlobalCursor),
380+
ConcurrentPerPartitionCursor,
381381
)
382382
):
383383
stream_state = self._connector_state_manager.get_stream_state(
@@ -435,6 +435,8 @@ def _group_streams(
435435
and self.is_partially_declarative
436436
):
437437
concurrent_streams.append(declarative_stream.get_underlying_stream())
438+
elif isinstance(declarative_stream, DefaultStream): # FIXME added temporarily until the ConcurrentDeclarativeSource is cleaned up
439+
concurrent_streams.append(declarative_stream)
438440
else:
439441
synchronous_streams.append(declarative_stream)
440442

airbyte_cdk/sources/declarative/incremental/concurrent_partition_cursor.py

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@
99
from collections import OrderedDict
1010
from copy import deepcopy
1111
from datetime import timedelta
12-
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional
12+
from typing import Any, Callable, Iterable, List, Mapping, MutableMapping, Optional, Union
1313

14+
from airbyte_cdk.models import AirbyteStateMessage, AirbyteStateBlob, AirbyteStreamState, AirbyteStateType, StreamDescriptor
1415
from airbyte_cdk.sources.connector_state_manager import ConnectorStateManager
1516
from airbyte_cdk.sources.declarative.incremental.global_substream_cursor import (
1617
Timer,
1718
iterate_with_last_flag_and_state,
1819
)
20+
# It is interesting that this file depends on the declarative stuff. If we ever think that per partition cursors will ever be needed outside the declarative package, we would need to add an interface here to ensure that we avoid circular dependencies
1921
from airbyte_cdk.sources.declarative.partition_routers.partition_router import PartitionRouter
2022
from airbyte_cdk.sources.message import MessageRepository
2123
from airbyte_cdk.sources.streams.checkpoint.per_partition_key_serializer import (
@@ -124,6 +126,7 @@ def __init__(
124126
self._timer = Timer()
125127

126128
self._set_initial_state(stream_state)
129+
self._synced_some_data = False
127130

128131
@property
129132
def cursor_field(self) -> CursorField:
@@ -154,6 +157,7 @@ def state(self) -> MutableMapping[str, Any]:
154157

155158
def close_partition(self, partition: Partition) -> None:
156159
# Attempt to retrieve the stream slice
160+
logger.warning(f"GODO: stream {self._stream_name} closing partition {partition.to_slice()}")
157161
stream_slice: Optional[StreamSlice] = partition.to_slice() # type: ignore[assignment]
158162

159163
# Ensure stream_slice is not None
@@ -209,8 +213,10 @@ def ensure_at_least_one_state_emitted(self) -> None:
209213
if not any(
210214
semaphore_item[1]._value for semaphore_item in self._semaphore_per_partition.items()
211215
):
212-
self._global_cursor = self._new_global_cursor
213-
self._lookback_window = self._timer.finish()
216+
if self._synced_some_data:
217+
# we only update those if we actually synced some data
218+
self._global_cursor = self._new_global_cursor
219+
self._lookback_window = self._timer.finish()
214220
self._parent_state = self._partition_router.get_stream_state()
215221
self._emit_state_message(throttle=False)
216222

@@ -454,6 +460,7 @@ def observe(self, record: Record) -> None:
454460
except ValueError:
455461
return
456462

463+
self._synced_some_data = True
457464
record_cursor = self._connector_state_converter.output_format(
458465
self._connector_state_converter.parse_value(record_cursor_value)
459466
)
@@ -522,3 +529,23 @@ def _get_cursor(self, record: Record) -> ConcurrentCursor:
522529

523530
def limit_reached(self) -> bool:
524531
return self._number_of_partitions > self.SWITCH_TO_GLOBAL_LIMIT
532+
533+
@staticmethod
534+
def get_parent_state(stream_state: Optional[StreamState], parent_stream_name: str) -> Optional[AirbyteStateMessage]:
535+
return AirbyteStateMessage(
536+
type=AirbyteStateType.STREAM,
537+
stream=AirbyteStreamState(
538+
stream_descriptor=StreamDescriptor(parent_stream_name, None),
539+
stream_state=AirbyteStateBlob(stream_state["parent_state"][parent_stream_name])
540+
)
541+
) if stream_state and "parent_state" in stream_state else None
542+
543+
@staticmethod
544+
def get_global_state(stream_state: Optional[StreamState], parent_stream_name: str) -> Optional[AirbyteStateMessage]:
545+
return AirbyteStateMessage(
546+
type=AirbyteStateType.STREAM,
547+
stream=AirbyteStreamState(
548+
stream_descriptor=StreamDescriptor(parent_stream_name, None),
549+
stream_state=AirbyteStateBlob(stream_state["state"])
550+
)
551+
) if stream_state and "state" in stream_state else None

0 commit comments

Comments
 (0)