Skip to content

Commit b2a3353

Browse files
committed
refactor: expose more precise server health messages to callback receiver
1 parent f9e8a6a commit b2a3353

File tree

3 files changed

+43
-28
lines changed

3 files changed

+43
-28
lines changed

crates/base/src/commands.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::{
22
rt_worker::{worker_ctx::TerminationToken, worker_pool::WorkerPoolPolicy},
3-
server::{Server, ServerCodes, WorkerEntrypoints},
3+
server::{Server, ServerHealth, WorkerEntrypoints},
44
};
55
use anyhow::Error;
66
use tokio::sync::mpsc::Sender;
@@ -14,7 +14,7 @@ pub async fn start_server(
1414
user_worker_policy: Option<WorkerPoolPolicy>,
1515
import_map_path: Option<String>,
1616
no_module_cache: bool,
17-
callback_tx: Option<Sender<ServerCodes>>,
17+
callback_tx: Option<Sender<ServerHealth>>,
1818
entrypoints: WorkerEntrypoints,
1919
termination_token: Option<TerminationToken>,
2020
) -> Result<(), Error> {

crates/base/src/macros/test_macros.rs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,11 @@
11
#[macro_export]
22
macro_rules! integration_test {
33
($main_file:expr, $port:expr, $url:expr, ($($function:tt)+) $(, $termination_token: expr)?) => {
4-
let (tx, mut rx) = tokio::sync::mpsc::channel::<base::server::ServerCodes>(1);
4+
let (tx, mut rx) = tokio::sync::mpsc::channel::<base::server::ServerHealth>(1);
55

66
let signal = tokio::spawn(async move {
7-
while let Some(base::server::ServerCodes::Listening) = rx.recv().await {
8-
integration_test!(@req $port, $url, ($($function)+));
7+
while let Some(base::server::ServerHealth::Listening(event_rx)) = rx.recv().await {
8+
integration_test!(@req event_rx, $port, $url, ($($function)+));
99
}
1010
None
1111
});
@@ -47,11 +47,11 @@ macro_rules! integration_test {
4747
None
4848
};
4949

50-
(@req $port:expr, $url:expr, ($req:expr, $_:expr)) => {
51-
return $req($port, $url).await;
50+
(@req $event_rx:ident, $port:expr, $url:expr, ($req:expr, $_:expr)) => {
51+
return $req($port, $url, $event_rx).await;
5252
};
5353

54-
(@req $port:expr, $url:expr, $_:expr) => {
54+
(@req $_:ident, $port:expr, $url:expr, $__:expr) => {
5555
let req = reqwest::get(format!("http://localhost:{}/{}", $port, $url)).await;
5656
return Some(req);
5757
};

crates/base/src/server.rs

Lines changed: 35 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,12 @@ use tokio::sync::mpsc::Sender;
2323
use tokio::sync::{mpsc, oneshot, watch};
2424
use tokio_util::sync::CancellationToken;
2525

26-
pub enum ServerCodes {
27-
Listening,
26+
pub enum ServerEvent {
27+
ConnectionError(hyper::Error),
28+
}
29+
30+
pub enum ServerHealth {
31+
Listening(mpsc::UnboundedReceiver<ServerEvent>),
2832
Failure,
2933
}
3034

@@ -158,7 +162,7 @@ pub struct Server {
158162
ip: Ipv4Addr,
159163
port: u16,
160164
main_worker_req_tx: mpsc::UnboundedSender<WorkerRequestMsg>,
161-
callback_tx: Option<Sender<ServerCodes>>,
165+
callback_tx: Option<Sender<ServerHealth>>,
162166
termination_token: TerminationToken,
163167
}
164168

@@ -172,7 +176,7 @@ impl Server {
172176
maybe_user_worker_policy: Option<WorkerPoolPolicy>,
173177
import_map_path: Option<String>,
174178
no_module_cache: bool,
175-
callback_tx: Option<Sender<ServerCodes>>,
179+
callback_tx: Option<Sender<ServerHealth>>,
176180
entrypoints: WorkerEntrypoints,
177181
termination_token: Option<TerminationToken>,
178182
) -> Result<Self, Error> {
@@ -237,10 +241,14 @@ impl Server {
237241
let listener = TcpListener::bind(&addr).await?;
238242
let termination_token = self.termination_token.clone();
239243

244+
let mut can_receive_event = false;
245+
let (event_tx, event_rx) = mpsc::unbounded_channel();
246+
240247
debug!("edge-runtime is listening on {:?}", listener.local_addr()?);
241248

242249
if let Some(callback) = self.callback_tx.clone() {
243-
let _ = callback.send(ServerCodes::Listening).await;
250+
can_receive_event = true;
251+
let _ = callback.send(ServerHealth::Listening(event_rx)).await;
244252
}
245253

246254
loop {
@@ -250,21 +258,28 @@ impl Server {
250258
msg = listener.accept() => {
251259
match msg {
252260
Ok((conn, _)) => {
253-
tokio::task::spawn(async move {
254-
let (service, cancel) = WorkerService::new(main_worker_req_tx);
255-
let _guard = cancel.drop_guard();
256-
257-
let conn_fut = Http::new()
258-
.serve_connection(conn, service);
259-
260-
if let Err(e) = conn_fut.await {
261-
// Most common cause for these errors are
262-
// when the client closes the connection
263-
// before we could send a response
264-
if e.is_incomplete_message() {
265-
debug!("connection reset ({:?})", e);
266-
} else {
267-
error!("client connection error ({:?})", e);
261+
tokio::task::spawn({
262+
let event_tx = event_tx.clone();
263+
async move {
264+
let (service, cancel) = WorkerService::new(main_worker_req_tx);
265+
let _guard = cancel.drop_guard();
266+
267+
let conn_fut = Http::new()
268+
.serve_connection(conn, service);
269+
270+
if let Err(e) = conn_fut.await {
271+
// Most common cause for these errors are
272+
// when the client closes the connection
273+
// before we could send a response
274+
if e.is_incomplete_message() {
275+
debug!("connection reset ({:?})", e);
276+
} else {
277+
error!("client connection error ({:?})", e);
278+
}
279+
280+
if can_receive_event {
281+
let _ = event_tx.send(ServerEvent::ConnectionError(e));
282+
}
268283
}
269284
}
270285
});

0 commit comments

Comments
 (0)