Skip to content

Commit fd41f2b

Browse files
authored
feat: create an object store resolver (#510)
1 parent a60db78 commit fd41f2b

File tree

6 files changed

+103
-35
lines changed

6 files changed

+103
-35
lines changed

crates/core/Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ geoparquet-compression = [
3131
"parquet/lz4",
3232
"parquet/zstd",
3333
]
34-
object-store = ["dep:object_store"]
34+
object-store = ["dep:object_store", "dep:tokio"]
3535
object-store-aws = ["object-store", "object_store/aws"]
3636
object-store-azure = ["object-store", "object_store/azure"]
3737
object-store-gcp = ["object-store", "object_store/gcp"]

crates/core/src/error.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,7 +90,7 @@ pub enum Error {
9090

9191
/// [tokio::task::JoinError]
9292
#[error(transparent)]
93-
#[cfg(feature = "validate")]
93+
#[cfg(any(feature = "validate", feature = "object-store"))]
9494
TokioJoin(#[from] tokio::task::JoinError),
9595

9696
/// [std::num::TryFromIntError]

crates/core/src/lib.rs

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -169,13 +169,17 @@ mod item_collection;
169169
mod json;
170170
mod ndjson;
171171
mod node;
172+
#[cfg(feature = "object-store")]
173+
mod resolver;
172174
mod statistics;
173175
#[cfg(feature = "validate")]
174176
mod validate;
175177
mod value;
176178

177179
use std::fmt::Display;
178180

181+
#[cfg(feature = "object-store")]
182+
pub use resolver::Resolver;
179183
pub use stac_types::{mime, Fields, Href, Link, Links, Migrate, SelfHref, Version, STAC_VERSION};
180184
#[cfg(feature = "validate-blocking")]
181185
pub use validate::ValidateBlocking;
@@ -197,7 +201,7 @@ pub use {
197201
item_collection::ItemCollection,
198202
json::{FromJson, ToJson},
199203
ndjson::{FromNdjson, ToNdjson},
200-
node::Node,
204+
node::{Container, Node},
201205
statistics::Statistics,
202206
value::Value,
203207
};

crates/core/src/node.rs

Lines changed: 18 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -32,41 +32,26 @@ pub struct IntoValues {
3232
items: VecDeque<Item>,
3333
}
3434

35+
/// A resolver that uses object store.
3536
impl Node {
3637
/// Resolves all child and item links in this node.
3738
///
39+
/// This method uses [crate::Resolver] to resolve links.
40+
///
3841
/// # Examples
3942
///
4043
/// ```
4144
/// use stac::{Catalog, Node};
4245
///
4346
/// let mut node: Node = stac::read::<Catalog>("examples/catalog.json").unwrap().into();
44-
/// node.resolve().unwrap();
47+
/// # tokio_test::block_on(async {
48+
/// let node = node.resolve().await.unwrap();
49+
/// });
4550
/// ```
46-
pub fn resolve(&mut self) -> Result<()> {
47-
let links = std::mem::take(self.value.links_mut());
48-
let href = self.value.self_href().cloned();
49-
for mut link in links {
50-
if link.is_child() {
51-
if let Some(href) = &href {
52-
link.make_absolute(href)?;
53-
}
54-
// TODO enable object store
55-
tracing::debug!("resolving child: {}", link.href);
56-
let child: Container = crate::read::<Value>(link.href)?.try_into()?;
57-
self.children.push_back(child.into());
58-
} else if link.is_item() {
59-
if let Some(href) = &href {
60-
link.make_absolute(href)?;
61-
}
62-
tracing::debug!("resolving item: {}", link.href);
63-
let item = crate::read::<Item>(link.href)?;
64-
self.items.push_back(item);
65-
} else {
66-
self.value.links_mut().push(link);
67-
}
68-
}
69-
Ok(())
51+
#[cfg(feature = "object-store")]
52+
pub async fn resolve(self) -> Result<Node> {
53+
let resolver = crate::Resolver::default();
54+
resolver.resolve(self).await
7055
}
7156

7257
/// Creates a consuming iterator over this node and its children and items.
@@ -205,20 +190,23 @@ impl SelfHref for Container {
205190
#[cfg(test)]
206191
mod tests {
207192
use super::Node;
208-
use crate::{Catalog, Collection, Links};
193+
use crate::{Catalog, Collection};
209194

210195
#[test]
211196
fn into_node() {
212197
let _ = Node::from(Catalog::new("an-id", "a description"));
213198
let _ = Node::from(Collection::new("an-id", "a description"));
214199
}
215200

216-
#[test]
217-
fn resolve() {
218-
let mut node: Node = crate::read::<Catalog>("examples/catalog.json")
201+
#[tokio::test]
202+
#[cfg(feature = "object-store")]
203+
async fn resolve() {
204+
use crate::Links;
205+
206+
let node: Node = crate::read::<Catalog>("examples/catalog.json")
219207
.unwrap()
220208
.into();
221-
node.resolve().unwrap();
209+
let node = node.resolve().await.unwrap();
222210
assert_eq!(node.children.len(), 3);
223211
assert_eq!(node.items.len(), 1);
224212
assert_eq!(node.value.links().len(), 2);

crates/core/src/resolver.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,66 @@
1+
use crate::{Container, Links, Node, Result, SelfHref, Value};
2+
use std::{future::Future, pin::Pin};
3+
use tokio::task::JoinSet;
4+
use url::Url;
5+
6+
/// An object that uses object store to resolve links.
7+
#[derive(Debug, Default)]
8+
#[cfg(feature = "object-store")]
9+
pub struct Resolver {
10+
recursive: bool,
11+
use_items_endpoint: bool,
12+
}
13+
14+
impl Resolver {
15+
/// Resolves the links of a node.
16+
pub fn resolve(&self, mut node: Node) -> Pin<Box<impl Future<Output = Result<Node>> + '_>> {
17+
Box::pin(async {
18+
let links = std::mem::take(node.value.links_mut());
19+
let href = node.value.self_href().cloned();
20+
let mut join_set = JoinSet::new();
21+
for mut link in links {
22+
if link.is_child() {
23+
if let Some(href) = &href {
24+
link.make_absolute(href)?;
25+
}
26+
let _ = join_set
27+
.spawn(async move { (crate::io::get::<Value>(link.href).await, true) });
28+
} else if !self.use_items_endpoint && link.is_item() {
29+
if let Some(href) = &href {
30+
link.make_absolute(href)?;
31+
}
32+
let _ = join_set.spawn(async move { (crate::io::get(link.href).await, false) });
33+
} else if self.use_items_endpoint && link.rel == "items" {
34+
let mut url: Url = link.href.try_into()?;
35+
// TODO make this configurable
36+
let _ = url
37+
.query_pairs_mut()
38+
.append_pair("limit", "1")
39+
.append_pair("sortby", "-properties.datetime");
40+
let _ = join_set.spawn(async move { (crate::io::get(url).await, false) });
41+
} else {
42+
node.value.links_mut().push(link);
43+
}
44+
}
45+
while let Some(result) = join_set.join_next().await {
46+
let (result, is_child) = result?;
47+
let value = result?;
48+
if is_child {
49+
let child = Container::try_from(value)?.into();
50+
node.children.push_back(child);
51+
} else if let Value::ItemCollection(item_collection) = value {
52+
node.items.extend(item_collection.into_iter());
53+
} else {
54+
node.items.push_back(value.try_into()?);
55+
}
56+
}
57+
if self.recursive {
58+
let children = std::mem::take(&mut node.children);
59+
for child in children {
60+
node.children.push_back(self.resolve(child).await?);
61+
}
62+
}
63+
Ok(node)
64+
})
65+
}
66+
}

crates/types/src/href.rs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -214,10 +214,20 @@ impl From<PathBuf> for Href {
214214
}
215215
}
216216

217+
impl TryFrom<Href> for Url {
218+
type Error = Error;
219+
fn try_from(value: Href) -> Result<Self> {
220+
match value {
221+
Href::Url(url) => Ok(url),
222+
Href::String(s) => s.parse().map_err(Error::from),
223+
}
224+
}
225+
}
226+
217227
#[cfg(feature = "reqwest")]
218228
impl From<reqwest::Url> for Href {
219229
fn from(value: reqwest::Url) -> Self {
220-
Href::Url(url::Url::from(value))
230+
Href::Url(value)
221231
}
222232
}
223233

@@ -241,7 +251,7 @@ fn make_absolute(href: &str, base: &str) -> String {
241251
} else {
242252
let (base, _) = base.split_at(base.rfind('/').unwrap_or(0));
243253
if base.is_empty() {
244-
normalize_path(&href)
254+
normalize_path(href)
245255
} else {
246256
normalize_path(&format!("{}/{}", base, href))
247257
}

0 commit comments

Comments
 (0)