Skip to content

Commit c62bc6c

Browse files
committed
Revert "v1.46.1 with kanal instead of flume"
This reverts commit d2c1b36.
1 parent d2c1b36 commit c62bc6c

File tree

4 files changed

+19
-45
lines changed

4 files changed

+19
-45
lines changed

Cargo.lock

Lines changed: 2 additions & 15 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,6 @@ ed25519-dalek = { version = "2.2.0", features = ["digest"] }
114114
enumset = "1.1.10"
115115
flate2 = "1.1.4"
116116
flume = { version = "0.11.1", default-features = false }
117-
kanal = "0.1.1"
118117
futures = { version = "0.3.31", default-features = false, features = ["std"] }
119118
globset = "0.4.16"
120119
hash32 = "1.0.0"
@@ -206,8 +205,7 @@ sysinfo = "0.35.1"
206205
tempfile = "3.23.0"
207206
thiserror = "2.0.17"
208207
tikv-jemallocator = "0.6.0"
209-
# tokio = { git = "https://github.com/dav1dde/tokio", rev = "b5d827e709e097fb5a9a391e2327880fce2a7759", default-features = false }
210-
tokio = { version = "=1.46.1", default-features = false }
208+
tokio = { git = "https://github.com/dav1dde/tokio", rev = "b5d827e709e097fb5a9a391e2327880fce2a7759", default-features = false }
211209
tokio-util = { version = "0.7.13", default-features = false }
212210
tower = { version = "0.5.2", default-features = false }
213211
tower-http = { version = "0.6.6", default-features = false }
@@ -226,4 +224,4 @@ zstd = { version = "0.13.3", features = ["experimental"] }
226224
# This prevents stack overflows in span description normalization.
227225
[patch.crates-io]
228226
sqlparser_derive = { git = "https://github.com/apache/datafusion-sqlparser-rs", rev = "ade40826563451cc14c130af9d689f4050dbdb15" }
229-
# tokio = { git = "https://github.com/dav1dde/tokio", rev = "b5d827e709e097fb5a9a391e2327880fce2a7759" }
227+
tokio = { git = "https://github.com/dav1dde/tokio", rev = "b5d827e709e097fb5a9a391e2327880fce2a7759" }

relay-threading/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ publish = false
1111

1212
[dependencies]
1313
flume = { workspace = true, features = ["async"] }
14-
kanal = { workspace = true }
1514
futures = { workspace = true }
1615
tokio = { workspace = true }
1716
pin-project-lite = { workspace = true }
@@ -25,4 +24,4 @@ tokio = { workspace = true, features = ["rt", "rt-multi-thread", "time", "sync",
2524

2625
[[bench]]
2726
name = "pool"
28-
harness = false
27+
harness = false

relay-threading/src/pool.rs

Lines changed: 14 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ pub struct AsyncPool<F> {
2323
/// Name of the pool.
2424
name: &'static str,
2525
/// Transmission containing all tasks.
26-
tx: kanal::AsyncSender<F>,
26+
tx: flume::Sender<F>,
2727
/// The maximum number of tasks that are expected to run concurrently at any point in time.
2828
max_tasks: u64,
2929
/// Vector containing all the metrics collected individually in each thread.
@@ -59,34 +59,24 @@ where
5959
S: ThreadSpawn,
6060
{
6161
let pool_name = builder.pool_name.unwrap_or(DEFAULT_POOL_NAME);
62-
let (tx, rx) = kanal::bounded_async(builder.num_threads * 2);
62+
let (tx, rx) = flume::bounded(builder.num_threads * 2);
6363
let mut threads_metrics = Vec::with_capacity(builder.num_threads);
6464

6565
for thread_id in 0..builder.num_threads {
6666
let rx = rx.clone();
6767

6868
let thread_name: Option<String> = builder.thread_name.as_mut().map(|f| f(thread_id));
6969
let metrics = Arc::new(ThreadMetrics::default());
70-
71-
let task = {
72-
let metrics = Arc::clone(&metrics);
73-
let task_panic_handler = builder.task_panic_handler.clone();
74-
async move {
75-
let stream = rx.stream();
76-
77-
MonitoredFuture::wrap_with_metrics(
78-
Multiplexed::new(
79-
pool_name,
80-
builder.max_concurrency,
81-
stream,
82-
task_panic_handler,
83-
metrics.clone(),
84-
),
85-
metrics.raw_metrics.clone(),
86-
)
87-
.await
88-
}
89-
};
70+
let task = MonitoredFuture::wrap_with_metrics(
71+
Multiplexed::new(
72+
pool_name,
73+
builder.max_concurrency,
74+
rx.into_stream(),
75+
builder.task_panic_handler.clone(),
76+
metrics.clone(),
77+
),
78+
metrics.raw_metrics.clone(),
79+
);
9080

9181
let thread = Thread {
9282
id: thread_id,
@@ -120,7 +110,7 @@ where
120110
/// the pool panicked.
121111
pub fn spawn(&self, future: F) {
122112
assert!(
123-
self.tx.as_sync().send(future).is_ok(),
113+
self.tx.send(future).is_ok(),
124114
"failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)"
125115
);
126116
}
@@ -135,7 +125,7 @@ where
135125
/// the pool panicked.
136126
pub async fn spawn_async(&self, future: F) {
137127
assert!(
138-
self.tx.send(future).await.is_ok(),
128+
self.tx.send_async(future).await.is_ok(),
139129
"failed to schedule task: all worker threads have terminated (either none were spawned or all have panicked)"
140130
);
141131
}

0 commit comments

Comments
 (0)