Skip to content

Commit df15d6e

Browse files
Add RuntimeHandle support for Tokio runtime segregation
1 parent d4c4bd4 commit df15d6e

File tree

3 files changed

+351
-25
lines changed

3 files changed

+351
-25
lines changed

crates/iceberg/src/io/file_io.rs

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -182,6 +182,53 @@ impl Extensions {
182182
}
183183
}
184184

185+
/// Runtime handle for executing async I/O operations.
186+
///
187+
/// When provided via FileIOBuilder extensions, OpenDAL operations will spawn
188+
/// tasks on this runtime instead of using the current runtime context.
189+
///
190+
/// This is useful for runtime segregation scenarios where you want to separate
191+
/// CPU-bound query execution from I/O-bound operations (e.g., blob storage access).
192+
///
193+
/// # Example
194+
///
195+
/// ```rust,ignore
196+
/// use iceberg::io::{FileIOBuilder, RuntimeHandle};
197+
/// use tokio::runtime::Builder;
198+
///
199+
/// // Create dedicated I/O runtime
200+
/// let io_runtime = Builder::new_multi_thread()
201+
/// .worker_threads(8)
202+
/// .thread_name("io-pool")
203+
/// .enable_io()
204+
/// .enable_time()
205+
/// .build()?;
206+
///
207+
/// // Configure FileIO with runtime handle
208+
/// let file_io = FileIOBuilder::new("s3")
209+
/// .with_extension(RuntimeHandle::new(io_runtime.handle().clone()))
210+
/// .with_props(s3_config)
211+
/// .build()?;
212+
/// ```
213+
#[derive(Clone, Debug)]
214+
pub struct RuntimeHandle(pub tokio::runtime::Handle);
215+
216+
impl RuntimeHandle {
217+
/// Create a new RuntimeHandle from a Tokio runtime handle.
218+
pub fn new(handle: tokio::runtime::Handle) -> Self {
219+
Self(handle)
220+
}
221+
222+
/// Get the current runtime handle.
223+
///
224+
/// # Panics
225+
///
226+
/// Panics if called outside of a Tokio runtime context.
227+
pub fn current() -> Self {
228+
Self(tokio::runtime::Handle::current())
229+
}
230+
}
231+
185232
/// Builder for [`FileIO`].
186233
#[derive(Clone, Debug)]
187234
pub struct FileIOBuilder {

crates/iceberg/src/io/storage.rs

Lines changed: 75 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@
1515
// specific language governing permissions and limitations
1616
// under the License.
1717

18+
use std::future::Future;
19+
use std::pin::Pin;
1820
#[cfg(any(
1921
feature = "storage-s3",
2022
feature = "storage-gcs",
@@ -36,14 +38,34 @@ use opendal::{Operator, Scheme};
3638

3739
#[cfg(feature = "storage-azdls")]
3840
use super::AzureStorageScheme;
39-
use super::FileIOBuilder;
41+
use super::{FileIOBuilder, RuntimeHandle};
4042
#[cfg(feature = "storage-s3")]
4143
use crate::io::CustomAwsCredentialLoader;
4244
use crate::{Error, ErrorKind};
4345

46+
/// Custom OpenDAL executor that spawns tasks on a specific Tokio runtime.
47+
///
48+
/// This executor implements the OpenDAL Execute trait and routes all spawned
49+
/// tasks to a configured Tokio runtime handle, enabling runtime segregation.
50+
#[derive(Clone)]
51+
struct CustomTokioExecutor {
52+
handle: tokio::runtime::Handle,
53+
}
54+
55+
impl opendal::Execute for CustomTokioExecutor {
56+
fn execute(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
57+
self.handle.spawn(f);
58+
}
59+
}
60+
4461
/// The storage carries all supported storage services in iceberg
62+
pub(crate) struct Storage {
63+
backend: StorageBackend,
64+
executor: Option<opendal::Executor>,
65+
}
66+
4567
#[derive(Debug)]
46-
pub(crate) enum Storage {
68+
enum StorageBackend {
4769
#[cfg(feature = "storage-memory")]
4870
Memory(Operator),
4971
#[cfg(feature = "storage-fs")]
@@ -73,48 +95,70 @@ pub(crate) enum Storage {
7395
},
7496
}
7597

98+
impl std::fmt::Debug for Storage {
99+
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100+
f.debug_struct("Storage")
101+
.field("backend", &self.backend)
102+
.field(
103+
"executor",
104+
&self.executor.as_ref().map(|_| "Some(Executor)"),
105+
)
106+
.finish()
107+
}
108+
}
109+
76110
impl Storage {
77111
/// Convert iceberg config to opendal config.
78112
pub(crate) fn build(file_io_builder: FileIOBuilder) -> crate::Result<Self> {
79113
let (scheme_str, props, extensions) = file_io_builder.into_parts();
80114
let _ = (&props, &extensions);
81115
let scheme = Self::parse_scheme(&scheme_str)?;
82116

83-
match scheme {
117+
// Extract runtime handle and create executor if provided
118+
let executor = extensions.get::<RuntimeHandle>().map(|runtime_handle| {
119+
let handle = Arc::unwrap_or_clone(runtime_handle).0;
120+
opendal::Executor::with(CustomTokioExecutor { handle })
121+
});
122+
123+
let backend = match scheme {
84124
#[cfg(feature = "storage-memory")]
85-
Scheme::Memory => Ok(Self::Memory(super::memory_config_build()?)),
125+
Scheme::Memory => StorageBackend::Memory(super::memory_config_build()?),
86126
#[cfg(feature = "storage-fs")]
87-
Scheme::Fs => Ok(Self::LocalFs),
127+
Scheme::Fs => StorageBackend::LocalFs,
88128
#[cfg(feature = "storage-s3")]
89-
Scheme::S3 => Ok(Self::S3 {
129+
Scheme::S3 => StorageBackend::S3 {
90130
configured_scheme: scheme_str,
91131
config: super::s3_config_parse(props)?.into(),
92132
customized_credential_load: extensions
93133
.get::<CustomAwsCredentialLoader>()
94134
.map(Arc::unwrap_or_clone),
95-
}),
135+
},
96136
#[cfg(feature = "storage-gcs")]
97-
Scheme::Gcs => Ok(Self::Gcs {
137+
Scheme::Gcs => StorageBackend::Gcs {
98138
config: super::gcs_config_parse(props)?.into(),
99-
}),
139+
},
100140
#[cfg(feature = "storage-oss")]
101-
Scheme::Oss => Ok(Self::Oss {
141+
Scheme::Oss => StorageBackend::Oss {
102142
config: super::oss_config_parse(props)?.into(),
103-
}),
143+
},
104144
#[cfg(feature = "storage-azdls")]
105145
Scheme::Azdls => {
106146
let scheme = scheme_str.parse::<AzureStorageScheme>()?;
107-
Ok(Self::Azdls {
147+
StorageBackend::Azdls {
108148
config: super::azdls_config_parse(props)?.into(),
109149
configured_scheme: scheme,
110-
})
150+
}
111151
}
112152
// Update doc on [`FileIO`] when adding new schemes.
113-
_ => Err(Error::new(
114-
ErrorKind::FeatureUnsupported,
115-
format!("Constructing file io from scheme: {scheme} not supported now",),
116-
)),
117-
}
153+
_ => {
154+
return Err(Error::new(
155+
ErrorKind::FeatureUnsupported,
156+
format!("Constructing file io from scheme: {scheme} not supported now",),
157+
));
158+
}
159+
};
160+
161+
Ok(Self { backend, executor })
118162
}
119163

120164
/// Creates operator from path.
@@ -135,17 +179,17 @@ impl Storage {
135179
) -> crate::Result<(Operator, &'a str)> {
136180
let path = path.as_ref();
137181
let _ = path;
138-
let (operator, relative_path): (Operator, &str) = match self {
182+
let (operator, relative_path): (Operator, &str) = match &self.backend {
139183
#[cfg(feature = "storage-memory")]
140-
Storage::Memory(op) => {
184+
StorageBackend::Memory(op) => {
141185
if let Some(stripped) = path.strip_prefix("memory:/") {
142186
Ok::<_, crate::Error>((op.clone(), stripped))
143187
} else {
144188
Ok::<_, crate::Error>((op.clone(), &path[1..]))
145189
}
146190
}
147191
#[cfg(feature = "storage-fs")]
148-
Storage::LocalFs => {
192+
StorageBackend::LocalFs => {
149193
let op = super::fs_config_build()?;
150194

151195
if let Some(stripped) = path.strip_prefix("file:/") {
@@ -155,7 +199,7 @@ impl Storage {
155199
}
156200
}
157201
#[cfg(feature = "storage-s3")]
158-
Storage::S3 {
202+
StorageBackend::S3 {
159203
configured_scheme,
160204
config,
161205
customized_credential_load,
@@ -175,7 +219,7 @@ impl Storage {
175219
}
176220
}
177221
#[cfg(feature = "storage-gcs")]
178-
Storage::Gcs { config } => {
222+
StorageBackend::Gcs { config } => {
179223
let operator = super::gcs_config_build(config, path)?;
180224
let prefix = format!("gs://{}/", operator.info().name());
181225
if path.starts_with(&prefix) {
@@ -188,7 +232,7 @@ impl Storage {
188232
}
189233
}
190234
#[cfg(feature = "storage-oss")]
191-
Storage::Oss { config } => {
235+
StorageBackend::Oss { config } => {
192236
let op = super::oss_config_build(config, path)?;
193237

194238
// Check prefix of oss path.
@@ -203,7 +247,7 @@ impl Storage {
203247
}
204248
}
205249
#[cfg(feature = "storage-azdls")]
206-
Storage::Azdls {
250+
StorageBackend::Azdls {
207251
configured_scheme,
208252
config,
209253
} => super::azdls_create_operator(path, config, configured_scheme),
@@ -220,6 +264,12 @@ impl Storage {
220264
)),
221265
}?;
222266

267+
// Apply custom executor if configured for runtime segregation
268+
if let Some(ref executor) = self.executor {
269+
let executor_clone = executor.clone();
270+
operator.update_executor(|_| executor_clone);
271+
}
272+
223273
// Transient errors are common for object stores; however there's no
224274
// harm in retrying temporary failures for other storage backends as well.
225275
let operator = operator.layer(RetryLayer::new());

0 commit comments

Comments
 (0)