File tree Expand file tree Collapse file tree 1 file changed +6
-4
lines changed
airbyte_cdk/sources/streams/concurrent Expand file tree Collapse file tree 1 file changed +6
-4
lines changed Original file line number Diff line number Diff line change 44
55import functools
66import logging
7- import os
87import threading
98from abc import ABC , abstractmethod
109from typing import (
1918 Union ,
2019)
2120
22- from airbyte_cdk .models import AirbyteLogMessage , AirbyteMessage , Level , Type
2321from airbyte_cdk .sources .connector_state_manager import ConnectorStateManager
2422from airbyte_cdk .sources .message import MessageRepository
2523from airbyte_cdk .sources .streams import NO_CURSOR_STATE_KEY
@@ -233,8 +231,12 @@ def _get_concurrent_state(
233231 def observe (self , record : Record ) -> None :
234232 # Because observe writes to the most_recent_cursor_value_per_partition mapping,
235233 # it is not thread-safe. However, this shouldn't lead to concurrency issues
236- # because observe() is only invoked on the main thread and the map is broken
237- # down by partition which should not have conflicting read/write.
234+ # because observe() is only invoked in two ways both of which aren't conflicting:
235+ # - ConcurrentReadProcessor.on_record(): Since records are observed on the main
236+ # thread and so there aren't concurrent operations. Partitions are also split by key.
237+ # - PartitionReader.process_partition(): Because the map is broken down according to
238+ # partition, concurrent threads processing only read/write from different keys which
239+ # avoids any conflicts.
238240 #
239241 # If we were to add thread safety, we should implement a lock per-partition
240242 # which is instantiated during stream_slices()
You can’t perform that action at this time.
0 commit comments