Skip to content

Commit 98ec089

Browse files
committed
s3tftpd-healthz: simplify after lifetime puzzle...
1 parent f711139 commit 98ec089

File tree

1 file changed

+38
-164
lines changed

1 file changed

+38
-164
lines changed

s3tftpd-healthz/src/main.rs

Lines changed: 38 additions & 164 deletions
Original file line numberDiff line numberDiff line change
@@ -1,189 +1,63 @@
11
#[tokio::main]
22
async fn main() {
33
tracing_subscriber::fmt::init();
4-
let app = axum::Router::new().route("/ping", axum::routing::get(ping));
4+
let app = axum::Router::new().route("/ping", axum::routing::get(get_ping));
55
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], 8080));
66
let listener = tokio::net::TcpListener::bind(&addr).await.unwrap();
77
axum::serve(listener, app).await.unwrap();
88
}
99

10-
async fn ping() -> (axum::http::StatusCode, &'static str) {
11-
match run_ping().await {
12-
Ok(_) => (axum::http::StatusCode::OK, "ok"),
10+
async fn get_ping() -> (axum::http::StatusCode, &'static str) {
11+
match tokio::time::timeout(std::time::Duration::from_secs(5), do_ping()).await {
12+
Ok(Ok(_)) => (axum::http::StatusCode::OK, "ok"),
1313
Err(e) => {
14+
tracing::error!(e=?e,"Timeout");
15+
(axum::http::StatusCode::GATEWAY_TIMEOUT, "timeout")
16+
}
17+
Ok(Err(e))=> {
1418
tracing::error!(e=?e,"Error");
1519
(axum::http::StatusCode::INTERNAL_SERVER_ERROR, "not ok")
1620
}
1721
}
1822
}
1923

20-
async fn run_ping() -> Result<(), anyhow::Error> {
21-
let data = start().await?;
22-
let s = String::from_utf8_lossy(&data);
23-
if !s.contains(r#"{"ok": true}"#) {
24-
tracing::error!(s=?s, "not ok");
25-
return Err(anyhow::anyhow!("not ok..."));
26-
}
27-
Ok(())
28-
}
29-
30-
#[derive(Debug)]
31-
struct State {
32-
data: Vec<u8>,
33-
sock: tokio::net::UdpSocket,
34-
tx: Option<Vec<u8>>,
35-
awaiting_data: ttftp::client::download::AwaitingData,
36-
addr: Option<std::net::SocketAddr>,
37-
}
38-
39-
#[derive(Debug)]
40-
enum Next {
41-
//AwaitingData(Option<Vec<u8>>, ttftp::client::download::AwaitingData),
42-
Intermediate(
43-
Option<Vec<u8>>,
44-
ttftp::client::download::AwaitingData,
45-
Vec<u8>,
46-
),
47-
Final(Option<Vec<u8>>, Vec<u8>),
48-
Retransmission(Option<Vec<u8>>, ttftp::client::download::AwaitingData),
49-
Error(Option<Vec<u8>>, anyhow::Error),
50-
}
51-
52-
fn process(rx: &[u8], next: ttftp::client::download::AwaitingData) -> Next {
24+
async fn do_ping() -> Result<(), anyhow::Error> {
5325
let mut tx = [0u8; ttftp::PACKET_SIZE];
54-
let (result, send) = next.process(rx, &mut tx);
55-
let tx_vec = if let Some(send) = send {
56-
Some(tx[..send].to_vec())
57-
} else {
58-
None
59-
};
60-
let retval = match result {
61-
Ok(ttftp::client::download::BlockReceived::Intermediate(awaiting_data, block)) => {
62-
Next::Intermediate(tx_vec, awaiting_data, block.to_vec())
63-
}
64-
Ok(ttftp::client::download::BlockReceived::Final(block)) => {
65-
Next::Final(tx_vec, block.to_vec())
66-
}
67-
Ok(ttftp::client::download::BlockReceived::Retransmission(state)) => {
68-
Next::Retransmission(tx_vec, state)
69-
}
70-
Err(e) => Next::Error(tx_vec, anyhow::anyhow!(e.to_string())),
71-
};
72-
tracing::trace!(next=?retval, "process");
73-
retval
74-
}
26+
let mut rx = [0u8; ttftp::PACKET_SIZE];
27+
let filename = c"ping";
28+
let (mut state, send) = ttftp::client::download::new(&mut tx, filename, ttftp::Mode::Octect)?;
7529

76-
async fn start() -> Result<Vec<u8>, anyhow::Error> {
77-
let mut tx = [0u8; ttftp::PACKET_SIZE];
78-
let filename = std::ffi::CStr::from_bytes_with_nul(b"ping\0").unwrap();
79-
let (awaiting_data, send) =
80-
ttftp::client::download::new(&mut tx, filename, ttftp::Mode::Octect)?;
30+
let mut data = vec![];
8131
let sock = tokio::net::UdpSocket::bind((std::net::Ipv4Addr::new(127, 0, 0, 1), 0)).await?;
82-
8332
sock.send_to(&tx[..send], (std::net::Ipv4Addr::new(127, 0, 0, 1), 69))
8433
.await?;
85-
86-
let data = perform(State {
87-
data: vec![],
88-
sock,
89-
tx: None,
90-
awaiting_data,
91-
addr: None,
92-
})
93-
.await?;
94-
95-
Ok(data)
96-
}
97-
98-
async fn perform(state: State) -> Result<Vec<u8>, anyhow::Error> {
99-
tracing::trace!(state=?state, "perform");
100-
let sock = state.sock;
101-
if let Some(pend_tx) = state.tx {
102-
tracing::trace!(pend_tx=?pend_tx, "send");
103-
sock.send_to(&pend_tx, state.addr.unwrap()).await?;
104-
}
105-
let mut data = state.data;
106-
let awaiting_data = state.awaiting_data;
107-
108-
tracing::trace!("recv...");
109-
let mut rx = [0u8; ttftp::PACKET_SIZE];
110-
let (received, addr) = sock.recv_from(&mut rx).await?;
111-
let addr = Some(addr);
112-
let mut retry = false;
113-
if state.addr.is_none() {
114-
if addr.unwrap().ip() != std::net::Ipv4Addr::new(127, 0, 0, 1) {
115-
tracing::warn!(addr=?addr, "Unexpected addr");
116-
retry = true;
117-
} else {
118-
}
119-
tracing::trace!(remote_addr=?addr, "addr");
120-
} else if state.addr != addr {
121-
tracing::warn!(addr=?addr, state_addr=?state.addr, "Unexpected addr");
122-
retry = true
123-
}
124-
if retry {
125-
return Box::pin(perform(State {
126-
data,
127-
sock,
128-
tx: None,
129-
awaiting_data,
130-
addr,
131-
}))
132-
.await;
133-
}
134-
tracing::trace!(rx=?rx[..received], "recv");
135-
let next = process(&rx[..received], awaiting_data);
136-
match next {
137-
Next::Intermediate(tx, awaiting_data, block) => {
138-
data.extend_from_slice(&block);
139-
return Box::pin(perform(State {
140-
data,
141-
sock,
142-
tx,
143-
awaiting_data,
144-
addr,
145-
}))
146-
.await;
147-
}
148-
Next::Final(tx, block) => {
149-
send_maybe(tx.as_deref(), addr.as_ref(), &sock).await?;
150-
data.extend_from_slice(&block);
151-
return Ok(data);
152-
}
153-
Next::Retransmission(tx, awaiting_data) => {
154-
return Box::pin(perform(State {
155-
data,
156-
sock,
157-
tx,
158-
awaiting_data,
159-
addr,
160-
}))
161-
.await
162-
}
163-
Next::Error(tx, e) => {
164-
send_maybe(tx.as_deref(), addr.as_ref(), &sock).await?;
165-
tracing::error!(e=?e, "Error");
166-
return Err(e.into());
167-
}
168-
}
169-
}
170-
171-
async fn send_maybe(
172-
buf: Option<&[u8]>,
173-
addr: Option<&std::net::SocketAddr>,
174-
sock: &tokio::net::UdpSocket,
175-
) -> Result<(), anyhow::Error> {
176-
match (buf, addr) {
177-
(Some(buf), Some(addr)) => {
178-
if let Err(e) = sock.send_to(buf, addr).await {
179-
tracing::warn!(e=?e, "Error sending (send_maybe)");
34+
let mut raddr = None;
35+
36+
loop {
37+
let (received, paddr) = sock.recv_from(&mut rx).await?;
38+
if raddr.is_none() {
39+
raddr = Some(paddr);
40+
} else if raddr != Some(paddr) {
41+
tracing::error!(remote=?raddr,received=?paddr,"received packet from unexpected address");
42+
continue;
43+
}
44+
let (result, send) = state.process(&rx[..received], &mut tx);
45+
if let Some(send) = send {
46+
sock.send_to(&tx[..send], raddr.unwrap()).await?;
47+
}
48+
state = match result {
49+
Ok(ttftp::client::download::BlockReceived::Intermediate(awaiting_data, block)) => {
50+
data.extend_from_slice(block);
51+
awaiting_data
18052
}
53+
Ok(ttftp::client::download::BlockReceived::Final(block)) => {
54+
data.extend_from_slice(block);
55+
break;
56+
}
57+
Ok(ttftp::client::download::BlockReceived::Retransmission(state)) => state,
58+
Err(e) => return Err(anyhow::anyhow!(e.to_string())),
18159
}
182-
(Some(_), None) => {
183-
tracing::warn!("missing addr for send_maybe");
184-
}
185-
(None, Some(_)) => {}
186-
(None, None) => {}
18760
}
61+
18862
Ok(())
18963
}

0 commit comments

Comments
 (0)