Skip to content

Commit 47c2734

Browse files
committed
batch submissions to the runtime optimistically
1 parent 3e6cdad commit 47c2734

File tree

5 files changed

+73
-30
lines changed

5 files changed

+73
-30
lines changed

aws-sdk-s3-transfer-manager/src/runtime/managed.rs

Lines changed: 22 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@ use tokio_util::sync::CancellationToken;
2424
use super::topology::Cpu;
2525
use super::Topology;
2626
use super::{ExecutionRuntime, RuntimeComponents, ScheduledWork};
27+
use crate::runtime::sync::SubmissionGuard;
2728
use crate::scheduler::Scheduler;
2829
use crate::transfer::{TransferId, WorkOutcome};
2930

@@ -314,26 +315,28 @@ impl ManagedThreadRuntimeBuilder {
314315
}
315316

316317
impl ExecutionRuntime for ManagedThreadRuntime {
317-
fn dispatch(&self, mut work: ScheduledWork) {
318-
let thread_id = self.router.select(&self.threads);
319-
let th = &self.threads[thread_id.0];
320-
th.in_flight.fetch_add(1, Ordering::Relaxed);
321-
322-
let scheduler = self.scheduler.clone();
323-
let in_flight = Arc::clone(&th.in_flight);
324-
325-
th.runtime_handle.spawn(async move {
326-
let result = execute_work(&mut work, &scheduler).await;
327-
in_flight.fetch_sub(1, Ordering::Relaxed);
328-
match result {
329-
ExecuteResult::Completed(outcome, elapsed) => {
330-
scheduler.on_completion(work, outcome, elapsed);
318+
fn dispatch(&self, batch: &mut SubmissionGuard<'_, ScheduledWork>) {
319+
for mut work in batch.drain() {
320+
let thread_id = self.router.select(&self.threads);
321+
let th = &self.threads[thread_id.0];
322+
th.in_flight.fetch_add(1, Ordering::Relaxed);
323+
324+
let scheduler = self.scheduler.clone();
325+
let in_flight = Arc::clone(&th.in_flight);
326+
327+
th.runtime_handle.spawn(async move {
328+
let result = execute_work(&mut work, &scheduler).await;
329+
in_flight.fetch_sub(1, Ordering::Relaxed);
330+
match result {
331+
ExecuteResult::Completed(outcome, elapsed) => {
332+
scheduler.on_completion(work, outcome, elapsed);
333+
}
334+
ExecuteResult::Panicked => {
335+
scheduler.on_panic(work);
336+
}
331337
}
332-
ExecuteResult::Panicked => {
333-
scheduler.on_panic(work);
334-
}
335-
}
336-
});
338+
});
339+
}
337340
}
338341

339342
fn shutdown(&self) {

aws-sdk-s3-transfer-manager/src/runtime/mod.rs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@ pub(crate) use managed::ManagedThreadRuntime;
1616
mod topology;
1717
pub(crate) use topology::{Cpu, NumaNode, Topology};
1818

19-
mod sync;
19+
pub(crate) mod sync;
2020

2121
use aws_smithy_runtime_api::client::http::SharedHttpClient;
2222

23+
use crate::runtime::sync::SubmissionGuard;
2324
use crate::scheduler::descriptor::TransferDescriptor;
2425
use crate::transfer::{IoRequest, TransferId};
2526

@@ -41,8 +42,8 @@ pub(crate) struct ScheduledWork {
4142
///
4243
/// The scheduler decides WHAT to run and WHEN. The runtime decides WHERE and HOW.
4344
pub(crate) trait ExecutionRuntime: Send + Sync + std::fmt::Debug {
44-
/// Dispatch an IO request for execution.
45-
fn dispatch(&self, work: ScheduledWork);
45+
/// Dispatch a batch of IO requests for execution.
46+
fn dispatch(&self, batch: &mut SubmissionGuard<'_, ScheduledWork>);
4647

4748
/// Shut down the runtime, draining in-flight work.
4849
fn shutdown(&self);

aws-sdk-s3-transfer-manager/src/runtime/sync/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,4 +6,4 @@
66
//! Synchronization primitives for the runtime.
77
88
mod submission;
9-
pub(crate) use submission::{SubmissionGuard, SubmissionQueue};
9+
pub(crate) use submission::{Submission, SubmissionGuard, SubmissionQueue};

aws-sdk-s3-transfer-manager/src/runtime/tokio_mt.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@ use futures_util::FutureExt;
1515
mod worker_pool;
1616

1717
use super::{ExecutionRuntime, RuntimeComponents, ScheduledWork};
18+
use crate::runtime::sync::SubmissionGuard;
1819
use crate::scheduler::Scheduler;
1920
use crate::transfer::{TransferId, WorkOutcome};
2021
use worker_pool::WorkerPool;
@@ -79,10 +80,12 @@ impl TokioMultiThreadRuntime {
7980
}
8081

8182
impl ExecutionRuntime for TokioMultiThreadRuntime {
82-
fn dispatch(&self, work: ScheduledWork) {
83+
fn dispatch(&self, batch: &mut SubmissionGuard<'_, ScheduledWork>) {
8384
self.ensure_workers_started();
8485
self.ensure_worker_capacity();
85-
self.pool.push(work);
86+
for work in batch.drain() {
87+
self.pool.push(work);
88+
}
8689
}
8790

8891
fn shutdown(&self) {

aws-sdk-s3-transfer-manager/src/scheduler/scheduler.rs

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ use crate::transfer::{BoxTransfer, IoRequest, PollWork, TransferId, WorkOutcome}
5656
use crate::metrics::{IOCounters, IoSample};
5757
use crate::runtime::ExecutionRuntime;
5858

59+
use crate::runtime::sync::{Submission, SubmissionQueue};
5960
use crate::runtime::ScheduledWork;
6061
use crate::scheduler::descriptor::TransferDescriptor;
6162
use crate::scheduler::ready_set::ReadySet;
@@ -66,6 +67,9 @@ use std::sync::Arc;
6667
use std::sync::{OnceLock, RwLock};
6768
use std::time::Duration;
6869

70+
/// Batch size for work generated and submitted in a single round
71+
const SUBMISSION_QUEUE_SIZE: usize = 64;
72+
6973
/// Event-driven scheduler for coordinating transfer work.
7074
///
7175
/// Clone is cheap (Arc).
@@ -79,6 +83,7 @@ struct SchedulerInner {
7983
io_counters: Arc<IOCounters>,
8084
runtime: OnceLock<Arc<dyn ExecutionRuntime>>,
8185
dispatched: AtomicUsize,
86+
submission_queue: SubmissionQueue<ScheduledWork>,
8287
}
8388

8489
impl std::fmt::Debug for Scheduler {
@@ -115,6 +120,7 @@ impl SchedulerBuilder {
115120
io_counters: self.io_counters,
116121
runtime: OnceLock::new(),
117122
dispatched: AtomicUsize::new(0),
123+
submission_queue: SubmissionQueue::new(SUBMISSION_QUEUE_SIZE),
118124
}));
119125
let runtime = runtime_factory(scheduler.clone());
120126
scheduler
@@ -301,7 +307,7 @@ impl Scheduler {
301307
item: IoRequest { kind, data },
302308
descriptor: desc.clone(),
303309
};
304-
self.dispatch_to_runtime(next);
310+
self.dispatch_single(next);
305311
}
306312

307313
// capacity has freed try to queue up more work
@@ -333,8 +339,20 @@ impl Scheduler {
333339
self.0.dispatched.load(Ordering::Relaxed) < self.0.controller.target()
334340
}
335341

342+
/// Flush the current submission and re-enter for the next batch.
343+
fn submit_and_reenter<'a>(
344+
&'a self,
345+
sub: Submission<'a, ScheduledWork>,
346+
) -> Submission<'a, ScheduledWork> {
347+
if let Some(mut guard) = sub.submit() {
348+
self.runtime().dispatch(&mut guard);
349+
}
350+
self.0.submission_queue.enter()
351+
}
352+
336353
/// Generate work from ready transfers and dispatch to runtime.
337354
fn generate_work(&self) {
355+
let mut sub = self.0.submission_queue.enter();
338356
while self.has_capacity() {
339357
let Some(desc) = self.0.ready_set.pop() else {
340358
break;
@@ -349,10 +367,16 @@ impl Scheduler {
349367
PollWork::Ready(item) => {
350368
desc.work_generated();
351369
self.0.ready_set.insert(desc.clone());
352-
self.dispatch_to_runtime(ScheduledWork {
370+
self.0.dispatched.fetch_add(1, Ordering::Relaxed);
371+
desc.work_queued();
372+
let work = ScheduledWork {
353373
item,
354374
descriptor: desc,
355-
});
375+
};
376+
if let Err(work) = sub.push(work) {
377+
sub = self.submit_and_reenter(sub);
378+
sub.push(work).expect("empty queue after flush");
379+
}
356380
}
357381
PollWork::Pending => {
358382
// re-added on wake as state machine progresses
@@ -363,12 +387,24 @@ impl Scheduler {
363387
}
364388
}
365389
}
390+
if let Some(mut guard) = sub.submit() {
391+
self.runtime().dispatch(&mut guard);
392+
}
366393
}
367394

368-
fn dispatch_to_runtime(&self, work: ScheduledWork) {
395+
fn dispatch_single(&self, work: ScheduledWork) {
369396
self.0.dispatched.fetch_add(1, Ordering::Relaxed);
370397
work.descriptor.work_queued();
371-
self.runtime().dispatch(work);
398+
let sub = self.0.submission_queue.enter();
399+
if let Err(work) = sub.push(work) {
400+
let sub = self.submit_and_reenter(sub);
401+
sub.push(work).expect("empty queue after flush");
402+
if let Some(mut guard) = sub.submit() {
403+
self.runtime().dispatch(&mut guard);
404+
}
405+
} else if let Some(mut guard) = sub.submit() {
406+
self.runtime().dispatch(&mut guard);
407+
}
372408
}
373409

374410
#[allow(dead_code)] // TODO: wire into Handle for graceful shutdown

0 commit comments

Comments
 (0)