Skip to content

Commit e95523f

Browse files
author
Devdutt Shenoi
committed
refactor: execution_plans
1 parent 0b2e19e commit e95523f

File tree

1 file changed

+90
-101
lines changed

1 file changed

+90
-101
lines changed

src/query/stream_schema_provider.rs

Lines changed: 90 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,7 @@ impl StandardTableProvider {
118118
#[allow(clippy::too_many_arguments)]
119119
async fn create_parquet_physical_plan(
120120
&self,
121+
execution_plans: &mut Vec<Arc<dyn ExecutionPlan>>,
121122
object_store_url: ObjectStoreUrl,
122123
partitions: Vec<Vec<PartitionedFile>>,
123124
statistics: Statistics,
@@ -126,7 +127,7 @@ impl StandardTableProvider {
126127
limit: Option<usize>,
127128
state: &dyn Session,
128129
time_partition: Option<String>,
129-
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
130+
) -> Result<(), DataFusionError> {
130131
let filters = if let Some(expr) = conjunction(filters.to_vec()) {
131132
let table_df_schema = self.schema.as_ref().clone().to_dfschema()?;
132133
let filters = create_physical_expr(&expr, &table_df_schema, state.execution_props())?;
@@ -165,20 +166,23 @@ impl StandardTableProvider {
165166
filters.as_ref(),
166167
)
167168
.await?;
168-
Ok(plan)
169+
execution_plans.push(plan);
170+
171+
Ok(())
169172
}
170173

171174
#[allow(clippy::too_many_arguments)]
172175
async fn get_cache_exectuion_plan(
173176
&self,
177+
execution_plans: &mut Vec<Arc<dyn ExecutionPlan>>,
174178
cache_manager: &LocalCacheManager,
175179
manifest_files: &mut Vec<File>,
176180
projection: Option<&Vec<usize>>,
177181
filters: &[Expr],
178182
limit: Option<usize>,
179183
state: &dyn Session,
180184
time_partition: Option<String>,
181-
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
185+
) -> Result<(), DataFusionError> {
182186
let (cached, remainder) = cache_manager
183187
.partition_on_cached(&self.stream, manifest_files.clone(), |file: &File| {
184188
&file.file_path
@@ -200,33 +204,34 @@ impl StandardTableProvider {
200204
.collect();
201205

202206
let (partitioned_files, statistics) = self.partitioned_files(cached);
203-
let plan = self
204-
.create_parquet_physical_plan(
205-
ObjectStoreUrl::parse("file:///").unwrap(),
206-
partitioned_files,
207-
statistics,
208-
projection,
209-
filters,
210-
limit,
211-
state,
212-
time_partition.clone(),
213-
)
214-
.await?;
207+
self.create_parquet_physical_plan(
208+
execution_plans,
209+
ObjectStoreUrl::parse("file:///").unwrap(),
210+
partitioned_files,
211+
statistics,
212+
projection,
213+
filters,
214+
limit,
215+
state,
216+
time_partition.clone(),
217+
)
218+
.await?;
215219

216-
Ok(Some(plan))
220+
Ok(())
217221
}
218222

219223
#[allow(clippy::too_many_arguments)]
220224
async fn get_hottier_exectuion_plan(
221225
&self,
226+
execution_plans: &mut Vec<Arc<dyn ExecutionPlan>>,
222227
hot_tier_manager: &HotTierManager,
223228
manifest_files: &mut Vec<File>,
224229
projection: Option<&Vec<usize>>,
225230
filters: &[Expr],
226231
limit: Option<usize>,
227232
state: &dyn Session,
228233
time_partition: Option<String>,
229-
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
234+
) -> Result<(), DataFusionError> {
230235
let hot_tier_files = hot_tier_manager
231236
.get_hot_tier_manifest_files(&self.stream, manifest_files)
232237
.await
@@ -247,25 +252,26 @@ impl StandardTableProvider {
247252
.collect();
248253

249254
let (partitioned_files, statistics) = self.partitioned_files(hot_tier_files);
250-
let plan = self
251-
.create_parquet_physical_plan(
252-
ObjectStoreUrl::parse("file:///").unwrap(),
253-
partitioned_files,
254-
statistics,
255-
projection,
256-
filters,
257-
limit,
258-
state,
259-
time_partition.clone(),
260-
)
261-
.await?;
255+
self.create_parquet_physical_plan(
256+
execution_plans,
257+
ObjectStoreUrl::parse("file:///").unwrap(),
258+
partitioned_files,
259+
statistics,
260+
projection,
261+
filters,
262+
limit,
263+
state,
264+
time_partition.clone(),
265+
)
266+
.await?;
262267

263-
Ok(Some(plan))
268+
Ok(())
264269
}
265270

266271
#[allow(clippy::too_many_arguments)]
267272
async fn legacy_listing_table(
268273
&self,
274+
execution_plans: &mut Vec<Arc<dyn ExecutionPlan>>,
269275
glob_storage: Arc<dyn ObjectStorage>,
270276
object_store: Arc<dyn ObjectStore>,
271277
time_filters: &[PartialTimeFilter],
@@ -274,33 +280,32 @@ impl StandardTableProvider {
274280
filters: &[Expr],
275281
limit: Option<usize>,
276282
time_partition: Option<String>,
277-
) -> Result<Option<Arc<dyn ExecutionPlan>>, DataFusionError> {
278-
let remote_table = ListingTableBuilder::new(self.stream.to_owned())
283+
) -> Result<(), DataFusionError> {
284+
ListingTableBuilder::new(self.stream.to_owned())
279285
.populate_via_listing(glob_storage.clone(), object_store, time_filters)
280286
.and_then(|builder| async {
281287
let table = builder.build(
282288
self.schema.clone(),
283289
|x| glob_storage.query_prefixes(x),
284290
time_partition,
285291
)?;
286-
let res = match table {
287-
Some(table) => Some(table.scan(state, projection, filters, limit).await?),
288-
_ => None,
289-
};
290-
Ok(res)
292+
if let Some(table) = table {
293+
let plan = table.scan(state, projection, filters, limit).await?;
294+
execution_plans.push(plan);
295+
}
296+
297+
Ok(())
291298
})
292299
.await?;
293300

294-
Ok(remote_table)
301+
Ok(())
295302
}
296303

297304
fn final_plan(
298305
&self,
299-
execution_plans: Vec<Option<Arc<dyn ExecutionPlan>>>,
306+
mut execution_plans: Vec<Arc<dyn ExecutionPlan>>,
300307
projection: Option<&Vec<usize>>,
301308
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
302-
let mut execution_plans = execution_plans.into_iter().flatten().collect_vec();
303-
304309
let exec: Arc<dyn ExecutionPlan> = if execution_plans.is_empty() {
305310
let schema = match projection {
306311
Some(projection) => Arc::new(self.schema.project(projection)?),
@@ -462,10 +467,7 @@ impl TableProvider for StandardTableProvider {
462467
filters: &[Expr],
463468
limit: Option<usize>,
464469
) -> Result<Arc<dyn ExecutionPlan>, DataFusionError> {
465-
let mut memory_exec = None;
466-
let mut cache_exec = None;
467-
let mut hot_tier_exec = None;
468-
let mut listing_exec = None;
470+
let mut execution_plans = vec![];
469471
let object_store = state
470472
.runtime_env()
471473
.object_store_registry
@@ -488,11 +490,11 @@ impl TableProvider for StandardTableProvider {
488490
event::STREAM_WRITERS.recordbatches_cloned(&self.stream, &self.schema)
489491
{
490492
let reversed_mem_table = reversed_mem_table(records, self.schema.clone())?;
491-
memory_exec = Some(
492-
reversed_mem_table
493-
.scan(state, projection, filters, limit)
494-
.await?,
495-
);
493+
494+
let memory_exec = reversed_mem_table
495+
.scan(state, projection, filters, limit)
496+
.await?;
497+
execution_plans.push(memory_exec);
496498
}
497499
};
498500
let mut merged_snapshot: snapshot::Snapshot = Snapshot::default();
@@ -526,8 +528,9 @@ impl TableProvider for StandardTableProvider {
526528
let listing_time_fiters =
527529
return_listing_time_filters(&merged_snapshot.manifest_list, &mut time_filters);
528530

529-
listing_exec = if let Some(listing_time_filter) = listing_time_fiters {
531+
if let Some(listing_time_filter) = listing_time_fiters {
530532
self.legacy_listing_table(
533+
&mut execution_plans,
531534
glob_storage.clone(),
532535
object_store.clone(),
533536
&listing_time_filter,
@@ -537,10 +540,8 @@ impl TableProvider for StandardTableProvider {
537540
limit,
538541
time_partition.clone(),
539542
)
540-
.await?
541-
} else {
542-
None
543-
};
543+
.await?;
544+
}
544545
}
545546

546547
let mut manifest_files = collect_from_snapshot(
@@ -553,14 +554,30 @@ impl TableProvider for StandardTableProvider {
553554
.await?;
554555

555556
if manifest_files.is_empty() {
556-
return self.final_plan(vec![listing_exec, memory_exec], projection);
557+
return self.final_plan(execution_plans, projection);
557558
}
558559

559560
// Based on entries in the manifest files, find them in the cache and create a physical plan.
560561
if let Some(cache_manager) = LocalCacheManager::global() {
561-
cache_exec = self
562-
.get_cache_exectuion_plan(
563-
cache_manager,
562+
self.get_cache_exectuion_plan(
563+
&mut execution_plans,
564+
cache_manager,
565+
&mut manifest_files,
566+
projection,
567+
filters,
568+
limit,
569+
state,
570+
time_partition.clone(),
571+
)
572+
.await?;
573+
}
574+
575+
// Hot tier data fetch
576+
if let Some(hot_tier_manager) = HotTierManager::global() {
577+
if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) {
578+
self.get_hottier_exectuion_plan(
579+
&mut execution_plans,
580+
hot_tier_manager,
564581
&mut manifest_files,
565582
projection,
566583
filters,
@@ -569,56 +586,28 @@ impl TableProvider for StandardTableProvider {
569586
time_partition.clone(),
570587
)
571588
.await?;
572-
}
573-
574-
// Hot tier data fetch
575-
if let Some(hot_tier_manager) = HotTierManager::global() {
576-
if hot_tier_manager.check_stream_hot_tier_exists(&self.stream) {
577-
hot_tier_exec = self
578-
.get_hottier_exectuion_plan(
579-
hot_tier_manager,
580-
&mut manifest_files,
581-
projection,
582-
filters,
583-
limit,
584-
state,
585-
time_partition.clone(),
586-
)
587-
.await?;
588589
}
589590
}
590591
if manifest_files.is_empty() {
591592
QUERY_CACHE_HIT.with_label_values(&[&self.stream]).inc();
592-
return self.final_plan(
593-
vec![listing_exec, memory_exec, cache_exec, hot_tier_exec],
594-
projection,
595-
);
593+
return self.final_plan(execution_plans, projection);
596594
}
597595

598596
let (partitioned_files, statistics) = self.partitioned_files(manifest_files);
599-
let remote_exec = self
600-
.create_parquet_physical_plan(
601-
ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(),
602-
partitioned_files,
603-
statistics,
604-
projection,
605-
filters,
606-
limit,
607-
state,
608-
time_partition.clone(),
609-
)
610-
.await?;
611-
612-
Ok(self.final_plan(
613-
vec![
614-
listing_exec,
615-
memory_exec,
616-
cache_exec,
617-
hot_tier_exec,
618-
Some(remote_exec),
619-
],
597+
self.create_parquet_physical_plan(
598+
&mut execution_plans,
599+
ObjectStoreUrl::parse(glob_storage.store_url()).unwrap(),
600+
partitioned_files,
601+
statistics,
620602
projection,
621-
)?)
603+
filters,
604+
limit,
605+
state,
606+
time_partition.clone(),
607+
)
608+
.await?;
609+
610+
Ok(self.final_plan(execution_plans, projection)?)
622611
}
623612

624613
/*

0 commit comments

Comments
 (0)