Skip to content

Commit d2c1b36

Browse files
committed
v1.46.1 with kanal instead of flume
1 parent e798267 commit d2c1b36

File tree

4 files changed

+45
-19
lines changed

4 files changed

+45
-19
lines changed

Cargo.lock

Lines changed: 15 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,7 @@ ed25519-dalek = { version = "2.2.0", features = ["digest"] }
114114
enumset = "1.1.10"
115115
flate2 = "1.1.4"
116116
flume = { version = "0.11.1", default-features = false }
117+
kanal = "0.1.1"
117118
futures = { version = "0.3.31", default-features = false, features = ["std"] }
118119
globset = "0.4.16"
119120
hash32 = "1.0.0"
@@ -205,7 +206,8 @@ sysinfo = "0.35.1"
205206
tempfile = "3.23.0"
206207
thiserror = "2.0.17"
207208
tikv-jemallocator = "0.6.0"
208-
tokio = { git = "https://github.com/dav1dde/tokio", rev = "b5d827e709e097fb5a9a391e2327880fce2a7759", default-features = false }
209+
# tokio = { git = "https://github.com/dav1dde/tokio", rev = "b5d827e709e097fb5a9a391e2327880fce2a7759", default-features = false }
210+
tokio = { version = "=1.46.1", default-features = false }
209211
tokio-util = { version = "0.7.13", default-features = false }
210212
tower = { version = "0.5.2", default-features = false }
211213
tower-http = { version = "0.6.6", default-features = false }
@@ -224,4 +226,4 @@ zstd = { version = "0.13.3", features = ["experimental"] }
224226
# This prevents stack overflows in span description normalization.
225227
[patch.crates-io]
226228
sqlparser_derive = { git = "https://github.com/apache/datafusion-sqlparser-rs", rev = "ade40826563451cc14c130af9d689f4050dbdb15" }
227-
tokio = { git = "https://github.com/dav1dde/tokio", rev = "b5d827e709e097fb5a9a391e2327880fce2a7759" }
229+
# tokio = { git = "https://github.com/dav1dde/tokio", rev = "b5d827e709e097fb5a9a391e2327880fce2a7759" }

relay-threading/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ publish = false
1111

1212
[dependencies]
1313
flume = { workspace = true, features = ["async"] }
14+
kanal = { workspace = true }
1415
futures = { workspace = true }
1516
tokio = { workspace = true }
1617
pin-project-lite = { workspace = true }
@@ -24,4 +25,4 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "sync",
2425

2526
[[bench]]
2627
name = "pool"
27-
harness = false
28+
harness = false

relay-threading/src/pool.rs

Lines changed: 24 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct AsyncPool<F> {
2323
/// Name of the pool.
2424
name: &'static str,
2525
/// Transmission containing all tasks.
26-
tx: flume::Sender<F>,
26+
tx: kanal::AsyncSender<F>,
2727
/// The maximum number of tasks that are expected to run concurrently at any point in time.
2828
max_tasks: u64,
2929
/// Vector containing all the metrics collected individually in each thread.
@@ -59,24 +59,34 @@ where
5959
S: ThreadSpawn,
6060
{
6161
let pool_name = builder.pool_name.unwrap_or(DEFAULT_POOL_NAME);
62-
let (tx, rx) = flume::bounded(builder.num_threads * 2);
62+
let (tx, rx) = kanal::bounded_async(builder.num_threads * 2);
6363
let mut threads_metrics = Vec::with_capacity(builder.num_threads);
6464

6565
for thread_id in 0..builder.num_threads {
6666
let rx = rx.clone();
6767

6868
let thread_name: Option<String> = builder.thread_name.as_mut().map(|f| f(thread_id));
6969
let metrics = Arc::new(ThreadMetrics::default());
70-
let task = MonitoredFuture::wrap_with_metrics(
71-
Multiplexed::new(
72-
pool_name,
73-
builder.max_concurrency,
74-
rx.into_stream(),
75-
builder.task_panic_handler.clone(),
76-
metrics.clone(),
77-
),
78-
metrics.raw_metrics.clone(),
79-
);
70+
71+
let task = {
72+
let metrics = Arc::clone(&metrics);
73+
let task_panic_handler = builder.task_panic_handler.clone();
74+
async move {
75+
let stream = rx.stream();
76+
77+
MonitoredFuture::wrap_with_metrics(
78+
Multiplexed::new(
79+
pool_name,
80+
builder.max_concurrency,
81+
stream,
82+
task_panic_handler,
83+
metrics.clone(),
84+
),
85+
metrics.raw_metrics.clone(),
86+
)
87+
.await
88+
}
89+
};
8090

8191
let thread = Thread {
8292
id: thread_id,
@@ -110,7 +120,7 @@ where
110120
/// the pool panicked.
111121
pub fn spawn(&self, future: F) {
112122
assert!(
113-
self.tx.send(future).is_ok(),
123+
self.tx.as_sync().send(future).is_ok(),
114124
"failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)"
115125
);
116126
}
@@ -125,7 +135,7 @@ where
125135
/// the pool panicked.
126136
pub async fn spawn_async(&self, future: F) {
127137
assert!(
128-
self.tx.send_async(future).await.is_ok(),
138+
self.tx.send(future).await.is_ok(),
129139
"failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)"
130140
);
131141
}

0 commit comments

Comments
 (0)