File tree Expand file tree Collapse file tree 1 file changed +7
-0
lines changed
airbyte_cdk/sources/streams/concurrent Expand file tree Collapse file tree 1 file changed +7
-0
lines changed Original file line number Diff line number Diff line change @@ -231,6 +231,13 @@ def _get_concurrent_state(
231231 )
232232
233233 def observe (self , record : Record ) -> None :
234+ # Because observe writes to the most_recent_cursor_value_per_partition mapping,
235+ # it is not thread-safe. However, this shouldn't lead to concurrency issues
236+ # because observe() is only invoked on the main thread and the map is broken
237+ # down by partition which should not have conflicting read/write.
238+ #
239+ # If we were to add thread safety, we should implement a lock per-partition
240+ # which is instantiated during stream_slices()
234241 most_recent_cursor_value = self ._most_recent_cursor_value_per_partition .get (
235242 record .associated_slice
236243 )
You can’t perform that action at this time.
0 commit comments