2121
2222This directory contains examples for executing distributed queries with Ballista.
2323
24- # Standalone Examples
24+ ## Standalone Examples
2525
2626The standalone example is the easiest to get started with. Ballista supports a standalone mode where a scheduler
2727and executor are started in-process.
@@ -33,18 +33,35 @@ cargo run --example standalone_sql --features="ballista/standalone"
3333### Source code for standalone SQL example
3434
3535``` rust
36+ use ballista :: {
37+ extension :: SessionConfigExt ,
38+ prelude :: *
39+ };
40+ use datafusion :: {
41+ execution :: {options :: ParquetReadOptions , SessionStateBuilder },
42+ prelude :: {SessionConfig , SessionContext },
43+ };
44+
3645#[tokio:: main]
3746async fn main () -> Result <()> {
38- let config = BallistaConfig :: builder ()
39- . set ( " ballista.shuffle.partitions " , " 1 " )
40- . build () ? ;
47+ let config = SessionConfig :: new_with_ballista ()
48+ . with_target_partitions ( 1 )
49+ . with_ballista_standalone_parallelism ( 2 ) ;
4150
42- let ctx = BallistaContext :: standalone (& config , 2 ). await ? ;
51+ let state = SessionStateBuilder :: new ()
52+ . with_config (config )
53+ . with_default_features ()
54+ . build ();
4355
44- ctx . register_csv (
56+ let ctx = SessionContext :: standalone_with_state (state ). await ? ;
57+
58+ let test_data = test_util :: examples_test_data ();
59+
60+ // register parquet file with the execution context
61+ ctx . register_parquet (
4562 " test" ,
46- " testdata/aggregate_test_100.csv " ,
47- CsvReadOptions :: new (),
63+ & format! ( " {test_data}/alltypes_plain.parquet " ) ,
64+ ParquetReadOptions :: default (),
4865 )
4966 . await ? ;
5067
@@ -56,12 +73,12 @@ async fn main() -> Result<()> {
5673
5774```
5875
59- # Distributed Examples
76+ ## Distributed Examples
6077
6178For background information on the Ballista architecture, refer to
6279the [ Ballista README] ( ../ballista/client/README.md ) .
6380
64- ## Start a standalone cluster
81+ ### Start a standalone cluster
6582
6683From the root of the project, build release binaries.
6784
@@ -83,40 +100,49 @@ RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50051
83100RUST_LOG=info ./target/release/ballista-executor -c 2 -p 50052
84101```
85102
86- ## Running the examples
103+ ### Running the examples
87104
88105The examples can be run using the ` cargo run --bin ` syntax.
89106
90- ## Distributed SQL Example
107+ ### Distributed SQL Example
91108
92109``` bash
93110cargo run --release --example remote-sql
94111```
95112
96- ### Source code for distributed SQL example
113+ #### Source code for distributed SQL example
97114
98115``` rust
99- use ballista :: prelude :: * ;
100- use datafusion :: prelude :: CsvReadOptions ;
116+ use ballista :: {extension :: SessionConfigExt , prelude :: * };
117+ use datafusion :: {
118+ execution :: SessionStateBuilder ,
119+ prelude :: {CsvReadOptions , SessionConfig , SessionContext },
120+ };
101121
102122/// This example demonstrates executing a simple query against an Arrow data source (CSV) and
103123/// fetching results, using SQL
104124#[tokio:: main]
105125async fn main () -> Result <()> {
106- let config = BallistaConfig :: builder ()
107- . set (" ballista.shuffle.partitions" , " 4" )
108- . build ()? ;
109- let ctx = BallistaContext :: remote (" localhost" , 50050 , & config ). await ? ;
126+ let config = SessionConfig :: new_with_ballista ()
127+ . with_target_partitions (4 )
128+ . with_ballista_job_name (" Remote SQL Example" );
129+
130+ let state = SessionStateBuilder :: new ()
131+ . with_config (config )
132+ . with_default_features ()
133+ . build ();
134+
135+ let ctx = SessionContext :: remote_with_state (" df://localhost:50050" , state ). await ? ;
136+
137+ let test_data = test_util :: examples_test_data ();
110138
111- // register csv file with the execution context
112139 ctx . register_csv (
113140 " test" ,
114- " testdata /aggregate_test_100.csv" ,
141+ & format! ( " {test_data} /aggregate_test_100.csv" ) ,
115142 CsvReadOptions :: new (),
116143 )
117144 . await ? ;
118145
119- // execute the query
120146 let df = ctx
121147 . sql (
122148 " SELECT c1, MIN(c12), MAX(c12) \
@@ -126,39 +152,49 @@ async fn main() -> Result<()> {
126152 )
127153 . await ? ;
128154
129- // print the results
130155 df . show (). await ? ;
131156
132157 Ok (())
133158}
134159```
135160
136- ## Distributed DataFrame Example
161+ ### Distributed DataFrame Example
137162
138163``` bash
139164cargo run --release --example remote-dataframe
140165```
141166
142- ### Source code for distributed DataFrame example
167+ #### Source code for distributed DataFrame example
143168
144169``` rust
170+ use ballista :: {extension :: SessionConfigExt , prelude :: * };
171+ use datafusion :: {
172+ execution :: SessionStateBuilder ,
173+ prelude :: {col, lit, ParquetReadOptions , SessionConfig , SessionContext },
174+ };
175+
176+ /// This example demonstrates executing a simple query against an Arrow data source (Parquet) and
177+ /// fetching results, using the DataFrame trait
145178#[tokio:: main]
146179async fn main () -> Result <()> {
147- let config = BallistaConfig :: builder ()
148- . set (" ballista.shuffle.partitions" , " 4" )
149- . build ()? ;
150- let ctx = BallistaContext :: remote (" localhost" , 50050 , & config ). await ? ;
180+ let config = SessionConfig :: new_with_ballista (). with_target_partitions (4 );
181+
182+ let state = SessionStateBuilder :: new ()
183+ . with_config (config )
184+ . with_default_features ()
185+ . build ();
186+
187+ let ctx = SessionContext :: remote_with_state (" df://localhost:50050" , state ). await ? ;
151188
152- let filename = " testdata/alltypes_plain.parquet" ;
189+ let test_data = test_util :: examples_test_data ();
190+ let filename = format! (" {test_data}/alltypes_plain.parquet" );
153191
154- // define the query using the DataFrame trait
155192 let df = ctx
156193 . read_parquet (filename , ParquetReadOptions :: default ())
157194 . await ?
158195 . select_columns (& [" id" , " bool_col" , " timestamp_col" ])?
159196 . filter (col (" id" ). gt (lit (1 )))? ;
160197
161- // print the results
162198 df . show (). await ? ;
163199
164200 Ok (())
0 commit comments