File tree Expand file tree Collapse file tree 1 file changed +11
-7
lines changed
Expand file tree Collapse file tree 1 file changed +11
-7
lines changed Original file line number Diff line number Diff line change @@ -488,15 +488,19 @@ async def _read_stream_iteration(self) -> None:
488488 if self .allow_pick_jobs :
489489 if self .job_counter < self .max_jobs :
490490 stream_msgs = await self ._get_idle_tasks (count )
491- count = count - len (stream_msgs )
491+ msgs_count = sum ([len (msgs ) for _ , msgs in stream_msgs ])
492+
493+ count -= msgs_count
492494
493495 if count > 0 :
494- stream_msgs = await self .pool .xreadgroup (
495- groupname = self .consumer_group_name ,
496- consumername = self .worker_id ,
497- streams = {self .queue_name + stream_key_suffix : '>' },
498- count = count ,
499- block = int (max (self .stream_block_s * 1000 , 1 )),
496+ stream_msgs .extend (
497+ await self .pool .xreadgroup (
498+ groupname = self .consumer_group_name ,
499+ consumername = self .worker_id ,
500+ streams = {self .queue_name + stream_key_suffix : '>' },
501+ count = count ,
502+ block = int (max (self .stream_block_s * 1000 , 1 )),
503+ )
500504 )
501505
502506 jobs = []
You can’t perform that action at this time.
0 commit comments