@@ -23,6 +23,7 @@ def __init__(self, transfer_id: str):
2323 self ._k_metadata = self .key ('metadata' )
2424 self ._k_position = self .key ('position' )
2525 self ._k_progress = self .key ('progress' )
26+ self ._k_receiver_active = self .key ('receiver_active' )
2627
2728 @classmethod
2829 def get_redis (cls ) -> redis .Redis :
@@ -38,16 +39,15 @@ def key(self, name: str) -> str:
3839
3940 async def add_chunk (self , data : bytes ) -> None :
4041 """Add chunk to stream."""
41- # No maxlen limit - streams auto-expire after 5 minutes
4242 await self .redis .xadd (self ._k_stream , {'data' : data })
4343
44- async def stream_chunks (self , timeout_ms : int = 20000 ):
44+ async def stream_chunks (self , read_timeout : float = 20.0 ):
4545 """Stream chunks from last position."""
4646 position = await self .redis .get (self ._k_position )
4747 last_id = position .decode () if position else '0'
4848
4949 while True :
50- result = await self .redis .xread ({self ._k_stream : last_id }, block = timeout_ms )
50+ result = await self .redis .xread ({self ._k_stream : last_id }, block = int ( read_timeout * 1000 ) )
5151 if not result :
5252 raise TimeoutError ("Stream read timeout" )
5353
@@ -119,11 +119,11 @@ async def get_progress(self) -> int:
119119
120120 async def set_receiver_active (self ) -> None :
121121 """Mark receiver as actively downloading with TTL."""
122- await self .redis .set (self .key ( 'receiver_active' ) , '1' , ex = 5 )
122+ await self .redis .set (self ._k_receiver_active , '1' , ex = 5 )
123123
124124 async def is_receiver_active (self ) -> bool :
125125 """Check if receiver is actively downloading."""
126- return bool (await self .redis .exists (self .key ( 'receiver_active' ) ))
126+ return bool (await self .redis .exists (self ._k_receiver_active ))
127127
128128 async def cleanup (self ) -> None :
129129 """Delete all transfer data."""
0 commit comments