diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 9b383df2b1..d2212ac737 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -198,7 +198,8 @@ impl GlueCatalog { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Catalog for GlueCatalog { /// List namespaces from glue catalog. /// diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index c8f046cb7e..3744b66571 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -210,7 +210,8 @@ impl HmsCatalog { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Catalog for HmsCatalog { /// HMS doesn't support nested namespaces. /// diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index 9c18ab4e5f..aa6bdb460e 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -41,7 +41,8 @@ pub fn supported_types() -> Vec<&'static str> { CATALOG_REGISTRY.iter().map(|(k, _)| *k).collect() } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub trait BoxedCatalogBuilder { async fn load( self: Box, @@ -50,7 +51,8 @@ pub trait BoxedCatalogBuilder { ) -> Result>; } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl BoxedCatalogBuilder for T { async fn load( self: Box, diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 6cb4d93b99..c3546fbc1f 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -435,7 +435,8 @@ impl RestCatalog { /// All requests and expected responses are derived from the REST catalog API spec: /// https://github.com/apache/iceberg/blob/main/open-api/rest-catalog-open-api.yaml -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Catalog for RestCatalog { async fn list_namespaces( &self, diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 858bb92896..daf3a1d035 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -191,7 +191,8 @@ impl S3TablesCatalog { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Catalog for S3TablesCatalog { /// List namespaces from s3tables catalog. /// diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 35889d451a..99b2b66fef 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -218,7 +218,8 @@ impl SqlCatalog { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Catalog for SqlCatalog { async fn list_namespaces( &self, diff --git a/crates/iceberg/Cargo.toml b/crates/iceberg/Cargo.toml index d592700b73..5ca8e0f775 100644 --- a/crates/iceberg/Cargo.toml +++ b/crates/iceberg/Cargo.toml @@ -40,7 +40,7 @@ storage-oss = ["opendal/services-oss"] storage-s3 = ["opendal/services-s3", "reqsign"] smol = ["dep:smol"] -tokio = ["tokio/rt-multi-thread"] +tokio = [] [dependencies] anyhow = { workspace = true } @@ -74,8 +74,8 @@ opendal = { workspace = true } ordered-float = { workspace = true } parquet = { workspace = true, features = ["async"] } rand = { workspace = true } -reqwest = { workspace = true } reqsign = { version = "0.16.3", optional = true, default-features = false } +reqwest = { workspace = true } roaring = { workspace = true } rust_decimal = { workspace = true } serde = { workspace = true } @@ -87,7 +87,6 @@ serde_with = { workspace = true } smol = { workspace = true, optional = true } strum = { workspace = true, features = ["derive"] } thrift = { workspace = true } -tokio = { workspace = true, optional = false, features = ["sync"] } typed-builder = { workspace = true } url = { workspace = true } uuid = { workspace = true } @@ -104,6 +103,15 @@ regex = { workspace = true } tempfile = { workspace = true } tera = { workspace = true } +[target.'cfg(not(target_arch = "wasm32"))'.dependencies] +tokio = { workspace = true, optional = false, features = [ + "rt-multi-thread", + "sync", +] } + +[target.'cfg(target_arch = "wasm32")'.dependencies] +tokio = { workspace = true, optional = false, features = ["rt", "sync"] } + [package.metadata.cargo-machete] # These dependencies are added to ensure minimal dependency version ignored = ["tap"] diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 592ef2eb4a..053adbff5c 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -28,7 +28,8 @@ use crate::{Error, ErrorKind, Result}; /// Delete File Loader #[allow(unused)] -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] pub trait DeleteFileLoader { /// Read the delete file referred to in the task /// @@ -96,7 +97,8 @@ impl BasicDeleteFileLoader { } } -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl DeleteFileLoader for BasicDeleteFileLoader { async fn read_delete_file( &self, diff --git a/crates/iceberg/src/arrow/reader.rs b/crates/iceberg/src/arrow/reader.rs index 05aa6a4c9f..3a4455107f 100644 --- a/crates/iceberg/src/arrow/reader.rs +++ b/crates/iceberg/src/arrow/reader.rs @@ -32,8 +32,8 @@ use arrow_schema::{ use arrow_string::like::starts_with; use bytes::Bytes; use fnv::FnvHashSet; -use futures::future::BoxFuture; -use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt, try_join}; +use futures::{StreamExt, TryFutureExt, TryStreamExt, try_join}; +use opendal::raw::BoxedFuture; use parquet::arrow::arrow_reader::{ ArrowPredicateFn, ArrowReaderOptions, RowFilter, RowSelection, RowSelector, }; @@ -1373,7 +1373,7 @@ impl ArrowFileReader { } impl AsyncFileReader for ArrowFileReader { - fn get_bytes(&mut self, range: Range) -> BoxFuture<'_, parquet::errors::Result> { + fn get_bytes(&mut self, range: Range) -> BoxedFuture<'_, parquet::errors::Result> { Box::pin( self.r .read(range.start..range.end) @@ -1386,8 +1386,8 @@ impl AsyncFileReader for ArrowFileReader { fn get_metadata( &mut self, _options: Option<&'_ ArrowReaderOptions>, - ) -> BoxFuture<'_, parquet::errors::Result>> { - async move { + ) -> BoxedFuture<'_, parquet::errors::Result>> { + Box::pin(async move { let reader = ParquetMetaDataReader::new() .with_prefetch_hint(self.metadata_size_hint) .with_column_indexes(self.preload_column_index) @@ -1397,8 +1397,7 @@ impl AsyncFileReader for ArrowFileReader { let meta = reader.load_and_finish(self, size).await?; Ok(Arc::new(meta)) - } - .boxed() + }) } } diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 6ce3111419..c05b27bd58 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -140,7 +140,8 @@ impl MemoryCatalog { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl Catalog for MemoryCatalog { /// List namespaces inside the catalog. async fn list_namespaces( diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 77b171ee30..2691364c41 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -47,7 +47,8 @@ use crate::table::Table; use crate::{Error, ErrorKind, Result}; /// The catalog API for Iceberg Rust. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] #[cfg_attr(test, automock)] pub trait Catalog: Debug + Sync + Send { /// List namespaces inside the catalog. diff --git a/crates/iceberg/src/delete_file_index.rs b/crates/iceberg/src/delete_file_index.rs index d8f7a872e1..b14b70c5fa 100644 --- a/crates/iceberg/src/delete_file_index.rs +++ b/crates/iceberg/src/delete_file_index.rs @@ -60,7 +60,7 @@ impl DeleteFileIndex { let state = Arc::new(RwLock::new(DeleteFileIndexState::Populating( notify.clone(), ))); - let delete_file_stream = rx.boxed(); + let delete_file_stream = Box::pin(rx); spawn({ let state = state.clone(); diff --git a/crates/iceberg/src/future_util.rs b/crates/iceberg/src/future_util.rs new file mode 100644 index 0000000000..c6c180f612 --- /dev/null +++ b/crates/iceberg/src/future_util.rs @@ -0,0 +1,38 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! A module for `futures`-related utilities. +//! +//! The primary purpose of this module is to provide a type alias to maintain +//! compatibility between WebAssembly (WASM) and native targets. +//! +//! `BoxedStream` is defined as a different type depending on the compilation target: +//! - **Native environments (`not(target_arch = "wasm32")`)**: Uses `futures::stream::BoxStream`. +//! This stream implements the `Send` trait, allowing it to be safely sent across threads. +//! - **WASM environments (`target_arch = "wasm32"`)**: Uses `futures::stream::LocalBoxStream`. +//! Since WASM is typically a single-threaded environment, `LocalBoxStream`, which is `!Send`, +//! is the appropriate choice. +//! +//! This conditional compilation allows for seamless support of various platforms +//! while maintaining a single codebase. + +/// BoxedStream is the type alias of [`futures::stream::BoxStream`]. +#[cfg(not(target_arch = "wasm32"))] +pub type BoxedStream<'a, T> = futures::stream::BoxStream<'a, T>; +#[cfg(target_arch = "wasm32")] +/// BoxedStream is the type alias of [`futures::stream::LocalBoxStream`]. +pub type BoxedStream<'a, T> = futures::stream::LocalBoxStream<'a, T>; diff --git a/crates/iceberg/src/inspect/manifests.rs b/crates/iceberg/src/inspect/manifests.rs index 60854b8bae..9e43da7d77 100644 --- a/crates/iceberg/src/inspect/manifests.rs +++ b/crates/iceberg/src/inspect/manifests.rs @@ -24,7 +24,7 @@ use arrow_array::builder::{ }; use arrow_array::types::{Int32Type, Int64Type}; use arrow_schema::{DataType, Field, Fields}; -use futures::{StreamExt, stream}; +use futures::stream; use crate::Result; use crate::arrow::schema_to_arrow_schema; @@ -212,7 +212,7 @@ impl<'a> ManifestsTable<'a> { Arc::new(deleted_delete_files_count.finish()), Arc::new(partition_summaries.finish()), ])?; - Ok(stream::iter(vec![Ok(batch)]).boxed()) + Ok(Box::pin(stream::iter(vec![Ok(batch)]))) } fn partition_summary_builder(&self) -> Result> { diff --git a/crates/iceberg/src/inspect/snapshots.rs b/crates/iceberg/src/inspect/snapshots.rs index 6081ec165b..2187013ee3 100644 --- a/crates/iceberg/src/inspect/snapshots.rs +++ b/crates/iceberg/src/inspect/snapshots.rs @@ -22,7 +22,7 @@ use arrow_array::RecordBatch; use arrow_array::builder::{MapBuilder, MapFieldNames, PrimitiveBuilder, StringBuilder}; use arrow_array::types::{Int64Type, TimestampMicrosecondType}; use arrow_schema::{DataType, Field}; -use futures::{StreamExt, stream}; +use futures::stream; use parquet::arrow::PARQUET_FIELD_ID_META_KEY; use crate::Result; @@ -130,7 +130,7 @@ impl<'a> SnapshotsTable<'a> { Arc::new(summary.finish()), ])?; - Ok(stream::iter(vec![Ok(batch)]).boxed()) + Ok(Box::pin(stream::iter(vec![Ok(batch)]))) } } diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 6e2d152ed7..d10ebd6490 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -295,7 +295,8 @@ pub struct FileMetadata { /// /// It's possible for us to remove the async_trait, but we need to figure /// out how to handle the object safety. -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] pub trait FileRead: Send + Sync + Unpin + 'static { /// Read file content with given range. /// @@ -303,7 +304,8 @@ pub trait FileRead: Send + Sync + Unpin + 'static { async fn read(&self, range: Range) -> crate::Result; } -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl FileRead for opendal::Reader { async fn read(&self, range: Range) -> crate::Result { Ok(opendal::Reader::read(self, range).await?.to_bytes()) @@ -365,7 +367,8 @@ impl InputFile { /// /// It's possible for us to remove the async_trait, but we need to figure /// out how to handle the object safety. -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] pub trait FileWrite: Send + Unpin + 'static { /// Write bytes to file. /// @@ -378,7 +381,8 @@ pub trait FileWrite: Send + Unpin + 'static { async fn close(&mut self) -> crate::Result<()>; } -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl FileWrite for opendal::Writer { async fn write(&mut self, bs: Bytes) -> crate::Result<()> { Ok(opendal::Writer::write(self, bs).await?) @@ -390,7 +394,8 @@ impl FileWrite for opendal::Writer { } } -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl FileWrite for Box { async fn write(&mut self, bs: Bytes) -> crate::Result<()> { self.as_mut().write(bs).await diff --git a/crates/iceberg/src/io/storage_s3.rs b/crates/iceberg/src/io/storage_s3.rs index f2408331c5..29dcdeec28 100644 --- a/crates/iceberg/src/io/storage_s3.rs +++ b/crates/iceberg/src/io/storage_s3.rs @@ -208,7 +208,8 @@ impl CustomAwsCredentialLoader { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl AwsCredentialLoad for CustomAwsCredentialLoader { async fn load_credential(&self, client: Client) -> anyhow::Result> { self.0.load_credential(client).await diff --git a/crates/iceberg/src/lib.rs b/crates/iceberg/src/lib.rs index aae8efed74..edd47ce3d6 100644 --- a/crates/iceberg/src/lib.rs +++ b/crates/iceberg/src/lib.rs @@ -91,6 +91,7 @@ mod runtime; pub mod arrow; pub(crate) mod delete_file_index; +pub mod future_util; pub mod test_utils; mod utils; pub mod writer; diff --git a/crates/iceberg/src/runtime/mod.rs b/crates/iceberg/src/runtime/mod.rs index d0a3ce6602..531b442f5a 100644 --- a/crates/iceberg/src/runtime/mod.rs +++ b/crates/iceberg/src/runtime/mod.rs @@ -21,6 +21,8 @@ use std::future::Future; use std::pin::Pin; use std::task::{Context, Poll}; +use opendal::raw::MaybeSend; + pub enum JoinHandle { #[cfg(feature = "tokio")] Tokio(tokio::task::JoinHandle), @@ -50,14 +52,23 @@ impl Future for JoinHandle { #[allow(dead_code)] pub fn spawn(f: F) -> JoinHandle where - F: Future + Send + 'static, - F::Output: Send + 'static, + F: Future + MaybeSend + 'static, + F::Output: MaybeSend + 'static, { - #[cfg(feature = "tokio")] - return JoinHandle::Tokio(tokio::task::spawn(f)); + #[cfg(not(target_arch = "wasm32"))] + { + #[cfg(feature = "tokio")] + return JoinHandle::Tokio(tokio::task::spawn(f)); - #[cfg(all(feature = "smol", not(feature = "tokio")))] - return JoinHandle::Smol(smol::spawn(f)); + #[cfg(all(feature = "smol", not(feature = "tokio")))] + return JoinHandle::Smol(smol::spawn(f)); + } + + #[cfg(target_arch = "wasm32")] + { + #[cfg(feature = "tokio")] + return JoinHandle::Tokio(tokio::task::spawn_local(f)); + } #[cfg(all(not(feature = "smol"), not(feature = "tokio")))] unimplemented!("no runtime has been enabled") diff --git a/crates/iceberg/src/scan/mod.rs b/crates/iceberg/src/scan/mod.rs index 3d14b3cce4..944c3a9f65 100644 --- a/crates/iceberg/src/scan/mod.rs +++ b/crates/iceberg/src/scan/mod.rs @@ -27,7 +27,6 @@ use std::sync::Arc; use arrow_array::RecordBatch; use futures::channel::mpsc::{Sender, channel}; -use futures::stream::BoxStream; use futures::{SinkExt, StreamExt, TryStreamExt}; pub use task::*; @@ -35,6 +34,7 @@ use crate::arrow::ArrowReaderBuilder; use crate::delete_file_index::DeleteFileIndex; use crate::expr::visitors::inclusive_metrics_evaluator::InclusiveMetricsEvaluator; use crate::expr::{Bind, BoundPredicate, Predicate}; +use crate::future_util::BoxedStream; use crate::io::FileIO; use crate::runtime::spawn; use crate::spec::{DataContentType, SnapshotRef}; @@ -43,7 +43,7 @@ use crate::utils::available_parallelism; use crate::{Error, ErrorKind, Result}; /// A stream of arrow [`RecordBatch`]es. -pub type ArrowRecordBatchStream = BoxStream<'static, Result>; +pub type ArrowRecordBatchStream = BoxedStream<'static, Result>; /// Builder to create table scan. pub struct TableScanBuilder<'a> { @@ -423,7 +423,7 @@ impl TableScan { } }); - Ok(file_scan_task_rx.boxed()) + Ok(Box::pin(file_scan_task_rx)) } /// Returns an [`ArrowRecordBatchStream`]. diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index 32fe3ae309..48617f1d69 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -15,15 +15,15 @@ // specific language governing permissions and limitations // under the License. -use futures::stream::BoxStream; use serde::{Deserialize, Serialize}; use crate::Result; use crate::expr::BoundPredicate; +use crate::future_util::BoxedStream; use crate::spec::{DataContentType, DataFileFormat, ManifestEntryRef, Schema, SchemaRef}; /// A stream of [`FileScanTask`]. -pub type FileScanTaskStream = BoxStream<'static, Result>; +pub type FileScanTaskStream = BoxedStream<'static, Result>; /// A task to scan part of file. #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] diff --git a/crates/iceberg/src/transaction/action.rs b/crates/iceberg/src/transaction/action.rs index aa0a05d0d9..c11de1d7fb 100644 --- a/crates/iceberg/src/transaction/action.rs +++ b/crates/iceberg/src/transaction/action.rs @@ -33,7 +33,8 @@ pub(crate) type BoxedTransactionAction = Arc; /// Implementors of this trait define how a specific action is committed to a table. /// Each action is responsible for generating the updates and requirements needed /// to modify the table metadata. -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] pub(crate) trait TransactionAction: AsAny + Sync + Send { /// Commits this action against the provided table and returns the resulting updates. /// NOTE: This function is intended for internal use only and should not be called directly by users. @@ -120,7 +121,8 @@ mod tests { struct TestAction; - #[async_trait] + #[cfg_attr(not(target_arch = "wasm32"), async_trait)] + #[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl TransactionAction for TestAction { async fn commit(self: Arc, _table: &Table) -> Result { Ok(ActionCommit::new( diff --git a/crates/iceberg/src/transaction/append.rs b/crates/iceberg/src/transaction/append.rs index f248543df2..bdd25ae831 100644 --- a/crates/iceberg/src/transaction/append.rs +++ b/crates/iceberg/src/transaction/append.rs @@ -81,7 +81,8 @@ impl FastAppendAction { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl TransactionAction for FastAppendAction { async fn commit(self: Arc, table: &Table) -> Result { let snapshot_producer = SnapshotProducer::new( diff --git a/crates/iceberg/src/transaction/snapshot.rs b/crates/iceberg/src/transaction/snapshot.rs index 48dc2b5b90..e23106a429 100644 --- a/crates/iceberg/src/transaction/snapshot.rs +++ b/crates/iceberg/src/transaction/snapshot.rs @@ -16,7 +16,6 @@ // under the License. use std::collections::{HashMap, HashSet}; -use std::future::Future; use std::ops::RangeFrom; use uuid::Uuid; @@ -38,14 +37,14 @@ const META_ROOT_PATH: &str = "metadata"; pub(crate) trait SnapshotProduceOperation: Send + Sync { fn operation(&self) -> Operation; #[allow(unused)] - fn delete_entries( + async fn delete_entries( &self, snapshot_produce: &SnapshotProducer, - ) -> impl Future>> + Send; - fn existing_manifest( + ) -> Result>; + async fn existing_manifest( &self, snapshot_produce: &SnapshotProducer<'_>, - ) -> impl Future>> + Send; + ) -> Result>; } pub(crate) struct DefaultManifestProcess; diff --git a/crates/iceberg/src/transaction/sort_order.rs b/crates/iceberg/src/transaction/sort_order.rs index dfa1328c09..9991a3f186 100644 --- a/crates/iceberg/src/transaction/sort_order.rs +++ b/crates/iceberg/src/transaction/sort_order.rs @@ -97,7 +97,8 @@ impl Default for ReplaceSortOrderAction { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl TransactionAction for ReplaceSortOrderAction { async fn commit(self: Arc, table: &Table) -> Result { let current_schema = table.metadata().current_schema(); diff --git a/crates/iceberg/src/transaction/update_location.rs b/crates/iceberg/src/transaction/update_location.rs index 0c32c75355..08f14266af 100644 --- a/crates/iceberg/src/transaction/update_location.rs +++ b/crates/iceberg/src/transaction/update_location.rs @@ -59,7 +59,8 @@ impl Default for UpdateLocationAction { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl TransactionAction for UpdateLocationAction { async fn commit(self: Arc, _table: &Table) -> Result { let updates: Vec; diff --git a/crates/iceberg/src/transaction/update_properties.rs b/crates/iceberg/src/transaction/update_properties.rs index 825d6bec8d..57181b0724 100644 --- a/crates/iceberg/src/transaction/update_properties.rs +++ b/crates/iceberg/src/transaction/update_properties.rs @@ -80,7 +80,8 @@ impl Default for UpdatePropertiesAction { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl TransactionAction for UpdatePropertiesAction { async fn commit(self: Arc, _table: &Table) -> Result { if let Some(overlapping_key) = self.removals.iter().find(|k| self.updates.contains_key(*k)) diff --git a/crates/iceberg/src/transaction/update_statistics.rs b/crates/iceberg/src/transaction/update_statistics.rs index dfce95d898..5df35845de 100644 --- a/crates/iceberg/src/transaction/update_statistics.rs +++ b/crates/iceberg/src/transaction/update_statistics.rs @@ -74,7 +74,8 @@ impl Default for UpdateStatisticsAction { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl TransactionAction for UpdateStatisticsAction { async fn commit(self: Arc, _table: &Table) -> Result { let mut updates: Vec = vec![]; diff --git a/crates/iceberg/src/transaction/upgrade_format_version.rs b/crates/iceberg/src/transaction/upgrade_format_version.rs index ff15926d00..2f7e7159f6 100644 --- a/crates/iceberg/src/transaction/upgrade_format_version.rs +++ b/crates/iceberg/src/transaction/upgrade_format_version.rs @@ -63,7 +63,8 @@ impl Default for UpgradeFormatVersionAction { } } -#[async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait(?Send))] impl TransactionAction for UpgradeFormatVersionAction { async fn commit(self: Arc, _table: &Table) -> Result { let format_version = self.format_version.ok_or_else(|| { diff --git a/crates/iceberg/src/writer/base_writer/data_file_writer.rs b/crates/iceberg/src/writer/base_writer/data_file_writer.rs index f215673df1..f041462c55 100644 --- a/crates/iceberg/src/writer/base_writer/data_file_writer.rs +++ b/crates/iceberg/src/writer/base_writer/data_file_writer.rs @@ -44,7 +44,8 @@ impl DataFileWriterBuilder { } } -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl IcebergWriterBuilder for DataFileWriterBuilder { type R = DataFileWriter; @@ -65,7 +66,8 @@ pub struct DataFileWriter { partition_spec_id: i32, } -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl IcebergWriter for DataFileWriter { async fn write(&mut self, batch: RecordBatch) -> Result<()> { self.inner_writer.as_mut().unwrap().write(&batch).await diff --git a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs index 765ff1cacd..79d93bd284 100644 --- a/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs +++ b/crates/iceberg/src/writer/base_writer/equality_delete_writer.rs @@ -109,7 +109,8 @@ impl EqualityDeleteWriterConfig { } } -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl IcebergWriterBuilder for EqualityDeleteFileWriterBuilder { type R = EqualityDeleteFileWriter; @@ -134,7 +135,8 @@ pub struct EqualityDeleteFileWriter { partition_spec_id: i32, } -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl IcebergWriter for EqualityDeleteFileWriter { async fn write(&mut self, batch: RecordBatch) -> Result<()> { let batch = self.projector.project_batch(batch)?; diff --git a/crates/iceberg/src/writer/file_writer/mod.rs b/crates/iceberg/src/writer/file_writer/mod.rs index 2a5a735534..124ae79bc1 100644 --- a/crates/iceberg/src/writer/file_writer/mod.rs +++ b/crates/iceberg/src/writer/file_writer/mod.rs @@ -19,6 +19,7 @@ use arrow_array::RecordBatch; use futures::Future; +use opendal::raw::MaybeSend; use super::CurrentFileStatus; use crate::Result; @@ -44,7 +45,7 @@ pub trait FileWriterBuilder: Send + Clone + 'static { /// File writer focus on writing record batch to different physical file format.(Such as parquet. orc) pub trait FileWriter: Send + CurrentFileStatus + 'static { /// Write record batch to file. - fn write(&mut self, batch: &RecordBatch) -> impl Future> + Send; + fn write(&mut self, batch: &RecordBatch) -> impl Future> + MaybeSend; /// Close file writer. - fn close(self) -> impl Future> + Send; + fn close(self) -> impl Future> + MaybeSend; } diff --git a/crates/iceberg/src/writer/file_writer/parquet_writer.rs b/crates/iceberg/src/writer/file_writer/parquet_writer.rs index 0a8a095ea8..ded1f3d335 100644 --- a/crates/iceberg/src/writer/file_writer/parquet_writer.rs +++ b/crates/iceberg/src/writer/file_writer/parquet_writer.rs @@ -22,8 +22,8 @@ use std::sync::Arc; use arrow_schema::SchemaRef as ArrowSchemaRef; use bytes::Bytes; -use futures::future::BoxFuture; use itertools::Itertools; +use opendal::raw::BoxedFuture; use parquet::arrow::AsyncArrowWriter; use parquet::arrow::async_reader::AsyncFileReader; use parquet::arrow::async_writer::AsyncFileWriter as ArrowAsyncFileWriter; @@ -659,7 +659,7 @@ impl AsyncFileWriter { } impl ArrowAsyncFileWriter for AsyncFileWriter { - fn write(&mut self, bs: Bytes) -> BoxFuture<'_, parquet::errors::Result<()>> { + fn write(&mut self, bs: Bytes) -> BoxedFuture<'_, parquet::errors::Result<()>> { Box::pin(async { self.0 .write(bs) @@ -668,7 +668,7 @@ impl ArrowAsyncFileWriter for AsyncFileWriter { }) } - fn complete(&mut self) -> BoxFuture<'_, parquet::errors::Result<()>> { + fn complete(&mut self) -> BoxedFuture<'_, parquet::errors::Result<()>> { Box::pin(async { self.0 .close() diff --git a/crates/iceberg/src/writer/mod.rs b/crates/iceberg/src/writer/mod.rs index 8f17d50e27..4b3b89c0da 100644 --- a/crates/iceberg/src/writer/mod.rs +++ b/crates/iceberg/src/writer/mod.rs @@ -136,7 +136,8 @@ //! } //! } //! -//! #[async_trait::async_trait] +//! #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +//! #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] //! impl IcebergWriterBuilder for LatencyRecordWriterBuilder { //! type R = LatencyRecordWriter; //! @@ -150,7 +151,8 @@ //! inner_writer: W, //! } //! -//! #[async_trait::async_trait] +//! #[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +//! #[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] //! impl IcebergWriter for LatencyRecordWriter { //! async fn write(&mut self, input: RecordBatch) -> Result<()> { //! let start = Instant::now(); @@ -235,7 +237,8 @@ type DefaultInput = RecordBatch; type DefaultOutput = Vec; /// The builder for iceberg writer. -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] pub trait IcebergWriterBuilder: Send + Clone + 'static { @@ -246,7 +249,8 @@ pub trait IcebergWriterBuilder: } /// The iceberg writer used to write data to iceberg table. -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] pub trait IcebergWriter: Send + 'static { /// Write data to iceberg table. async fn write(&mut self, input: I) -> Result<()>; diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 0ee0504faa..61098d2f2b 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -180,7 +180,7 @@ impl ExecutionPlan for IcebergCommitExec { let catalog = Arc::clone(&self.catalog); // Process the input streams from all partitions and commit the data files - let stream = futures::stream::once(async move { + let stream = Box::pin(futures::stream::once(async move { let mut data_files: Vec = Vec::new(); let mut total_record_count: u64 = 0; @@ -246,8 +246,7 @@ impl ExecutionPlan for IcebergCommitExec { .map_err(to_datafusion_error)?; Self::make_count_batch(total_record_count) - }) - .boxed(); + })); Ok(Box::pin(RecordBatchStreamAdapter::new( Arc::clone(&self.count_schema), @@ -270,7 +269,6 @@ mod tests { use datafusion::physical_plan::execution_plan::Boundedness; use datafusion::physical_plan::stream::RecordBatchStreamAdapter; use datafusion::physical_plan::{DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties}; - use futures::StreamExt; use iceberg::memory::{MEMORY_CATALOG_WAREHOUSE, MemoryCatalogBuilder}; use iceberg::spec::{ DataContentType, DataFileBuilder, DataFileFormat, NestedField, PrimitiveType, Schema, @@ -350,7 +348,7 @@ mod tests { let batch = RecordBatch::try_new(self.schema.clone(), vec![array])?; // Create a stream that returns this batch - let stream = futures::stream::once(async move { Ok(batch) }).boxed(); + let stream = Box::pin(futures::stream::once(async move { Ok(batch) })); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema(), stream, diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 625405c95b..b22e273787 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -278,7 +278,7 @@ impl ExecutionPlan for IcebergWriteExec { )?; // Create write stream - let stream = futures::stream::once(async move { + let stream = Box::pin(futures::stream::once(async move { let mut writer = data_file_writer_builder .build() .await @@ -301,8 +301,7 @@ impl ExecutionPlan for IcebergWriteExec { .collect::>>()?; Self::make_result_batch(data_files_strs) - }) - .boxed(); + })); Ok(Box::pin(RecordBatchStreamAdapter::new( Arc::clone(&self.result_schema), @@ -414,7 +413,7 @@ mod tests { let stream = stream::iter(batches.into_iter().map(Ok)); Ok(Box::pin(RecordBatchStreamAdapter::new( self.schema.clone(), - stream.boxed(), + Box::pin(stream), ))) } } diff --git a/crates/integrations/datafusion/src/table/metadata_table.rs b/crates/integrations/datafusion/src/table/metadata_table.rs index 38148b4084..7a16936931 100644 --- a/crates/integrations/datafusion/src/table/metadata_table.rs +++ b/crates/integrations/datafusion/src/table/metadata_table.rs @@ -27,8 +27,8 @@ use datafusion::error::Result as DFResult; use datafusion::logical_expr::Expr; use datafusion::physical_plan::ExecutionPlan; use futures::TryStreamExt; -use futures::stream::BoxStream; use iceberg::arrow::schema_to_arrow_schema; +use iceberg::future_util::BoxedStream; use iceberg::inspect::MetadataTableType; use iceberg::table::Table; @@ -74,7 +74,7 @@ impl TableProvider for IcebergMetadataTableProvider { } impl IcebergMetadataTableProvider { - pub async fn scan(self) -> DFResult>> { + pub async fn scan(self) -> DFResult>> { let metadata_table = self.table.inspect(); let stream = match self.r#type { MetadataTableType::Snapshots => metadata_table.snapshots().scan().await, diff --git a/crates/sqllogictest/src/engine/datafusion.rs b/crates/sqllogictest/src/engine/datafusion.rs index 49d7273d60..95a63a472b 100644 --- a/crates/sqllogictest/src/engine/datafusion.rs +++ b/crates/sqllogictest/src/engine/datafusion.rs @@ -33,7 +33,8 @@ pub struct DataFusionEngine { datafusion: DataFusion, } -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] impl EngineRunner for DataFusionEngine { async fn run_slt_file(&mut self, path: &Path) -> Result<()> { let content = std::fs::read_to_string(path) diff --git a/crates/sqllogictest/src/engine/mod.rs b/crates/sqllogictest/src/engine/mod.rs index a1d34dd9bc..b1e6e2989a 100644 --- a/crates/sqllogictest/src/engine/mod.rs +++ b/crates/sqllogictest/src/engine/mod.rs @@ -27,7 +27,8 @@ use crate::error::Result; const KEY_TYPE: &str = "type"; const TYPE_DATAFUSION: &str = "datafusion"; -#[async_trait::async_trait] +#[cfg_attr(not(target_arch = "wasm32"), async_trait::async_trait)] +#[cfg_attr(target_arch = "wasm32", async_trait::async_trait(?Send))] pub trait EngineRunner: Sized { async fn run_slt_file(&mut self, path: &Path) -> Result<()>; }