Skip to content

Commit 134cf93

Browse files
authored
feat: storage abstraction (#749)
## Closes - #731 cc @kylebarron
1 parent d7b1254 commit 134cf93

File tree

10 files changed

+182
-329
lines changed

10 files changed

+182
-329
lines changed

crates/cli/src/lib.rs

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
use anyhow::{Error, Result, anyhow};
66
use clap::{Parser, Subcommand};
7-
use stac::{Collection, Item, Links, Migrate, geoparquet::Compression};
7+
use stac::{Collection, Href, Item, Links, Migrate, geoparquet::Compression};
88
use stac_api::{GetItems, GetSearch, Search};
99
use stac_io::{Format, Validate};
1010
use stac_server::Backend;
@@ -459,7 +459,8 @@ impl Rustac {
459459
let href = href.and_then(|s| if s == "-" { None } else { Some(s) });
460460
let format = self.input_format(href);
461461
if let Some(href) = href {
462-
let value: stac::Value = format.get_opts(href, self.opts()).await?;
462+
let (store, path) = stac_io::parse_href_opts(Href::from(href), self.opts())?;
463+
let value: stac::Value = store.get_format(path, format).await?;
463464
Ok(value)
464465
} else {
465466
let mut buf = Vec::new();
@@ -473,10 +474,10 @@ impl Rustac {
473474
let href = href.and_then(|s| if s == "-" { None } else { Some(s) });
474475
let format = self.output_format(href);
475476
if let Some(href) = href {
476-
let opts = self.opts();
477+
let (store, path) = stac_io::parse_href_opts(Href::from(href), self.opts())?;
477478
let _ = match value {
478-
Value::Json(json) => format.put_opts(href, json, opts).await?,
479-
Value::Stac(stac) => format.put_opts(href, stac, opts).await?,
479+
Value::Json(json) => store.put_format(path, json, format).await?,
480+
Value::Stac(stac) => store.put_format(path, stac, format).await?,
480481
};
481482
Ok(())
482483
} else {

crates/core/src/href.rs

Lines changed: 24 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,16 @@ impl TryFrom<Href> for Url {
178178
fn try_from(value: Href) -> Result<Self> {
179179
match value {
180180
Href::Url(url) => Ok(url),
181-
Href::String(s) => s.parse().map_err(Error::from),
181+
Href::String(mut s) => {
182+
if !s.starts_with("/") {
183+
s = std::env::current_dir()?
184+
.join(s)
185+
.to_string_lossy()
186+
.into_owned();
187+
}
188+
let url = Url::parse(&format!("file://{s}"))?;
189+
Ok(url)
190+
}
182191
}
183192
}
184193
}
@@ -322,3 +331,17 @@ fn make_relative(href: &str, base: &str) -> String {
322331

323332
relative
324333
}
334+
335+
#[cfg(test)]
336+
mod tests {
337+
use super::Href;
338+
use url::Url;
339+
340+
#[test]
341+
fn href_to_url() {
342+
let href = Href::from("examples/simple-item.json");
343+
let url: Url = href.try_into().unwrap();
344+
assert_eq!(url.scheme(), "file");
345+
assert!(url.path().ends_with("examples/simple-item.json"));
346+
}
347+
}

crates/io/src/error.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,10 @@ pub enum Error {
8383
#[cfg(feature = "validate")]
8484
#[error(transparent)]
8585
JsonschemaValidation(#[from] jsonschema::ValidationError<'static>),
86+
87+
/// [url::ParseError]
88+
#[error(transparent)]
89+
UrlParse(#[from] url::ParseError),
8690
}
8791

8892
#[cfg(feature = "validate")]

crates/io/src/format.rs

Lines changed: 0 additions & 183 deletions
Original file line numberDiff line numberDiff line change
@@ -128,75 +128,6 @@ impl Format {
128128
Ok(value)
129129
}
130130

131-
/// Gets a STAC value from an object store with the provided options.
132-
///
133-
/// # Examples
134-
///
135-
/// ```no_run
136-
/// use stac::Catalog;
137-
/// use stac_io::Format;
138-
///
139-
/// #[cfg(feature = "store-aws")]
140-
/// {
141-
/// # tokio_test::block_on(async {
142-
/// let catalog: Catalog = stac_io::get_opts("s3://nz-elevation/catalog.json",
143-
/// [("skip_signature", "true"), ("region", "ap-southeast-2")],
144-
/// ).await.unwrap();
145-
/// # })
146-
/// }
147-
/// ```
148-
#[cfg(feature = "store")]
149-
pub async fn get_opts<T, I, K, V>(&self, href: impl Into<Href>, options: I) -> Result<T>
150-
where
151-
T: SelfHref + Readable,
152-
I: IntoIterator<Item = (K, V)>,
153-
K: AsRef<str>,
154-
V: Into<String>,
155-
{
156-
let href = href.into();
157-
match href.clone().into() {
158-
RealizedHref::Url(url) => {
159-
let (object_store, path) = parse_url_opts(&url, options)?;
160-
let mut value: T =
161-
self.get_store(object_store.into(), path)
162-
.await
163-
.map_err(|err| Error::Get {
164-
href,
165-
message: err.to_string(),
166-
})?;
167-
value.set_self_href(url);
168-
Ok(value)
169-
}
170-
RealizedHref::PathBuf(path) => {
171-
tracing::debug!(
172-
"getting {self} from {} with the standard library",
173-
path.display()
174-
);
175-
self.from_path(path).map_err(|err| Error::Get {
176-
href,
177-
message: err.to_string(),
178-
})
179-
}
180-
}
181-
}
182-
183-
/// Gets a STAC value from an object store.
184-
#[cfg(feature = "store")]
185-
pub async fn get_store<T>(
186-
&self,
187-
object_store: std::sync::Arc<dyn object_store::ObjectStore>,
188-
path: impl Into<object_store::path::Path>,
189-
) -> Result<T>
190-
where
191-
T: SelfHref + Readable,
192-
{
193-
let path = path.into();
194-
tracing::debug!("getting {self} from {path} with object store");
195-
let get_result = object_store.get(&path).await?;
196-
let value: T = self.from_bytes(get_result.bytes().await?)?;
197-
Ok(value)
198-
}
199-
200131
/// Writes a STAC value to the provided path.
201132
///
202133
/// # Examples
@@ -237,62 +168,6 @@ impl Format {
237168
Ok(value)
238169
}
239170

240-
/// Puts a STAC value to an object store with the provided options.
241-
///
242-
/// # Examples
243-
///
244-
/// ```no_run
245-
/// use stac::Item;
246-
/// use stac_io::Format;
247-
///
248-
/// let item = Item::new("an-id");
249-
/// #[cfg(feature = "store-aws")]
250-
/// {
251-
/// # tokio_test::block_on(async {
252-
/// Format::json().put_opts("s3://bucket/item.json", item, [("aws_access_key_id", "...")]).await.unwrap();
253-
/// # })
254-
/// }
255-
/// ```
256-
#[cfg(feature = "store")]
257-
pub async fn put_opts<T, I, K, V>(
258-
&self,
259-
href: impl ToString,
260-
value: T,
261-
options: I,
262-
) -> Result<Option<object_store::PutResult>>
263-
where
264-
T: Writeable,
265-
I: IntoIterator<Item = (K, V)>,
266-
K: AsRef<str>,
267-
V: Into<String>,
268-
{
269-
let href = href.to_string();
270-
if let Ok(url) = url::Url::parse(&href) {
271-
let (object_store, path) = parse_url_opts(&url, options)?;
272-
self.put_store(object_store.into(), path, value)
273-
.await
274-
.map(Some)
275-
} else {
276-
self.write(href, value).map(|_| None)
277-
}
278-
}
279-
280-
/// Puts a STAC value into an object store.
281-
#[cfg(feature = "store")]
282-
pub async fn put_store<T>(
283-
&self,
284-
object_store: std::sync::Arc<dyn object_store::ObjectStore>,
285-
path: impl Into<object_store::path::Path>,
286-
value: T,
287-
) -> Result<object_store::PutResult>
288-
where
289-
T: Writeable,
290-
{
291-
let bytes = self.into_vec(value)?;
292-
let put_result = object_store.put(&path.into(), bytes.into()).await?;
293-
Ok(put_result)
294-
}
295-
296171
/// Returns the default JSON format (compact).
297172
pub fn json() -> Format {
298173
Format::Json(false)
@@ -317,33 +192,6 @@ impl Format {
317192
}
318193
}
319194

320-
#[cfg(feature = "store")]
321-
fn parse_url_opts<I, K, V>(
322-
url: &url::Url,
323-
options: I,
324-
) -> Result<(Box<dyn object_store::ObjectStore>, object_store::path::Path)>
325-
where
326-
I: IntoIterator<Item = (K, V)>,
327-
K: AsRef<str>,
328-
V: Into<String>,
329-
{
330-
// It's technically inefficient to parse it twice, but we're doing this to
331-
// then do IO so who cares.
332-
#[cfg(feature = "store-aws")]
333-
if let Ok((object_store::ObjectStoreScheme::AmazonS3, path)) =
334-
object_store::ObjectStoreScheme::parse(url)
335-
{
336-
let mut builder = object_store::aws::AmazonS3Builder::from_env();
337-
for (key, value) in options {
338-
builder = builder.with_config(key.as_ref().parse()?, value);
339-
}
340-
return Ok((Box::new(builder.with_url(url.to_string()).build()?), path));
341-
}
342-
343-
let result = object_store::parse_url_opts(url, options)?;
344-
Ok(result)
345-
}
346-
347195
impl Default for Format {
348196
fn default() -> Self {
349197
Self::Json(false)
@@ -461,35 +309,4 @@ mod tests {
461309
);
462310
}
463311
}
464-
465-
#[tokio::test]
466-
#[cfg(feature = "store")]
467-
async fn prefix_store_read() {
468-
use stac::Item;
469-
use std::sync::Arc;
470-
471-
let object_store =
472-
object_store::local::LocalFileSystem::new_with_prefix("examples").unwrap();
473-
let _: Item = Format::json()
474-
.get_store(Arc::new(object_store), "simple-item.json")
475-
.await
476-
.unwrap();
477-
}
478-
479-
#[tokio::test]
480-
#[cfg(feature = "store")]
481-
async fn store_write() {
482-
use object_store::ObjectStore;
483-
use stac::Item;
484-
use std::sync::Arc;
485-
486-
let object_store = Arc::new(object_store::memory::InMemory::new());
487-
let item = Item::new("an-id");
488-
let _ = Format::json()
489-
.put_store(object_store.clone(), "item.json", item)
490-
.await
491-
.unwrap();
492-
let get_result = object_store.get(&"item.json".into()).await.unwrap();
493-
let _: Item = serde_json::from_slice(&get_result.bytes().await.unwrap()).unwrap();
494-
}
495312
}

crates/io/src/lib.rs

Lines changed: 1 addition & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,7 @@ mod write;
1515
#[cfg(feature = "geoparquet")]
1616
pub use geoparquet::{FromGeoparquetPath, IntoGeoparquetPath};
1717
#[cfg(feature = "store")]
18-
pub use store::{
19-
get::{get, get_opts},
20-
put::{put, put_opts},
21-
};
18+
pub use store::{StacStore, parse_href, parse_href_opts};
2219
#[cfg(feature = "validate")]
2320
pub use validate::{Validate, Validator};
2421
pub use {
@@ -134,18 +131,6 @@ mod tests {
134131
let _ = super::read::<ItemCollection>("data/extended-item.parquet").unwrap_err();
135132
}
136133

137-
#[tokio::test]
138-
#[cfg(all(feature = "store", not(target_os = "windows")))]
139-
async fn get() {
140-
let path = format!(
141-
"file://{}",
142-
std::fs::canonicalize("examples/simple-item.json")
143-
.unwrap()
144-
.to_string_lossy()
145-
);
146-
let _: Item = super::get(path).await.unwrap();
147-
}
148-
149134
#[test]
150135
fn write() {
151136
let tempdir = TempDir::new().unwrap();
@@ -154,18 +139,4 @@ mod tests {
154139
let item: Item = super::read(tempdir.path().join("item.json")).unwrap();
155140
assert_eq!(item.id, "an-id");
156141
}
157-
158-
#[tokio::test]
159-
#[cfg(feature = "store")]
160-
async fn put() {
161-
let tempdir = TempDir::new().unwrap();
162-
let path = format!(
163-
"file://{}",
164-
tempdir.path().join("item.json").to_string_lossy()
165-
);
166-
let item = Item::new("an-id");
167-
assert!(super::put(path, item).await.unwrap().is_some());
168-
let item: Item = crate::read(tempdir.path().join("item.json")).unwrap();
169-
assert_eq!(item.id, "an-id");
170-
}
171142
}

0 commit comments

Comments
 (0)