11use std:: sync:: Arc ;
22
3+ #[ cfg( feature = "tokio" ) ]
4+ use tokio:: runtime:: Handle ;
5+
36use crate :: exec:: inline:: InlineDriver ;
7+ #[ cfg( feature = "tokio" ) ]
48use crate :: exec:: tokio:: TokioDriver ;
59use crate :: exec:: ExecDriver ;
610
@@ -11,20 +15,32 @@ pub enum ExecutionMode {
1115 /// [`vortex_array::stream::ArrayStream`]. In other words, uses the same runtime.
1216 Inline ,
1317 /// Spawns the tasks onto a provided Rayon thread pool.
14- // TODO(ngates): feature-flag this dependency.
18+ # [ cfg ( feature = "rayon" ) ]
1519 RayonThreadPool ( Arc < rayon:: ThreadPool > ) ,
1620 /// Spawns the tasks onto a provided Tokio runtime.
17- // TODO(ngates): feature-flag this dependency.
18- TokioRuntime ( tokio :: runtime :: Handle ) ,
21+ # [ cfg ( feature = "tokio" ) ]
22+ TokioRuntime ( Handle ) ,
1923}
2024
2125impl ExecutionMode {
2226 pub fn into_driver ( self ) -> Arc < dyn ExecDriver > {
2327 match self {
24- ExecutionMode :: Inline => Arc :: new ( InlineDriver ) ,
28+ ExecutionMode :: Inline => {
29+ // Default to tokio-specific behavior if its enabled and there's a runtime running.
30+ #[ cfg( feature = "tokio" ) ]
31+ match Handle :: try_current ( ) {
32+ Ok ( h) => Arc :: new ( TokioDriver ( h) ) ,
33+ Err ( _) => Arc :: new ( InlineDriver ) ,
34+ }
35+
36+ #[ cfg( not( feature = "tokio" ) ) ]
37+ Arc :: new ( InlineDriver )
38+ }
39+ #[ cfg( feature = "rayon" ) ]
2540 ExecutionMode :: RayonThreadPool ( _) => {
2641 todo ! ( )
2742 }
43+ #[ cfg( feature = "tokio" ) ]
2844 ExecutionMode :: TokioRuntime ( handle) => Arc :: new ( TokioDriver ( handle) ) ,
2945 }
3046 }
0 commit comments