Skip to content

Commit 78e0469

Browse files
committed
remove unneeded observe() and update comment
1 parent 3316766 commit 78e0469

File tree

2 files changed

+4
-8
lines changed

2 files changed

+4
-8
lines changed

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
154154
stream.as_airbyte_stream(), AirbyteStreamStatus.RUNNING
155155
)
156156
self._record_counter[stream.name] += 1
157-
stream.cursor.observe(record)
158157
yield message
159158
yield from self._message_repository.consume_queue()
160159

airbyte_cdk/sources/streams/concurrent/cursor.py

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -230,13 +230,10 @@ def _get_concurrent_state(
230230

231231
def observe(self, record: Record) -> None:
232232
# Because observe writes to the most_recent_cursor_value_per_partition mapping,
233-
# it is not thread-safe. However, this shouldn't lead to concurrency issues
234-
# because observe() is only invoked in two ways both of which aren't conflicting:
235-
# - ConcurrentReadProcessor.on_record(): Since records are observed on the main
236-
# thread and so there aren't concurrent operations. Partitions are also split by key.
237-
# - PartitionReader.process_partition(): Because the map is broken down according to
238-
# partition, concurrent threads processing only read/write from different keys which
239-
# avoids any conflicts.
233+
# it is not thread-safe. However, this shouldn't lead to concurrency issues because
234+
# observe() is only invoked by PartitionReader.process_partition(). Since the map is
235+
# broken down according to partition, concurrent threads processing only read/write
236+
# from different keys which avoids any conflicts.
240237
#
241238
# If we were to add thread safety, we should implement a lock per-partition
242239
# which is instantiated during stream_slices()

0 commit comments

Comments
 (0)