Skip to content

Commit cd6aa8f

Browse files
author
Michael Ingley
committed
Address review feedback on TSO waker PR
Signed-off-by: Michael Ingley <mingley@linkedin.com>
1 parent a66b5e6 commit cd6aa8f

File tree

3 files changed

+20
-2
lines changed

3 files changed

+20
-2
lines changed

benches/tso_waker_policy.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,9 @@ use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
77
use futures::task::AtomicWaker;
88

99
const MAX_PENDING_COUNT: usize = 1 << 16;
10+
// Every `FULL_EVERY` iterations, the synthetic queue is reset to "full".
1011
const FULL_EVERY: u64 = 1024;
12+
// Within each `FULL_EVERY` window, keep queue full for `FULL_WINDOW` iterations.
1113
const FULL_WINDOW: u64 = 16;
1214

1315
struct NoopWake;
@@ -30,6 +32,7 @@ fn response_policy_old(iterations: u64) -> Duration {
3032
}
3133
black_box(pending_len >= MAX_PENDING_COUNT);
3234
pending_len = pending_len.saturating_sub(1);
35+
// Old behavior: wake unconditionally for every response batch.
3336
atomic_waker.wake();
3437
}
3538
start.elapsed()
@@ -49,6 +52,7 @@ fn response_policy_new(iterations: u64) -> Duration {
4952
let was_full = pending_len >= MAX_PENDING_COUNT;
5053
pending_len = pending_len.saturating_sub(1);
5154
let should_wake = was_full && pending_len < MAX_PENDING_COUNT;
55+
// New behavior: wake only for full -> non-full transition.
5256
if black_box(should_wake) {
5357
atomic_waker.wake();
5458
}

doc/tso_waker_criterion.md

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,10 @@ Rust toolchain: 1.84.1
1111
Quantify the latency impact of reducing TSO stream wake/registration churn in
1212
`src/pd/timestamp.rs`.
1313

14+
These numbers are a point-in-time snapshot from this branch. Re-run the
15+
benchmark after meaningful changes to either `src/pd/timestamp.rs` or
16+
`benches/tso_waker_policy.rs`.
17+
1418
## Method
1519

1620
Benchmark framework:
@@ -34,6 +38,10 @@ The benchmark compares old vs new policies in two isolated hot paths:
3438
- `response/*`: wake policy when processing responses
3539
- `register/*`: self-waker registration policy in no-request branch
3640

41+
Note: the old/new response benchmarks intentionally do asymmetric work
42+
(always wake vs transition-only wake), so the speedup reflects the amortized
43+
benefit of skipping redundant wake calls under this simulation pattern.
44+
3745
## Results (Absolute Latency)
3846

3947
From Criterion output (`time` line):

src/pd/timestamp.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use futures::task::Context;
2222
use futures::task::Poll;
2323
use log::debug;
2424
use log::info;
25+
use log::warn;
2526
use pin_project::pin_project;
2627
use tokio::sync::mpsc;
2728
use tokio::sync::oneshot;
@@ -58,8 +59,13 @@ impl TimestampOracle {
5859
let pd_client = pd_client.clone();
5960
let (request_tx, request_rx) = mpsc::channel(MAX_BATCH_SIZE);
6061

61-
// Start a background thread to handle TSO requests and responses
62-
tokio::spawn(run_tso(cluster_id, pd_client, request_rx));
62+
// Start a background task to handle TSO requests and responses.
63+
// If it exits with an error, log it explicitly so root cause is preserved.
64+
tokio::spawn(async move {
65+
if let Err(err) = run_tso(cluster_id, pd_client, request_rx).await {
66+
warn!("TSO background task exited with error: {:?}", err);
67+
}
68+
});
6369

6470
Ok(TimestampOracle { request_tx })
6571
}

0 commit comments

Comments
 (0)