Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
147 changes: 147 additions & 0 deletions crates/core/src/api/client.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
use super::{ItemCollection, Items, Search};
use crate::{Collection, Error, Item};
use std::future::Future;

/// A client that can search for STAC items.
///
/// [`SearchClient::search`] is the only required method. [`SearchClient::item`]
/// and [`SearchClient::items`] have default implementations that delegate to
/// `search`.
pub trait SearchClient: Send + Sync {
/// The error type for this client.
type Error: Send;

/// Searches for STAC items matching the given parameters.
fn search(
&self,
search: Search,
) -> impl Future<Output = Result<ItemCollection, Self::Error>> + Send;

/// Returns a single item from a collection.
///
/// The default implementation uses [`SearchClient::search`] with `ids` and
/// `collections` filters, then deserializes the result.
fn item(
&self,
collection_id: &str,
item_id: &str,
) -> impl Future<Output = Result<Option<Item>, Self::Error>> + Send
where
Self::Error: From<Error>,
{
async move {
let search = Search::default()
.ids(vec![item_id.to_string()])
.collections(vec![collection_id.to_string()]);
let mut item_collection = self.search(search).await?;
if item_collection.items.len() == 1 {
let api_item = item_collection.items.pop().expect("just checked length");
let item: Item = serde_json::from_value(serde_json::Value::Object(api_item))
.map_err(Error::from)?;
Ok(Some(item))
} else {
Ok(None)
}
}
}

/// Returns items from a collection.
///
/// The default implementation converts the request to a [`Search`] scoped
/// to the given collection and delegates to [`SearchClient::search`].
fn items(
&self,
collection_id: &str,
items: Items,
) -> impl Future<Output = Result<ItemCollection, Self::Error>> + Send {
async move {
let search = items.search_collection(collection_id);
self.search(search).await
}
}
}

/// A client that can retrieve STAC collections.
///
/// [`CollectionSearchClient::collections`] is the only required method.
/// [`CollectionSearchClient::collection`] has a default implementation that
/// filters the result of `collections`.
pub trait CollectionSearchClient: Send + Sync {
/// The error type for this client.
type Error: Send;

/// Returns all collections.
fn collections(&self) -> impl Future<Output = Result<Vec<Collection>, Self::Error>> + Send;

/// Returns a single collection by ID.
///
/// The default implementation calls
/// [`CollectionSearchClient::collections`] and finds the matching
/// collection.
fn collection(
&self,
id: &str,
) -> impl Future<Output = Result<Option<Collection>, Self::Error>> + Send {
async move {
let collections = self.collections().await?;
Ok(collections.into_iter().find(|c| c.id == id))
}
}
}

/// A client that can create or add STAC items and collections.
///
/// [`TransactionClient::add_collection`] and
/// [`TransactionClient::add_item`] are required methods.
/// [`TransactionClient::add_items`] has a default implementation that calls
/// `add_item` in a loop.
pub trait TransactionClient: Send {
/// The error type for this client.
type Error: Send;

/// Adds a collection.
fn add_collection(
&mut self,
collection: Collection,
) -> impl Future<Output = Result<(), Self::Error>> + Send;

/// Adds a single item.
fn add_item(&mut self, item: Item) -> impl Future<Output = Result<(), Self::Error>> + Send;

/// Adds multiple items.
///
/// The default implementation calls [`TransactionClient::add_item`] for
/// each item sequentially.
fn add_items(
&mut self,
items: Vec<Item>,
) -> impl Future<Output = Result<(), Self::Error>> + Send {
async move {
for item in items {
self.add_item(item).await?;
}
Ok(())
}
}
}

/// A client that can search for STAC items returning Arrow record batches.
///
/// [`ArrowSearchClient::search_to_arrow`] is the only required method.
///
/// Unlike the other client traits, this trait does not require `Send + Sync`
/// and `search_to_arrow` is synchronous. This allows implementations to return
/// borrowing iterators (e.g. iterators that borrow from a database connection).
#[cfg(feature = "geoarrow")]
pub trait ArrowSearchClient {
/// The error type for this client.
type Error;

/// The record batch reader type returned by [`ArrowSearchClient::search_to_arrow`].
type RecordBatchStream<'a>: arrow_array::RecordBatchReader
where
Self: 'a;

/// Searches for STAC items, returning results as Arrow record batches.
fn search_to_arrow(&self, search: Search) -> Result<Self::RecordBatchStream<'_>, Self::Error>;
}
4 changes: 4 additions & 0 deletions crates/core/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

#![warn(missing_docs, unused_qualifications)]

mod client;
mod collections;
mod conformance;
mod fields;
Expand All @@ -45,6 +46,9 @@ mod search;
mod sort;
mod url_builder;

#[cfg(feature = "geoarrow")]
pub use client::ArrowSearchClient;
pub use client::{CollectionSearchClient, SearchClient, TransactionClient};
pub use collections::Collections;
pub use conformance::{
COLLECTIONS_URI, CORE_URI, Conformance, FEATURES_URI, FILTER_URIS, GEOJSON_URI,
Expand Down
1 change: 1 addition & 0 deletions crates/duckdb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,4 @@ geo.workspace = true
rstest.workspace = true
stac-validate = { path = "../validate" }
tokio = { workspace = true, features = ["rt-multi-thread", "macros"] }
tokio-test.workspace = true
160 changes: 159 additions & 1 deletion crates/duckdb/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,10 @@ use cql2::{Expr, ToDuckSQL};
use duckdb::{Connection, Statement, types::Value};
use geo::BoundingRect;
use geojson::Geometry;
use stac::api::{Direction, Search};
use stac::api::{ArrowSearchClient, CollectionSearchClient, Direction, Search, SearchClient};
use stac::{Collection, SpatialExtent, TemporalExtent, geoarrow::DATETIME_COLUMNS};
use std::ops::{Deref, DerefMut};
use std::sync::Mutex;

/// Default hive partitioning value
pub const DEFAULT_USE_HIVE_PARTITIONING: bool = false;
Expand Down Expand Up @@ -476,6 +477,163 @@ impl From<Connection> for Client {
}
}

/// A DuckDB client bound to a specific stac-geoparquet href.
///
/// This wraps a [`Client`] with a specific href, implementing the
/// [`ArrowSearchClient`] trait. Because [`duckdb::Connection`] is not
/// [`Sync`], use [`Mutex<HrefClient>`](std::sync::Mutex) for the async client
/// traits ([`SearchClient`] and [`CollectionSearchClient`]).
///
/// # Examples
///
/// ```
/// use stac::api::ArrowSearchClient;
/// use stac_duckdb::HrefClient;
///
/// let client = HrefClient::new("data/100-sentinel-2-items.parquet").unwrap();
/// let record_batch_reader = client.search_to_arrow(Default::default()).unwrap();
/// ```
#[derive(Debug)]
pub struct HrefClient {
client: Client,
href: String,
}

impl HrefClient {
/// Creates a new `HrefClient` for the given href.
pub fn new(href: impl ToString) -> Result<HrefClient> {
let client = Client::new()?;
Ok(HrefClient {
client,
href: href.to_string(),
})
}

/// Creates a new `HrefClient` from an existing [`Client`] and href.
pub fn from_client(client: Client, href: impl ToString) -> HrefClient {
HrefClient {
client,
href: href.to_string(),
}
}

/// Returns a reference to the underlying [`Client`].
pub fn client(&self) -> &Client {
&self.client
}

/// Returns a mutable reference to the underlying [`Client`].
pub fn client_mut(&mut self) -> &mut Client {
&mut self.client
}

/// Returns the href.
pub fn href(&self) -> &str {
&self.href
}
}

impl ArrowSearchClient for HrefClient {
type Error = Error;
type RecordBatchStream<'a> = ArrowBatchReader<'a>;

fn search_to_arrow(&self, search: Search) -> std::result::Result<ArrowBatchReader<'_>, Error> {
let iter = self.client.search_to_arrow(&self.href, search)?;
Ok(ArrowBatchReader::new(iter))
}
}

/// A thread-safe wrapper around [`HrefClient`] that implements
/// [`SearchClient`] and [`CollectionSearchClient`].
///
/// Use this when you need the async client traits. For [`ArrowSearchClient`],
/// use [`HrefClient`] directly.
///
/// # Examples
///
/// ```
/// use stac::api::SearchClient;
/// use stac_duckdb::SyncHrefClient;
///
/// let client = SyncHrefClient::new("data/100-sentinel-2-items.parquet").unwrap();
/// # tokio_test::block_on(async {
/// let item_collection = client.search(Default::default()).await.unwrap();
/// # })
/// ```
#[derive(Debug)]
pub struct SyncHrefClient {
inner: Mutex<HrefClient>,
}

impl SyncHrefClient {
/// Creates a new `SyncHrefClient` for the given href.
pub fn new(href: impl ToString) -> Result<SyncHrefClient> {
Ok(SyncHrefClient {
inner: Mutex::new(HrefClient::new(href)?),
})
}

/// Creates a new `SyncHrefClient` from an existing [`Client`] and href.
pub fn from_client(client: Client, href: impl ToString) -> SyncHrefClient {
SyncHrefClient {
inner: Mutex::new(HrefClient::from_client(client, href)),
}
}
}

impl SearchClient for SyncHrefClient {
type Error = Error;

async fn search(
&self,
search: Search,
) -> std::result::Result<stac::api::ItemCollection, Error> {
let guard = self.inner.lock().expect("SyncHrefClient mutex is poisoned");
guard.client.search(&guard.href, search)
}
}

impl CollectionSearchClient for SyncHrefClient {
type Error = Error;

async fn collections(&self) -> std::result::Result<Vec<Collection>, Error> {
let guard = self.inner.lock().expect("SyncHrefClient mutex is poisoned");
guard.client.collections(&guard.href)
}
}

/// A wrapper around [`SearchArrowBatchIter`] that implements
/// [`arrow_array::RecordBatchReader`].
pub struct ArrowBatchReader<'a> {
inner: SearchArrowBatchIter<'a>,
schema: SchemaRef,
}

impl<'a> ArrowBatchReader<'a> {
fn new(inner: SearchArrowBatchIter<'a>) -> Self {
let schema = inner
.schema()
.unwrap_or_else(|| arrow_schema::Schema::empty().into());
Self { inner, schema }
}
}

impl Iterator for ArrowBatchReader<'_> {
type Item = std::result::Result<RecordBatch, ArrowError>;

fn next(&mut self) -> Option<Self::Item> {
self.inner
.next()
.map(|r| r.map_err(|e| ArrowError::ExternalError(Box::new(e))))
}
}

impl arrow_array::RecordBatchReader for ArrowBatchReader<'_> {
fn schema(&self) -> SchemaRef {
self.schema.clone()
}
}

/// Iterator returned by [`Client::search_to_arrow`].
pub struct SearchArrowBatchIter<'conn> {
statement: Option<Statement<'conn>>,
Expand Down
7 changes: 6 additions & 1 deletion crates/duckdb/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,14 @@ mod client;
mod error;
mod extension;

pub use {client::Client, error::Error, extension::Extension};
pub use {
client::ArrowBatchReader, client::Client, client::HrefClient, client::SearchArrowBatchIter,
client::SyncHrefClient, error::Error, extension::Extension,
};

use getrandom as _;
#[cfg(test)]
use tokio_test as _;

/// Searches a stac-geoparquet file.
///
Expand Down
Loading