@@ -97,6 +97,7 @@ use super::udfs::{
9797 aggregate_udf_by_kind, registerable_aggregate_udfs, registerable_arc_aggregate_udfs,
9898 registerable_arc_scalar_udfs, CubeAggregateUDFKind ,
9999} ;
100+ use super :: QueryPlannerImpl ;
100101
101102#[ automock]
102103#[ async_trait]
@@ -139,14 +140,21 @@ pub trait QueryExecutor: DIService + Send + Sync {
139140crate :: di_service!( MockQueryExecutor , [ QueryExecutor ] ) ;
140141
141142pub struct QueryExecutorImpl {
142- // TODO: Why do we need a MetadataCacheFactory when we have a ParquetMetadataCache?
143+ // TODO: Why do we need a MetadataCacheFactory when we have a ParquetMetadataCache? (We use its make_session_config() now, TODO rename stuff)
143144 metadata_cache_factory : Arc < dyn MetadataCacheFactory > ,
144145 parquet_metadata_cache : Arc < dyn CubestoreParquetMetadataCache > ,
145146 memory_handler : Arc < dyn MemoryHandler > ,
146147}
147148
148149crate :: di_service!( QueryExecutorImpl , [ QueryExecutor ] ) ;
149150
151+ impl QueryExecutorImpl {
152+ fn execution_context ( & self ) -> Result < Arc < SessionContext > , CubeError > {
153+ // This is supposed to be identical to QueryImplImpl::execution_context.
154+ Ok ( Arc :: new ( QueryPlannerImpl :: execution_context_helper ( self . metadata_cache_factory . make_session_config ( ) ) ) )
155+ }
156+ }
157+
150158#[ async_trait]
151159impl QueryExecutor for QueryExecutorImpl {
152160 #[ instrument( level = "trace" , skip( self , plan, cluster) ) ]
@@ -174,7 +182,8 @@ impl QueryExecutor for QueryExecutorImpl {
174182
175183 let execution_time = SystemTime :: now ( ) ;
176184
177- let results = collect ( split_plan. clone ( ) , Arc :: new ( TaskContext :: default ( ) ) )
185+ let session_context = self . execution_context ( ) ?;
186+ let results = collect ( split_plan. clone ( ) , session_context. task_ctx ( ) )
178187 . instrument ( collect_span)
179188 . await ;
180189 let execution_time = execution_time. elapsed ( ) ?;
@@ -241,8 +250,9 @@ impl QueryExecutor for QueryExecutorImpl {
241250 ) ;
242251
243252 let execution_time = SystemTime :: now ( ) ;
253+ let session_context = self . execution_context ( ) ?;
244254 // TODO context
245- let results = collect ( worker_plan. clone ( ) , Arc :: new ( TaskContext :: default ( ) ) )
255+ let results = collect ( worker_plan. clone ( ) , session_context . task_ctx ( ) )
246256 . instrument ( tracing:: span!(
247257 tracing:: Level :: TRACE ,
248258 "collect_physical_plan"
0 commit comments