Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 13 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 3 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@ pyo3-async-runtimes = { version = "0.23.0", features = [
pyo3-log = "0.12.1"
pythonize = "0.23.0"
serde = "1.0.217"
serde_json = "1.0.135"
stac = { version = "0.11.1", features = [
serde_json = "1.0.138"
stac = { version = "0.12.0", features = [
"geoparquet-compression",
"object-store-all",
], git = "https://github.com/stac-utils/stac-rs" }
stac-api = { version = "0.7.0", features = [
stac-api = { version = "0.7.1", features = [
"client",
"python",
], git = "https://github.com/stac-utils/stac-rs" }
stac-duckdb = { version = "0.1.0", git = "https://github.com/stac-utils/stac-rs" }
thiserror = "2.0.11"
tokio = { version = "1.43.0", features = ["rt"] }
154 changes: 65 additions & 89 deletions src/search.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,10 @@
use crate::Error;
use pyo3::{
prelude::*,
types::{PyDict, PyList},
};
use crate::{Error, Json, Result};
use pyo3::{prelude::*, types::PyDict};
use stac::Format;
use stac_api::{
python::{StringOrDict, StringOrList},
BlockingClient, Item, ItemCollection,
Search,
};
use stac_duckdb::Client;
use tokio::runtime::Builder;

#[pyfunction]
#[pyo3(signature = (href, *, intersects=None, ids=None, collections=None, max_items=None, limit=None, bbox=None, datetime=None, include=None, exclude=None, sortby=None, filter=None, query=None, use_duckdb=None, **kwargs))]
Expand All @@ -31,13 +26,11 @@ pub fn search<'py>(
query: Option<Bound<'py, PyDict>>,
use_duckdb: Option<bool>,
kwargs: Option<Bound<'_, PyDict>>,
) -> PyResult<Bound<'py, PyList>> {
let items = search_items(
href,
) -> PyResult<Bound<'py, PyAny>> {
let search = stac_api::python::search(
intersects,
ids,
collections,
max_items,
limit,
bbox,
datetime,
Expand All @@ -46,18 +39,28 @@ pub fn search<'py>(
sortby,
filter,
query,
use_duckdb,
kwargs,
)?;
pythonize::pythonize(py, &items)
.map_err(PyErr::from)
.and_then(|v| v.extract())
if use_duckdb
.unwrap_or_else(|| matches!(Format::infer_from_href(&href), Some(Format::Geoparquet(_))))
{
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let value = search_duckdb(href, search, max_items)?;
Ok(Json(value.items))
})
} else {
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let value = search_api(href, search, max_items).await?;
Ok(Json(value.items))
})
}
}

#[pyfunction]
#[pyo3(signature = (outfile, href, *, intersects=None, ids=None, collections=None, max_items=None, limit=None, bbox=None, datetime=None, include=None, exclude=None, sortby=None, filter=None, query=None, format=None, options=None, use_duckdb=None, **kwargs))]
#[allow(clippy::too_many_arguments)]
pub fn search_to<'py>(
py: Python<'py>,
outfile: String,
href: String,
intersects: Option<StringOrDict>,
Expand All @@ -76,13 +79,11 @@ pub fn search_to<'py>(
options: Option<Vec<(String, String)>>,
use_duckdb: Option<bool>,
kwargs: Option<Bound<'_, PyDict>>,
) -> PyResult<usize> {
let items = search_items(
href,
) -> PyResult<Bound<'py, PyAny>> {
let search = stac_api::python::search(
intersects,
ids,
collections,
max_items,
limit,
bbox,
datetime,
Expand All @@ -91,7 +92,6 @@ pub fn search_to<'py>(
sortby,
filter,
query,
use_duckdb,
kwargs,
)?;
let format = format
Expand All @@ -100,77 +100,53 @@ pub fn search_to<'py>(
.map_err(Error::from)?
.or_else(|| Format::infer_from_href(&outfile))
.unwrap_or_default();
let item_collection = ItemCollection::from(items);
let count = item_collection.items.len();
Builder::new_current_thread()
.build()?
.block_on(format.put_opts(
outfile,
serde_json::to_value(item_collection).map_err(Error::from)?,
options.unwrap_or_default(),
))
.map_err(Error::from)?;
Ok(count)
}

#[allow(clippy::too_many_arguments)]
fn search_items<'py>(
href: String,
intersects: Option<StringOrDict>,
ids: Option<StringOrList>,
collections: Option<StringOrList>,
max_items: Option<usize>,
limit: Option<u64>,
bbox: Option<Vec<f64>>,
datetime: Option<String>,
include: Option<StringOrList>,
exclude: Option<StringOrList>,
sortby: Option<StringOrList>,
filter: Option<StringOrDict>,
query: Option<Bound<'py, PyDict>>,
use_duckdb: Option<bool>,
kwargs: Option<Bound<'py, PyDict>>,
) -> PyResult<Vec<Item>> {
let mut search = stac_api::python::search(
intersects,
ids,
collections,
limit,
bbox,
datetime,
include,
exclude,
sortby,
filter,
query,
kwargs,
)?;
if use_duckdb
.unwrap_or_else(|| matches!(Format::infer_from_href(&href), Some(Format::Geoparquet(_))))
{
if let Some(max_items) = max_items {
search.items.limit = Some(max_items.try_into()?);
}
let client = Client::new().map_err(Error::from)?;
client
.search_to_json(&href, search)
.map(|item_collection| item_collection.items)
.map_err(Error::from)
.map_err(PyErr::from)
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let value = search_duckdb(href, search, max_items)?;
let count = value.items.len();
let _ = format
.put_opts(
outfile,
serde_json::to_value(value).map_err(Error::from)?,
options.unwrap_or_default(),
)
.await
.map_err(Error::from)?;
Ok(count)
})
} else {
let client = BlockingClient::new(&href).map_err(Error::from)?;
let items = client.search(search).map_err(Error::from)?;
if let Some(max_items) = max_items {
items
.take(max_items)
.collect::<Result<_, _>>()
.map_err(Error::from)
.map_err(PyErr::from)
} else {
items
.collect::<Result<_, _>>()
.map_err(Error::from)
.map_err(PyErr::from)
}
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let value = search_api(href, search, max_items).await?;
let count = value.items.len();
let _ = format
.put_opts(
outfile,
serde_json::to_value(value).map_err(Error::from)?,
options.unwrap_or_default(),
)
.await
.map_err(Error::from)?;
Ok(count)
})
}
}

fn search_duckdb(
href: String,
search: Search,
max_items: Option<usize>,
) -> Result<stac_api::ItemCollection> {
let value = stac_duckdb::search(&href, search, max_items)?;
Ok(value)
}

async fn search_api(
href: String,
search: Search,
max_items: Option<usize>,
) -> Result<stac_api::ItemCollection> {
let value = stac_api::client::search(&href, search, max_items).await?;
Ok(value)
}
10 changes: 5 additions & 5 deletions stacrs.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ async def read(
>>> item = await stacrs.read("item.json")
"""

def search(
async def search(
href: str,
*,
intersects: Optional[str | dict[str, Any]] = None,
Expand Down Expand Up @@ -201,7 +201,7 @@ def search(
list[dict[str, Any]]: A list of the returned STAC items.

Examples:
>>> items = stacrs.search(
>>> items = await stacrs.search(
... "https://landsatlook.usgs.gov/stac-server",
... collections=["landsat-c2l2-sr"],
... intersects={"type": "Point", "coordinates": [-105.119, 40.173]},
Expand All @@ -210,7 +210,7 @@ def search(
... )
"""

def search_to(
async def search_to(
outfile: str,
href: str,
*,
Expand Down Expand Up @@ -272,10 +272,10 @@ def search_to(
to None.

Returns:
list[dict[str, Any]]: A list of the returned STAC items.
int: The number of items written

Examples:
>>> items = stacrs.search_to("out.parquet",
>>> count = await stacrs.search_to("out.parquet",
... "https://landsatlook.usgs.gov/stac-server",
... collections=["landsat-c2l2-sr"],
... intersects={"type": "Point", "coordinates": [-105.119, 40.173]},
Expand Down
Loading