Skip to content

Commit ee10645

Browse files
committed
Require timeout, use async udp operations (tokio).
1 parent 78f8264 commit ee10645

File tree

4 files changed

+131
-27
lines changed

4 files changed

+131
-27
lines changed

Cargo.toml

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ description = "Get server addresses from QuakeWorld master servers."
44
keywords = ["masters", "quake", "quakeworld", "servers"]
55
repository = "https://github.com/quakeworld/masterstat"
66
authors = ["Viktor Persson <viktor.persson@arcsin.se>"]
7-
version = "0.2.0"
7+
version = "0.3.0"
88
edition = "2021"
99
license = "MIT"
1010
include = [
@@ -21,8 +21,8 @@ include = [
2121
anyhow = "1.0.82"
2222
binrw = "0.14.1"
2323
futures = "0.3.30"
24-
tinyudp = "0.4.0"
25-
tokio = { version = "1.37.0", features = ["rt", "sync"] }
24+
thiserror = "2.0.11"
25+
tokio = { version = "1.37.0", features = ["macros", "net", "rt-multi-thread", "sync", "time"] }
2626

2727
[dev-dependencies]
2828
pretty_assertions = "1.4.0"

src/command.rs

Lines changed: 52 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ use binrw::BinRead;
77
use tokio::sync::Mutex;
88

99
use crate::server_address::{RawServerAddress, ServerAddress};
10+
use crate::tinyudp;
1011

1112
/// Get server addresses from a single master server
1213
///
@@ -15,28 +16,33 @@ use crate::server_address::{RawServerAddress, ServerAddress};
1516
/// ```
1617
/// use std::time::Duration;
1718
///
18-
/// let master = "master.quakeworld.nu:27000";
19-
/// let timeout = Some(Duration::from_secs(2));
20-
/// match masterstat::server_addresses(&master, timeout) {
21-
/// Ok(addresses) => { println!("found {} server addresses", addresses.len()) },
22-
/// Err(e) => { eprintln!("error: {}", e); }
19+
/// async fn test() {
20+
/// let master = "master.quakeworld.nu:27000";
21+
/// let timeout = Duration::from_secs(2);
22+
/// match masterstat::server_addresses(&master, timeout).await {
23+
/// Ok(result) => { println!("found {} server addresses", result.len()) },
24+
/// Err(e) => { eprintln!("error: {}", e); }
25+
/// }
2326
/// }
2427
/// ```
25-
pub fn server_addresses(
28+
pub async fn server_addresses(
2629
master_address: &str,
27-
timeout: Option<Duration>,
30+
timeout: Duration,
2831
) -> Result<Vec<ServerAddress>> {
29-
const MESSAGE: [u8; 3] = [99, 10, 0];
30-
let options = tinyudp::ReadOptions {
31-
timeout,
32-
buffer_size: 16 * 1024, // 16 kb
33-
};
34-
let response = tinyudp::send_and_read(master_address, &MESSAGE, &options)?;
35-
let server_addresses = parse_servers_response(&response)?;
36-
Ok(sorted_and_unique(&server_addresses))
32+
const STATUS_MSG: [u8; 3] = [99, 10, 0];
33+
let response = tinyudp::send_and_receive(
34+
master_address,
35+
&STATUS_MSG,
36+
tinyudp::Options {
37+
timeout,
38+
buffer_size: 64 * 1024, // 64 kb
39+
},
40+
)
41+
.await?;
42+
parse_servers_response(&response)
3743
}
3844

39-
/// Get server addresses from many master servers (async, in parallel)
45+
/// Get server addresses from many master servers (concurrently)
4046
///
4147
/// # Example
4248
///
@@ -45,21 +51,22 @@ pub fn server_addresses(
4551
///
4652
/// async fn test() {
4753
/// let masters = ["master.quakeworld.nu:27000", "master.quakeservers.net:27000"];
48-
/// let timeout = Some(Duration::from_secs(2));
49-
/// let server_addresses = masterstat::server_addresses_from_many(&masters, timeout).await;
54+
/// let timeout = Duration::from_secs(2);
55+
/// let result = masterstat::server_addresses_from_many(&masters, timeout).await;
56+
/// println!("found {} server addresses", result.len());
5057
/// }
5158
/// ```
5259
pub async fn server_addresses_from_many(
5360
master_addresses: &[impl AsRef<str>],
54-
timeout: Option<Duration>,
61+
timeout: Duration,
5562
) -> Vec<ServerAddress> {
5663
let mut task_handles = vec![];
5764
let result_mux = Arc::<Mutex<Vec<ServerAddress>>>::default();
5865

5966
for master_address in master_addresses.iter().map(|a| a.as_ref().to_string()) {
6067
let result_mux = result_mux.clone();
6168
let task = tokio::spawn(async move {
62-
if let Ok(servers) = server_addresses(&master_address, timeout) {
69+
if let Ok(servers) = server_addresses(&master_address, timeout).await {
6370
let mut result = result_mux.lock().await;
6471
result.extend(servers);
6572
}
@@ -99,12 +106,33 @@ fn sorted_and_unique(server_addresses: &[ServerAddress]) -> Vec<ServerAddress> {
99106

100107
#[cfg(test)]
101108
mod tests {
102-
use pretty_assertions::assert_eq;
103-
104109
use super::*;
110+
// use pretty_assertions::assert_eq;
111+
112+
#[tokio::test]
113+
async fn test_server_addresses() -> Result<()> {
114+
let master = "master.quakeservers.net:27000";
115+
let timeout = Duration::from_secs(1);
116+
let result = server_addresses(master, timeout).await?;
117+
assert!(!result.is_empty());
118+
Ok(())
119+
}
105120

106-
#[test]
107-
fn test_parse_servers_response() -> Result<()> {
121+
#[tokio::test]
122+
async fn test_server_addresses_from_many() -> Result<()> {
123+
let masters = [
124+
"master.quakeservers.net:27000",
125+
"master.quakeworld.nu:27000",
126+
"qwmaster.fodquake.net:27000",
127+
];
128+
let timeout = Duration::from_secs(1);
129+
let result = server_addresses_from_many(&masters, timeout).await;
130+
assert!(result.len() > 500);
131+
Ok(())
132+
}
133+
134+
#[tokio::test]
135+
async fn test_parse_servers_response() -> Result<()> {
108136
// invalid response header
109137
{
110138
let response = [0xff, 0xff];

src/lib.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
55
mod command;
66
mod server_address;
7+
mod tinyudp;
78

89
pub use crate::command::server_addresses;
910
pub use crate::command::server_addresses_from_many;

src/tinyudp.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,75 @@
1+
use anyhow::Result;
2+
use std::net::{Ipv4Addr, SocketAddrV4};
3+
use std::time::Duration;
4+
use thiserror::Error;
5+
use tokio::net::{ToSocketAddrs, UdpSocket};
6+
7+
#[derive(Debug, Error)]
8+
pub enum TinyudpError {
9+
#[error("failed to bind socket: {0}")]
10+
BindFailed(#[source] std::io::Error),
11+
12+
#[error("failed to send message: {0}")]
13+
SendFailed(#[source] std::io::Error),
14+
15+
#[error("failed to receive message: {0}")]
16+
ReceiveFailed(#[source] std::io::Error),
17+
18+
#[error("timeout reached while waiting for response")]
19+
TimeoutReached,
20+
}
21+
22+
pub async fn send_and_receive(
23+
target: impl ToSocketAddrs,
24+
message: &[u8],
25+
options: Options,
26+
) -> Result<Vec<u8>, TinyudpError> {
27+
let socket = bind().await?;
28+
socket
29+
.send_to(message, target)
30+
.await
31+
.map_err(TinyudpError::SendFailed)?;
32+
33+
let mut buffer = vec![0; options.buffer_size];
34+
let (bytes_read, _) = tokio::select! {
35+
_ = tokio::time::sleep(options.timeout) => Err(TinyudpError::TimeoutReached),
36+
res = socket.recv_from(&mut buffer) => res.map_err(TinyudpError::ReceiveFailed),
37+
}?;
38+
39+
let response = buffer[..bytes_read].to_vec();
40+
Ok(response)
41+
}
42+
43+
async fn bind() -> Result<UdpSocket, TinyudpError> {
44+
let address = SocketAddrV4::new(Ipv4Addr::UNSPECIFIED, 0);
45+
UdpSocket::bind(address)
46+
.await
47+
.map_err(TinyudpError::BindFailed)
48+
}
49+
50+
#[derive(Debug)]
51+
pub struct Options {
52+
pub timeout: Duration,
53+
pub buffer_size: usize,
54+
}
55+
56+
#[cfg(test)]
57+
mod tests {
58+
use super::*;
59+
use anyhow::Result;
60+
61+
#[tokio::test]
62+
async fn test_send_and_receive() -> Result<()> {
63+
let response = send_and_receive(
64+
"quake.se:28501",
65+
b"\xff\xff\xff\xffstatus",
66+
Options {
67+
timeout: Duration::from_secs_f32(0.2),
68+
buffer_size: 32 * 1024,
69+
},
70+
)
71+
.await?;
72+
assert!(String::from_utf8_lossy(&response).contains("QUAKE.SE KTX"));
73+
Ok(())
74+
}
75+
}

0 commit comments

Comments
 (0)