refactor(rust): update CloudFetchConfig, remove legacy types, add pipeline types#283
refactor(rust): update CloudFetchConfig, remove legacy types, add pipeline types#283eric-wang-1990 wants to merge 3 commits intomainfrom
Conversation
Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
…eline types Update CloudFetchConfig to align with C# reference driver defaults: - Add max_refresh_retries (default 3), num_download_workers (default 3), url_expiration_buffer_secs (default 60) - Correct max_retries (5→3) and retry_delay (1500ms→500ms) - Remove chunk_ready_timeout field Remove public ChunkEntry/ChunkState types and LINK_EXPIRY_BUFFER_SECS/ DEFAULT_CHUNK_READY_TIMEOUT_SECS constants. Legacy copies of ChunkEntry/ ChunkState are kept private in streaming_provider.rs until pipeline rewrite. Update CloudFetchLink::is_expired() to accept a buffer_secs parameter instead of using a hardcoded constant. Introduce pipeline_types.rs with ChunkDownloadTask, ChunkHandle, and create_chunk_pair() helper for the channel-based pipeline architecture. PECO-2927, types portion of PECO-2928 Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Refactors Rust CloudFetch configuration/types to align defaults with the C# reference driver, removes legacy public chunk state types, and introduces foundational pipeline structs for an upcoming channel-based CloudFetch architecture.
Changes:
- Update
CloudFetchConfigdefaults/fields (add refresh retry + worker count + URL expiry buffer; remove chunk-ready timeout) and adjustCloudFetchLink::is_expiredto accept a buffer parameter. - Remove public
ChunkEntry/ChunkStateexports and keep legacy equivalents private within the legacy DashMap-based provider. - Add
pipeline_types.rs(task/handle + helper) and export these types from the CloudFetch reader module.
Reviewed changes
Copilot reviewed 8 out of 8 changed files in this pull request and generated 4 comments.
Show a summary per file
| File | Description |
|---|---|
| rust/src/types/mod.rs | Stops re-exporting legacy chunk state types from types. |
| rust/src/types/cloudfetch.rs | Updates CloudFetch config fields/defaults; changes link expiry API; removes legacy public chunk types. |
| rust/src/reader/cloudfetch/streaming_provider.rs | Introduces module-local legacy chunk types; threads configurable URL expiry buffer into legacy expiry checks; replaces removed timeout config with a local constant. |
| rust/src/reader/cloudfetch/pipeline_types.rs | Adds task/handle pipeline types and helper for upcoming channel-based architecture (with unit tests). |
| rust/src/reader/cloudfetch/mod.rs | Wires in and re-exports pipeline types from the CloudFetch reader module. |
| rust/src/database.rs | Adds new CloudFetch option keys to mutate the new config fields (removes the old chunk-ready-timeout setter). |
| rust/spec/sprint-plan-cloudfetch-redesign.md | Adds sprint plan documentation for the CloudFetch pipeline redesign effort. |
| rust/spec/cloudfetch-pipeline-redesign.md | Adds a detailed design spec for the DashMap → channel-based pipeline rewrite. |
Comments suppressed due to low confidence (1)
rust/src/database.rs:247
- The option value is parsed as an i64 and cast directly to u32. Negative values will wrap to a large u32, making the expiration buffer effectively decades and forcing constant link refreshes/behavior changes. Validate non-negative input (and ideally cap to a reasonable max) before assigning url_expiration_buffer_secs.
"databricks.cloudfetch.url_expiration_buffer_secs" => {
if let Some(v) = Self::parse_int_option(&value) {
self.cloudfetch_config.url_expiration_buffer_secs = v as u32;
Ok(())
} else {
Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc())
}
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| /// Entry for a chunk in the chunks map (legacy, module-local). | ||
| #[derive(Clone)] | ||
| struct ChunkEntry { | ||
| /// Link for this chunk. | ||
| link: Option<CloudFetchLink>, | ||
| /// Current state of this chunk. | ||
| state: ChunkState, | ||
| /// Parsed record batches (populated when state is Downloaded). | ||
| batches: Option<Vec<RecordBatch>>, |
There was a problem hiding this comment.
ChunkEntry derives Clone but can hold Vec (potentially very large). Since this type is module-local and there are no in-file clones, consider removing the Clone derive (or storing batches behind Arc) to prevent accidental large data clones during future maintenance.
There was a problem hiding this comment.
Good catch. ChunkEntry and ChunkState are deleted entirely in the new design (spec section "ChunkEntry / ChunkState (types.rs)") so this becomes a non-issue. Will remove the Clone derive when that cleanup happens in the pipeline rewrite PR.
This comment was generated with GitHub MCP.
| self.cloudfetch_config.max_refresh_retries = v as u32; | ||
| Ok(()) |
There was a problem hiding this comment.
The option value is parsed as an i64 (parse_int_option) and then cast directly to u32. Negative values would wrap to a very large u32 and could lead to extreme retry/refresh behavior. Validate that the parsed value is non-negative (and ideally within a reasonable upper bound) before assigning to max_refresh_retries; otherwise return set_invalid_option.
| self.cloudfetch_config.max_refresh_retries = v as u32; | |
| Ok(()) | |
| if v >= 0 && v <= u32::MAX as i64 { | |
| self.cloudfetch_config.max_refresh_retries = v as u32; | |
| Ok(()) | |
| } else { | |
| Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc()) | |
| } |
There was a problem hiding this comment.
Valid. The i64 -> u32 cast for max_refresh_retries should validate that the parsed value is non-negative before truncating. Will add a bounds check in database.rs (reject negative values with an invalid_argument error) in a follow-up.
This comment was generated with GitHub MCP.
| "databricks.cloudfetch.num_download_workers" => { | ||
| if let Some(v) = Self::parse_int_option(&value) { | ||
| self.cloudfetch_config.num_download_workers = v as usize; | ||
| Ok(()) | ||
| } else { | ||
| Err(DatabricksErrorHelper::set_invalid_option(&key, &value).to_adbc()) | ||
| } |
There was a problem hiding this comment.
The option value is parsed as an i64 and cast directly to usize. If a user supplies a negative value, it will wrap to a huge usize and may attempt to spawn an enormous number of download workers (once the pipeline uses this field), potentially causing OOM or denial of service. Reject negative values and consider enforcing a minimum of 1 and a sensible maximum.
There was a problem hiding this comment.
Same issue as max_refresh_retries. Will add a non-negative bounds check for num_download_workers (i64 -> usize) and url_expiration_buffer_secs (i64 -> u32) in database.rs as part of the same follow-up.
This comment was generated with GitHub MCP.
| pub max_retries: u32, | ||
| /// Delay between retry attempts. | ||
| /// Delay between retry attempts (used with linear backoff: retry_delay * (attempt + 1)). | ||
| pub retry_delay: Duration, |
There was a problem hiding this comment.
CloudFetchConfig.retry_delay is now documented as being used with linear backoff (retry_delay * (attempt + 1)), but the current legacy DashMap-based provider still sleeps a constant retry_delay on each retry. Either update the legacy retry loop to apply the documented linear backoff, or soften/clarify this field doc to match the behavior until the pipeline rewrite lands.
There was a problem hiding this comment.
Good catch on the doc/behavior mismatch. The current StreamingCloudFetchProvider uses a constant sleep (legacy behavior). The linear backoff (retry_delay * (attempt + 1)) is the target behavior for the new channel-based pipeline in the next PR. Will update the doc comment to distinguish: legacy provider keeps constant delay; new pipeline uses linear backoff.
This comment was generated with GitHub MCP.
| end | ||
| ``` | ||
|
|
||
| - `fetch_links()` returns a **batch** of `CloudFetchLink` values (`Vec<CloudFetchLink>`), not a |
There was a problem hiding this comment.
how is this batch size decided? ChunkLinkFetcher prefetches links so do we simply get all prefetched links at a point of time? Does it signal to the chunk link fetcher that these links have been consumed causing next prefetch?
There was a problem hiding this comment.
Good question — the spec was unclear here. Updated the spec to clarify:
-
Batch size is server-determined — one
fetch_links()call maps to one API call and returns however many links the server provides in that page. The client has no control over batch size. -
Prefetch signaling is implicit — calling
fetch_links(next_chunk_index)internally updatesSeaChunkLinkFetcher'scurrent_consumer_indexand callsmaybe_trigger_prefetch(). Ifnext_server_fetch_index <= consumer + link_prefetch_window (128), a backgroundtokio::spawntask fetches the next page and caches links in the DashMap. The scheduler does not signal prefetch explicitly — it's triggered as a side effect offetch_links(). -
Cache-first —
fetch_links()returns from the DashMap cache immediately if links are available; falls back to a synchronous server fetch only on cache miss (prefetch hasn't caught up yet).
| **Retry sleep** matches C#'s `Task.Delay(_retryDelayMs * (retry + 1))`: linear backoff | ||
| (`retry_delay * (attempt + 1)`), not constant and not exponential. | ||
|
|
||
| **Retry contract:** |
There was a problem hiding this comment.
we would ideally like to build a common retry interface at the http client layer for all operations in the driver. seems like we introduce some cloud fetch specificity here, is that necessary?
There was a problem hiding this comment.
Good point on the interface boundary. The 401/403/404 retry path requires calling refetch_link() on ChunkLinkFetcher, which holds CloudFetch-specific state (cached links, prefetch window, server API client) that a generic HTTP client layer cannot access. Moving URL refresh into the HTTP client layer would couple it to CloudFetch semantics via a callback or trait object.
The split we would eventually want: a shared HTTP client owns 5xx/network retries (generic), while URL refresh stays in the download worker to access ChunkLinkFetcher (CloudFetch-specific). The spec now calls this out explicitly in the worker section. This mirrors the C# design where DownloadFileAsync owns the combined retry + refresh loop directly. Happy to revisit the interface boundary when we implement the HTTP client layer.
This comment was generated with GitHub MCP.
jadewang-db
left a comment
There was a problem hiding this comment.
is this a design only review or already include all the implementations?
|
|
||
| ### Download Worker (replaces `download_chunk_with_retry`) | ||
|
|
||
| Each worker is a long-lived `tokio::spawn` task that loops over `download_channel`. |
There was a problem hiding this comment.
will this scenario happen? 4 chunks and 3 downloaders, somehow, 3 downloaders are downloading chunk 2 3 and 4, and we are running out of memory, thus we are in a deadlock situation, because chunk 1 is blocked and not able to start download, but the consumer still waiting to consume chunk 1
There was a problem hiding this comment.
This scenario cannot occur by construction. download_channel is a FIFO queue so chunk 1 is always dequeued before chunk 2, before chunk 3, etc. A worker cannot skip to chunk 2 while chunk 1 is still in the queue.
The real concern is memory pressure: if chunks 2/3/4 finish downloading while the consumer is slow reading chunk 1, their results sit in fired oneshot senders. This is bounded by max_chunks_in_memory (the channel capacities), so at most that many completed results accumulate in memory. Updated the spec to clarify this.
This comment was generated with GitHub MCP.
| W->>W: proactive expiry check (link.is_expired()) | ||
| opt link expired or expiring within url_expiration_buffer_secs | ||
| W->>LF: refetch_link(chunk_index) | ||
| LF-->>W: fresh CloudFetchLink |
There was a problem hiding this comment.
is this a in place update?
There was a problem hiding this comment.
No. refetch_link() returns a fresh CloudFetchLink value that is assigned to the worker's local link variable. No shared state is mutated, no lock acquired. This is one of the key improvements over the DashMap design, where calling refetch_link().await while holding a get_mut() guard was impossible. Added a note to the spec to make this explicit.
This comment was generated with GitHub MCP.
|
Replying to @jadewang-db's review question ("is this a design only review or already include all the implementations?"): This PR includes both:
The config type changes land here first so the pipeline rewrite PR can build on a stable config foundation. |
…ign details Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
## Summary - Add CloudFetch pipeline redesign spec with channel-based architecture design - Add sprint plan with task breakdown and DoD checklist - Update spec with channel design details (bounded result channel, unbounded download channel) This is a design-only PR with no code changes. ## Stacked PRs - [**stack/task-0-cloudfetch-design**](this PR) — design docs - [stack/task-1-config-types-and-pipeline-types](#283) — impl: config + pipeline types - [stack/task-2-scheduler-workers-consumer-rewrite](#284) — impl: scheduler, workers, consumer 🤖 Generated with [Claude Code](https://claude.com/claude-code) --------- Co-authored-by: Claude Sonnet 4.6 <noreply@anthropic.com>
🥞 Stacked PR
Use this link to review incremental changes.
Update CloudFetchConfig to align with C# reference driver defaults:
url_expiration_buffer_secs (default 60)
Remove public ChunkEntry/ChunkState types and LINK_EXPIRY_BUFFER_SECS/
DEFAULT_CHUNK_READY_TIMEOUT_SECS constants. Legacy copies of ChunkEntry/
ChunkState are kept private in streaming_provider.rs until pipeline rewrite.
Update CloudFetchLink::is_expired() to accept a buffer_secs parameter
instead of using a hardcoded constant.
Introduce pipeline_types.rs with ChunkDownloadTask, ChunkHandle, and
create_chunk_pair() helper for the channel-based pipeline architecture.
PECO-2927, types portion of PECO-2928
Co-Authored-By: Claude Opus 4.6 noreply@anthropic.com