@@ -25,10 +25,7 @@ use crate::io_object_store::IoObjectStore;
2525use crate :: stream:: RecordBatchStreamAdapter ;
2626use crate :: SendableRecordBatchStream ;
2727use datafusion_common:: DataFusionError ;
28- use futures:: {
29- future:: { BoxFuture , Shared } ,
30- Future , FutureExt , TryFutureExt ,
31- } ;
28+ use futures:: { future:: { BoxFuture , Shared } , Future , FutureExt , Stream , TryFutureExt } ;
3229use log:: { info, warn} ;
3330use object_store:: ObjectStore ;
3431use parking_lot:: RwLock ;
@@ -59,6 +56,8 @@ impl From<Builder> for DedicatedExecutorBuilder {
5956/// tasks on the same threadpool by running futures (and any `tasks` that are
6057/// `tokio::task::spawned` by them) on a separate tokio [`Executor`].
6158///
59+ /// DedicatedExecutor can be `clone`ed and all clones share the same threadpool.
60+ ///
6261/// TODO add note about `io_thread`
6362///
6463/// TODO: things we use in InfluxData
@@ -119,7 +118,7 @@ impl From<Builder> for DedicatedExecutorBuilder {
119118/// happens when a runtime is dropped from within an asynchronous
120119/// context.', .../tokio-1.4.0/src/runtime/blocking/shutdown.rs:51:21
121120///
122- #[ derive( Clone ) ]
121+ #[ derive( Clone , Debug ) ]
123122pub struct DedicatedExecutor {
124123 state : Arc < RwLock < State > > ,
125124}
@@ -222,8 +221,8 @@ impl DedicatedExecutor {
222221 /// Note that this object store will only work correctly if run on this
223222 /// dedicated executor. If you try and use it on another executor, it will
224223 /// panic with "no IO runtime registered" type error.
225- pub fn wrap_object_store ( object_store : Arc < dyn ObjectStore > ) -> Arc < IoObjectStore > {
226- Arc :: new ( IoObjectStore :: new ( object_store) )
224+ pub fn wrap_object_store ( & self , object_store : Arc < dyn ObjectStore > ) -> Arc < IoObjectStore > {
225+ Arc :: new ( IoObjectStore :: new ( self . clone ( ) , object_store) )
227226 }
228227
229228 /// Returns a SendableRecordBatchStream that will run on this executor's thread pool
@@ -237,6 +236,28 @@ impl DedicatedExecutor {
237236 Box :: pin ( RecordBatchStreamAdapter :: new ( schema, cross_rt_stream) )
238237 }
239238
239+ /// Runs an stream that produces Results on the executor's thread pool
240+ ///
241+ /// Ths stream must produce Results so that any errors on the dedicated
242+ /// executor (like a panic or shutdown) can be communicated back.
243+ ///
244+ /// # Arguments:
245+ /// - stream: the stream to run on this dedicated executor
246+ /// - converter: a function that converts a [`JobError`] to the error type of the stream
247+ pub fn run_stream < X , E , S , C > (
248+ & self ,
249+ stream : S ,
250+ converter : C ,
251+ ) -> impl Stream < Item = Result < X , E > > + Send + ' static
252+ where
253+ X : Send + ' static ,
254+ E : Send + ' static ,
255+ S : Stream < Item = Result < X , E > > + Send + ' static ,
256+ C : Fn ( JobError ) -> E + Send + ' static ,
257+ {
258+ CrossRtStream :: new_with_error_stream ( stream, self . clone ( ) , converter)
259+ }
260+
240261 /// Registers `handle` as the IO runtime for this thread
241262 ///
242263 /// This sets a thread-local variable
@@ -306,6 +327,7 @@ impl<T> Future for DropGuard<T> {
306327/// The state is only used by the "outer" API, not by the newly created runtime. The new runtime waits for
307328/// [`start_shutdown`](Self::start_shutdown) and signals the completion via
308329/// [`completed_shutdown`](Self::completed_shutdown) (for which is owns the sender side).
330+ #[ derive( Debug ) ]
309331struct State {
310332 /// Runtime handle.
311333 ///
0 commit comments