Skip to content

Commit d5180f7

Browse files
committed
Resume transfers + switch to Redis streams + clean
1 parent 72eb99a commit d5180f7

File tree

9 files changed

+354
-261
lines changed

9 files changed

+354
-261
lines changed

lib/metadata.py

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
from starlette.datastructures import Headers
2-
from pydantic import BaseModel, Field, field_validator, ByteSize, StrictStr, ConfigDict, AliasChoices
2+
from pydantic import BaseModel, Field, field_validator, ByteSize, ConfigDict, AliasChoices
33
from typing import Optional, Self, Annotated
44

55

66
class FileMetadata(BaseModel):
7-
name: StrictStr = Field(description="File name", min_length=2, max_length=255)
7+
name: str = Field(description="File name", min_length=1, max_length=255)
88
size: ByteSize = Field(description="Size in bytes", gt=0)
9-
type: StrictStr = Field(description="MIME type", default='application/octet-stream')
9+
type: str = Field(description="MIME type", default='application/octet-stream')
1010

1111
model_config = ConfigDict(title="File transfer metadata", alias_generator=lambda s: f'file_{s}', populate_by_name=True, validate_by_name=True)
1212

@@ -29,7 +29,7 @@ def get_from_http_headers(cls, headers: Headers, filename: str) -> Self:
2929
return cls(
3030
name=filename,
3131
size=headers.get('content-length', '0'),
32-
type=headers.get('content-type', '') or None
32+
type=headers.get('content-type', '')
3333
)
3434

3535
@classmethod

lib/store.py

Lines changed: 42 additions & 87 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,10 @@ def __init__(self, transfer_id: str):
1919
self.transfer_id = transfer_id
2020
self.redis = self.get_redis()
2121

22-
self._k_queue = self.key('queue')
23-
self._k_meta = self.key('metadata')
24-
self._k_cleanup = f'cleanup:{transfer_id}'
25-
self._k_receiver_connected = self.key('receiver_connected')
22+
self._k_stream = self.key('stream')
23+
self._k_metadata = self.key('metadata')
24+
self._k_position = self.key('position')
25+
self._k_progress = self.key('progress')
2626

2727
@classmethod
2828
def get_redis(cls) -> redis.Redis:
@@ -36,26 +36,26 @@ def key(self, name: str) -> str:
3636
"""Get the Redis key for this transfer with the provided name."""
3737
return f'transfer:{self.transfer_id}:{name}'
3838

39-
## Queue operations ##
39+
async def add_chunk(self, data: bytes) -> None:
40+
"""Add chunk to stream."""
41+
# No maxlen limit - streams auto-expire after 5 minutes
42+
await self.redis.xadd(self._k_stream, {'data': data})
4043

41-
async def _wait_for_queue_space(self, maxsize: int) -> None:
42-
while await self.redis.llen(self._k_queue) >= maxsize:
43-
await anyio.sleep(0.5)
44+
async def stream_chunks(self, timeout_ms: int = 20000):
45+
"""Stream chunks from last position."""
46+
position = await self.redis.get(self._k_position)
47+
last_id = position.decode() if position else '0'
4448

45-
async def put_in_queue(self, data: bytes, maxsize: int = 16, timeout: float = 20.0) -> None:
46-
"""Add data to the transfer queue with backpressure control."""
47-
with anyio.fail_after(timeout):
48-
await self._wait_for_queue_space(maxsize)
49-
await self.redis.lpush(self._k_queue, data)
50-
51-
async def get_from_queue(self, timeout: float = 20.0) -> bytes:
52-
"""Get data from the transfer queue with timeout."""
53-
result = await self.redis.brpop([self._k_queue], timeout=timeout)
54-
if not result:
55-
raise TimeoutError("Timeout waiting for data")
49+
while True:
50+
result = await self.redis.xread({self._k_stream: last_id}, block=timeout_ms)
51+
if not result:
52+
raise TimeoutError("Stream read timeout")
5653

57-
_, data = result
58-
return data
54+
_, messages = result[0]
55+
for message_id, fields in messages:
56+
last_id = message_id
57+
await self.redis.set(self._k_position, last_id, ex=300)
58+
yield fields[b'data']
5959

6060
## Event operations ##
6161

@@ -99,80 +99,35 @@ async def wait_for_event(self, event_name: str, timeout: float = 300.0) -> None:
9999
await pubsub.unsubscribe(event_key)
100100
await pubsub.aclose()
101101

102-
## Metadata operations ##
103-
104102
async def set_metadata(self, metadata: str) -> None:
105103
"""Store transfer metadata."""
106-
challenge = random.randbytes(8)
107-
await self.redis.set(self._k_meta, challenge, nx=True)
108-
if await self.redis.get(self._k_meta) == challenge:
109-
await self.redis.set(self._k_meta, metadata, ex=300)
110-
else:
111-
raise KeyError("Metadata already set for this transfer.")
104+
if not await self.redis.set(self._k_metadata, metadata, nx=True, ex=300):
105+
raise KeyError("Transfer already exists")
112106

113107
async def get_metadata(self) -> str | None:
114-
"""Retrieve transfer metadata."""
115-
return await self.redis.get(self._k_meta)
116-
117-
## Transfer state operations ##
118-
119-
async def set_receiver_connected(self) -> bool:
120-
"""
121-
Mark that a receiver has connected for this transfer.
122-
Returns True if the flag was set, False if it was already created.
123-
"""
124-
return bool(await self.redis.set(self._k_receiver_connected, '1', ex=300, nx=True))
125-
126-
async def is_receiver_connected(self) -> bool:
127-
"""Check if a receiver has already connected."""
128-
return await self.redis.exists(self._k_receiver_connected) > 0
129-
130-
async def set_completed(self) -> None:
131-
"""Mark the transfer as completed."""
132-
await self.redis.set(f'completed:{self.transfer_id}', '1', ex=300, nx=True)
133-
134-
async def is_completed(self) -> bool:
135-
"""Check if the transfer is marked as completed."""
136-
return await self.redis.exists(f'completed:{self.transfer_id}') > 0
137-
138-
async def set_interrupted(self) -> None:
139-
"""Mark the transfer as interrupted."""
140-
await self.redis.set(f'interrupt:{self.transfer_id}', '1', ex=300, nx=True)
141-
await self.redis.ltrim(self._k_queue, 0, 0)
142-
143-
async def is_interrupted(self) -> bool:
144-
"""Check if the transfer was interrupted."""
145-
return await self.redis.exists(f'interrupt:{self.transfer_id}') > 0
146-
147-
## Cleanup operations ##
148-
149-
async def cleanup_started(self) -> bool:
150-
"""
151-
Check if cleanup has already been initiated for this transfer.
152-
This uses a set/get pattern with challenge to avoid race conditions.
153-
"""
154-
challenge = random.randbytes(8)
155-
await self.redis.set(self._k_cleanup, challenge, ex=60, nx=True)
156-
if await self.redis.get(self._k_cleanup) == challenge:
157-
return False
158-
return True
159-
160-
async def cleanup(self) -> int:
161-
"""Remove all keys related to this transfer."""
162-
if await self.cleanup_started():
163-
return 0
108+
"""Get transfer metadata."""
109+
return await self.redis.get(self._k_metadata)
164110

165-
pattern = self.key('*')
166-
keys_to_delete = set()
111+
async def save_progress(self, bytes_downloaded: int) -> None:
112+
"""Save download progress."""
113+
await self.redis.set(self._k_progress, str(bytes_downloaded), ex=300)
167114

115+
async def get_progress(self) -> int:
116+
"""Get download progress."""
117+
progress = await self.redis.get(self._k_progress)
118+
return int(progress) if progress else 0
119+
120+
async def cleanup(self) -> None:
121+
"""Delete all transfer data."""
122+
pattern = self.key('*')
168123
cursor = 0
124+
keys = []
125+
169126
while True:
170-
cursor, keys = await self.redis.scan(cursor, match=pattern)
171-
keys_to_delete |= set(keys)
127+
cursor, batch = await self.redis.scan(cursor, match=pattern)
128+
keys.extend(batch)
172129
if cursor == 0:
173130
break
174131

175-
if keys_to_delete:
176-
self.debug(f"- Cleaning up {len(keys_to_delete)} keys")
177-
return await self.redis.delete(*keys_to_delete)
178-
return 0
132+
if keys:
133+
await self.redis.delete(*keys)

lib/transfer.py

Lines changed: 45 additions & 84 deletions
Original file line numberDiff line numberDiff line change
@@ -37,12 +37,14 @@ def __init__(self, uid: str, file: FileMetadata):
3737

3838
@classmethod
3939
async def create(cls, uid: str, file: FileMetadata):
40+
"""Create a new transfer using the provided identifier and file metadata."""
4041
transfer = cls(uid, file)
4142
await transfer.store.set_metadata(file.to_json())
4243
return transfer
4344

4445
@classmethod
4546
async def get(cls, uid: str):
47+
"""Fetch a transfer from the store using the provided identifier."""
4648
store = Store(uid)
4749
metadata_json = await store.get_metadata()
4850
if not metadata_json:
@@ -58,122 +60,81 @@ def _format_uid(uid: str):
5860
def get_file_info(self):
5961
return self.file.name, self.file.size, self.file.type
6062

61-
async def wait_for_event(self, event_name: str, timeout: float = 300.0):
62-
await self.store.wait_for_event(event_name, timeout)
63+
async def notify_receiver_connected(self):
64+
"""Notify sender that receiver connected."""
65+
await self.store.set_event('receiver_connected')
6366

64-
async def set_client_connected(self):
65-
self.debug(f"▼ Notifying sender that receiver is connected...")
66-
await self.store.set_event('client_connected')
67+
async def wait_for_receiver(self):
68+
"""Wait for receiver to connect."""
69+
self.info(f"△ Waiting for receiver...")
70+
await self.store.wait_for_event('receiver_connected')
71+
self.debug(f"△ Receiver connected")
6772

68-
async def wait_for_client_connected(self):
69-
self.info(f"△ Waiting for client to connect...")
70-
await self.wait_for_event('client_connected')
71-
self.debug(f"△ Received client connected notification.")
72-
73-
async def is_receiver_connected(self) -> bool:
74-
return await self.store.is_receiver_connected()
75-
76-
async def set_receiver_connected(self) -> bool:
77-
return await self.store.set_receiver_connected()
78-
79-
async def is_interrupted(self) -> bool:
80-
return await self.store.is_interrupted()
81-
82-
async def set_interrupted(self):
83-
await self.store.set_interrupted()
84-
85-
async def is_completed(self) -> bool:
86-
return await self.store.is_completed()
87-
88-
async def set_completed(self):
89-
await self.store.set_completed()
90-
91-
async def collect_upload(self, stream: AsyncIterator[bytes], on_error: Callable[[Exception | str], Awaitable[None]]) -> None:
73+
async def consume_upload(self, stream: AsyncIterator[bytes], on_error: Callable[[Exception | str], Awaitable[None]]) -> None:
74+
"""Consume upload stream and add chunks to Redis stream."""
9275
self.bytes_uploaded = 0
9376

9477
try:
9578
async for chunk in stream:
9679
if not chunk:
97-
self.debug(f"△ Empty chunk received, ending upload.")
9880
break
9981

100-
if await self.is_interrupted():
101-
raise TransferError("Transfer was interrupted by the receiver.", propagate=False)
102-
103-
await self.store.put_in_queue(chunk)
82+
await self.store.add_chunk(chunk)
10483
self.bytes_uploaded += len(chunk)
10584

10685
if self.bytes_uploaded < self.file.size:
107-
raise TransferError("Received less data than expected.", propagate=True)
86+
raise TransferError("Incomplete upload", propagate=True)
10887

109-
self.debug(f"△ End of upload, sending done marker.")
110-
await self.store.put_in_queue(self.DONE_FLAG)
88+
await self.store.add_chunk(self.DONE_FLAG)
89+
self.debug(f"△ Upload complete: {self.bytes_uploaded} bytes")
11190

112-
except (ClientDisconnect, WebSocketDisconnect) as e:
113-
self.error(f"△ Unexpected upload error: {e}")
114-
await self.store.put_in_queue(self.DEAD_FLAG)
91+
except (ClientDisconnect, WebSocketDisconnect):
92+
self.error(f"△ Sender disconnected")
93+
await self.store.add_chunk(self.DEAD_FLAG)
11594

116-
except TimeoutError as e:
117-
self.warning(f"△ Timeout during upload.", exc_info=True)
118-
await on_error("Timeout during upload.")
95+
except TimeoutError:
96+
self.warning(f"△ Upload timeout")
97+
await on_error("Upload timeout")
11998

12099
except TransferError as e:
121-
self.warning(f"△ Upload error: {e}")
122100
if e.propagate:
123-
await self.store.put_in_queue(self.DEAD_FLAG)
124-
else:
125-
await on_error(e)
101+
await self.store.add_chunk(self.DEAD_FLAG)
102+
await on_error(e)
126103

127-
finally:
128-
await anyio.sleep(1.0)
104+
async def produce_download(self, on_error: Callable[[Exception | str], Awaitable[None]]) -> AsyncIterator[bytes]:
105+
"""Produce download stream from Redis stream."""
106+
self.bytes_downloaded = await self.store.get_progress()
129107

130-
async def supply_download(self, on_error: Callable[[Exception | str], Awaitable[None]]) -> AsyncIterator[bytes]:
131-
self.bytes_downloaded = 0
108+
if self.bytes_downloaded > 0:
109+
self.info(f"▼ Resuming from byte {self.bytes_downloaded}")
132110

133111
try:
134-
while True:
135-
chunk = await self.store.get_from_queue()
136-
112+
async for chunk in self.store.stream_chunks():
137113
if chunk == self.DEAD_FLAG:
138-
raise TransferError("Sender disconnected.")
139-
140-
if chunk == self.DONE_FLAG and self.bytes_downloaded < self.file.size:
141-
raise TransferError("Received less data than expected.")
114+
raise TransferError("Sender disconnected")
142115

143-
elif chunk == self.DONE_FLAG:
144-
self.debug(f"▼ Done marker received, ending download.")
116+
if chunk == self.DONE_FLAG:
117+
if self.bytes_downloaded >= self.file.size:
118+
self.debug(f"▼ Download complete: {self.bytes_downloaded} bytes")
145119
break
146120

147121
self.bytes_downloaded += len(chunk)
122+
await self.store.save_progress(self.bytes_downloaded)
148123
yield chunk
149124

150-
except Exception as e:
151-
self.error(f"▼ Unexpected download error!", exc_info=True)
152-
self.debug("Debug info:", stack_info=True)
153-
await on_error(e)
154-
155125
except TransferError as e:
156-
self.warning(f"▼ Download error")
126+
await on_error(e)
127+
except Exception as e:
128+
self.error(f"▼ Download error", exc_info=True)
157129
await on_error(e)
158130

159131
async def cleanup(self):
160-
try:
161-
with anyio.fail_after(30.0):
162-
await self.store.cleanup()
163-
except TimeoutError:
164-
self.warning(f"- Cleanup timed out.")
165-
pass
132+
"""Clean up transfer data."""
133+
await self.store.cleanup()
166134

167135
async def finalize_download(self):
168-
# self.debug("▼ Finalizing download...")
169-
if self.bytes_downloaded < self.file.size and not await self.is_interrupted():
170-
self.warning("▼ Client disconnected before download was complete.")
171-
await self.set_interrupted()
172-
173-
await self.cleanup()
174-
# self.debug("▼ Finalizing download...")
175-
if self.bytes_downloaded < self.file.size and not await self.is_interrupted():
176-
self.warning("▼ Client disconnected before download was complete.")
177-
await self.set_interrupted()
178-
179-
await self.cleanup()
136+
"""Finalize download and cleanup if complete."""
137+
if self.bytes_downloaded < self.file.size:
138+
self.info(f"▼ Download paused at {self.bytes_downloaded}/{self.file.size} bytes")
139+
else:
140+
await self.cleanup()

static/index.html

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ <h2>Send a file</h2>
5454
<section class="section">
5555
<div class="code-section">
5656
<h3>Using cURL</h3>
57-
<p>You can use the <code class="inline-highlight">curl</code> command to transfer from your terminal. 100 MiB maximum.</p>
57+
<p>You can use the <code class="inline-highlight">curl</code> command to transfer from your terminal. 1 GiB maximum.</p>
5858

5959
<div class="code-block">
6060
<code><span class="code-comment"># Send</span>

0 commit comments

Comments
 (0)