@@ -149,9 +149,11 @@ def close_partition(self, partition: Partition) -> None:
149149 partition_key in self ._finished_partitions
150150 and self ._semaphore_per_partition [partition_key ]._value == 0
151151 ):
152- if self ._new_global_cursor is None or self ._extract_cursor_value_from_state (
153- self ._new_global_cursor
154- ) < self ._extract_cursor_value_from_state (cursor .state ):
152+ if (
153+ self ._new_global_cursor is None
154+ or self ._new_global_cursor [self .cursor_field .cursor_field_key ]
155+ < cursor .state [self .cursor_field .cursor_field_key ]
156+ ):
155157 self ._new_global_cursor = copy .deepcopy (cursor .state )
156158 if not self ._use_global_cursor :
157159 self ._emit_state_message ()
@@ -304,8 +306,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
304306 ):
305307 # We assume that `stream_state` is in a global format that can be applied to all partitions.
306308 # Example: {"global_state_format_key": "global_state_format_value"}
307- self ._global_cursor = deepcopy (stream_state )
308- self ._new_global_cursor = deepcopy (stream_state )
309+ self ._set_global_state (stream_state )
309310
310311 else :
311312 self ._use_global_cursor = stream_state .get ("use_global_cursor" , False )
@@ -322,8 +323,7 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
322323
323324 # set default state for missing partitions if it is per partition with fallback to global
324325 if self ._GLOBAL_STATE_KEY in stream_state :
325- self ._global_cursor = deepcopy (stream_state [self ._GLOBAL_STATE_KEY ])
326- self ._new_global_cursor = deepcopy (stream_state [self ._GLOBAL_STATE_KEY ])
326+ self ._set_global_state (stream_state [self ._GLOBAL_STATE_KEY ])
327327
328328 # Set initial parent state
329329 if stream_state .get ("parent_state" ):
@@ -332,6 +332,27 @@ def _set_initial_state(self, stream_state: StreamState) -> None:
332332 # Set parent state for partition routers based on parent streams
333333 self ._partition_router .set_initial_state (stream_state )
334334
335+ def _set_global_state (self , stream_state : Mapping [str , Any ]) -> None :
336+ """
337+ Initializes the global cursor state from the provided stream state.
338+
339+ If the cursor field key is present in the stream state, its value is parsed,
340+ formatted, and stored as the global cursor. This ensures consistency in state
341+ representation across partitions.
342+ """
343+ if self .cursor_field .cursor_field_key in stream_state :
344+ global_state_value = stream_state [self .cursor_field .cursor_field_key ]
345+ final_format_global_state_value = self ._connector_state_converter .output_format (
346+ self ._connector_state_converter .parse_value (global_state_value )
347+ )
348+
349+ fixed_global_state = {
350+ self .cursor_field .cursor_field_key : final_format_global_state_value
351+ }
352+
353+ self ._global_cursor = deepcopy (fixed_global_state )
354+ self ._new_global_cursor = deepcopy (fixed_global_state )
355+
335356 def observe (self , record : Record ) -> None :
336357 if not self ._use_global_cursor and self .limit_reached ():
337358 self ._use_global_cursor = True
0 commit comments