1717
1818import argparse
1919import ray
20+ from datafusion import SessionContext , SessionConfig , RuntimeConfig
2021from datafusion_ray import DatafusionRayContext
2122from datetime import datetime
2223import json
@@ -41,22 +42,32 @@ def main(benchmark: str, data_path: str, query_path: str, concurrency: int):
4142 # use ray job submit
4243 ray .init ()
4344
44- ctx = DatafusionRayContext (concurrency )
45+ runtime = (
46+ RuntimeConfig ()
47+ )
48+ config = (
49+ SessionConfig ()
50+ .with_target_partitions (concurrency )
51+ .set ("datafusion.execution.parquet.pushdown_filters" , "true" )
52+ )
53+ df_ctx = SessionContext (config , runtime )
54+
55+ ray_ctx = DatafusionRayContext (df_ctx )
4556
4657 for table in table_names :
4758 path = f"{ data_path } /{ table } .parquet"
4859 print (f"Registering table { table } using path { path } " )
49- ctx .register_parquet (table , path )
60+ df_ctx .register_parquet (table , path )
5061
5162 results = {
5263 'engine' : 'datafusion-python' ,
5364 'benchmark' : benchmark ,
5465 'data_path' : data_path ,
5566 'query_path' : query_path ,
56- 'concurrency' : concurrency ,
5767 }
5868
5969 for query in range (1 , num_queries + 1 ):
70+
6071 # read text file
6172 path = f"{ query_path } /q{ query } .sql"
6273 print (f"Reading query { query } using path { path } " )
@@ -70,7 +81,7 @@ def main(benchmark: str, data_path: str, query_path: str, concurrency: int):
7081 sql = sql .strip ()
7182 if len (sql ) > 0 :
7283 print (f"Executing: { sql } " )
73- rows = ctx .sql (sql )
84+ rows = ray_ctx .sql (sql )
7485
7586 print (f"Query { query } returned { len (rows )} rows" )
7687 end_time = time .time ()
@@ -86,6 +97,9 @@ def main(benchmark: str, data_path: str, query_path: str, concurrency: int):
8697 with open (results_path , "w" ) as f :
8798 f .write (str )
8899
100+ # write results to stdout
101+ print (str )
102+
89103if __name__ == "__main__" :
90104 parser = argparse .ArgumentParser (description = "DataFusion benchmark derived from TPC-H / TPC-DS" )
91105 parser .add_argument ("--benchmark" , required = True , help = "Benchmark to run (tpch or tpcds)" )
0 commit comments