Skip to content

Commit 63c925c

Browse files
committed
async executor queue
1 parent a7095e1 commit 63c925c

File tree

4 files changed

+60
-22
lines changed

4 files changed

+60
-22
lines changed

CHANGELOG.md

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,10 @@ All notable changes to this project will be documented in this file.
44
The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
55
and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
66

7+
## [0.2.0] - 2025-09-05
8+
### Changed
9+
- Use async executor to allow for long running CPU profile and allocs to run concurrently (works with pyroscope for example)
10+
711
## [0.1.4] - 2025-08-20
812
### Fixed
913
- Don't wait for the heap lock if already taken

Cargo.lock

Lines changed: 25 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,7 @@ authors = ["killzoner"]
33
edition = "2024"
44
name = "pprof_hyper_server"
55
resolver = "3"
6-
version = "0.1.4"
6+
version = "0.2.0"
77

88
description = "A minimal pprof server implementation using hyper without runtime dependency"
99
documentation = "https://docs.rs/pprof_hyper_server"
@@ -14,11 +14,11 @@ repository = "https://github.com/killzoner/pprof-hyper-server"
1414
[dependencies]
1515
anyhow = { version = "1.0.99", default-features = false }
1616
async-channel = { version = "2.5.0", default-features = false }
17+
async-executor = { version = "1.13.3", default-features = false }
1718
async-io = { version = "2.5.0", default-features = false }
1819
form_urlencoded = { version = "1.2.2", default-features = false, features = [
1920
"alloc",
2021
] }
21-
futures-lite = { version = "2.6.1", default-features = false }
2222
http-body-util = { version = "0.1.3", default-features = false }
2323
hyper = { version = "1.7.0", default-features = false, features = [
2424
"server",

src/lib.rs

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,8 @@
77

88
use anyhow::Result;
99
use async_channel::bounded;
10+
use async_executor::Executor;
1011
use async_io::Async;
11-
use futures_lite::future;
1212
use http_body_util::Full;
1313
use hyper::{
1414
Method, Request, Response, StatusCode, Uri,
@@ -102,7 +102,6 @@ impl Task<'_> {
102102
);
103103

104104
let blocklist = self.config.pprof_blocklist.unwrap_or(PPROF_BLOCKLIST);
105-
106105
let guard = ProfilerGuardBuilder::default()
107106
.frequency(profile_sampling)
108107
.blocklist(blocklist)
@@ -183,14 +182,29 @@ pub async fn serve<'a>(bind_address: SocketAddr, config: Config<'a>) -> Result<(
183182
let listener = Async::<TcpListener>::bind(bind_address)?;
184183
let (s, r) = bounded::<Task>(MAX_CONCURRENT_REQUESTS);
185184
let config = Arc::new(config);
185+
let ex = Arc::new(Executor::new());
186186

187-
loop {
188-
// stack max MAX_CONCURRENT_REQUESTS requests, prefering stacking than answering to them.
189-
// if we cannot stack anymore, drop the connection and other pending requests.
190-
// we don't need a multi threaded server to serve pprof server, but don't want it to be a source of DDOS.
191-
future::or(
192-
async {
193-
// Wait for a new client.
187+
ex.spawn({
188+
let ex = ex.clone();
189+
async move {
190+
loop {
191+
if let Ok(task) = r.recv().await {
192+
ex.spawn(async {
193+
task.handle_client().await.unwrap_or_default();
194+
})
195+
.detach();
196+
}
197+
}
198+
}
199+
})
200+
.detach();
201+
202+
ex.run({
203+
async move {
204+
// stack max MAX_CONCURRENT_REQUESTS requests
205+
// if we cannot add more tasks, drop the connection
206+
// we don't need a multi threaded server to serve pprof server, but don't want it to be a source of DOS.
207+
loop {
194208
let listener = listener.accept().await;
195209
if let Ok((client, _)) = listener {
196210
let task = Task {
@@ -201,13 +215,10 @@ pub async fn serve<'a>(bind_address: SocketAddr, config: Config<'a>) -> Result<(
201215
// we ignore the potential error as it would mean we should drop the connection if channel is full.
202216
let _ = s.try_send(task);
203217
}
204-
},
205-
async {
206-
if let Ok(task) = r.recv().await {
207-
task.handle_client().await.unwrap_or_default();
208-
}
209-
},
210-
)
211-
.await;
212-
}
218+
}
219+
}
220+
})
221+
.await;
222+
223+
Ok(())
213224
}

0 commit comments

Comments
 (0)