@@ -157,7 +157,11 @@ def _refresh_stream_keys(
157157 stream_key_prefix = self .stream_key_prefix
158158
159159 try :
160- candidate_keys = self ._scan_candidate_stream_keys (stream_key_prefix )
160+ candidate_keys = self ._scan_candidate_stream_keys (
161+ stream_key_prefix = stream_key_prefix ,
162+ max_keys = max_keys ,
163+ time_limit_sec = time_limit_sec ,
164+ )
161165 last_entries_results = self ._pipeline_last_entries (candidate_keys )
162166 now_sec = time .time ()
163167 keys_to_delete = self ._collect_inactive_keys (
@@ -762,17 +766,45 @@ def __del__(self):
762766 def unfinished_tasks (self ) -> int :
763767 return self .qsize ()
764768
765- def _scan_candidate_stream_keys (self , stream_key_prefix : str ) -> list [str ]:
766- """Return stream keys matching the given prefix via SCAN, using precompiled regex when possible."""
769+ def _scan_candidate_stream_keys (
770+ self ,
771+ stream_key_prefix : str ,
772+ max_keys : int | None = None ,
773+ time_limit_sec : float | None = None ,
774+ count_hint : int = 200 ,
775+ ) -> list [str ]:
776+ """Return stream keys matching the given prefix via SCAN with optional limits.
777+
778+ Uses a cursor-based SCAN to collect keys matching the prefix, honoring
779+ optional `max_keys` and `time_limit_sec` constraints. Filters results
780+ with a precompiled regex when scanning the configured prefix.
781+ """
767782 redis_pattern = f"{ stream_key_prefix } :*"
768- raw_keys_iter = self ._redis_conn .scan_iter (match = redis_pattern )
769- raw_keys = list (raw_keys_iter )
770- # Use precompiled pattern when scanning the configured prefix; otherwise compile a per-call pattern
783+ collected = []
784+ cursor = 0
785+ start_ts = time .time () if time_limit_sec else None
786+ while True :
787+ if (
788+ start_ts is not None
789+ and time_limit_sec is not None
790+ and (time .time () - start_ts ) > time_limit_sec
791+ ):
792+ break
793+ cursor , keys = self ._redis_conn .scan (
794+ cursor = cursor , match = redis_pattern , count = count_hint
795+ )
796+ collected .extend (keys )
797+ if max_keys is not None and len (collected ) >= max_keys :
798+ break
799+ if cursor == 0 or cursor == "0" :
800+ break
801+
771802 if stream_key_prefix == self .stream_key_prefix :
772803 pattern = self .stream_prefix_regex_pattern
773804 else :
774- pattern = re .compile (f"^{ re .escape (stream_key_prefix )} :" )
775- return [key for key in raw_keys if pattern .match (key )]
805+ escaped_prefix = re .escape (stream_key_prefix )
806+ pattern = re .compile (f"^{ escaped_prefix } :" )
807+ return [key for key in collected if pattern .match (key )]
776808
777809 def _pipeline_last_entries (self , candidate_keys : list [str ]) -> list [list [tuple [str , dict ]]]:
778810 """Fetch last entries for keys using pipelined XREVRANGE COUNT 1."""
0 commit comments