diff --git a/Cargo.toml b/Cargo.toml index 6a12a7b1..530d2a32 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,9 @@ arrow-json = "55.0.0" arrow-schema = "55.0.0" assert-json-diff = "2.0" assert_cmd = "2.0" +async-recursion = "1.1.1" async-stream = "0.3.6" +async-trait = "0.1.89" axum = "0.8.1" bb8 = "0.9.0" bb8-postgres = "0.9.0" @@ -63,7 +65,7 @@ geojson = "0.24.1" getrandom = { version = "0.3.3", features = ["wasm_js"] } http = "1.1" indexmap = { version = "2.10.0", features = ["serde"] } -jsonschema = { version = "0.33.0", default-features = false } +jsonschema = { version = "0.33.0", default-features = false, features = ["resolve-async"] } libduckdb-sys = "1.3.0" log = "0.4.25" mime = "0.3.17" @@ -75,6 +77,7 @@ quote = "1.0" reqwest = { version = "0.12.8", default-features = false, features = [ "rustls-tls", ] } +referencing = "0.33.0" rstest = "0.26.1" rustls = { version = "0.23.22", default-features = false } serde = "1.0" diff --git a/crates/cli/src/lib.rs b/crates/cli/src/lib.rs index 0120d566..605f3844 100644 --- a/crates/cli/src/lib.rs +++ b/crates/cli/src/lib.rs @@ -20,7 +20,7 @@ use std::{ io::Write, str::FromStr, }; -use tokio::{io::AsyncReadExt, net::TcpListener, runtime::Handle, task::JoinSet}; +use tokio::{io::AsyncReadExt, net::TcpListener, task::JoinSet}; use tracing::metadata::Level; use tracing_indicatif::IndicatifLayer; use tracing_subscriber::{ @@ -492,9 +492,7 @@ impl Rustac { } Command::Validate { ref infile } => { let value = self.get(infile.as_deref()).await?; - let result = Handle::current() - .spawn_blocking(move || value.validate()) - .await?; + let result = value.validate().await; if let Err(error) = result { if let stac_validate::Error::Validation(errors) = error { if let Some(format) = self.output_format { diff --git a/crates/core/tests/examples.rs b/crates/core/tests/examples.rs index d73c0f6d..8e3edac6 100644 --- a/crates/core/tests/examples.rs +++ b/crates/core/tests/examples.rs @@ -4,13 +4,15 @@ use stac_validate::Validate; use std::path::PathBuf; #[rstest] -fn v1_0_0(#[files("../../spec-examples/v1.0.0/**/*.json")] path: PathBuf) { +#[tokio::test] +async fn v1_0_0(#[files("../../spec-examples/v1.0.0/**/*.json")] path: PathBuf) { let value: Value = stac::read(path.to_str().unwrap()).unwrap(); - value.validate().unwrap(); + value.validate().await.unwrap(); } #[rstest] -fn v1_1_0(#[files("../../spec-examples/v1.1.0/**/*.json")] path: PathBuf) { +#[tokio::test] +async fn v1_1_0(#[files("../../spec-examples/v1.1.0/**/*.json")] path: PathBuf) { let value: Value = stac::read(path.to_str().unwrap()).unwrap(); - value.validate().unwrap(); + value.validate().await.unwrap(); } diff --git a/crates/core/tests/migrate.rs b/crates/core/tests/migrate.rs index 1825098e..5b5f39c4 100644 --- a/crates/core/tests/migrate.rs +++ b/crates/core/tests/migrate.rs @@ -4,8 +4,9 @@ use stac_validate::Validate; use std::path::PathBuf; #[rstest] -fn v1_0_0_to_v1_1_0(#[files("../../spec-examples/v1.0.0/**/*.json")] path: PathBuf) { +#[tokio::test] +async fn v1_0_0_to_v1_1_0(#[files("../../spec-examples/v1.0.0/**/*.json")] path: PathBuf) { let value: Value = stac::read(path.to_str().unwrap()).unwrap(); let value = value.migrate(&Version::v1_1_0).unwrap(); - value.validate().unwrap(); + value.validate().await.unwrap(); } diff --git a/crates/duckdb/Cargo.toml b/crates/duckdb/Cargo.toml index faf3d20b..5587be89 100644 --- a/crates/duckdb/Cargo.toml +++ b/crates/duckdb/Cargo.toml @@ -34,3 +34,4 @@ thiserror.workspace = true geo.workspace = true rstest.workspace = true stac-validate = { path = "../validate" } +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/crates/duckdb/src/client.rs b/crates/duckdb/src/client.rs index 2634e34f..e65249d7 100644 --- a/crates/duckdb/src/client.rs +++ b/crates/duckdb/src/client.rs @@ -464,12 +464,13 @@ mod tests { } #[rstest] - fn search(client: Client) { + #[tokio::test] + async fn search(client: Client) { let item_collection = client .search("data/100-sentinel-2-items.parquet", Search::default()) .unwrap(); assert_eq!(item_collection.items.len(), 100); - item_collection.items[0].validate().unwrap(); + item_collection.items[0].validate().await.unwrap(); } #[rstest] diff --git a/crates/validate/Cargo.toml b/crates/validate/Cargo.toml index 7b1d1d53..efd149d8 100644 --- a/crates/validate/Cargo.toml +++ b/crates/validate/Cargo.toml @@ -19,7 +19,10 @@ serde.workspace = true serde_json.workspace = true stac.workspace = true thiserror.workspace = true +async-trait.workspace = true +referencing.workspace = true +async-recursion.workspace = true [dev-dependencies] stac-io.workspace = true -tokio = { workspace = true, features = ["macros"] } +tokio = { workspace = true, features = ["rt-multi-thread", "macros"] } diff --git a/crates/validate/src/lib.rs b/crates/validate/src/lib.rs index 1af12c62..7542680f 100644 --- a/crates/validate/src/lib.rs +++ b/crates/validate/src/lib.rs @@ -8,19 +8,26 @@ //! use stac::Item; //! use stac_validate::Validate; //! -//! Item::new("an-id").validate().unwrap(); +//! #[tokio::main] +//! async fn main() { +//! Item::new("an-id").validate().await.unwrap(); +//! } //! ``` //! //! All fetched schemas are cached, so if you're you're doing multiple //! validations, you should re-use the same [Validator]: //! //! ``` -//! # use stac::Item; +//! use stac::Item; //! use stac_validate::Validator; -//! let mut items: Vec<_> = (0..10).map(|n| Item::new(format!("item-{}", n))).collect(); -//! let mut validator = Validator::new().unwrap(); -//! for item in items { -//! validator.validate(&item).unwrap(); +//! +//! #[tokio::main] +//! async fn main() { +//! let mut items: Vec<_> = (0..10).map(|n| Item::new(format!("item-{}", n))).collect(); +//! let mut validator = Validator::new().await.unwrap(); +//! for item in items { +//! validator.validate(&item).await.unwrap(); +//! } //! } //! ``` //! @@ -31,6 +38,7 @@ use serde::Serialize; mod error; mod validator; +use async_trait::async_trait; pub use {error::Error, validator::Validator}; @@ -38,6 +46,7 @@ pub use {error::Error, validator::Validator}; pub type Result = std::result::Result; /// Validate any serializable object with [json-schema](https://json-schema.org/) +#[async_trait] pub trait Validate: Serialize + Sized { /// Validates this object. /// @@ -53,16 +62,19 @@ pub trait Validate: Serialize + Sized { /// use stac::Item; /// use stac_validate::Validate; /// - /// let mut item = Item::new("an-id"); - /// item.validate().unwrap(); + /// #[tokio::main] + /// async fn main() { + /// let mut item = Item::new("an-id"); + /// item.validate().await.unwrap(); + /// } /// ``` - fn validate(&self) -> Result<()> { - let mut validator = Validator::new()?; - validator.validate(self) + async fn validate(&self) -> Result<()> { + let mut validator = Validator::new().await?; + validator.validate(self).await } } -impl Validate for T {} +impl Validate for T {} /// Returns a string suitable for use as a HTTP user agent. pub fn user_agent() -> &'static str { diff --git a/crates/validate/src/validator.rs b/crates/validate/src/validator.rs index 225b3387..e8b15498 100644 --- a/crates/validate/src/validator.rs +++ b/crates/validate/src/validator.rs @@ -1,19 +1,21 @@ use crate::{Error, Result}; +use async_recursion::async_recursion; +use async_trait::async_trait; use fluent_uri::Uri; -use jsonschema::{Resource, Retrieve, ValidationOptions, Validator as JsonschemaValidator}; -use reqwest::blocking::Client; +use jsonschema::{AsyncRetrieve, Resource, ValidationOptions, Validator as JsonschemaValidator}; +use reqwest::Client; use serde::Serialize; use serde_json::{Map, Value}; use stac::{Type, Version}; use std::collections::HashMap; +use std::sync::Arc; const SCHEMA_BASE: &str = "https://schemas.stacspec.org"; /// A structure for validating STAC. -#[derive(Debug)] pub struct Validator { validators: HashMap, JsonschemaValidator>, - validation_options: ValidationOptions, + validation_options: ValidationOptions>, } #[derive(Debug)] @@ -27,17 +29,20 @@ impl Validator { /// ``` /// use stac_validate::Validator; /// - /// let validator = Validator::new().unwrap(); + /// #[tokio::main] + /// async fn main() { + /// let validator = Validator::new().await.unwrap(); + /// } /// ``` - pub fn new() -> Result { - let validation_options = jsonschema::options(); + pub async fn new() -> Result { + let validation_options = jsonschema::async_options(); let validation_options = validation_options .with_resources(prebuild_resources().into_iter()) .with_retriever(Retriever( Client::builder().user_agent(crate::user_agent()).build()?, )); Ok(Validator { - validators: prebuild_validators(&validation_options), + validators: prebuild_validators(&validation_options).await, validation_options, }) } @@ -48,37 +53,41 @@ impl Validator { /// /// ``` /// use stac::Item; - /// use stac_validate::Validator; + /// use stac_validate::Validate; /// - /// let item = Item::new("an-id"); - /// let mut validator = Validator::new().unwrap(); - /// validator.validate(&item).unwrap(); + /// #[tokio::main] + /// async fn main() { + /// let mut item = Item::new("an-id"); + /// item.validate().await.unwrap(); + /// } /// ``` - pub fn validate(&mut self, value: &T) -> Result<()> + pub async fn validate(&mut self, value: &T) -> Result<()> where T: Serialize, { let value = serde_json::to_value(value)?; - let _ = self.validate_value(value)?; + let _ = self.validate_value(value).await?; Ok(()) } /// If you have a [serde_json::Value], you can skip a deserialization step by using this method. - pub fn validate_value(&mut self, value: Value) -> Result { + #[async_recursion] + pub async fn validate_value(&mut self, value: Value) -> Result { if let Value::Object(object) = value { - self.validate_object(object).map(Value::Object) + self.validate_object(object).await.map(Value::Object) } else if let Value::Array(array) = value { - self.validate_array(array).map(Value::Array) + self.validate_array(array).await.map(Value::Array) } else { Err(Error::ScalarJson(value)) } } - fn validate_array(&mut self, array: Vec) -> Result> { + #[async_recursion] + async fn validate_array(&mut self, array: Vec) -> Result> { let mut errors = Vec::new(); let mut new_array = Vec::with_capacity(array.len()); for value in array { - match self.validate_value(value) { + match self.validate_value(value).await { Ok(value) => new_array.push(value), Err(error) => { if let Error::Validation(e) = error { @@ -96,12 +105,16 @@ impl Validator { } } - fn validate_object(&mut self, mut object: Map) -> Result> { + #[async_recursion] + async fn validate_object( + &mut self, + mut object: Map, + ) -> Result> { let r#type = if let Some(r#type) = object.get("type").and_then(|v| v.as_str()) { let r#type: Type = r#type.parse()?; if r#type == Type::ItemCollection { if let Some(features) = object.remove("features") { - let features = self.validate_value(features)?; + let features = self.validate_value(features).await?; let _ = object.insert("features".to_string(), features); } return Ok(object); @@ -110,7 +123,7 @@ impl Validator { } else { match object.remove("collections") { Some(collections) => { - let collections = self.validate_value(collections)?; + let collections = self.validate_value(collections).await?; let _ = object.insert("collections".to_string(), collections); return Ok(object); } @@ -129,7 +142,7 @@ impl Validator { .ok_or(stac::Error::MissingField("stac_version"))?; let uri = build_uri(r#type, &version); - let validator = self.validator(uri)?; + let validator = self.validator(uri).await?; let value = Value::Object(object); let errors: Vec<_> = validator.iter_errors(&value).collect(); let object = if errors.is_empty() { @@ -145,10 +158,13 @@ impl Validator { )); }; - self.validate_extensions(object) + self.validate_extensions(object).await } - fn validate_extensions(&mut self, object: Map) -> Result> { + async fn validate_extensions( + &mut self, + object: Map, + ) -> Result> { match object .get("stac_extensions") .and_then(|value| value.as_array()) @@ -165,7 +181,7 @@ impl Validator { } }) .collect::, _>>()?; - self.ensure_validators(&uris)?; + self.ensure_validators(&uris).await?; let mut errors = Vec::new(); let value = Value::Object(object); @@ -192,24 +208,27 @@ impl Validator { } } - fn validator(&mut self, uri: Uri) -> Result<&JsonschemaValidator> { - self.ensure_validator(&uri)?; + async fn validator(&mut self, uri: Uri) -> Result<&JsonschemaValidator> { + self.ensure_validator(&uri).await?; Ok(self.validator_opt(&uri).unwrap()) } - fn ensure_validators(&mut self, uris: &[Uri]) -> Result<()> { + async fn ensure_validators(&mut self, uris: &[Uri]) -> Result<()> { for uri in uris { - self.ensure_validator(uri)?; + self.ensure_validator(uri).await?; } Ok(()) } - fn ensure_validator(&mut self, uri: &Uri) -> Result<()> { + async fn ensure_validator(&mut self, uri: &Uri) -> Result<()> { if !self.validators.contains_key(uri) { - let response = reqwest::blocking::get(uri.as_str())?.error_for_status()?; + let client = reqwest::Client::new(); + let response = client.get(uri.as_str()).send().await?.error_for_status()?; + let json_data = response.json().await?; let validator = self .validation_options - .build(&response.json()?) + .build(&json_data) + .await .map_err(Box::new)?; let _ = self.validators.insert(uri.clone(), validator); } @@ -221,13 +240,14 @@ impl Validator { } } -impl Retrieve for Retriever { - fn retrieve( +#[async_trait] +impl AsyncRetrieve for Retriever { + async fn retrieve( &self, uri: &Uri, ) -> std::result::Result> { - let response = self.0.get(uri.as_str()).send()?.error_for_status()?; - let value = response.json()?; + let response = self.0.get(uri.as_str()).send().await?.error_for_status()?; + let value = response.json().await?; Ok(value) } } @@ -243,8 +263,8 @@ fn build_uri(r#type: Type, version: &Version) -> Uri { .unwrap() } -fn prebuild_validators( - validation_options: &ValidationOptions, +async fn prebuild_validators( + validation_options: &ValidationOptions>, ) -> HashMap, JsonschemaValidator> { use Type::*; use Version::*; @@ -255,7 +275,7 @@ fn prebuild_validators( ($t:expr_2021, $v:expr_2021, $path:expr_2021, $schemas:expr_2021) => { let url = build_uri($t, &$v); let value = serde_json::from_str(include_str!($path)).unwrap(); - let validator = validation_options.build(&value).unwrap(); + let validator = validation_options.build(&value).await.unwrap(); let _ = schemas.insert(url, validator); }; } @@ -381,35 +401,34 @@ mod tests { use serde_json::json; use stac::{Collection, Item}; - #[test] - fn validate_simple_item() { + #[tokio::test] + async fn validate_simple_item() { let item: Item = stac_io::read("examples/simple-item.json").unwrap(); - item.validate().unwrap(); + item.validate().await.unwrap(); } #[tokio::test] - #[ignore = "can't validate in a tokio runtime yet: https://github.com/Stranger6667/jsonschema/issues/385"] async fn validate_inside_tokio_runtime() { let item: Item = stac_io::read("examples/extended-item.json").unwrap(); - item.validate().unwrap(); + item.validate().await.unwrap(); } - #[test] - fn validate_array() { + #[tokio::test] + async fn validate_array() { let items: Vec<_> = (0..100) .map(|i| Item::new(format!("item-{i}"))) .map(|i| serde_json::to_value(i).unwrap()) .collect(); - let mut validator = Validator::new().unwrap(); - validator.validate(&items).unwrap(); + let mut validator = Validator::new().await.unwrap(); + validator.validate(&items).await.unwrap(); } - #[test] - fn validate_collections() { + #[tokio::test] + async fn validate_collections() { let collection: Collection = stac_io::read("examples/collection.json").unwrap(); let collections = json!({ "collections": [collection] }); - collections.validate().unwrap(); + collections.validate().await.unwrap(); } }