Skip to content

Commit f36c42d

Browse files
docs(rust): update cloudfetch pipeline redesign spec with channel design details
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1 parent 9239a4c commit f36c42d

File tree

2 files changed

+86
-285
lines changed

2 files changed

+86
-285
lines changed

rust/spec/cloudfetch-pipeline-redesign.md

Lines changed: 86 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,19 @@
1+
<!--
2+
Copyright (c) 2025 ADBC Drivers Contributors
3+
4+
Licensed under the Apache License, Version 2.0 (the "License");
5+
you may not use this file except in compliance with the License.
6+
You may obtain a copy of the License at
7+
8+
http://www.apache.org/licenses/LICENSE-2.0
9+
10+
Unless required by applicable law or agreed to in writing, software
11+
distributed under the License is distributed on an "AS IS" BASIS,
12+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
See the License for the specific language governing permissions and
14+
limitations under the License.
15+
-->
16+
117
# CloudFetch Pipeline Redesign: DashMap → Channel-Based Pipeline
218

319
**Status:** Proposed
@@ -61,14 +77,22 @@ graph LR
6177
**Two channels replace the DashMap:**
6278

6379
- **`download_channel`** — scheduler pushes `ChunkDownloadTask` items; download workers pull
64-
from it. Unbounded (backpressure is applied via `result_channel`).
80+
from it. Bounded to `max_chunks_in_memory` using `async-channel` (MPMC). The scheduler's
81+
sequential loop means `download_channel` and `result_channel` are always filled in lockstep,
82+
so the same bound caps both.
6583
- **`result_channel`** — scheduler pushes `ChunkHandle` items *in chunk-index order*, bounded
6684
to `max_chunks_in_memory`. The consumer reads from it in order and awaits each handle.
6785

6886
Items are enqueued to `result_channel` by the scheduler before the download starts, preserving
6987
sequential ordering even when downloads complete out of order. This matches the C# pattern in
7088
`CloudFetchDownloader.cs` (result enqueued before download task is awaited).
7189

90+
**Why `async-channel` for `download_channel`?** `tokio::sync::mpsc` is single-consumer —
91+
N workers cannot each call `.recv()` on the same receiver. `async-channel` is MPMC: each
92+
worker clones the receiver and blocks on `.recv()` independently. When a task arrives, exactly
93+
one worker wakes up. No mutex needed. Equivalent to C#'s `BlockingCollection` consumed via
94+
`GetConsumingEnumerable` with a `SemaphoreSlim` controlling parallelism.
95+
7296
---
7397

7498
## Key Types
@@ -125,9 +149,14 @@ sequenceDiagram
125149
```
126150

127151
- `fetch_links()` returns a **batch** of `CloudFetchLink` values (`Vec<CloudFetchLink>`), not a
128-
single link. The scheduler iterates the batch and creates one oneshot pair per link.
129-
- `SeaChunkLinkFetcher` caches and prefetches links internally; the scheduler simply consumes
130-
what `fetch_links()` returns, advancing `next_chunk_index` after each batch.
152+
single link. The batch size is **server-determined** — one `fetch_links()` call maps to one
153+
API call and returns however many links the server provides in that page.
154+
- Calling `fetch_links(next_chunk_index)` internally updates `SeaChunkLinkFetcher`'s
155+
`current_consumer_index` and triggers a background prefetch task if the prefetch window
156+
(`link_prefetch_window`, default 128 chunks) needs filling. The scheduler does not need to
157+
signal prefetch explicitly — `fetch_links()` handles it.
158+
- `SeaChunkLinkFetcher` returns from its `DashMap` cache immediately if links are available;
159+
falls back to a synchronous server fetch only on a cache miss (prefetch hasn't caught up).
131160
- Creates a `oneshot` channel pair per chunk.
132161
- Sends `ChunkDownloadTask` to `download_channel` and `ChunkHandle` to `result_channel`.
133162
- The bounded `result_channel` provides backpressure automatically — no manual
@@ -171,8 +200,16 @@ sequenceDiagram
171200
end
172201
```
173202

174-
The worker owns `ChunkDownloadTask` outright. URL refresh mutates a local `link` variable —
175-
no map lookup, no lock, no guard. Mirrors C#'s `DownloadFileAsync` directly.
203+
The worker owns `ChunkDownloadTask` outright. When `refetch_link()` returns a fresh
204+
`CloudFetchLink`, it is assigned to the worker's local `link` variable — no shared state is
205+
mutated, no lock needed, no map lookup. This is the key improvement over the DashMap approach
206+
where calling `refetch_link().await` while holding a `get_mut()` guard was impossible.
207+
Mirrors C#'s `DownloadFileAsync` directly.
208+
209+
**Why retry logic lives in the worker, not a shared HTTP client layer:** the 401/403 path
210+
requires calling `refetch_link()` on `ChunkLinkFetcher`, which is CloudFetch-specific state
211+
unavailable to a generic HTTP client. A future shared HTTP client could own 5xx/network
212+
retries, but URL refresh must remain in the worker to access `ChunkLinkFetcher`.
176213

177214
**Proactive expiry check** (C# parity — `IsExpiredOrExpiringSoon`): before the first HTTP
178215
request for each chunk, the worker checks `link.is_expired()` using the
@@ -232,6 +269,9 @@ pub struct StreamingCloudFetchProvider {
232269

233270
// Cancellation
234271
cancel_token: CancellationToken,
272+
273+
// Worker task handles — awaited on drop for clean shutdown
274+
worker_handles: JoinSet<()>,
235275
}
236276
```
237277

@@ -259,11 +299,17 @@ Both types are no longer needed and can be deleted. The equivalent state lives o
259299
| Component | Concurrency primitive | Reason |
260300
|---|---|---|
261301
| Scheduler | Single `tokio::spawn` task | Sequential chunk ordering required |
262-
| Download workers | N `tokio::spawn` tasks sharing `download_channel` | Parallel downloads |
302+
| Download workers | N `tokio::spawn` tasks, each holding a cloned `async_channel::Receiver` | All N workers block on `.recv()` simultaneously; exactly one wakes per task |
263303
| Consumer | Caller's task (no spawn) | Sequential result consumption |
264-
| `result_channel` | Bounded `mpsc` (capacity = `max_chunks_in_memory`) | Backpressure without manual counter |
304+
| `download_channel` | `async_channel::bounded(max_chunks_in_memory)` | MPMC required; bounded to match `result_channel` capacity |
305+
| `result_channel` | `tokio::sync::mpsc` bounded (capacity = `max_chunks_in_memory`) | Single consumer; backpressure without manual counter |
265306
| URL refresh in worker | Local variable mutation | No shared state — no lock needed |
266307

308+
**Worker lifecycle:** `StreamingCloudFetchProvider` holds a `tokio::task::JoinSet<()>`
309+
containing the N worker task handles. On shutdown: `cancel_token.cancel()` → workers exit
310+
their recv loop → scheduler drops the `download_channel` sender → `JoinSet` is awaited on
311+
`StreamingCloudFetchProvider` drop to ensure clean teardown.
312+
267313
**Thread safety:** No shared mutable state between components. Each `ChunkDownloadTask` is
268314
owned by exactly one worker at a time. Each `ChunkHandle` is owned by the consumer.
269315

@@ -309,7 +355,7 @@ field is **removed**, and two defaults are **corrected** to match C#.
309355

310356
| Field | Old use | New use | Default change? |
311357
|---|---|---|---|
312-
| `max_chunks_in_memory` | Manual `AtomicUsize` counter | `mpsc::channel(max_chunks_in_memory)` capacity bound | No |
358+
| `max_chunks_in_memory` | Manual `AtomicUsize` counter | `async_channel::bounded` + `mpsc::channel` capacity for both channels | **Yes: → 10** (C# equivalent: 200 MB ÷ 20 MB max chunk size) |
313359
| `max_retries` | Per-download retry limit | Unchanged | **Yes: 5 → 3** (align with C# `MaxRetries = 3`) |
314360
| `retry_delay` | Constant sleep | Linear backoff: `retry_delay * (attempt + 1)` — matches C# `RetryDelayMs * (retry + 1)` | **Yes: 1500ms → 500ms** (align with C# `RetryDelayMs = 500`) |
315361
| `link_prefetch_window` | Background prefetch ahead of consumer | Unchanged — owned by `SeaChunkLinkFetcher` | No |
@@ -337,6 +383,37 @@ address the root issue of co-locating three concerns in one structure.
337383

338384
---
339385

386+
## Implementation Phases
387+
388+
### Phase 1: Foundation — Config and Pipeline Types
389+
390+
- Update `CloudFetchConfig`: add `max_refresh_retries`, `num_download_workers`,
391+
`url_expiration_buffer_secs`; remove `chunk_ready_timeout`; correct defaults for
392+
`max_chunks_in_memory`, `max_retries`, `retry_delay`
393+
- Remove legacy types `ChunkEntry` and `ChunkState` from `types.rs`
394+
- Define `ChunkDownloadTask` and `ChunkHandle` structs
395+
396+
Corresponds to PECO-2927 and the type definitions from PECO-2928.
397+
398+
### Phase 2: Core Pipeline — Scheduler, Workers, Consumer
399+
400+
- Implement the Scheduler task (fetch links → create oneshot pairs → push to both channels)
401+
- Implement Download Workers (retry contract, proactive expiry check, refetch on 401/403/404)
402+
- Implement Consumer (`next_batch` reads from `result_channel`, awaits each `ChunkHandle`)
403+
404+
Corresponds to PECO-2928, PECO-2929, PECO-2930.
405+
406+
### Phase 3: Integration — Provider Rewrite and Tests
407+
408+
- Refactor `StreamingCloudFetchProvider` struct: replace `DashMap` fields with channels,
409+
wire up scheduler and workers in constructor, clean shutdown via `JoinSet` + `CancellationToken`
410+
- Unit tests (9 tests — see Test Strategy below)
411+
- Integration tests (3 tests — see Test Strategy below)
412+
413+
Corresponds to PECO-2931, PECO-2932, PECO-2933.
414+
415+
---
416+
340417
## Test Strategy
341418

342419
### Unit Tests

0 commit comments

Comments
 (0)