Skip to content

Commit 08bdb78

Browse files
committed
feat(core, cli): move i/o to core
1 parent 9b0307b commit 08bdb78

File tree

19 files changed

+706
-438
lines changed

19 files changed

+706
-438
lines changed

.github/workflows/core.yml

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -39,14 +39,12 @@ jobs:
3939
run: cargo check -F geoarrow
4040
- name: Check w/ geoparquet
4141
run: cargo check -F geoparquet
42-
- name: Check w/ geoparquet-compression
43-
run: cargo check -F geoparquet-compression
44-
- name: Check w/ object_store
45-
run: cargo check -F object_store
42+
- name: Check w/ object-store
43+
run: cargo check -F object-store
4644
- name: Check w/ reqwest
4745
run: cargo check -F reqwest
4846
- name: Test
49-
run: cargo test -F geo -F geoarrow -F geoparquet -F geoparquet-compression -F reqwest -F object_store
47+
run: cargo test -F geo -F geoparquet-compression -F reqwest -F object-store-full
5048
test-core-with-gdal:
5149
runs-on: ubuntu-latest
5250
steps:

cli/Cargo.toml

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,29 +15,34 @@ rust-version = "1.75"
1515
default = ["gdal", "geoparquet", "pgstac"]
1616
duckdb = ["dep:stac-duckdb", "dep:duckdb"]
1717
gdal = ["stac/gdal"]
18-
geoparquet = ["dep:geoarrow", "stac/geoparquet-compression", "parquet"]
18+
geoparquet = ["stac/geoparquet-compression"]
1919
pgstac = ["stac-server/pgstac"]
2020
python = ["dep:pyo3", "pgstac", "geoparquet"]
2121

2222
[dependencies]
2323
axum = "0.7"
24-
bytes = { version = "1" }
2524
clap = { version = "4", features = ["derive"] }
26-
duckdb = { version = "1", optional = true } # We have this dependency only to allow us to bundle it
27-
geoarrow = { version = "0.3", optional = true }
28-
object_store = { version = "0.11", features = ["aws", "gcp", "azure", "http"] }
29-
parquet = { version = "52", optional = true }
25+
duckdb = { version = "1", optional = true } # We have this dependency only to allow us to bundle it
26+
object_store = "0.11"
3027
pyo3 = { version = "0.22", optional = true }
3128
reqwest = "0.12"
3229
serde = "1"
3330
serde_json = "1"
34-
stac = { version = "0.9.0", path = "../core", features = ["reqwest"] }
31+
stac = { version = "0.9.0", path = "../core", features = [
32+
"reqwest",
33+
"object-store-full",
34+
] }
3535
stac-api = { version = "0.5.0", path = "../api", features = ["client"] }
3636
stac-duckdb = { version = "0.0.1", path = "../duckdb", optional = true }
3737
stac-server = { version = "0.2.0", path = "../server", features = ["axum"] }
3838
stac-validate = { version = "0.2.2", path = "../validate" }
3939
thiserror = "1"
40-
tokio = { version = "1.23", features = ["macros", "io-std", "rt-multi-thread"] }
40+
tokio = { version = "1.23", features = [
41+
"macros",
42+
"io-std",
43+
"rt-multi-thread",
44+
"fs",
45+
] }
4146
tokio-stream = "0.1"
4247
tracing = "0.1"
4348
tracing-subscriber = "0.3"

cli/src/args/items.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ impl Run for Args {
3535
let mut join_set = JoinSet::new();
3636
let mut items = Vec::with_capacity(self.hrefs.len());
3737
for href in self.hrefs {
38-
let input = input.with_href(&href)?;
38+
let input = input.with_href(href.clone());
3939
let sender = stream.clone();
4040
let args = ItemArgs {
4141
id_or_href: href,

cli/src/args/mod.rs

Lines changed: 28 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,11 @@ mod serve;
1010
mod translate;
1111
mod validate;
1212

13-
use crate::{config::Entry, input::Input, output::Output, Result, Value};
13+
use crate::{input::Input, options::KeyValue, output::Output, Result, Value};
1414
use clap::Parser;
15+
use stac::Format;
1516
use tokio::sync::mpsc::Sender;
1617
use tokio::task::JoinHandle;
17-
use tracing::info;
1818
use tracing::metadata::Level;
1919

2020
const BUFFER: usize = 100;
@@ -25,19 +25,23 @@ const BUFFER: usize = 100;
2525
pub struct Args {
2626
/// The input format, if not provided will be inferred from the input file's extension, falling back to json
2727
#[arg(short, long, global = true)]
28-
input_format: Option<stac::Format>,
28+
input_format: Option<Format>,
2929

3030
/// key=value pairs to use for the input object store
31-
#[arg(short = 'k', long)]
32-
input_config: Vec<Entry>,
31+
#[arg(long = "input-option")]
32+
input_options: Vec<KeyValue>,
3333

3434
/// The output format, if not provided will be inferred from the output file's extension, falling back to json
3535
#[arg(short, long, global = true)]
36-
output_format: Option<crate::output::Format>,
36+
output_format: Option<Format>,
3737

3838
/// key=value pairs to use for the output object store
39-
#[arg(short = 'c', long)]
40-
output_config: Vec<Entry>,
39+
#[arg(long = "output-option")]
40+
output_options: Vec<KeyValue>,
41+
42+
/// key=value pairs to use for both the input and the output object store
43+
#[arg(short = 'c', long = "option")]
44+
options: Vec<KeyValue>,
4145

4246
/// If the output is a local file, create its parent directories before creating the file
4347
#[arg(long, default_value_t = true)]
@@ -129,23 +133,30 @@ impl Args {
129133
let input = Input::new(
130134
self.subcommand.take_infile(),
131135
self.input_format,
132-
self.input_config,
133-
)?;
136+
self.options
137+
.clone()
138+
.into_iter()
139+
.chain(self.input_options)
140+
.collect::<Vec<_>>(),
141+
);
134142
let mut output = Output::new(
135143
self.subcommand.take_outfile(),
136144
self.output_format.or({
137145
if self.stream {
138-
Some(crate::output::Format::NdJson)
146+
Some(Format::NdJson)
139147
} else {
140148
None
141149
}
142150
}),
143-
self.output_config,
151+
self.options
152+
.into_iter()
153+
.chain(self.output_options)
154+
.collect::<Vec<_>>(),
144155
self.create_parent_directories,
145156
)
146157
.await?;
147158
let value = if self.stream {
148-
if output.format != crate::output::Format::NdJson {
159+
if output.format != Format::NdJson {
149160
tracing::warn!(
150161
"format was set to {}, but stream=true so re-setting to nd-json",
151162
output.format
@@ -166,10 +177,10 @@ impl Args {
166177
};
167178
if let Some(value) = value {
168179
if let Some(put_result) = output.put(value).await? {
169-
info!(
170-
"put result: etag={}, version={}",
171-
put_result.e_tag.as_deref().unwrap_or("<none>"),
172-
put_result.version.as_deref().unwrap_or("<none>")
180+
tracing::info!(
181+
"put result: etag={} version={}",
182+
put_result.e_tag.unwrap_or_default(),
183+
put_result.version.unwrap_or_default()
173184
);
174185
}
175186
}

cli/src/args/serve.rs

Lines changed: 3 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -78,12 +78,8 @@ impl Run for Args {
7878
reading_from_stdin = true;
7979
}
8080
}
81-
let input = input.with_href(&href)?;
82-
let _ = join_set.spawn(async move {
83-
let mut value = input.get().await?;
84-
value.set_href(href);
85-
Ok(value)
86-
});
81+
let input = input.with_href(href);
82+
let _ = join_set.spawn(async move { input.get().await });
8783
}
8884
let mut item_join_set = JoinSet::new();
8985
let mut collections = HashSet::new();
@@ -102,7 +98,7 @@ impl Run for Args {
10298
collection.make_relative_links_absolute(href)?;
10399
for link in collection.iter_item_links() {
104100
let href = link.href.to_string();
105-
let input = input.with_href(&href)?;
101+
let input = input.with_href(href);
106102
let _ = item_join_set.spawn(async move { input.get().await });
107103
}
108104
}

cli/src/error.rs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,11 +17,6 @@ pub enum Error {
1717
#[error(transparent)]
1818
ObjectStorePath(#[from] object_store::path::Error),
1919

20-
/// [parquet::errors::ParquetError]
21-
#[error(transparent)]
22-
#[cfg(feature = "geoparquet")]
23-
Parquet(#[from] parquet::errors::ParquetError),
24-
2520
/// [reqwest::Error]
2621
#[error(transparent)]
2722
Reqwest(#[from] reqwest::Error),

cli/src/input.rs

Lines changed: 20 additions & 97 deletions
Original file line numberDiff line numberDiff line change
@@ -1,120 +1,43 @@
1-
use crate::{config::Config, Error, Result};
2-
use object_store::{local::LocalFileSystem, path::Path, ObjectStore};
3-
use stac::{Format, Item, ItemCollection, Value};
4-
use std::io::BufReader;
5-
use url::Url;
1+
use crate::{options::Options, Error, Result};
2+
use stac::{io::Config, Format, Value};
63

74
/// The input to a CLI run.
85
#[derive(Debug, Default)]
96
pub(crate) struct Input {
10-
format: Format,
11-
reader: Reader,
127
config: Config,
13-
}
14-
15-
#[derive(Debug, Default)]
16-
enum Reader {
17-
ObjectStore {
18-
object_store: Box<dyn ObjectStore>,
19-
path: Path,
20-
},
21-
#[default]
22-
Stdin,
8+
href: Option<String>,
239
}
2410

2511
impl Input {
2612
/// Creates a new input.
2713
pub(crate) fn new(
28-
infile: impl Into<Option<String>>,
14+
href: impl Into<Option<String>>,
2915
format: impl Into<Option<Format>>,
30-
config: impl Into<Config>,
31-
) -> Result<Input> {
32-
let infile = infile
16+
options: impl Into<Options>,
17+
) -> Input {
18+
let href = href
3319
.into()
34-
.and_then(|infile| if infile == "-" { None } else { Some(infile) });
35-
let format = format
36-
.into()
37-
.or_else(|| infile.as_deref().and_then(Format::infer_from_href))
38-
.unwrap_or_default();
39-
let config = config.into();
40-
let reader = if let Some(infile) = infile {
41-
let (object_store, path) = parse_href_opts(&infile, config.iter())?;
42-
Reader::ObjectStore { object_store, path }
43-
} else {
44-
Reader::Stdin
45-
};
46-
Ok(Input {
47-
format,
48-
reader,
49-
config,
50-
})
20+
.and_then(|href| if href == "-" { None } else { Some(href) });
21+
let config = Config::new().format(format).options(options.into());
22+
Input { config, href }
5123
}
5224

5325
/// Creates a new input with the given href.
54-
pub(crate) fn with_href(&self, href: &str) -> Result<Input> {
55-
let (object_store, path) = parse_href_opts(href, self.config.iter())?;
56-
let reader = Reader::ObjectStore { object_store, path };
57-
Ok(Input {
58-
format: self.format,
59-
reader,
26+
pub(crate) fn with_href(&self, href: impl Into<Option<String>>) -> Input {
27+
Input {
6028
config: self.config.clone(),
61-
})
29+
href: href.into(),
30+
}
6231
}
6332

6433
/// Gets a STAC value from the input.
6534
pub(crate) async fn get(&self) -> Result<Value> {
66-
tracing::debug!("getting {}", self.format);
67-
match &self.reader {
68-
Reader::ObjectStore { object_store, path } => {
69-
let bytes = object_store.get(path).await?.bytes().await?;
70-
match self.format {
71-
Format::Json => serde_json::from_slice(&bytes).map_err(Error::from),
72-
Format::NdJson => bytes
73-
.split(|c| *c == b'\n')
74-
.map(|line| serde_json::from_slice::<Item>(line).map_err(Error::from))
75-
.collect::<Result<Vec<_>>>()
76-
.map(ItemCollection::from)
77-
.map(Value::from),
78-
#[cfg(feature = "geoparquet")]
79-
Format::Geoparquet => stac::geoparquet::from_reader(bytes)
80-
.map(Value::from)
81-
.map_err(Error::from),
82-
}
83-
}
84-
Reader::Stdin => match self.format {
85-
Format::Json => serde_json::from_reader(std::io::stdin()).map_err(Error::from),
86-
Format::NdJson => stac::ndjson::from_buf_reader(BufReader::new(std::io::stdin()))
87-
.map(Value::from)
88-
.map_err(Error::from),
89-
#[cfg(feature = "geoparquet")]
90-
Format::Geoparquet => {
91-
use std::io::Read;
92-
93-
let mut buf = Vec::new();
94-
let _ = std::io::stdin().read_to_end(&mut buf)?;
95-
stac::geoparquet::from_reader(bytes::Bytes::from(buf))
96-
.map(Value::from)
97-
.map_err(Error::from)
98-
}
99-
},
35+
if let Some(href) = self.href.as_ref() {
36+
self.config.get(href.clone()).await.map_err(Error::from)
37+
} else {
38+
self.config
39+
.from_reader(std::io::stdin())
40+
.map_err(Error::from)
10041
}
10142
}
10243
}
103-
104-
pub(crate) fn parse_href_opts<I, K, V>(
105-
href: &str,
106-
options: I,
107-
) -> Result<(Box<dyn ObjectStore>, Path)>
108-
where
109-
I: IntoIterator<Item = (K, V)>,
110-
K: AsRef<str>,
111-
V: Into<String>,
112-
{
113-
if let Ok(url) = Url::parse(href) {
114-
object_store::parse_url_opts(&url, options).map_err(Error::from)
115-
} else {
116-
let path = Path::from_filesystem_path(href)?;
117-
let object_store = LocalFileSystem::new();
118-
Ok((Box::new(object_store), path))
119-
}
120-
}

cli/src/lib.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
//! ```
1414
1515
#![cfg_attr(docsrs, feature(doc_auto_cfg))]
16-
#![warn(
16+
#![deny(
1717
elided_lifetimes_in_paths,
1818
explicit_outlives_requirements,
1919
keyword_idents,
@@ -44,9 +44,9 @@
4444
)]
4545

4646
mod args;
47-
mod config;
4847
mod error;
4948
mod input;
49+
mod options;
5050
mod output;
5151
#[cfg(feature = "python")]
5252
mod python;

0 commit comments

Comments
 (0)