Skip to content

move uploads to new scheduler#132

Merged
aajtodd merged 5 commits intos3-tm-vnextfrom
vnext-upload
Mar 11, 2026
Merged

move uploads to new scheduler#132
aajtodd merged 5 commits intos3-tm-vnextfrom
vnext-upload

Conversation

@aajtodd
Copy link
Copy Markdown
Contributor

@aajtodd aajtodd commented Mar 5, 2026

Summary

Replace the Tower service-based upload orchestration (ConcurrencyLimitLayer, service_fn, Buffer) with a scheduler-driven state machine. The upload transfer implements the Transfer trait from #131. The scheduler polls the transfer for work; the transfer produces work items lazily, one at a time.

Upload Lifecycle

Multipart upload

orchestrate() → UploadTransfer enqueued to scheduler

poll_work → CreateMPU           [Network]
poll_work → Pending              (waiting for upload_id)
  ... CreateMPU completes, wake ...
poll_work → UploadPart(1)       [DataIO]   ← disk read
poll_work → UploadPart(2)       [DataIO]   ← disk read
  ... DataIO completes → follow-on: UploadPart(1) [Network] ...
  ... all parts complete ...
poll_work → CompleteMPU         [Network]
poll_work → Done

Small file (below MPU threshold)

Skips multipart entirely — produces a single PutObject work item.

Design

Each UploadPart has two phases: a DataIO phase that reads the part from the PartReader, then a Network phase that sends it to S3. The Network phase is a follow-on work item that bypasses the ready set, so the buffer isn't held waiting behind other transfers.

Error handling follows the download transfer's contract:

  • fail() stores the original error via ctx.set_failed(error) and signals terminal
  • join() awaits completion_rx from TransferContext, then reads the error from ctx.take_error() on failure or the result from transfer.take_result() on success
  • No separate result channel — errors and results flow through TransferContext and the transfer struct respectively

What changed

File Change
operation/upload/transfer.rs New. Upload state machine implementing Transfer trait.
operation/upload/upload.rs orchestrate() creates UploadTransfer and enqueues to scheduler instead of spawning Tower service pipeline.
operation/upload/handle.rs Holds UploadTransfer + completion_rx directly. join() follows download's pattern. Abort waits for scheduler idle, calls AbortMultipartUpload.
operation/upload/service.rs Deleted. Tower middleware chain no longer needed.
scheduler/mod.rs Re-exports StateMachineTerminalReceiver.

What did not change

  • Download still uses the old runtime::scheduler and service pattern
  • upload_objects / download_objects unchanged
  • middleware/, runtime/ kept for download and multi-object operations
  • No scheduler changes beyond the re-export
  • No public API changes

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@aajtodd aajtodd requested a review from a team as a code owner March 5, 2026 19:06
Copy link
Copy Markdown
Contributor

@landonxjames landonxjames left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Few questions about some of the logic. Nothing major or blocking.

return PollWork::Done;
}

let mut state = self.inner.state.lock().unwrap();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
let mut state = self.inner.state.lock().unwrap();
let mut state = self.inner.state.lock().expect("Failed to unlock transfer state");

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion applies to a few places, just putting it here so I don't spam it everywhere.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

More generally, this Mutex seems like it is going to be locked for every poll_work and execute_* and like that could cause a lot of contention. I wonder if using something that is supposed to be more performant, like parking_lot::Mutex would make sense here?

Probably a premature optimization, but worth evaluating when you get to benchmarking this

distribute_work(&mut mpu_data, ctx)?;
Ok(UploadType::MultipartUpload(mpu_data))
}
// TODO: Relax this constraint - unknown content length implies MPU
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
// TODO: Relax this constraint - unknown content length implies MPU
// TODO(vnext): Relax this constraint - unknown content length implies MPU

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or #90

}))
}
Ok(None) => {
tracing::warn!("part_reader returned None for part {}", part_number);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Think this should just be a trace log? If content_length is just an upper bound instead of exact (which is how I think size_hint works?) then this will happen sometimes and shouldn't be exposed as something failing.


let (stream, content_length) = {
let mut state = self.inner.state.lock().unwrap();
match std::mem::replace(&mut *state, UploadState::Done) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is replacing this with Done correct? If poll_work is called in the window what state is set to Done won't it return PollWork::Done, causing the scheduler to remove
the transfer entirely? Seems like it would be possible since you have to re-acquire the lock on the state later in this function?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, no this is wrong.

.clone()
.unwrap_or_default();
match abort_policy {
FailedMultipartUploadPolicy::Retain => Ok(AbortedUpload::default()),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are no longer checking the abort_policy is that intended?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nope will fix


let resp = match req
.customize()
.disable_payload_signing()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We disable this for MPU but not PutObject? Might make sense but would probably want a comment explaining why

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a miss when converting.

use crate::types::{ConcurrencyMode, PartSize};

#[tokio::test]
async fn test_basic_mpu() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't seem to be a new version of the happy path test_basic_mpu and test_basic_upload_object tests?

@ysaito1001
Copy link
Copy Markdown
Contributor

Again, to aid code review:

a runtime sequence diagram for MPU
                          MPU Upload Runtime Lifecycle
                          ===========================
 
 builders.rs              upload.rs                transfer.rs              scheduler/          handle.rs
 (initiate)            (Upload::orchestrate)       (UploadTransfer)         (Scheduler)         (UploadHandle)
     │                        │                         │                       │                    │
     │──initiate_with()──────>│                         │                       │                    │
     │                        │ resolve checksum_strategy│                       │                    │
     │                        │ take_body() from input   │                       │                    │
     │                        │ check size_hint upper    │                       │                    │
     │                        │ BucketType::from_bucket_name                     │                    │
     │                        │                         │                       │                    │
     │                        │ TransferContext::new(handle)                     │                    │
     │                        │──────────────────────────> new(ctx, bucket_type, │                    │
     │                        │                           input, stream)        │                    │
     │                        │                         │                       │                    │
     │                        │                         │  state = PendingInit  │                    │
     │                        │                         │  {stream,             │                    │
     │                        │                         │   content_length,     │                    │
     │                        │                         │   init_in_flight:false}│                    │
     │                        │                         │                       │                    │
     │                        │─────────────────────────────enqueue_transfer(───>│                    │
     │                        │                         │  Box<transfer.clone>)  │                    │
     │                        │                         │                       │                    │
     │                        │──────────────────────────────────────────────────────> new(completion_rx,
     │<───UploadHandle────────│                         │                       │        transfer)   │
     │                        │                         │                       │                    │
 
 
                    ┌─── Scheduler poll loop begins ───┐
                    │                                   │
                    ▼                                   │
 
 scheduler/                          transfer.rs
 (Scheduler)                         (UploadTransfer)
     │                                    │
     │──poll_work()──────────────────────>│
     │                                    │ state == PendingInit
     │                                    │ content_length >= mpu_threshold
     │                                    │ set init_in_flight = true
     │<──PollWork::Ready(CreateMPU, ──────│
     │                   Network)         │
     │                                    │
     │──execute(CreateMPU)───────────────>│
     │                                    │ s3_client.create_multipart_upload().send()
     │                                    │ take stream + content_length from PendingInit
     │                                    │ compute part_size, total_parts
     │                                    │ build PartReader
     │                                    │ state = Transferring {
     │                                    │   upload_id, part_reader,
     │                                    │   next_part: 1, total_parts,
     │                                    │   parts_in_flight: 0,
     │                                    │   completed_parts: [],
     │                                    │   response_builder }
     │                                    │ create_mpu_complete.notify_waiters()
     │                                    │ ctx.try_wake()
     │<──WorkOutcome::Success(None)───────│
     │                                    │
     │──poll_work()──────────────────────>│  (repeated for each part 1..total_parts)
     │                                    │ state == Transferring
     │                                    │ next_part++, parts_in_flight++
     │<──PollWork::Ready(UploadPart{N},───│
     │                   DataIO)          │
     │                                    │
     │──execute(UploadPart{N}, DataIO)───>│
     │                                    │ part_reader.next_part()  (disk read)
     │                                    │ store part_data in work item
     │<──WorkOutcome::Success(Some(───────│  ◄── follow-on work item
     │     WorkItem{UploadPart{N},        │
     │              Network}))            │
     │                                    │
     │──execute(UploadPart{N}, Network)──>│
     │                                    │ s3_client.upload_part()
     │                                    │   .upload_id(&upload_id)
     │                                    │   .part_number(N)
     │                                    │   .body(data)
     │                                    │   .disable_payload_signing()
     │                                    │   .send()
     │                                    │ build CompletedPart from response
     │                                    │ push to completed_parts
     │                                    │ parts_in_flight--
     │                                    │ if next_part > total_parts && parts_in_flight == 0:
     │                                    │   state = Completing {
     │                                    │     upload_id, part_reader,
     │                                    │     completed_parts, response_builder,
     │                                    │     complete_in_flight: false }
     │                                    │   ctx.try_wake()
     │<──WorkOutcome::Success(None)───────│
     │                                    │
     │──poll_work()──────────────────────>│
     │                                    │ state == Completing
     │                                    │ set complete_in_flight = true
     │<──PollWork::Ready(CompleteMPU, ────│
     │                   Network)         │
     │                                    │
     │──execute(CompleteMPU)─────────────>│
     │                                    │ sort completed_parts by part_number
     │                                    │ s3_client.complete_multipart_upload()
     │                                    │   .upload_id(&upload_id)
     │                                    │   .multipart_upload(parts)
     │                                    │   .send()
     │                                    │ build UploadOutput via response_builder
     │                                    │ store in self.result
     │                                    │ ctx.set_completed()
     │                                    │ ctx.signal_terminal()
     │<──WorkOutcome::Success(None)───────│
     │                                    │
     │──poll_work()──────────────────────>│
     │                                    │ state == Done
     │<──PollWork::Done───────────────────│

Copy link
Copy Markdown
Contributor

@ysaito1001 ysaito1001 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left nit comments. Looks good!

/// State machine for work progression
state: Mutex<UploadState>,
/// The original request (body taken for processing)
request: Arc<UploadInput>,
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will this end up being double Arc, seen from UploadTransfer?

}

/// Get the upload_id if MPU was started.
pub(crate) fn upload_id(&self) -> Option<String> {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: do we always want to clone a returned String, as opposed to Option<&str>?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not on a hot path, only for abort, I'm going to leave for now.

distribute_work(&mut mpu_data, ctx)?;
Ok(UploadType::MultipartUpload(mpu_data))
}
// TODO: Relax this constraint - unknown content length implies MPU
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or #90

Comment on lines +199 to +202
return PollWork::Pending;
}
self.inner.ctx.set_pending();
return PollWork::Pending;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just to double check, once next_part exceeds total_parts, is returning PollWork::Pending here is expected?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, in practice this state is never observed I think because we transition to completing before poll_work sees it. I'll update it though to transition to completing as well.

@aajtodd aajtodd merged commit 6d0c017 into s3-tm-vnext Mar 11, 2026
10 of 11 checks passed
@aajtodd aajtodd deleted the vnext-upload branch March 11, 2026 17:55
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants