Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions crates/iceberg/src/io/file_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,53 @@ impl Extensions {
}
}

/// Runtime handle for executing async I/O operations.
///
/// When provided via FileIOBuilder extensions, OpenDAL operations will spawn
/// tasks on this runtime instead of using the current runtime context.
///
/// This is useful for runtime segregation scenarios where you want to separate
/// CPU-bound query execution from I/O-bound operations (e.g., blob storage access).
///
/// # Example
///
/// ```rust,ignore
/// use iceberg::io::{FileIOBuilder, RuntimeHandle};
/// use tokio::runtime::Builder;
///
/// // Create dedicated I/O runtime
/// let io_runtime = Builder::new_multi_thread()
/// .worker_threads(8)
/// .thread_name("io-pool")
/// .enable_io()
/// .enable_time()
/// .build()?;
///
/// // Configure FileIO with runtime handle
/// let file_io = FileIOBuilder::new("s3")
/// .with_extension(RuntimeHandle::new(io_runtime.handle().clone()))
/// .with_props(s3_config)
/// .build()?;
/// ```
#[derive(Clone, Debug)]
pub struct RuntimeHandle(pub tokio::runtime::Handle);

impl RuntimeHandle {
/// Create a new RuntimeHandle from a Tokio runtime handle.
pub fn new(handle: tokio::runtime::Handle) -> Self {
Self(handle)
}

/// Get the current runtime handle.
///
/// # Panics
///
/// Panics if called outside of a Tokio runtime context.
pub fn current() -> Self {
Self(tokio::runtime::Handle::current())
}
}

/// Builder for [`FileIO`].
#[derive(Clone, Debug)]
pub struct FileIOBuilder {
Expand Down
128 changes: 97 additions & 31 deletions crates/iceberg/src/io/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,8 @@
// specific language governing permissions and limitations
// under the License.

#[cfg(any(
feature = "storage-s3",
feature = "storage-gcs",
feature = "storage-oss",
feature = "storage-azdls",
))]
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;

use opendal::layers::RetryLayer;
Expand All @@ -36,14 +32,34 @@ use opendal::{Operator, Scheme};

#[cfg(feature = "storage-azdls")]
use super::AzureStorageScheme;
use super::FileIOBuilder;
use super::{FileIOBuilder, RuntimeHandle};
#[cfg(feature = "storage-s3")]
use crate::io::CustomAwsCredentialLoader;
use crate::{Error, ErrorKind};

/// Custom OpenDAL executor that spawns tasks on a specific Tokio runtime.
///
/// This executor implements the OpenDAL Execute trait and routes all spawned
/// tasks to a configured Tokio runtime handle, enabling runtime segregation.
#[derive(Clone)]
struct CustomTokioExecutor {
handle: tokio::runtime::Handle,
}

impl opendal::Execute for CustomTokioExecutor {
fn execute(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.handle.spawn(f);
}
}

/// The storage carries all supported storage services in iceberg
pub(crate) struct Storage {
backend: StorageBackend,
executor: Option<opendal::Executor>,
}

#[derive(Debug)]
pub(crate) enum Storage {
enum StorageBackend {
#[cfg(feature = "storage-memory")]
Memory(Operator),
#[cfg(feature = "storage-fs")]
Expand Down Expand Up @@ -73,48 +89,92 @@ pub(crate) enum Storage {
},
}

impl std::fmt::Debug for Storage {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("Storage")
.field("backend", &self.backend)
.field(
"executor",
&self.executor.as_ref().map(|_| "Some(Executor)"),
)
.finish()
}
}

impl Storage {
/// Convert iceberg config to opendal config.
pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> {
let (scheme_str, props, extensions) = file_io_builder.into_parts();
let _ = (&props, &extensions);
let scheme = Self::parse_scheme(&scheme_str)?;

match scheme {
// Extract runtime handle and create executor if provided
let executor = extensions.get::<RuntimeHandle>().map(|runtime_handle| {
let handle = Arc::unwrap_or_clone(runtime_handle).0;
opendal::Executor::with(CustomTokioExecutor { handle })
});

let backend = match scheme {
#[cfg(feature = "storage-memory")]
Scheme::Memory => Ok(Self::Memory(super::memory_config_build()?)),
Scheme::Memory => StorageBackend::Memory(super::memory_config_build()?),
#[cfg(feature = "storage-fs")]
Scheme::Fs => Ok(Self::LocalFs),
Scheme::Fs => StorageBackend::LocalFs,
#[cfg(feature = "storage-s3")]
Scheme::S3 => Ok(Self::S3 {
Scheme::S3 => StorageBackend::S3 {
configured_scheme: scheme_str,
config: super::s3_config_parse(props)?.into(),
customized_credential_load: extensions
.get::<CustomAwsCredentialLoader>()
.map(Arc::unwrap_or_clone),
}),
},
#[cfg(feature = "storage-gcs")]
Scheme::Gcs => Ok(Self::Gcs {
Scheme::Gcs => StorageBackend::Gcs {
config: super::gcs_config_parse(props)?.into(),
}),
},
#[cfg(feature = "storage-oss")]
Scheme::Oss => Ok(Self::Oss {
Scheme::Oss => StorageBackend::Oss {
config: super::oss_config_parse(props)?.into(),
}),
},
#[cfg(feature = "storage-azdls")]
Scheme::Azdls => {
let scheme = scheme_str.parse::<AzureStorageScheme>()?;
Ok(Self::Azdls {
StorageBackend::Azdls {
config: super::azdls_config_parse(props)?.into(),
configured_scheme: scheme,
})
}
}
// Update doc on [`FileIO`] when adding new schemes.
_ => Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Constructing file io from scheme: {scheme} not supported now",),
)),
}
#[cfg(not(any(
feature = "storage-memory",
feature = "storage-fs",
feature = "storage-s3",
feature = "storage-gcs",
feature = "storage-oss",
feature = "storage-azdls"
)))]
_ => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
"No storage service has been enabled".to_string(),
));
}
#[cfg(any(
feature = "storage-memory",
feature = "storage-fs",
feature = "storage-s3",
feature = "storage-gcs",
feature = "storage-oss",
feature = "storage-azdls"
))]
_ => {
return Err(Error::new(
ErrorKind::FeatureUnsupported,
format!("Constructing file io from scheme: {scheme} not supported now"),
));
}
};

Ok(Self { backend, executor })
}

/// Creates operator from path.
Expand All @@ -135,17 +195,17 @@ impl Storage {
) -> crate::Result<(Operator, &'a str)> {
let path = path.as_ref();
let _ = path;
let (operator, relative_path): (Operator, &str) = match self {
let (operator, relative_path): (Operator, &str) = match &self.backend {
#[cfg(feature = "storage-memory")]
Storage::Memory(op) => {
StorageBackend::Memory(op) => {
if let Some(stripped) = path.strip_prefix("memory:/") {
Ok::<_, crate::Error>((op.clone(), stripped))
} else {
Ok::<_, crate::Error>((op.clone(), &path[1..]))
}
}
#[cfg(feature = "storage-fs")]
Storage::LocalFs => {
StorageBackend::LocalFs => {
let op = super::fs_config_build()?;

if let Some(stripped) = path.strip_prefix("file:/") {
Expand All @@ -155,7 +215,7 @@ impl Storage {
}
}
#[cfg(feature = "storage-s3")]
Storage::S3 {
StorageBackend::S3 {
configured_scheme,
config,
customized_credential_load,
Expand All @@ -175,7 +235,7 @@ impl Storage {
}
}
#[cfg(feature = "storage-gcs")]
Storage::Gcs { config } => {
StorageBackend::Gcs { config } => {
let operator = super::gcs_config_build(config, path)?;
let prefix = format!("gs://{}/", operator.info().name());
if path.starts_with(&prefix) {
Expand All @@ -188,7 +248,7 @@ impl Storage {
}
}
#[cfg(feature = "storage-oss")]
Storage::Oss { config } => {
StorageBackend::Oss { config } => {
let op = super::oss_config_build(config, path)?;

// Check prefix of oss path.
Expand All @@ -203,7 +263,7 @@ impl Storage {
}
}
#[cfg(feature = "storage-azdls")]
Storage::Azdls {
StorageBackend::Azdls {
configured_scheme,
config,
} => super::azdls_create_operator(path, config, configured_scheme),
Expand All @@ -220,6 +280,12 @@ impl Storage {
)),
}?;

// Apply custom executor if configured for runtime segregation
if let Some(ref executor) = self.executor {
let executor_clone = executor.clone();
operator.update_executor(|_| executor_clone);
}

// Transient errors are common for object stores; however there's no
// harm in retrying temporary failures for other storage backends as well.
let operator = operator.layer(RetryLayer::new());
Expand Down
Loading
Loading