Skip to content

bootstrap new scheduler#131

Merged
aajtodd merged 3 commits intos3-tm-vnextfrom
vnext-scheduler
Mar 5, 2026
Merged

bootstrap new scheduler#131
aajtodd merged 3 commits intos3-tm-vnextfrom
vnext-scheduler

Conversation

@aajtodd
Copy link
Copy Markdown
Contributor

@aajtodd aajtodd commented Feb 27, 2026

vNext Scheduler

What this PR adds

A new scheduler module (scheduler/) and its design doc (docs/design/scheduler.md). The scheduler is the core coordination layer for the transfer manager redesign. It holds transfers, polls them for work, controls ordering and admission, and submits work for execution. Nothing in the existing upload/download paths changes -- the new scheduler sits alongside the existing runtime::scheduler and is not yet wired into any operations.

This is the first PR in a series that lands the redesign incrementally:

  1. This PR: Scheduler + design doc + mock tests
  2. vnext-upload: Upload state machine on new scheduler
  3. vnext-download: Download state machine + seq window backpressure
  4. vnext-integration: Wire into Handle, remove old code paths

Why

The current scheduler (runtime::scheduler) is a token-bucket concurrency limiter. It gates how many tasks can run concurrently, but has no opinion about which tasks run, in what order, or how to balance work across transfers. Concurrency is controlled via Tower middleware (ConcurrencyLimitLayer), and work is spawned eagerly into a JoinSet. Once spawned, we lose control over ordering, priority, and cancellation.

This works for the simple case (one transfer, fixed concurrency), but breaks down for:

  • Priority control: Mountpoint needs to promote prefetches to active reads. No mechanism exists today.
  • Multi-transfer fairness: 100 concurrent downloads with no fairness means some starve while others monopolize capacity.
  • Memory bounding: Out-of-order download completions buffer unboundedly. The scheduler needs to integrate with backpressure so transfers that are ahead of their consumer stop generating work.
  • Adaptive concurrency: Fixed concurrency doesn't adapt to shared NICs or changing network conditions.

CRT's scheduler (aws-c-s3) solves some of these -- it uses a state machine vtable pattern and runs scheduling on a dedicated event loop -- but has no priority system and uses fixed concurrency derived from a throughput target.

Design

The design doc (rendered) has the full design. tl;dr:

Transfers as state machines. Each transfer implements trait Transfer with poll_work() and execute(). The scheduler polls transfers for work when capacity is available. Transfers produce work lazily -- a 10,000-part upload generates one work item per poll. This naturally bounds memory and in-flight work without the scheduler knowing anything about the operation's internals.

CFS fair scheduling. Adapted from the Linux kernel's Completely Fair Scheduler. Each transfer accumulates virtual runtime as it generates work. The ready set is ordered by vruntime, so the scheduler always picks the transfer that has received the least scheduling share. Priority acts as a weight on accumulation rate -- higher priority means slower accumulation, more work before yielding. A priority-1 transfer still makes progress because its vruntime stays low while it waits.

Edge-triggered ready set. Transfers enter the ready set when enqueued or woken. They leave when poll_work() returns Pending or Done. The scheduler never polls a transfer that returned Pending until something explicitly wakes it. Scheduling cost scales with active transfers, not total transfer count.

Capacity gating via ConcurrencyController trait. The scheduler checks controller.target() before polling. A FixedConcurrency controller returns a constant. An adaptive controller (future PR) observes throughput and adjusts. The scheduler doesn't care which -- it just asks "do I have capacity?"

Follow-on work bypasses CFS. When a work item completes and produces a successor (e.g., disk read completes, now send over network), the successor goes directly to execution. Re-entering the ready set would mean completed disk reads sit in buffers waiting behind other transfers. CFS controls admission; successors complete admitted work.

Execution layer is an abstraction boundary. The scheduler generates work and receives completions. It doesn't know how work runs. The current implementation uses a worker pool with tokio::spawn on the multi-threaded runtime -- the closest analog to main's JoinSet-based execution. The boundary is deliberate: the scheduler's internals (CFS, ready set, capacity tracking) don't change if the execution layer moves to managed threads with current-thread runtimes, or something else entirely.

What's in the module

scheduler/
  mod.rs              - module entry, re-exports
  scheduler.rs        - Scheduler struct, generate_work loop, worker spawning
  context.rs          - TransferContext, StateMachineStatus, transfer lifecycle
  transfer.rs         - Transfer trait
  transfer/mock.rs    - MockTransfer for deterministic testing
  transfer/test_util.rs - test helpers
  descriptor.rs       - TransferDescriptor (scheduler's handle to a transfer + vruntime)
  ready_set.rs        - CFS-ordered ready set (crossbeam-skiplist)
  concurrency.rs      - ConcurrencyController trait + FixedConcurrency
  work/
    mod.rs            - work module entry
    item.rs           - WorkItem, WorkKind, WorkOutcome, PollWork, TransferId
    pool.rs           - WorkerPool (queue + notify + capacity tracking)
    queue.rs          - WorkQueue (VecDeque + per-transfer index for purge)

The Handle struct now carries both schedulers:

pub(crate) struct Handle {
    pub(crate) config: crate::Config,
    pub(crate) scheduler: crate::scheduler::Scheduler,       // new
    pub(crate) legacy_scheduler: crate::runtime::scheduler::Scheduler, // existing
}

Existing upload/download operations use legacy_scheduler. The new scheduler is available but not called by any operation yet.

Benchmarks

Benchmarked on c6in.16xlarge (64 vCPU, 100 Gbps NIC). Single 30 GiB download to RAM, fixed concurrency at 128.

Configuration Steady Gb/s Pending/run ratio vs Main
Main branch (adaptive ~136 concurrency) 54 n/a baseline
New scheduler, seq window gap=16 3.7 ~3840 -93%
New scheduler, gap=128 32 ~3840 -40%
New scheduler, gap=256 49 ~17 -8%
New scheduler, gap=512 55 ~2 +2%

The "pending/run ratio" is how many times poll_work() returned Pending for every time it returned Ready with actual work. A high ratio means churn: the transfer keeps getting polled, keeps saying "not ready yet", gets woken on the next completion, and the cycle repeats. A ratio near 1 means almost every poll produces work.

Resource Usage: Main vs New Scheduler, seq_gap=512
30_gib_download_ram_256_gap

Configuration Peak RSS
Main branch 11.8 GB
New scheduler, gap=512 4.6 GB

The gap=16 result was the key finding: with a seq window too small relative to concurrency, transfers churn between Pending and Ready on every completion, and the scheduler spends all its time waking and re-polling instead of generating work. At gap=512 (4x concurrency), the churn disappears and throughput matches main while using 60% less memory.

With 128 concurrency and 8 MB parts, the theoretical minimum in-flight memory is ~1 GB (every body buffer full, nothing queued). The new scheduler's 4.6 GB reflects that plus sequencing buffers. Main's 11.8 GB reflects unbounded out-of-order buffering with no backpressure from the scheduler.

Scheduler overhead per work item is ~10us, under 1% of wall time at real throughput. At gap=512, the new scheduler matches main's throughput while using 60% less memory.

What's NOT in this PR

  • No changes to existing upload/download behavior
  • No adaptive concurrency (future: AdaptiveConcurrency controller)
  • No retry or hedging integration
  • No public API changes

@aajtodd aajtodd requested a review from a team as a code owner February 27, 2026 20:43
Copy link
Copy Markdown
Contributor

@landonxjames landonxjames left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall looks good to me, couple of questions but no blockers

self.pending.pop_front()
}

pub(super) fn mark_in_flight(&mut self) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like mark_in_flight is always called when pop returns Some. Could it be eliminated in favor of pop doing the update so callers don't have to remember to manually update the in_flight count?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I chose to keep them separate, it allows the execution layer to dequeue without marking it in-flight (e.g. to batch).

@ysaito1001
Copy link
Copy Markdown
Contributor

ysaito1001 commented Mar 4, 2026

To aid code review (for me at least), here are Kiro-generated diagrams showing

how the scheduler submodules interact: (static view)
┌─────────────────────────────────────────────────────────────────────┐
│                          scheduler.rs                               │
│                    (Main Scheduler struct)                          │
│  • Owns WorkerPool, ReadySet, ConcurrencyController                 │
│  • Runs generate_work() loop                                        │
│  • Polls transfers for work when capacity available                 │
└────────┬──────────────────┬──────────────────┬───────────────┬──────┘
         │                  │                  │               │
         │ checks           │ polls            │ submits       │ tracks
         │ capacity         │ transfers        │ work          │ lifecycle
         ▼                  ▼                  ▼               ▼
┌─────────────────┐  ┌──────────────┐  ┌─────────────┐  ┌──────────────┐
│ concurrency.rs  │  │ ready_set.rs │  │   work/     │  │ context.rs   │
│                 │  │              │  │             │  │              │
│ • Controller    │  │ • CFS-ordered│  │ • WorkItem  │  │ • Transfer   │
│   trait         │  │   skiplist   │  │ • WorkQueue │  │   Context    │
│ • Fixed         │  │ • vruntime   │  │ • WorkerPool│  │ • Status     │
│   Concurrency   │  │   tracking   │  │             │  │ • Lifecycle  │
└─────────────────┘  └──────┬───────┘  └──────┬──────┘  └──────────────┘
                            │                 │
                            │ contains        │ executes
                            ▼                 ▼
                     ┌──────────────┐  ┌─────────────────┐
                     │descriptor.rs │  │   transfer/     │
                     │              │  │                 │
                     │ • Transfer   │  │ • Transfer      │
                     │   Descriptor │  │   trait         │
                     │ • vruntime   │  │ • poll_work()   │
                     │ • priority   │  │ • execute()     │
                     │ • work counts│  │ • MockTransfer  │
                     └──────────────┘  └─────────────────┘

Flow:

  1. Scheduler checks ConcurrencyController.target() for capacity
  2. If capacity available, pops transfer from ReadySet (lowest vruntime)
  3. Calls Transfer.poll_work() via TransferContext
  4. If Ready(work), submits WorkItem to WorkerPool
  5. WorkerPool queues work and spawns workers to execute
  6. On completion, follow-on work bypasses ReadySet, goes direct to pool
  7. Transfer re-enters ReadySet when woken or returns Pending
  8. TransferDescriptor tracks vruntime, priority, queued/executing counts
single download transfer lifecycle (runtime view)

┌─────────────────────────────────────────────────────────────────┐
│ 1. TRANSFER ENQUEUED                                            │
└─────────────────────────────────────────────────────────────────┘
    User initiates download
         │
         ▼
    Transfer enters scheduler
         │
         ▼
    ┌──────────────────────────────────┐
    │ Wrapped in TransferDescriptor    │
    │ • vruntime (for CFS ordering)    │
    │ • priority                       │
    │ • queued/executing counts        │
    └──────────────┬───────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ Inserted into ReadySet           │
    │ (CFS-ordered via crossbeam-      │
    │  skiplist, sorted by vruntime)   │
    └──────────────┬───────────────────┘
                   │
┌─────────────────────────────────────────────────────────────────┐
│ 2. SCHEDULER MAIN LOOP (generate_work)                          │
└─────────────────────────────────────────────────────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ Check ConcurrencyController      │
    │ .target() - capacity available?  │
    └──────────────┬───────────────────┘
                   │ YES
                   ▼
    ┌──────────────────────────────────┐
    │ ReadySet.pop()                   │
    │ Returns: transfer with lowest    │
    │          vruntime (CFS fairness) │
    └──────────────┬───────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ Transfer.poll_work()             │
    │ (Transfer trait method)          │
    └──────────────┬───────────────────┘
                   │
┌─────────────────────────────────────────────────────────────────┐
│ 3. TRANSFER PRODUCES WORK (or doesn't)                          │
└─────────────────────────────────────────────────────────────────┘
                   │
         ┌─────────┴─────────┐
         │                   │
         ▼                   ▼
    Ready(WorkItem)      Pending
         │                   │
         │                   ▼
         │          ┌──────────────────────┐
         │          │ Transfer leaves      │
         │          │ ReadySet             │
         │          │ Waits for wake()     │
         │          └──────────────────────┘
         │
         ▼
    ┌──────────────────────────────────┐
    │ Update vruntime                  │
    │ (accumulates based on priority)  │
    └──────────────┬───────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ Submit WorkItem to WorkerPool    │
    └──────────────┬───────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ WorkQueue.push()                 │
    │ (VecDeque + per-transfer index)  │
    └──────────────┬───────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ Re-insert into ReadySet if       │
    │ poll_work() returned Ready       │
    │ (transfer may have more work)    │
    └──────────────────────────────────┘
                   │
┌─────────────────────────────────────────────────────────────────┐
│ 4. WORK EXECUTION                                               │
└─────────────────────────────────────────────────────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ Worker (tokio::spawn on multi-   │
    │ threaded runtime)                │
    └──────────────┬───────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ WorkQueue.pop()                  │
    └──────────────┬───────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ WorkQueue.mark_in_flight()       │
    └──────────────┬───────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ Transfer.execute(WorkItem)       │
    │ (Transfer trait method)          │
    └──────────────┬───────────────────┘
                   │
                   ▼
    ┌──────────────────────────────────┐
    │ Returns WorkOutcome              │
    │ • Success (optional follow-on)   │
    │ • Failure                        │
    └──────────────┬───────────────────┘
                   │
┌─────────────────────────────────────────────────────────────────┐
│ 5. COMPLETION HANDLING                                          │
└─────────────────────────────────────────────────────────────────┘
                   │
         ┌─────────┴─────────┐
         │                   │
         ▼                   ▼
    Success with         Success, no
    follow-on work       follow-on
         │                   │
         ▼                   │
    ┌──────────────────┐    │
    │ Follow-on work   │    │
    │ BYPASSES ReadySet│    │
    │ Goes directly to │    │
    │ WorkerPool       │    │
    └──────────┬───────┘    │
               │            │
               └────┬───────┘
                    │
                    ▼
         ┌──────────────────────┐
         │ Something calls      │
         │ wake() on transfer   │
         └──────────┬───────────┘
                    │
                    ▼
         ┌──────────────────────┐
         │ Transfer re-enters   │
         │ ReadySet             │
         └──────────────────────┘
                    │
┌─────────────────────────────────────────────────────────────────┐
│ 6. LOOP: Steps 2-5 repeat until transfer complete              │
└─────────────────────────────────────────────────────────────────┘
                    │
                    ▼
         ┌──────────────────────┐
         │ poll_work() returns  │
         │ Done                 │
         └──────────┬───────────┘
                    │
                    ▼
         ┌──────────────────────┐
         │ Transfer removed     │
         │ from system          │
         └──────────────────────┘

Critical flow points:

  1. Enqueue - Transfer enters scheduler, wrapped in TransferDescriptor, inserted into ReadySet
  2. Poll loop - Scheduler continuously polls ready transfers when capacity available (CFS ordering by vruntime)
  3. Work generation - Transfer produces work lazily via poll_work(), one item at a time
  4. Execution - Workers pop from WorkQueue and execute work asynchronously
  5. Follow-on bypass - Completed work's successors skip ReadySet, go directly to WorkerPool
  6. Loop - Steps 2-5 repeat, with wake() re-inserting transfers that returned Pending, until transfer returns Done and exits system

Copy link
Copy Markdown
Contributor

@ysaito1001 ysaito1001 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the design write-up and the benchmarks! Solid foundation with good abstractions.

@aajtodd aajtodd merged commit 0095480 into s3-tm-vnext Mar 5, 2026
11 of 12 checks passed
@aajtodd aajtodd deleted the vnext-scheduler branch March 5, 2026 03:32
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants