File tree Expand file tree Collapse file tree 2 files changed +26
-0
lines changed
Expand file tree Collapse file tree 2 files changed +26
-0
lines changed Original file line number Diff line number Diff line change @@ -2,6 +2,7 @@ use arrow::util::pretty::pretty_format_batches;
22use arrow_flight:: flight_service_client:: FlightServiceClient ;
33use async_trait:: async_trait;
44use datafusion:: common:: DataFusionError ;
5+ use datafusion:: common:: utils:: get_available_parallelism;
56use datafusion:: execution:: SessionStateBuilder ;
67use datafusion:: physical_plan:: displayable;
78use datafusion:: prelude:: { ParquetReadOptions , SessionContext } ;
@@ -27,6 +28,12 @@ struct Args {
2728
2829 #[ structopt( long) ]
2930 explain : bool ,
31+
32+ #[ structopt( long) ]
33+ files_per_task : Option < usize > ,
34+
35+ #[ structopt( long) ]
36+ cardinality_task_sf : Option < f64 > ,
3037}
3138
3239#[ tokio:: main]
@@ -37,6 +44,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
3744 . with_default_features ( )
3845 . with_distributed_channel_resolver ( InMemoryChannelResolver :: new ( ) )
3946 . with_physical_optimizer_rule ( Arc :: new ( DistributedPhysicalOptimizerRule ) )
47+ . with_distributed_files_per_task (
48+ args. files_per_task . unwrap_or ( get_available_parallelism ( ) ) ,
49+ ) ?
50+ . with_distributed_cardinality_effect_task_scale_factor (
51+ args. cardinality_task_sf . unwrap_or ( 1. ) ,
52+ ) ?
4053 . build ( ) ;
4154
4255 let ctx = SessionContext :: from ( state) ;
Original file line number Diff line number Diff line change @@ -3,6 +3,7 @@ use arrow_flight::flight_service_client::FlightServiceClient;
33use async_trait:: async_trait;
44use dashmap:: { DashMap , Entry } ;
55use datafusion:: common:: DataFusionError ;
6+ use datafusion:: common:: utils:: get_available_parallelism;
67use datafusion:: execution:: SessionStateBuilder ;
78use datafusion:: physical_plan:: displayable;
89use datafusion:: prelude:: { ParquetReadOptions , SessionContext } ;
@@ -28,6 +29,12 @@ struct Args {
2829
2930 #[ structopt( long) ]
3031 explain : bool ,
32+
33+ #[ structopt( long) ]
34+ files_per_task : Option < usize > ,
35+
36+ #[ structopt( long) ]
37+ cardinality_task_sf : Option < f64 > ,
3138}
3239
3340#[ tokio:: main]
@@ -43,6 +50,12 @@ async fn main() -> Result<(), Box<dyn Error>> {
4350 . with_default_features ( )
4451 . with_distributed_channel_resolver ( localhost_resolver)
4552 . with_physical_optimizer_rule ( Arc :: new ( DistributedPhysicalOptimizerRule ) )
53+ . with_distributed_files_per_task (
54+ args. files_per_task . unwrap_or ( get_available_parallelism ( ) ) ,
55+ ) ?
56+ . with_distributed_cardinality_effect_task_scale_factor (
57+ args. cardinality_task_sf . unwrap_or ( 1. ) ,
58+ ) ?
4659 . build ( ) ;
4760
4861 let ctx = SessionContext :: from ( state) ;
You can’t perform that action at this time.
0 commit comments