2121from pyspark .sql import SparkSession
2222import time
2323
24+ # rename same columns aliases
25+ # a, a, b, b -> a, a_1, b, b_1
26+ #
27+ # Important for writing data where column name uniqueness is required
28+ def dedup_columns (df ):
29+ counts = {}
30+ new_cols = []
31+ for c in df .columns :
32+ if c not in counts :
33+ counts [c ] = 0
34+ new_cols .append (c )
35+ else :
36+ counts [c ] += 1
37+ new_cols .append (f"{ c } _{ counts [c ]} " )
38+ return df .toDF (* new_cols )
39+
2440def main (benchmark : str , data_path : str , query_path : str , iterations : int , output : str , name : str , query_num : int = None , write_path : str = None ):
2541
2642 # Initialize a SparkSession
@@ -91,9 +107,11 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
91107 df .explain ()
92108
93109 if write_path is not None :
110+ # skip results with empty schema
111+ # coming across for running DDL stmt
94112 if len (df .columns ) > 0 :
95113 output_path = f"{ write_path } /q{ query } "
96- df .coalesce (1 ).write .mode ("overwrite" ).parquet (output_path )
114+ dedup_columns ( df ) .coalesce (1 ).write .mode ("overwrite" ).parquet (output_path )
97115 print (f"Query { query } results written to { output_path } " )
98116 else :
99117 print (f"Skipping write: DataFrame has no schema for { output_path } " )
@@ -135,4 +153,5 @@ def main(benchmark: str, data_path: str, query_path: str, iterations: int, outpu
135153 parser .add_argument ("--write" , required = False , help = "Path to save query results to, in Parquet format." )
136154 args = parser .parse_args ()
137155
138- main (args .benchmark , args .data , args .queries , int (args .iterations ), args .output , args .name , args .query , args .write )
156+ main (args .benchmark , args .data , args .queries , int (args .iterations ), args .output , args .name , args .query , args .write )
157+
0 commit comments