Skip to content

Commit 7fc2817

Browse files
authored
feat: crawl (#756)
## Closes - Closes #750 as OBE
1 parent 970c488 commit 7fc2817

File tree

7 files changed

+174
-17
lines changed

7 files changed

+174
-17
lines changed

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,8 @@ cql2 = "0.3.7"
4949
duckdb = "1.2.2"
5050
fluent-uri = "0.3.2"
5151
futures = "0.3.31"
52+
futures-core = "0.3.31"
53+
futures-util = "0.3.31"
5254
geo = "0.30.0"
5355
geo-traits = "0.2.0"
5456
geo-types = "0.7.15"
@@ -98,6 +100,7 @@ tracing-subscriber = { version = "0.3.18", features = [
98100
"env-filter",
99101
"tracing-log",
100102
] }
103+
tracing-indicatif = "0.3.9"
101104
url = "2.3"
102105
webpki-roots = "1.0.0"
103106

crates/cli/CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
99
### Added
1010

1111
- DuckDB server backend ([#651](https://github.com/stac-utils/rustac/pull/651))
12+
- Crawl ([#756](https://github.com/stac-utils/rustac/pull/756))
1213

1314
## [0.5.3] - 2025-02-20
1415

crates/cli/Cargo.toml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,8 +18,11 @@ duckdb-bundled = ["stac-duckdb/bundled"]
1818

1919
[dependencies]
2020
anyhow.workspace = true
21+
async-stream.workspace = true
2122
axum.workspace = true
2223
clap = { workspace = true, features = ["derive"] }
24+
futures-core.workspace = true
25+
futures-util.workspace = true
2326
serde_json.workspace = true
2427
stac.workspace = true
2528
stac-api = { workspace = true, features = ["client"] }
@@ -38,7 +41,9 @@ tokio = { workspace = true, features = [
3841
"fs",
3942
] }
4043
tracing.workspace = true
44+
tracing-indicatif.workspace = true
4145
tracing-subscriber = { workspace = true, features = ["env-filter"] }
46+
url.workspace = true
4247

4348
[dev-dependencies]
4449
assert_cmd.workspace = true

crates/cli/src/lib.rs

Lines changed: 131 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,15 +3,26 @@
33
#![deny(unused_crate_dependencies)]
44

55
use anyhow::{Error, Result, anyhow};
6+
use async_stream::try_stream;
67
use clap::{Parser, Subcommand};
7-
use stac::{Collection, Href, Item, Links, Migrate, geoparquet::Compression};
8+
use futures_core::TryStream;
9+
use futures_util::{TryStreamExt, pin_mut};
10+
use stac::{Assets, Collection, Href, Item, Links, Migrate, SelfHref, geoparquet::Compression};
811
use stac_api::{GetItems, GetSearch, Search};
9-
use stac_io::{Format, Validate};
12+
use stac_io::{Format, StacStore, Validate};
1013
use stac_server::Backend;
11-
use std::{collections::HashMap, io::Write, str::FromStr};
12-
use tokio::{io::AsyncReadExt, net::TcpListener, runtime::Handle};
14+
use std::{
15+
collections::{HashMap, VecDeque},
16+
io::Write,
17+
str::FromStr,
18+
};
19+
use tokio::{io::AsyncReadExt, net::TcpListener, runtime::Handle, task::JoinSet};
1320
use tracing::metadata::Level;
14-
use tracing_subscriber::EnvFilter;
21+
use tracing_indicatif::IndicatifLayer;
22+
use tracing_subscriber::{
23+
fmt::writer::MakeWriterExt, layer::SubscriberExt, util::SubscriberInitExt,
24+
};
25+
use url::Url;
1526

1627
const DEFAULT_COLLECTION_ID: &str = "default-collection-id";
1728

@@ -232,6 +243,19 @@ pub enum Command {
232243
create_collections: bool,
233244
},
234245

246+
/// Crawls a STAC Catalog or Collection by following its links.
247+
///
248+
/// Items are saved as item collections (in the output format) in the output directory.
249+
Crawl {
250+
/// The href of a STAC Catalog or Collection
251+
href: String,
252+
253+
/// The output directory
254+
///
255+
/// This doesn't have to be local, by the way.
256+
directory: String,
257+
},
258+
235259
/// Validates a STAC value.
236260
///
237261
/// The default output format is plain text — use `--output-format=json` to
@@ -264,11 +288,16 @@ impl Rustac {
264288
/// is setting up the appropriate logging (e.g. Python).
265289
pub async fn run(self, init_tracing_subscriber: bool) -> Result<()> {
266290
if init_tracing_subscriber {
267-
tracing_subscriber::fmt()
268-
.with_env_filter(EnvFilter::from_default_env())
269-
.with_max_level(self.log_level())
270-
.with_writer(std::io::stderr)
271-
.pretty()
291+
let indicatif_layer = IndicatifLayer::new();
292+
tracing_subscriber::registry()
293+
.with(
294+
tracing_subscriber::fmt::layer().with_writer(
295+
indicatif_layer
296+
.get_stderr_writer()
297+
.with_max_level(self.log_level().unwrap_or(Level::WARN)),
298+
),
299+
)
300+
.with(indicatif_layer)
272301
.init();
273302
}
274303
match self.command {
@@ -418,6 +447,45 @@ impl Rustac {
418447
load_and_serve(addr, backend, collections, items, create_collections).await
419448
}
420449
}
450+
Command::Crawl {
451+
ref href,
452+
ref directory,
453+
} => {
454+
let opts = self.opts();
455+
let (store, path) = stac_io::parse_href_opts(href.clone(), opts.clone())?;
456+
let value: stac::Value = store.get(path).await.unwrap();
457+
let mut items: HashMap<Option<String>, Vec<Item>> = HashMap::new();
458+
let crawl = crawl(value, store).await;
459+
pin_mut!(crawl);
460+
let mut warned = false;
461+
while let Some(item) = crawl.try_next().await? {
462+
let collection = item.collection.clone();
463+
if collection.as_deref() == Some(DEFAULT_COLLECTION_ID) && !warned {
464+
warned = true;
465+
tracing::warn!(
466+
"collection id matches the default collection id, so any collection-less items will be grouped into this collection: {DEFAULT_COLLECTION_ID}"
467+
)
468+
}
469+
items.entry(collection).or_default().push(item);
470+
}
471+
let (store, path) = stac_io::parse_href_opts(directory.clone(), opts)?;
472+
let format = self.output_format(None);
473+
for (collection, items) in items {
474+
let file_name = format!(
475+
"{}.{}",
476+
collection.as_deref().unwrap_or(DEFAULT_COLLECTION_ID),
477+
format.extension()
478+
);
479+
store
480+
.put_format(
481+
path.child(file_name),
482+
stac::ItemCollection::from(items),
483+
format,
484+
)
485+
.await?;
486+
}
487+
Ok(())
488+
}
421489
Command::Validate { ref infile } => {
422490
let value = self.get(infile.as_deref()).await?;
423491
let result = Handle::current()
@@ -651,6 +719,59 @@ fn level_value(level: Option<Level>) -> i8 {
651719
}
652720
}
653721

722+
async fn crawl(value: stac::Value, store: StacStore) -> impl TryStream<Item = Result<Item>> {
723+
use stac::Value::*;
724+
725+
try_stream! {
726+
let mut values = VecDeque::from([value]);
727+
while let Some(mut value) = values.pop_front() {
728+
value.make_links_absolute()?;
729+
match value {
730+
Catalog(_) | Collection(_) => {
731+
if let Catalog(ref catalog) = value {
732+
tracing::info!("got catalog={}", catalog.id);
733+
}
734+
if let Collection(ref collection) = value {
735+
tracing::info!("got collection={}", collection.id);
736+
}
737+
let mut join_set: JoinSet<Result<stac::Value>> = JoinSet::new();
738+
for link in value
739+
.links()
740+
.iter()
741+
.filter(|link| link.is_child() || link.is_item())
742+
.cloned()
743+
{
744+
let store = store.clone();
745+
let url = Url::try_from(link.href)?;
746+
join_set.spawn(async move {
747+
let value: stac::Value = store.get(url.path()).await?;
748+
Ok(value)
749+
});
750+
}
751+
while let Some(result) = join_set.join_next().await {
752+
let value = result??;
753+
values.push_back(value);
754+
}
755+
}
756+
Item(mut item) => {
757+
if let Some(self_href) = item.self_href().cloned() {
758+
item.make_assets_absolute(self_href)?;
759+
}
760+
yield item;
761+
}
762+
ItemCollection(item_collection) => {
763+
for mut item in item_collection.items {
764+
if let Some(self_href) = item.self_href().cloned() {
765+
item.make_assets_absolute(self_href)?;
766+
}
767+
yield item;
768+
}
769+
}
770+
}
771+
}
772+
}
773+
}
774+
654775
#[cfg(test)]
655776
mod tests {
656777
use super::Rustac;

crates/core/CHANGELOG.md

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -12,10 +12,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1212
- `Clone` for `Container` ([#666](https://github.com/stac-utils/rustac/pull/666))
1313
- `Serialize` for `Container` ([#667](https://github.com/stac-utils/rustac/pull/667))
1414
- More permissive datetime interval parsing ([#715](https://github.com/stac-utils/rustac/pull/715))
15-
- `Format` methods for providing your own object store ([#730](https://github.com/stac-utils/rustac/pull/730))
1615
- `type` field to **stac-geoparquet** writes ([#736](https://github.com/stac-utils/rustac/pull/736))
1716
- `SelfHref::set_self_href` and `SelfHref::clear_self_href` ([#746](https://github.com/stac-utils/rustac/pull/746))
1817
- `Assets::make_assets_absolute` ([#753](https://github.com/stac-utils/rustac/pull/753))
18+
- `Format::extension` ([#756](https://github.com/stac-utils/rustac/pull/756))
1919

2020
### Changed
2121

@@ -28,6 +28,10 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2828

2929
- Support geometry columns other than "geometry" for **stac-geoparquet** ([#723](https://github.com/stac-utils/rustac/pull/723), [#727](https://github.com/stac-utils/rustac/pull/727))
3030

31+
### Removed
32+
33+
- IO (moved to **stac-io**) ([#739](https://github.com/stac-utils/rustac/pull/739))
34+
3135
## [0.12.0] - 2025-01-31
3236

3337
### Added

crates/io/src/format.rs

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,26 @@ impl Format {
3333
href.rsplit_once('.').and_then(|(_, ext)| ext.parse().ok())
3434
}
3535

36+
/// Returns this format's file extension.
37+
///
38+
/// # Examples
39+
///
40+
/// ```
41+
/// use stac_io::Format;
42+
/// assert_eq!(Format::json().extension(), "json");
43+
/// assert_eq!(Format::ndjson().extension(), "ndjson");
44+
/// #[cfg(feature = "geoparquet")]
45+
/// assert_eq!(Format::geoparquet().extension(), "parquet");
46+
/// ```
47+
pub fn extension(&self) -> &'static str {
48+
match self {
49+
Format::Json(_) => "json",
50+
Format::NdJson => "ndjson",
51+
#[cfg(feature = "geoparquet")]
52+
Format::Geoparquet(_) => "parquet",
53+
}
54+
}
55+
3656
/// Returns true if this is a geoparquet href.
3757
#[cfg(feature = "geoparquet")]
3858
pub fn is_geoparquet_href(href: &str) -> bool {

crates/io/src/store.rs

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
use crate::{Format, Readable, Result, Writeable};
22
use object_store::{ObjectStore, ObjectStoreScheme, PutResult, path::Path};
33
use stac::Href;
4-
use std::sync::Arc;
4+
use std::{fmt::Debug, sync::Arc};
5+
use tracing::instrument;
56
use url::Url;
67

78
/// Parses an href into a [StacStore] and a [Path].
@@ -72,7 +73,7 @@ where
7273
}
7374

7475
/// Reads STAC from an [ObjectStore].
75-
#[derive(Debug)]
76+
#[derive(Debug, Clone)]
7677
pub struct StacStore {
7778
store: Arc<dyn ObjectStore>,
7879
root: Url,
@@ -126,7 +127,8 @@ impl StacStore {
126127
}
127128

128129
/// Gets a STAC value from the store in a specific format.
129-
pub async fn get_format<T>(&self, path: impl Into<Path>, format: Format) -> Result<T>
130+
#[instrument(skip(self))]
131+
pub async fn get_format<T>(&self, path: impl Into<Path> + Debug, format: Format) -> Result<T>
130132
where
131133
T: Readable,
132134
{
@@ -141,22 +143,23 @@ impl StacStore {
141143
/// Puts a STAC value to the store.
142144
pub async fn put<T>(&self, path: impl Into<Path>, value: T) -> Result<PutResult>
143145
where
144-
T: Writeable,
146+
T: Writeable + Debug,
145147
{
146148
let path = path.into();
147149
let format = Format::infer_from_href(path.as_ref()).unwrap_or_default();
148150
self.put_format(path, value, format).await
149151
}
150152

151153
/// Puts a STAC value to the store in a specific format.
154+
#[instrument(skip(self))]
152155
pub async fn put_format<T>(
153156
&self,
154-
path: impl Into<Path>,
157+
path: impl Into<Path> + Debug,
155158
value: T,
156159
format: Format,
157160
) -> Result<PutResult>
158161
where
159-
T: Writeable,
162+
T: Writeable + Debug,
160163
{
161164
let path = path.into();
162165
let bytes = format.into_vec(value)?;

0 commit comments

Comments
 (0)