@@ -59,8 +59,8 @@ class ConcurrentPerPartitionCursor(Cursor):
5959 CurrentPerPartitionCursor expects the state of the ConcurrentCursor to follow the format {cursor_field: cursor_value}.
6060 """
6161
62- DEFAULT_MAX_PARTITIONS_NUMBER = 25_000
63- SWITCH_TO_GLOBAL_LIMIT = 10_000
62+ DEFAULT_MAX_PARTITIONS_NUMBER = 200
63+ SWITCH_TO_GLOBAL_LIMIT = 100
6464 _NO_STATE : Mapping [str , Any ] = {}
6565 _NO_CURSOR_STATE : Mapping [str , Any ] = {}
6666 _GLOBAL_STATE_KEY = "state"
@@ -145,19 +145,19 @@ def close_partition(self, partition: Partition) -> None:
145145 raise ValueError ("stream_slice cannot be None" )
146146
147147 partition_key = self ._to_partition_key (stream_slice .partition )
148- if not self ._use_global_cursor :
149- self ._cursor_per_partition [partition_key ].close_partition (partition = partition )
150148 with self ._lock :
151- self ._semaphore_per_partition [partition_key ].acquire ()
152- cursor = self ._cursor_per_partition [partition_key ]
153- if (
154- partition_key in self ._finished_partitions
155- and self ._semaphore_per_partition [partition_key ]._value == 0
156- ):
157- self ._update_global_cursor (cursor .state [self .cursor_field .cursor_field_key ])
158149 if not self ._use_global_cursor :
150+ self ._cursor_per_partition [partition_key ].close_partition (partition = partition )
151+ cursor = self ._cursor_per_partition [partition_key ]
152+ if (
153+ partition_key in self ._finished_partitions
154+ and self ._semaphore_per_partition [partition_key ]._value == 0
155+ ):
156+ self ._update_global_cursor (cursor .state [self .cursor_field .cursor_field_key ])
159157 self ._emit_state_message ()
160158
159+ self ._semaphore_per_partition [partition_key ].acquire ()
160+
161161 def ensure_at_least_one_state_emitted (self ) -> None :
162162 """
163163 The platform expect to have at least one state message on successful syncs. Hence, whatever happens, we expect this method to be
@@ -246,6 +246,13 @@ def _ensure_partition_limit(self) -> None:
246246 - Logs a warning each time a partition is removed, indicating whether it was finished
247247 or removed due to being the oldest.
248248 """
249+ if not self ._use_global_cursor and self .limit_reached ():
250+ logger .info (
251+ f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of { self .SWITCH_TO_GLOBAL_LIMIT } . "
252+ f"Switching to global cursor for { self ._stream_name } ."
253+ )
254+ self ._use_global_cursor = True
255+
249256 with self ._lock :
250257 self ._number_of_partitions += 1
251258 while len (self ._cursor_per_partition ) > self .DEFAULT_MAX_PARTITIONS_NUMBER - 1 :
@@ -368,14 +375,6 @@ def _set_global_state(self, stream_state: Mapping[str, Any]) -> None:
368375 self ._new_global_cursor = deepcopy (fixed_global_state )
369376
370377 def observe (self , record : Record ) -> None :
371- # ToDo: check number of partitions
372- if not self ._use_global_cursor and self .limit_reached ():
373- logger .info (
374- f"Exceeded the 'SWITCH_TO_GLOBAL_LIMIT' of { self .SWITCH_TO_GLOBAL_LIMIT } . "
375- f"Switching to global cursor for { self ._stream_name } ."
376- )
377- self ._use_global_cursor = True
378-
379378 if not record .associated_slice :
380379 raise ValueError (
381380 "Invalid state as stream slices that are emitted should refer to an existing cursor"
0 commit comments