Skip to content

Commit 0095480

Browse files
authored
bootstrap new scheduler (#131)
bootstrap new scheduler Add a scheduler module that coordinates transfer execution: work generation, prioritization, capacity gating, and submission. This sits alongside the existing runtime::scheduler and is not yet wired into any operations. Transfers implement a state machine trait (poll_work/execute). The scheduler polls them for work when capacity is available. Work is generated lazily, one item per poll, so memory and in-flight work are naturally bounded without the scheduler knowing operation internals. Ordering uses CFS (Completely Fair Scheduling) adapted from the Linux kernel. Each transfer accumulates virtual runtime weighted by priority. The ready set is a skiplist ordered by vruntime; the scheduler always picks the transfer with the least scheduling share. Priority-1 transfers still make progress, they just accumulate vruntime faster. The ready set is edge-triggered: transfers enter on enqueue or wake, leave on Pending or Done. Scheduling cost scales with active transfers, not total count. Capacity is gated by a ConcurrencyController trait. FixedConcurrency returns a constant target. The trait is the seam for adaptive concurrency (not in this change). Follow-on work (e.g. disk read completes, now send over network) bypasses the ready set and goes directly to execution. CFS controls admission; successors complete admitted work without re-competing. Benchmarked on c6in.16xlarge: matches main branch throughput (55 Gb/s) at 60% less memory (4.6 GB vs 11.8 GB peak RSS) with seq window backpressure bounding out-of-order buffering. Design: docs/design/scheduler.md
1 parent 0226736 commit 0095480

File tree

23 files changed

+2954
-12
lines changed

23 files changed

+2954
-12
lines changed

Cargo.lock

Lines changed: 11 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

aws-sdk-s3-transfer-manager/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ aws-types = "1.3.7"
2626
blocking = "1.6.1"
2727
bytes = "1"
2828
bytes-utils = "0.1.4"
29+
crossbeam-skiplist = "0.1"
2930
futures-util = "0.3.31"
3031
path-clean = "1.0.1"
3132
pin-project-lite = "0.2.16"

aws-sdk-s3-transfer-manager/src/client.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@
33
* SPDX-License-Identifier: Apache-2.0
44
*/
55

6-
use crate::runtime::scheduler::Scheduler;
6+
use crate::runtime::scheduler::Scheduler as LegacyScheduler;
77
use crate::types::{ConcurrencyMode, PartSize};
88
use crate::Config;
99
use crate::{metrics::unit::ByteUnit, DEFAULT_CONCURRENCY};
@@ -19,7 +19,8 @@ pub struct Client {
1919
#[derive(Debug)]
2020
pub(crate) struct Handle {
2121
pub(crate) config: crate::Config,
22-
pub(crate) scheduler: Scheduler,
22+
pub(crate) scheduler: crate::scheduler::Scheduler,
23+
pub(crate) legacy_scheduler: LegacyScheduler,
2324
}
2425

2526
impl Handle {
@@ -63,8 +64,19 @@ impl Handle {
6364
impl Client {
6465
/// Creates a new client from a transfer manager config.
6566
pub fn new(config: Config) -> Client {
66-
let scheduler = Scheduler::new(config.concurrency().clone());
67-
let handle = Arc::new(Handle { config, scheduler });
67+
use crate::scheduler::FixedConcurrency;
68+
let new_scheduler = crate::scheduler::Scheduler::with_controller(Arc::new(
69+
FixedConcurrency::new(match config.concurrency() {
70+
ConcurrencyMode::Explicit(c) => *c,
71+
_ => crate::DEFAULT_CONCURRENCY,
72+
}),
73+
));
74+
let legacy_scheduler = LegacyScheduler::new(config.concurrency().clone());
75+
let handle = Arc::new(Handle {
76+
config,
77+
scheduler: new_scheduler,
78+
legacy_scheduler,
79+
});
6880
Client { handle }
6981
}
7082

aws-sdk-s3-transfer-manager/src/lib.rs

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,9 @@ pub(crate) mod http;
9292
/// Internal runtime components
9393
pub(crate) mod runtime;
9494

95+
/// Scheduler for coordinating transfer execution
96+
pub(crate) mod scheduler;
97+
9598
/// Metrics
9699
pub mod metrics;
97100

aws-sdk-s3-transfer-manager/src/operation/download.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ async fn send_discovery(
127127
// acquire a permit for discovery
128128
let permit = ctx
129129
.handle
130-
.scheduler
130+
.legacy_scheduler
131131
.acquire_permit(PermitType::Network(NetworkPermitContext {
132132
payload_size_estimate: 0,
133133
bucket_type: ctx.bucket_type(),

aws-sdk-s3-transfer-manager/src/operation/download/service.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ pub(super) fn chunk_service(
154154
+ Clone
155155
+ Send {
156156
let svc = service_fn(download_chunk_handler);
157-
let concurrency_limit = ConcurrencyLimitLayer::new(ctx.handle.scheduler.clone());
157+
let concurrency_limit = ConcurrencyLimitLayer::new(ctx.handle.legacy_scheduler.clone());
158158

159159
ServiceBuilder::new()
160160
.layer(concurrency_limit)

aws-sdk-s3-transfer-manager/src/operation/upload.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -117,7 +117,7 @@ async fn put_object(
117117

118118
let _permit = ctx
119119
.handle
120-
.scheduler
120+
.legacy_scheduler
121121
.acquire_permit(PermitType::Network(NetworkPermitContext {
122122
payload_size_estimate: content_length as u64,
123123
bucket_type: ctx.state.bucket_type(),

aws-sdk-s3-transfer-manager/src/operation/upload/service.rs

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -109,7 +109,7 @@ pub(super) fn upload_part_service(
109109
+ Clone
110110
+ Send {
111111
let svc = service_fn(upload_part_handler);
112-
let concurrency_limit = ConcurrencyLimitLayer::new(ctx.handle.scheduler.clone());
112+
let concurrency_limit = ConcurrencyLimitLayer::new(ctx.handle.legacy_scheduler.clone());
113113

114114
let svc = ServiceBuilder::new()
115115
.layer(concurrency_limit)
@@ -239,7 +239,12 @@ mod tests {
239239
ctx: UploadContext {
240240
handle: Arc::new(Handle {
241241
config: Config::builder().client(s3_client).build(),
242-
scheduler: Scheduler::new(ConcurrencyMode::Explicit(1)),
242+
scheduler: crate::scheduler::Scheduler::with_controller(Arc::new(
243+
crate::scheduler::FixedConcurrency::new(1),
244+
)),
245+
legacy_scheduler: crate::runtime::scheduler::Scheduler::new(
246+
ConcurrencyMode::Explicit(1),
247+
),
243248
}),
244249
state: Arc::new(UploadState {
245250
request: Arc::new(

aws-sdk-s3-transfer-manager/src/operation/upload_objects/worker.rs

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -328,6 +328,7 @@ mod tests {
328328
use aws_sdk_s3::operation::put_object::PutObjectOutput;
329329
use aws_smithy_mocks::{mock, mock_client, RuleMode};
330330
use bytes::Bytes;
331+
use std::sync::Arc;
331332

332333
use crate::{
333334
client::Handle,
@@ -711,9 +712,18 @@ mod tests {
711712
let s3_client = mock_client!(aws_sdk_s3, RuleMode::MatchAny, &[put_object]);
712713
let config = crate::Config::builder().client(s3_client).build();
713714

714-
let scheduler = Scheduler::new(ConcurrencyMode::Explicit(DEFAULT_CONCURRENCY));
715-
716-
let handle = std::sync::Arc::new(Handle { config, scheduler });
715+
let legacy_scheduler = crate::runtime::scheduler::Scheduler::new(
716+
ConcurrencyMode::Explicit(DEFAULT_CONCURRENCY),
717+
);
718+
let scheduler = crate::scheduler::Scheduler::with_controller(Arc::new(
719+
crate::scheduler::FixedConcurrency::new(DEFAULT_CONCURRENCY),
720+
));
721+
722+
let handle = std::sync::Arc::new(Handle {
723+
config,
724+
scheduler,
725+
legacy_scheduler,
726+
});
717727
let input = UploadObjectsInputBuilder::default()
718728
.source("doesnotmatter")
719729
.bucket(bucket)
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
* SPDX-License-Identifier: Apache-2.0
4+
*/
5+
6+
use std::fmt;
7+
8+
/// Controls how many work items can be in-flight concurrently.
9+
///
10+
/// The scheduler calls `target()` before generating work. Work is only
11+
/// generated when total in-flight + pending is below the target.
12+
///
13+
/// `FixedConcurrency` provides a constant target. Future implementations
14+
/// (e.g., `AdaptiveConcurrency`) can adjust the target dynamically based
15+
/// on observed throughput.
16+
pub(crate) trait ConcurrencyController: Send + Sync + fmt::Debug {
17+
/// Current concurrency target. May change between calls.
18+
fn target(&self) -> usize;
19+
}
20+
21+
/// Fixed concurrency target that never changes.
22+
#[derive(Debug)]
23+
pub(crate) struct FixedConcurrency(usize);
24+
25+
impl FixedConcurrency {
26+
pub(crate) fn new(target: usize) -> Self {
27+
assert!(target > 0, "concurrency target must be at least 1");
28+
Self(target)
29+
}
30+
}
31+
32+
impl ConcurrencyController for FixedConcurrency {
33+
fn target(&self) -> usize {
34+
self.0
35+
}
36+
}

0 commit comments

Comments
 (0)