Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,8 @@ impl GlueCatalog {
}
}

#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Catalog for GlueCatalog {
/// List namespaces from glue catalog.
///
Expand Down
3 changes: 2 additions & 1 deletion crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ impl HmsCatalog {
}
}

#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Catalog for HmsCatalog {
/// HMS doesn't support nested namespaces.
///
Expand Down
6 changes: 4 additions & 2 deletions crates/catalog/loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@ pub fn supported_types() -> Vec<&'static str> {
CATALOG_REGISTRY.iter().map(|(k, _)| *k).collect()
}

#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
pub trait BoxedCatalogBuilder {
async fn load(
self: Box<Self>,
Expand All @@ -50,7 +51,8 @@ pub trait BoxedCatalogBuilder {
) -> Result<Arc<dyn Catalog>>;
}

#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl<T: CatalogBuilder + 'static> BoxedCatalogBuilder for T {
async fn load(
self: Box<Self>,
Expand Down
3 changes: 2 additions & 1 deletion crates/catalog/rest/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -435,7 +435,8 @@ impl RestCatalog {

/// All requests and expected responses are derived from the REST catalog API spec:
/// https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml
#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Catalog for RestCatalog {
async fn list_namespaces(
&self,
Expand Down
3 changes: 2 additions & 1 deletion crates/catalog/s3tables/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,8 @@ impl S3TablesCatalog {
}
}

#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Catalog for S3TablesCatalog {
/// List namespaces from s3tables catalog.
///
Expand Down
3 changes: 2 additions & 1 deletion crates/catalog/sql/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,8 @@ impl SqlCatalog {
}
}

#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Catalog for SqlCatalog {
async fn list_namespaces(
&self,
Expand Down
14 changes: 11 additions & 3 deletions crates/iceberg/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ storage-oss = ["opendal/services-oss"]
storage-s3 = ["opendal/services-s3", "reqsign"]

smol = ["dep:smol"]
tokio = ["tokio/rt-multi-thread"]
tokio = []

[dependencies]
anyhow = { workspace = true }
Expand Down Expand Up @@ -74,8 +74,8 @@ opendal = { workspace = true }
ordered-float = { workspace = true }
parquet = { workspace = true, features = ["async"] }
rand = { workspace = true }
reqwest = { workspace = true }
reqsign = { version = "0.16.3", optional = true, default-features = false }
reqwest = { workspace = true }
roaring = { workspace = true }
rust_decimal = { workspace = true }
serde = { workspace = true }
Expand All @@ -87,7 +87,6 @@ serde_with = { workspace = true }
smol = { workspace = true, optional = true }
strum = { workspace = true, features = ["derive"] }
thrift = { workspace = true }
tokio = { workspace = true, optional = false, features = ["sync"] }
typed-builder = { workspace = true }
url = { workspace = true }
uuid = { workspace = true }
Expand All @@ -104,6 +103,15 @@ regex = { workspace = true }
tempfile = { workspace = true }
tera = { workspace = true }

[target.'cfg(not(target_arch = "wasm32"))'.dependencies]
tokio = { workspace = true, optional = false, features = [
"rt-multi-thread",
"sync",
] }

[target.'cfg(target_arch = "wasm32")'.dependencies]
tokio = { workspace = true, optional = false, features = ["rt", "sync"] }

[package.metadata.cargo-machete]
# These dependencies are added to ensure minimal dependency version
ignored = ["tap"]
6 changes: 4 additions & 2 deletions crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,8 @@ use crate::{Error, ErrorKind, Result};

/// Delete File Loader
#[allow(unused)]
#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait DeleteFileLoader {
/// Read the delete file referred to in the task
///
Expand Down Expand Up @@ -96,7 +97,8 @@ impl BasicDeleteFileLoader {
}
}

#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl DeleteFileLoader for BasicDeleteFileLoader {
async fn read_delete_file(
&self,
Expand Down
13 changes: 6 additions & 7 deletions crates/iceberg/src/arrow/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,8 @@ use arrow_schema::{
use arrow_string::like::starts_with;
use bytes::Bytes;
use fnv::FnvHashSet;
use futures::future::BoxFuture;
use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join};
use futures::{StreamExt, TryFutureExt, TryStreamExt, try_join};
use opendal::raw::BoxedFuture;
use parquet::arrow::arrow_reader::{
ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector,
};
Expand Down Expand Up @@ -1373,7 +1373,7 @@ impl<R: FileRead> ArrowFileReader<R> {
}

impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxFuture<'_, parquet::errors::Result<Bytes>> {
fn get_bytes(&mut self, range: Range<u64>) -> BoxedFuture<'_, parquet::errors::Result<Bytes>> {
Box::pin(
self.r
.read(range.start..range.end)
Expand All @@ -1386,8 +1386,8 @@ impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
fn get_metadata(
&mut self,
_options: Option<&'_ ArrowReaderOptions>,
) -> BoxFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
async move {
) -> BoxedFuture<'_, parquet::errors::Result<Arc<ParquetMetaData>>> {
Box::pin(async move {
let reader = ParquetMetaDataReader::new()
.with_prefetch_hint(self.metadata_size_hint)
.with_column_indexes(self.preload_column_index)
Expand All @@ -1397,8 +1397,7 @@ impl<R: FileRead> AsyncFileReader for ArrowFileReader<R> {
let meta = reader.load_and_finish(self, size).await?;

Ok(Arc::new(meta))
}
.boxed()
})
}
}

Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/src/catalog/memory/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ impl MemoryCatalog {
}
}

#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl Catalog for MemoryCatalog {
/// List namespaces inside the catalog.
async fn list_namespaces(
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/src/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,8 @@ use crate::table::Table;
use crate::{Error, ErrorKind, Result};

/// The catalog API for Iceberg Rust.
#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
#[cfg_attr(test, automock)]
pub trait Catalog: Debug + Sync + Send {
/// List namespaces inside the catalog.
Expand Down
2 changes: 1 addition & 1 deletion crates/iceberg/src/delete_file_index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ impl DeleteFileIndex {
let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating(
notify.clone(),
)));
let delete_file_stream = rx.boxed();
let delete_file_stream = Box::pin(rx);

spawn({
let state = state.clone();
Expand Down
38 changes: 38 additions & 0 deletions crates/iceberg/src/future_util.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! A module for `futures`-related utilities.
//!
//! The primary purpose of this module is to provide a type alias to maintain
//! compatibility between WebAssembly (WASM) and native targets.
//!
//! `BoxedStream` is defined as a different type depending on the compilation target:
//! - **Native environments (`not(target_arch = "wasm32")`)**: Uses `futures::stream::BoxStream`.
//! This stream implements the `Send` trait, allowing it to be safely sent across threads.
//! - **WASM environments (`target_arch = "wasm32"`)**: Uses `futures::stream::LocalBoxStream`.
//! Since WASM is typically a single-threaded environment, `LocalBoxStream`, which is `!Send`,
//! is the appropriate choice.
//!
//! This conditional compilation allows for seamless support of various platforms
//! while maintaining a single codebase.

/// BoxedStream is the type alias of [`futures::stream::BoxStream`].
#[cfg(not(target_arch = "wasm32"))]
pub type BoxedStream<'a, T> = futures::stream::BoxStream<'a, T>;
#[cfg(target_arch = "wasm32")]
/// BoxedStream is the type alias of [`futures::stream::LocalBoxStream`].
pub type BoxedStream<'a, T> = futures::stream::LocalBoxStream<'a, T>;
4 changes: 2 additions & 2 deletions crates/iceberg/src/inspect/manifests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use arrow_array::builder::{
};
use arrow_array::types::{Int32Type, Int64Type};
use arrow_schema::{DataType, Field, Fields};
use futures::{StreamExt, stream};
use futures::stream;

use crate::Result;
use crate::arrow::schema_to_arrow_schema;
Expand Down Expand Up @@ -212,7 +212,7 @@ impl<'a> ManifestsTable<'a> {
Arc::new(deleted_delete_files_count.finish()),
Arc::new(partition_summaries.finish()),
])?;
Ok(stream::iter(vec![Ok(batch)]).boxed())
Ok(Box::pin(stream::iter(vec![Ok(batch)])))
}

fn partition_summary_builder(&self) -> Result<GenericListBuilder<i32, StructBuilder>> {
Expand Down
4 changes: 2 additions & 2 deletions crates/iceberg/src/inspect/snapshots.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use arrow_array::RecordBatch;
use arrow_array::builder::{MapBuilder, MapFieldNames, PrimitiveBuilder, StringBuilder};
use arrow_array::types::{Int64Type, TimestampMicrosecondType};
use arrow_schema::{DataType, Field};
use futures::{StreamExt, stream};
use futures::stream;
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::Result;
Expand Down Expand Up @@ -130,7 +130,7 @@ impl<'a> SnapshotsTable<'a> {
Arc::new(summary.finish()),
])?;

Ok(stream::iter(vec![Ok(batch)]).boxed())
Ok(Box::pin(stream::iter(vec![Ok(batch)])))
}
}

Expand Down
15 changes: 10 additions & 5 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -295,15 +295,17 @@ pub struct FileMetadata {
///
/// It's possible for us to remove the async_trait, but we need to figure
/// out how to handle the object safety.
#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait FileRead: Send + Sync + Unpin + 'static {
/// Read file content with given range.
///
/// TODO: we can support reading non-contiguous bytes in the future.
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes>;
}

#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl FileRead for opendal::Reader {
async fn read(&self, range: Range<u64>) -> crate::Result<Bytes> {
Ok(opendal::Reader::read(self, range).await?.to_bytes())
Expand Down Expand Up @@ -365,7 +367,8 @@ impl InputFile {
///
/// It's possible for us to remove the async_trait, but we need to figure
/// out how to handle the object safety.
#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
pub trait FileWrite: Send + Unpin + 'static {
/// Write bytes to file.
///
Expand All @@ -378,7 +381,8 @@ pub trait FileWrite: Send + Unpin + 'static {
async fn close(&mut self) -> crate::Result<()>;
}

#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl FileWrite for opendal::Writer {
async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
Ok(opendal::Writer::write(self, bs).await?)
Expand All @@ -390,7 +394,8 @@ impl FileWrite for opendal::Writer {
}
}

#[async_trait::async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))]
impl FileWrite for Box<dyn FileWrite> {
async fn write(&mut self, bs: Bytes) -> crate::Result<()> {
self.as_mut().write(bs).await
Expand Down
3 changes: 2 additions & 1 deletion crates/iceberg/src/io/storage_s3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,8 @@ impl CustomAwsCredentialLoader {
}
}

#[async_trait]
#[cfg_attr(not(target_arch = "wasm32"), async_trait)]
#[cfg_attr(target_arch = "wasm32", async_trait(?Send))]
impl AwsCredentialLoad for CustomAwsCredentialLoader {
async fn load_credential(&self, client: Client) -> anyhow::Result<Option<AwsCredential>> {
self.0.load_credential(client).await
Expand Down
1 change: 1 addition & 0 deletions crates/iceberg/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ mod runtime;

pub mod arrow;
pub(crate) mod delete_file_index;
pub mod future_util;
pub mod test_utils;
mod utils;
pub mod writer;
Expand Down
23 changes: 17 additions & 6 deletions crates/iceberg/src/runtime/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@ use std::future::Future;
use std::pin::Pin;
use std::task::{Context, Poll};

use opendal::raw::MaybeSend;

pub enum JoinHandle<T> {
#[cfg(feature = "tokio")]
Tokio(tokio::task::JoinHandle<T>),
Expand Down Expand Up @@ -50,14 +52,23 @@ impl<T: Send + 'static> Future for JoinHandle<T> {
#[allow(dead_code)]
pub fn spawn<F>(f: F) -> JoinHandle<F::Output>
where
F: Future + Send + 'static,
F::Output: Send + 'static,
F: Future + MaybeSend + 'static,
F::Output: MaybeSend + 'static,
{
#[cfg(feature = "tokio")]
return JoinHandle::Tokio(tokio::task::spawn(f));
#[cfg(not(target_arch = "wasm32"))]
{
#[cfg(feature = "tokio")]
return JoinHandle::Tokio(tokio::task::spawn(f));

#[cfg(all(feature = "smol", not(feature = "tokio")))]
return JoinHandle::Smol(smol::spawn(f));
#[cfg(all(feature = "smol", not(feature = "tokio")))]
return JoinHandle::Smol(smol::spawn(f));
}

#[cfg(target_arch = "wasm32")]
{
#[cfg(feature = "tokio")]
return JoinHandle::Tokio(tokio::task::spawn_local(f));
}

#[cfg(all(not(feature = "smol"), not(feature = "tokio")))]
unimplemented!("no runtime has been enabled")
Expand Down
Loading
Loading