diff --git a/Cargo.toml b/Cargo.toml index 0a4baf0..d5b4b2e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -2,7 +2,10 @@ name = "ntrip-client" version = "0.0.1" license = "MPL-2.0" -authors = ["Guillaume W. Bres "] +authors = [ + "Guillaume W. Bres ", + "Ryan Kurte " +] description = "NTRIP client" homepage = "https://github.com/rtk-rs" repository = "https://github.com/rtk-rs/ntrip-client" @@ -19,14 +22,36 @@ all-features = true rustdoc-args = ["--cfg", "docrs", "--generate-link-to-definition"] [features] -default = [] +default = [ "log", "clap", "serde", "anyhow" ] # Unlock client logs -log = ["dep:log"] +log = ["dep:tracing", "dep:tracing-subscriber"] +clap = ["dep:clap"] +serde = ["dep:serde", "rtcm-rs/serde", "geoutils/serde"] [dependencies] thiserror = "2" base64 = "0.22" rtcm-rs = "0.11" -log = { version = "0.4", optional = true } +futures = "0.3" tokio = { version = "1.45.0", features = ["full"] } +strum = { version = "0.27.2", features = ["derive"] } +reqwest = { version = "0.12", features = ["rustls-tls"] } +http = "1.3.1" +tokio-rustls = "0.26.2" +rustls = "0.23.31" +webpki-roots = "1.0.2" +geoutils = "0.5.1" +isocountry = "0.3.2" + +tracing = { version = "0.1.41", optional = true, features = ["log"] } +tracing-subscriber = { version = "0.3.17", optional = true, features = ["fmt", "env-filter"] } +clap = { version = "4.5", optional = true, features = ["derive", "env"] } +serde = { version = "1.0", optional = true, features = ["derive"] } +anyhow = { version = "1.0.0", optional = true } + +[[bin]] +name = "ntrip-cli" +path = "src/bin/ntrip-cli.rs" +required-features = ["clap", "log", "anyhow"] + diff --git a/README.md b/README.md index efdaa29..561d273 100644 --- a/README.md +++ b/README.md @@ -23,16 +23,46 @@ Getting started ntrip-client = "0.0.1" ``` +See [src/bin/ntrip-cli.rs](src/bin/ntrip-cli.rs) for a complete example. + ```rust -let mut client = NTRIPClient::new("caster.centipede.fr", 2101, "ENSMM") - .with_credentials("centipede", "centipede"); - -// deploy using 'tokio' framework -client.run() - .await - .unwrap_or_else(|e| { - panic!("Failed to deploy NTRIP client: {}", e); - }); +// Configure server +let ntrip_config = "centipede".parse::(); +let ntrip_creds = NtripCredentials{ + user: "centipede".to_string(), + pass: "centipede".to_string(), +} + +// Setup client +let mut client = NtripClient::new(ntrip_config, ntrip_creds).await.unwrap(); + +// List mounts +let server_info = client.list_mounts().await.unwrap(); +for m in server_info.mounts { + println!("{} - {}", m.name, m.details); +} + +// Subscribe to a mount +let (exit_tx, exit_rx) = tokio::sync::broadcast(1); +let handle = client.mount("VALDM", exit_tx.clone()); + +loop { + select!{ + m = client.next() => match m { + Some(m) => { + info!("Received RTCM message: {:?}", m); + }, + None => { + error!("NTRIP client stream ended"); + break; + } + }, + _ = exit_rx.recv() => { + info!("Exiting on signal"); + break; + } + } +} ``` Licensing diff --git a/src/bin/ntrip-cli.rs b/src/bin/ntrip-cli.rs new file mode 100644 index 0000000..c62a545 --- /dev/null +++ b/src/bin/ntrip-cli.rs @@ -0,0 +1,149 @@ +use clap::Parser; +use futures::StreamExt; +use geoutils::Location; +use ntrip_client::{ + config::{NtripConfig, NtripCredentials}, + NtripClient, +}; +use tokio::select; +use tracing::{debug, error, info, level_filters::LevelFilter}; +use tracing_subscriber::{fmt::Subscriber as FmtSubscriber, EnvFilter}; + +/// NTRIP command line tool +#[derive(Clone, PartialEq, Debug, Parser)] +struct Args { + #[clap()] + /// NTRIP server identifier or URI ("rtk2go", "linz" etc., or "[ntrip|http|https]://host:port") + pub ntrip_host: NtripConfig, + + #[clap(flatten)] + pub ntrip_creds: NtripCredentials, + + #[clap(subcommand)] + pub command: Commands, + + #[clap(long, default_value = "info")] + /// Set log level + pub log_level: LevelFilter, +} + +#[derive(Clone, PartialEq, Debug, Parser)] +pub enum Commands { + /// List mount points on an NTRIP server + List, + /// Find the nearest mount point to a specified location + FindNearest { + #[clap()] + lat: f64, + #[clap()] + lon: f64, + }, + /// Subscribe to a specified mount point and print received RTCM messages + Subscribe { + #[clap()] + mount: String, + }, +} + +#[tokio::main] +async fn main() -> Result<(), anyhow::Error> { + // Parse command line arguments + let args = Args::parse(); + + // Setup logging + let filter = EnvFilter::from_default_env().add_directive(args.log_level.into()); + let _ = FmtSubscriber::builder() + .compact() + .without_time() + .with_max_level(args.log_level) + .with_env_filter(filter) + .try_init(); + + info!("Start NTRIP/RTMP tool"); + + debug!("Args {args:?}"); + + // Setup interrupt / exit handler + let (exit_tx, mut exit_rx) = tokio::sync::broadcast::channel(1); + let e = exit_tx.clone(); + tokio::task::spawn(async move { + tokio::signal::ctrl_c().await.unwrap(); + debug!("Received Ctrl-C, shutting down..."); + e.send(()).unwrap(); + }); + + let mut client = NtripClient::new(args.ntrip_host.clone(), args.ntrip_creds.clone()).await?; + + match args.command { + Commands::List => { + // List available NTRIP mounts using SNIP + info!("Listing NTRIP mounts"); + + let info = client.list_mounts().await.unwrap(); + + for s in info.services { + info!( + "{} - {} ({:.3}, {:.3})", + s.name, + s.details, + s.location.latitude(), + s.location.longitude() + ); + } + }, + Commands::FindNearest { lat, lon } => { + // Find the nearest NTRIP mount to the specified location + info!("Finding nearest NTRIP mount to ({}, {})", lat, lon); + + let info = client.list_mounts().await.unwrap(); + + let target_location = Location::new(lat, lon); + + match info.find_nearest(&target_location) { + Some((s, d)) => { + info!( + "Nearest mount: {} - {} ({:.3}, {:.3}), {:.3} km away", + s.name, + s.details, + s.location.latitude(), + s.location.longitude(), + d / 1000.0 + ); + }, + None => { + info!("No mounts found"); + }, + } + }, + Commands::Subscribe { mount } => { + // Subscribe to the specified NTRIP mount + debug!("Connecting to NTRIP server"); + + // Setup the NTRIP client + let mut client = client.mount(mount, exit_tx.clone()).await?; + + // Process incoming RTCM messages + loop { + select! { + m = client.next() => match m { + Some(m) => { + info!("Received RTCM message: {:?}", m); + }, + None => { + error!("NTRIP client stream ended"); + break; + } + }, + _ = exit_rx.recv() => { + info!("Exiting on signal"); + break; + } + } + } + }, + } + + debug!("Exiting"); + + Ok(()) +} diff --git a/src/client.rs b/src/client.rs new file mode 100644 index 0000000..9955a41 --- /dev/null +++ b/src/client.rs @@ -0,0 +1,405 @@ +//! NTRIP Client implementation + +use std::sync::Arc; + +use base64::{engine::general_purpose, Engine as _}; +use futures::Stream; +use http::{ + header::{InvalidHeaderValue, ToStrError, USER_AGENT}, + HeaderMap, HeaderValue, Method, +}; +use rtcm_rs::{Message, MessageFrame}; +use rustls::pki_types::{InvalidDnsNameError, ServerName}; +use tokio::{ + io::{AsyncRead, AsyncReadExt as _, AsyncWrite, AsyncWriteExt}, + net::TcpStream, + select, + sync::{ + broadcast::Sender as BroadcastSender, + mpsc::{unbounded_channel, UnboundedReceiver}, + }, + task::JoinHandle, +}; +use tokio_rustls::TlsConnector; +use tracing::{debug, error, trace, warn}; + +use crate::{ + config::{NtripConfig, NtripCredentials}, + snip::ServerInfo, +}; + +/// NTRIP Client, used to connect to an NTRIP (RTCM) service +pub struct NtripClient { + config: NtripConfig, + creds: NtripCredentials, +} + +/// NTRIP Mount handle, used to stream RTCM messages from an NTRIP service +pub struct NtripHandle { + _rx_handle: tokio::task::JoinHandle<()>, + ntrip_rx: UnboundedReceiver, +} + +#[derive(Debug, thiserror::Error)] +pub enum NtripClientError { + #[error("Io error: {0}")] + Io(#[from] std::io::Error), + + #[error("Reqwest error: {0}")] + Reqwest(#[from] reqwest::Error), + + #[error("Invalid header value {0}")] + InvalidHeaderValue(#[from] InvalidHeaderValue), + + #[error("Invalid DNS name {0}")] + InvalidDnsName(#[from] InvalidDnsNameError), + + #[error("Header ToStrError error {0}")] + ToStrError(#[from] ToStrError), + + #[error("Response error")] + ResponseError(String), +} + +impl NtripClient { + pub async fn new( + config: NtripConfig, + creds: NtripCredentials, + ) -> Result { + Ok(NtripClient { config, creds }) + } + + /// List available mounts on the NTRIP server + pub async fn list_mounts(&mut self) -> Result { + let client = reqwest::Client::builder() + .http1_ignore_invalid_headers_in_responses(true) + .http09_responses() + .user_agent(format!( + "NTRIP {}/{}", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_VERSION") + )) + .build()?; + + // TODO: auth etc. + let proto = if self.config.use_tls { "https" } else { "http" }; + + let req = client + .request( + Method::GET, + format!("{}://{}:{}", proto, self.config.host, self.config.port), + ) + .header("Ntrip-Version", "NTRIP/2.0") + .build()?; + + let res = client.execute(req).await?; + + debug!("Fetched NTRIP response: {:?}", res.status()); + + let body = res.text().await?; + + let lines = body.lines().collect::>(); + + let snip_info = ServerInfo::parse(lines.iter().cloned()); + + Ok(snip_info) + } + + pub async fn mount( + &mut self, + mount: impl ToString, + exit_tx: BroadcastSender<()>, + ) -> Result { + debug!( + "Connecting to NTRIP server {}/{}", + self.config.url(), + mount.to_string() + ); + + let sock = TcpStream::connect(&self.config.url()).await?; + + let (rx_handle, ntrip_rx) = match self.config.use_tls { + true => { + debug!("Using TLS connection"); + + let mut root_cert_store = rustls::RootCertStore::empty(); + root_cert_store.extend(webpki_roots::TLS_SERVER_ROOTS.iter().cloned()); + + let tls_config = rustls::ClientConfig::builder() + .with_root_certificates(root_cert_store) + .with_no_client_auth(); + let connector = TlsConnector::from(Arc::new(tls_config)); + let dnsname = ServerName::try_from(self.config.host.clone())?; + + let tls_sock = connector.connect(dnsname, sock).await?; + + Self::handle_connection( + &self.config, + &self.creds, + &mount.to_string(), + exit_tx.clone(), + tls_sock, + ) + .await? + }, + false => { + debug!("Using plain TCP connection"); + + Self::handle_connection( + &self.config, + &self.creds, + &mount.to_string(), + exit_tx.clone(), + sock, + ) + .await? + }, + }; + + Ok(NtripHandle { + _rx_handle: rx_handle, + ntrip_rx, + }) + } + + pub async fn handle_connection( + config: &NtripConfig, + creds: &NtripCredentials, + mount: &str, + exit_tx: BroadcastSender<()>, + mut sock: impl AsyncRead + AsyncWrite + Unpin + Send + 'static, + ) -> Result<(JoinHandle<()>, UnboundedReceiver), NtripClientError> { + // Setup HTTP headers + let mut headers = HeaderMap::new(); + headers.append( + USER_AGENT, + HeaderValue::from_str(&format!( + "NTRIP {}/{}", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_VERSION") + ))?, + ); + + headers.append("Ntrip-Version", HeaderValue::from_static("NTRIP/2.0")); + headers.append("Accept", HeaderValue::from_static("*/*")); + headers.append("Connection", HeaderValue::from_static("close")); + + // If we have credentials, add the Authorization header + if !creds.user.is_empty() { + let auth = general_purpose::STANDARD.encode(format!("{}:{}", creds.user, creds.pass)); + headers.append( + "Authorization", + HeaderValue::from_str(&format!("Basic {}", auth))?, + ); + } + + debug!("Headers: {:#?}", headers); + + // Write HTTP request + debug!("Write HTTP request"); + sock.write_all(format!("GET /{} HTTP/1.0\r\n", mount).as_bytes()) + .await?; + sock.write_all(format!("Host: {}\r\n", config.url()).as_bytes()) + .await?; + + // Write HTTP headers + debug!("Writing headers"); + for h in headers.iter() { + sock.write_all(format!("{}: {}\r\n", h.0.as_str(), h.1.to_str()?).as_bytes()) + .await?; + } + + sock.write_all(b"\r\n").await?; + sock.flush().await?; + + debug!("Reading response"); + let mut buff = Vec::with_capacity(1024); + + // Perform a first read to get the response status + let n = sock.read_buf(&mut buff).await?; + debug!("Read {} bytes, current buffer {} bytes", n, buff.len()); + + // Parse out response status + let r = String::from_utf8_lossy(&buff[..n]); + match r.lines().next() { + Some(status) if status.contains("200 OK") => { + debug!("Got 200 OK response"); + }, + Some(status) => { + error!("NTRIP server returned error: {}", status); + return Err(NtripClientError::ResponseError(status.to_string())); + }, + None => { + error!("NTRIP server returned empty response"); + return Err(NtripClientError::ResponseError("empty response".into())); + }, + } + + // Flush buffer until the first RTCM message (0xd3) + if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) { + debug!( + "Trimming buffer to next potential frame start at index {}", + i.0 + ); + let _ = buff.drain(..i.0); + } + + // Spawn a task to handle incoming NTRIP data + + let (ntrip_tx, ntrip_rx) = unbounded_channel(); + let mut exit_rx = exit_tx.subscribe(); + let rx_handle = tokio::task::spawn(async move { + // Track parse errors so we can drop data (or abort) if needed + let mut error_count = 0; + + 'listener: loop { + select! { + n = sock.read_buf(&mut buff) => match n { + Ok(n) => { + debug!("Read {} bytes, current buffer {} bytes", n, buff.len()); + trace!("Appended {:02x?}", &buff[buff.len()-n..][..n]); + + // Handle zero length read (connection closed) + if n == 0 { + warn!("Zero length response"); + break 'listener; + } + + // Trim any non-message data from the start of the buffer + if buff[0] != 0xd3 { + if let Some(i) = buff.iter().enumerate().find(|(_i, b)| **b == 0xd3) { + warn!("Trimming buffer to next potential frame start at index {}", i.0); + buff.drain(..i.0); + + assert_eq!(buff[0], 0xd3); + } + } + + // While we have enough data for a header, + // parse out RTCM messages + while buff.len() > 6 { + // Attempt to parse frames + match MessageFrame::new(&buff[..]) { + Ok(f) => { + // Parse out message from frame + let m = f.get_message(); + + debug!("Parsed RTCM message: {:?} (consumed {} bytes)", m, f.frame_len()); + + // Emit message + ntrip_tx.send(m).unwrap(); + + // Remove parsed data from the buffer + let _ = buff.drain(..f.frame_len()); + + // Reset error counter + error_count = 0; + }, + Err(e) => { + warn!("RTCM parse error: {} (count: {})", e, error_count); + + // Update error counter + error_count += 1; + + // If we keep getting errors, abort the connection + if error_count >= 5 { + error!("Too many parse errors, closing connection"); + break 'listener; + } + + break; + } + } + } + }, + Err(e) => { + error!("socket read error: {}", e); + break; + }, + }, + _ = exit_rx.recv() => { + error!("Exiting NTRIP read loop on signal"); + break; + } + } + } + + warn!("NTRIP read loop exiting"); + + if !buff.is_empty() { + warn!("Dropping {} bytes of unparsed data", buff.len()); + + if let Ok(s) = String::from_utf8(buff) { + debug!("Unparsed data:\r\n{}", s); + } + } + }); + + Ok((rx_handle, ntrip_rx)) + } +} + +/// [Stream] NTRIP [Message]'s from an [NtripHandle] +impl Stream for NtripHandle { + type Item = Message; + + fn poll_next( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.ntrip_rx.poll_recv(cx) + } +} + +#[cfg(test)] +mod tests { + use std::env; + + use futures::StreamExt; + use tracing::debug; + + use super::*; + use crate::config::NtripCredentials; + + fn setup_logging() { + let _ = tracing_subscriber::FmtSubscriber::builder() + .compact() + .without_time() + .with_max_level(tracing::level_filters::LevelFilter::DEBUG) + .try_init(); + } + + #[tokio::test] + #[ignore = "Requires NTRIP config from the environment"] + async fn test_ntrip_client() { + setup_logging(); + + debug!("Connecting to NTRIP server"); + + let (exit_tx, _exit_rx) = tokio::sync::broadcast::channel(1); + + let mount = env::var("NTRIP_MOUNT").unwrap_or("ARGOACU".to_string()); + let config = env::var("NTRIP_HOST") + .unwrap_or("rtk2go".to_string()) + .parse::() + .unwrap(); + let creds = NtripCredentials { + user: env::var("NTRIP_USER").unwrap_or("user".into()), + pass: env::var("NTRIP_PASS").unwrap_or("pass".into()), + }; + + let mut client = NtripClient::new(config, creds).await.unwrap(); + + let mut h = client + .mount(mount.to_string(), exit_tx.clone()) + .await + .unwrap(); + + for _i in 0..10 { + let m = h.next().await.unwrap(); + debug!("Got RTCM message: {:?}", m); + } + + let _ = exit_tx.send(()); + } +} diff --git a/src/config.rs b/src/config.rs new file mode 100644 index 0000000..48b3954 --- /dev/null +++ b/src/config.rs @@ -0,0 +1,184 @@ +//! NTRIP client configuration objects + +use std::str::FromStr; + +use strum::{Display, EnumString, VariantNames}; + +use crate::NtripClientError; + +/// NTRIP (Networked Transport of RTCM via Internet Protocol) configuration +#[derive(Clone, PartialEq, Debug)] +#[cfg_attr(feature = "clap", derive(clap::Parser))] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct NtripConfig { + /// Host name or IP address of the NTRIP server + #[cfg_attr( + feature = "clap", + clap(long = "ntrip-host", env = "NTRIP_HOST", default_value = "rtk2go.com") + )] + pub host: String, + + /// Port number of the NTRIP server + #[cfg_attr( + feature = "clap", + clap(long = "ntrip-port", env = "NTRIP_PORT", default_value_t = 2101) + )] + pub port: u16, + + /// Use TLS / SSL for the NTRIP connection + #[cfg_attr( + feature = "clap", + clap(long = "ntrip-use-tls", env = "NTRIP_USE_TLS", default_value_t = false) + )] + pub use_tls: bool, +} + +/// Credentials for an NTRIP (RTCM) service +#[derive(Clone, PartialEq, Debug)] +#[cfg_attr(feature = "clap", derive(clap::Parser))] +pub struct NtripCredentials { + /// Username for the NTRIP service + #[cfg_attr(feature = "clap", clap(long = "ntrip-user", env = "NTRIP_USER"))] + pub user: String, + + /// Password for the NTRIP service + #[cfg_attr( + feature = "clap", + clap(long = "ntrip-pass", env = "NTRIP_PASS", default_value = "") + )] + pub pass: String, +} + +impl NtripConfig { + /// Generate a connection URL ("host:port") from the NtripConfig + pub fn url(&self) -> String { + format!("{}:{}", self.host, self.port) + } +} + +/// Common RTCM data providers +#[derive(Clone, PartialEq, Debug, EnumString, Display, VariantNames)] +pub enum RtcmProvider { + /// Land Information New Zealand + /// + /// Note: requires credentials + #[strum(serialize = "linz")] + Linz, + /// RTK2GO.com free service + #[strum(serialize = "rtk2go")] + Rtk2Go, + /// Positioning Australia + /// + /// Note: requires credentials and TLS + #[strum(serialize = "posau")] + PosAu, + /// Centipede FR + #[strum(serialize = "centipede")] + Centipede, +} + +impl RtcmProvider { + /// Fetch the hostname for the provider + pub fn host(&self) -> &str { + match self { + RtcmProvider::Linz => "positionz-rt.linz.govt.nz", + RtcmProvider::Rtk2Go => "rtk2go.com", + RtcmProvider::PosAu => "ntrip.data.gnss.ga.gov.au", + RtcmProvider::Centipede => "caster.centipede.fr", + } + } + + /// Fetch the TCP port for the provider + pub fn port(&self) -> u16 { + match self { + RtcmProvider::Linz => 2101, + RtcmProvider::Rtk2Go => 2101, + RtcmProvider::PosAu => 443, + RtcmProvider::Centipede => 2101, + } + } + + /// Does the provider require TLS / SSL? + pub fn use_tls(&self) -> bool { + match self { + RtcmProvider::Linz => false, + RtcmProvider::Rtk2Go => false, + RtcmProvider::PosAu => true, + RtcmProvider::Centipede => false, + } + } +} + +/// Parse an [NtripConfig] from a URL string +/// +/// For example: +/// ``` +/// # use ntrip_client::config::NtripConfig; +/// +/// let cfg = "ntrip://rtk2go.com:2101".parse::().unwrap(); +/// +/// assert_eq!(cfg.host, "rtk2go.com"); +/// assert_eq!(cfg.port, 2101); +/// assert_eq!(cfg.use_tls, false); +/// ``` +/// +/// This also matches on [RtcmProvider]'s for convenience. +/// ``` +/// # use ntrip_client::config::NtripConfig; +/// +/// let cfg = "linz".parse::().unwrap(); +/// +/// assert_eq!(cfg.host, "positionz-rt.linz.govt.nz"); +/// assert_eq!(cfg.port, 2101); +/// assert_eq!(cfg.use_tls, false); +/// ``` +impl FromStr for NtripConfig { + type Err = NtripClientError; + + /// Parse an [NtripConfig] from a URL string + fn from_str(s: &str) -> Result { + // Match on known providers + if let Ok(provider) = RtcmProvider::from_str(s) { + return Ok(NtripConfig { + host: provider.host().to_string(), + port: provider.port(), + use_tls: provider.use_tls(), + }); + } + + // Strip protocol if present + let proto = if s.starts_with("http://") { + "http" + } else if s.starts_with("https://") { + "https" + } else if s.starts_with("ntrip://") { + "ntrip" + } else { + "unknown" + }; + let s = s.trim_start_matches(&format!("{proto}://")); + + // Split host and port + let parts: Vec<&str> = s.split(':').collect(); + if parts.is_empty() { + return Err(NtripClientError::InvalidUrl); + } + let host = parts[0].to_string(); + + // Parse port or use default + let port = if parts.len() > 1 { + parts[1] + .parse::() + .map_err(|_| NtripClientError::InvalidPort)? + } else if proto == "https" { + 443 + } else { + 2101 + }; + Ok(NtripConfig { + host, + port, + use_tls: port == 443, + }) + } +} diff --git a/src/credentials.rs b/src/credentials.rs deleted file mode 100644 index 479797a..0000000 --- a/src/credentials.rs +++ /dev/null @@ -1,25 +0,0 @@ -//! Client credentials -use base64::{engine::general_purpose, Engine}; - -#[cfg(doc)] -use crate::NTRIPClient; - -/// [NTRIPCredentials] optionally used by [NTRIPClient]s -#[derive(Clone, Default, PartialEq)] -pub struct NTRIPCredentials { - user: String, - password: String, -} - -impl NTRIPCredentials { - pub fn new(user: &str, password: &str) -> Self { - Self { - user: user.to_string(), - password: password.to_string(), - } - } - - pub fn encode(&self) -> String { - general_purpose::STANDARD.encode(format!("{}:{}", self.user, self.password)) - } -} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..1af7744 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,30 @@ +use reqwest::header::{InvalidHeaderValue, ToStrError}; +use rustls::pki_types::InvalidDnsNameError; + +/// NTRIP client error types +#[derive(Debug, thiserror::Error)] +pub enum NtripClientError { + #[error("Io error: {0}")] + Io(#[from] std::io::Error), + + #[error("Reqwest error: {0}")] + Reqwest(#[from] reqwest::Error), + + #[error("Invalid header value {0}")] + InvalidHeaderValue(#[from] InvalidHeaderValue), + + #[error("Invalid DNS name {0}")] + InvalidDnsName(#[from] InvalidDnsNameError), + + #[error("Header ToStrError error {0}")] + ToStrError(#[from] ToStrError), + + #[error("Response error")] + ResponseError(String), + + #[error("Invalid URL")] + InvalidUrl, + + #[error("Invalid port number")] + InvalidPort, +} diff --git a/src/lib.rs b/src/lib.rs index 0df7a9c..97c8a8d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,208 +1,13 @@ -use thiserror::Error; +//! NTRIP Client library +//! +//! Provides an async NTRIP client for listing mounts and connecting to RTCM services -mod credentials; -use credentials::NTRIPCredentials; +pub mod config; -// use std::io::{Read, Write}; +pub mod snip; -// use crate::coordinate::Coordinate; -// use crate::tcp_handler::TcpHandler; -// use base64::Engine as _; -// use std::io::{Read, Write}; -// use std::time::{Duration, Instant}; +mod error; +pub use error::NtripClientError; -use tokio::{ - io::{AsyncReadExt, AsyncWriteExt}, - net::TcpStream, -}; - -use rtcm_rs::next_msg_frame; - -#[derive(Debug, Error)] -pub enum NTRIPClientError { - #[error("I/O error: {0}")] - IoError(#[from] std::io::Error), - - #[error("failed to connect to server")] - Connection, - - #[error("failed to send data to server")] - Send, - - #[error("invalid response from server")] - BadResponse, -} - -#[cfg(feature = "log")] -use log::{error, info}; - -/// [NTRIPClient] allows to connect to a remote NTRIP server (v1 and v2 both supported), -/// to receiver RTCM messages. [NTRIPClient] supports both V1 and V2 NTRIP. -#[derive(Clone)] -pub struct NTRIPClient { - /// Host (url) - host: String, - - /// Network port - port: u16, - - /// Name of the mountpoint - mountpoint: String, - - /// Optional [NTRIPCredentials] - credentials: Option, -} - -impl NTRIPClient { - const GET_ICY_RESPONSE: &str = "ICY 200 OK\r\n"; - const GET_HTTPOK_RESPONSE: &str = "HTTP/1.1 200 OK\r\n"; - - /// Creates a new [NTRIPClient] - /// ## Input - /// - host: url - /// - port: network port - /// - mountpoint: remote NTRIP "mountpoint" - pub fn new(host: &str, port: u16, mountpoint: &str) -> Self { - Self { - port, - credentials: None, - host: host.to_string(), - mountpoint: mountpoint.to_string(), - } - } - - /// Update [NTRIPClient] with desired credentials - pub fn with_credentials(&self, user: &str, password: &str) -> Self { - let mut s = self.clone(); - s.credentials = Some(NTRIPCredentials::new(user, password)); - s - } - - /// Define [NTRIPClient] credentials, with mutable access. - pub fn set_credentials(&mut self, user: &str, password: &str) { - self.credentials = Some(NTRIPCredentials::new(user, password)); - } - - /// Deploy this [NTRIPClient] using tokio framework. - pub async fn run(&mut self) -> Result<(), NTRIPClientError> { - let mut ptr = 0; - - let mut buffer = [0u8; 1024]; - - let get_icy_response_len: usize = Self::GET_ICY_RESPONSE.len(); - let get_httpok_response_len = Self::GET_HTTPOK_RESPONSE.len(); - - let pkg_version = env!("CARGO_PKG_VERSION"); - - #[cfg(feature = "log")] - let mut stream = TcpStream::connect((self.host.as_str(), self.port)) - .await - .map_err(|e| { - error!("connection failed with: {}", e); - NTRIPClientError::Connection - })?; - - #[cfg(not(feature = "log"))] - let mut stream = TcpStream::connect((self.host.as_str(), self.port)) - .await - .map_err(|_| NTRIPClientError::Connection)?; - - // initial $GET request - let mut request = format!( - "GET /{} HTTP/1.0\r\n - Host: {}\r\nNtrip-version: Ntrip/2.0\r\n - User-Agent: rtk-rs/ntrip-client v{}\r\n - Connection: close\r\n - Accept: */*\r\n", - self.mountpoint, self.host, pkg_version, - ); - - if let Some(creds) = &self.credentials { - request.push_str(&format!("Authorization: Basic{}\r\n", &creds.encode())); - } - - #[cfg(feature = "log")] - stream.write_all(request.as_bytes()).await.map_err(|e| { - #[cfg(feature = "log")] - error!("write error: {}", e); - NTRIPClientError::Send - })?; - - #[cfg(not(feature = "log"))] - stream - .write_all(request.as_bytes()) - .await - .map_err(|_| NTRIPClientError::Send)?; - - // response verification - loop { - let size = stream.read(&mut buffer[ptr..]).await?; - if size == 0 { - break; - } - ptr += size; - } - - if ptr < get_icy_response_len && ptr < get_httpok_response_len { - #[cfg(feature = "log")] - error!("invalid server response"); - return Err(NTRIPClientError::BadResponse); - } - - let response = String::from_utf8_lossy(&buffer[..ptr]); - - if !response.starts_with(Self::GET_ICY_RESPONSE) { - if !response.starts_with(Self::GET_HTTPOK_RESPONSE) { - // #[cfg(feature = "log")] - println!("invalid response from server: \"{}\"", response); - return Err(NTRIPClientError::BadResponse); - } - } - - #[cfg(feature = "log")] - info!( - "rtk-rs/ntrip-client v{} - connected to {}", - pkg_version, self.host - ); - - loop { - ptr = 0; - let size = stream.read(&mut buffer[ptr..]).await?; - - if size == 0 { - #[cfg(feature = "log")] - error!("{} - connectoion closed", self.host); - return Ok(()); - } - - loop { - let (consumed, msg) = next_msg_frame(&buffer[ptr..]); - - if consumed == 0 { - break; - } - - ptr += consumed; - - if let Some(msg) = msg { - println!("Found {:?}", msg.get_message()); - } - } - } - } -} - -// #[cfg(test)] -// mod test { -// use crate::NTRIPClient; -// -// #[tokio::test] -// async fn test_simple_connection() { -// let mut client = NTRIPClient::new("caster.centipede.fr", 2101, "ENSMM") -// .with_credentials("centipede", "centipede"); -// -// client.run().await.unwrap_or_else(|e| { -// panic!("run() failed with {}", e); -// }); -// } -// } +mod client; +pub use client::NtripClient; diff --git a/src/snip.rs b/src/snip.rs new file mode 100644 index 0000000..d66cb6c --- /dev/null +++ b/src/snip.rs @@ -0,0 +1,323 @@ +use std::str::FromStr; + +use geoutils::Location; +use isocountry::CountryCode; +use strum::{Display, EnumString, VariantNames}; +use tracing::debug; + +/// Information about an NTRIP / SNIP server and its mounts +#[derive(Clone, PartialEq, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct ServerInfo { + pub server: Option, + // TODO: parse this out to a date? + pub date: Option, + pub content_type: Option, + pub content_length: Option, + + pub services: Vec, +} + +/// Information about a specific NTRIP mount point +#[derive(Clone, PartialEq, Debug)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub struct MountInfo { + pub name: String, + pub details: String, + pub protocol: Protocol, + pub messages: Vec, + pub constellations: Vec, + pub network: Network, + pub country: Option, + pub location: Location, +} + +/// NTRIP protocol types +#[derive(Clone, PartialEq, Debug, EnumString, Display, VariantNames)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Protocol { + #[strum(serialize = "RTCM 3")] + Rtcm3, + #[strum(serialize = "RTCM 3.0")] + Rtcm3_0, + #[strum(serialize = "RTCM 3.2")] + Rtcm3_2, + #[strum(serialize = "RTCM 3.3")] + Rtcm3_3, + #[strum(serialize = "RAW")] + Raw, + #[strum(serialize = "CMRx")] + CMRx, + #[strum(serialize = "UNKNOWN")] + Unknown, +} + +/// NTRIP network types +#[derive(Clone, PartialEq, Debug, EnumString, Display, VariantNames)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Network { + #[strum(serialize = "SNIP")] + Snip, + #[strum(serialize = "UNKNOWN")] + Unknown, +} + +/// GNSS Constellation types +#[derive(Clone, PartialEq, Debug, EnumString, Display, VariantNames)] +#[cfg_attr(feature = "serde", derive(serde::Serialize, serde::Deserialize))] +pub enum Constellation { + #[strum(serialize = "GPS")] + Gps, + #[strum(serialize = "GLO")] + Glonass, + #[strum(serialize = "GAL")] + Galileo, + #[strum(serialize = "BDS")] + BeiDou, + #[strum(serialize = "UNKNOWN")] + Unknown, +} + +impl ServerInfo { + /// Parse SNIP server info from an iterator of lines + pub fn parse<'a>(lines: impl Iterator) -> Self { + let mut server = None; + let mut date = None; + let mut content_type = None; + let mut content_length = None; + let mut services = Vec::new(); + + for line in lines { + if line.starts_with("Server: ") { + server = Some(line.trim_start_matches("Server: ").to_string()); + } else if line.starts_with("Date: ") { + date = Some(line.trim_start_matches("Date: ").to_string()); + } else if line.starts_with("Content-Type: ") { + content_type = Some(line.trim_start_matches("Content-Type: ").to_string()); + } else if line.starts_with("Content-Length: ") { + content_length = + Some(line.trim_start_matches("Content-Length: ").parse().ok()).flatten(); + } else if line.starts_with("STR;") { + match MountInfo::parse(line) { + Some(info) => { + services.push(info); + }, + None => { + debug!("Failed to parse STR line: {}", line); + }, + } + } + } + + ServerInfo { + server, + date, + content_type, + content_length, + services, + } + } + + /// Find the nearest mount point to a given location + pub fn find_nearest(&self, location: &Location) -> Option<(&MountInfo, f64)> { + // If they're more than 100km away, we don't want to know + let mut min_distance = 100_000f64; + let mut min_entry = None; + + for (i, s) in self.services.iter().enumerate() { + if let Ok(d) = s.location.distance_to(location) { + debug!("Distance to {}: {:.3} km", s.name, d); + if d.meters() < min_distance { + min_distance = d.meters(); + min_entry = Some(i); + } + } + } + + min_entry.map(|i| (&self.services[i], min_distance)) + } +} + +impl MountInfo { + pub fn parse(info: &str) -> Option { + let parts: Vec<&str> = info.split(';').collect(); + if parts.len() < 2 { + return None; + } + + if parts[0] != "STR" { + return None; + } + + let name = parts[1].to_string(); + let details = parts[2].trim().to_string(); + let protocol = parts + .get(3) + .and_then(|s| Protocol::from_str(s).ok()) + .unwrap_or(Protocol::Raw); + + let messages = match parts.get(4) { + Some(msgs) => msgs.split(",").map(|m| m.trim().to_string()).collect(), + None => vec![], + }; + + // What is part 5? + + // Part 6: constellations + let constellations = match parts.get(6) { + Some(c) => c + .split('+') + .map(|s| { + Constellation::from_str(s) + .ok() + .unwrap_or(Constellation::Unknown) + }) + .collect::>(), + None => vec![], + }; + + // Part 7: network + let network = parts + .get(7) + .and_then(|s| Network::from_str(s).ok()) + .unwrap_or(Network::Unknown); + + // Part 8: country + let country = parts + .get(8) + .and_then(|s| CountryCode::for_alpha3(s).ok()); + + // Parts 9-11: lat, lon, (alt?) + let location = Location::new( + parts.get(9).and_then(|s| s.parse().ok()).unwrap_or(0.0), + parts.get(10).and_then(|s| s.parse().ok()).unwrap_or(0.0), + ); + + // TODO: the rest of the fields + + Some(MountInfo { + name, + details, + protocol, + messages, + constellations, + network, + country, + location, + }) + } +} + +#[cfg(test)] +mod tests { + use http::Method; + use tracing::{debug, info, trace}; + + use super::*; + + fn setup_logging() { + let _ = tracing_subscriber::FmtSubscriber::builder() + .compact() + .without_time() + .with_max_level(tracing::level_filters::LevelFilter::DEBUG) + .try_init(); + } + + #[test] + fn test_parse_server_info() { + setup_logging(); + + let info = "STR;VargaRTKhr;Is near: Zagreb, Zagreb;RTCM 3.2;1006(1),1033(1),1074(1),1084(1),1094(1),1124(1),1230(1);;GPS+GLO+GAL+BDS;SNIP;HRV;46.44;16.50;1;0;sNTRIP;none;B;N;0;\n"; + + let server_info = MountInfo::parse(info).unwrap(); + + assert_eq!(server_info.name, "VargaRTKhr"); + assert_eq!(server_info.details, "Is near: Zagreb, Zagreb"); + assert_eq!(server_info.protocol, Protocol::Rtcm3_2); + assert_eq!( + server_info.messages, + vec!["1006(1)", "1033(1)", "1074(1)", "1084(1)", "1094(1)", "1124(1)", "1230(1)"] + ); + assert_eq!( + server_info.constellations, + vec![ + Constellation::Gps, + Constellation::Glonass, + Constellation::Galileo, + Constellation::BeiDou + ] + ); + assert_eq!(server_info.network, Network::Snip); + assert_eq!( + server_info.country, + Some(CountryCode::for_alpha3("HRV").unwrap()) + ); + assert!((server_info.location.latitude() - 46.44).abs() < 0.001); + assert!((server_info.location.longitude() - 16.50).abs() < 0.001); + } + + #[test] + fn test_parse_snip_info() { + setup_logging(); + + let snip_response = " + SOURCETABLE 200 OK\n + Server: NTRIP SNIP/2.0\n + Date: Wed, 26 Jun 2024 12:00:00 GMT\n + Content-Type: text/plain; charset=utf-8\n + Content-Length: 1234\n + STR;warrakam;Is near: Sydney, New South Wales;RTCM 3;1004(1), 1005(10), 1008(10), 1012(1), 1019(2), 1020(2), 1033(10), 1042(2), 1046(2), 1077(1), 1087(1), 1097(1), 1127(1), 1230(30);2;;SNIP;AUS;-36.37;144.46;1;0;SNIP;none;B;N;11740;\n + STR;VargaRTKhr;Is near: Zagreb, Zagreb;RTCM 3.2;1006(1),1033(1),1074(1),1084(1),1094(1),1124(1),1230(1);;GPS+GLO+GAL+BDS;SNIP;HRV;46.44;16.50;1;0;sNTRIP;none;B;N;0;\n + "; + + let lines = snip_response + .lines() + .map(|l| l.trim()) + .collect::>(); + + debug!("Lines: {:?}", &lines[..10]); + + let snip_info = ServerInfo::parse(lines.iter().cloned()); + + debug!("SNIP Info: {:#?}", snip_info); + } + + #[tokio::test] + #[ignore = "Requires network access"] + async fn test_ntrip_rtk2go() { + setup_logging(); + + let client = reqwest::Client::builder() + .http1_ignore_invalid_headers_in_responses(true) + .http09_responses() + .user_agent(format!( + "NTRIP {}/{}", + env!("CARGO_PKG_NAME"), + env!("CARGO_PKG_VERSION") + )) + .build() + .unwrap(); + + let req = client + .request(Method::GET, "http://rtk2go.com:2101") + .header("Ntrip-Version", "Ntrip/2.0") + .build() + .unwrap(); + + let res = client.execute(req).await.expect("Fetch failed"); + + info!("Fetched NTRIP response: {:?}", res.status()); + + assert!(res.status().is_success()); + + let body = res.text().await.unwrap(); + + let lines = body.lines().collect::>(); + + trace!("Lines: {:?}", &lines[..10]); + + let snip_info = ServerInfo::parse(lines.iter().cloned()); + + trace!("SNIP Info: {:#?}", snip_info); + } +}