11import duckdb
22
33import sys
4- import os
5- import multiprocessing
64
75conn = duckdb .connect ()
86
97
10- def make (scale_factor : int , partitions : int , output_path : str , step : int ):
8+ def make (scale_factor : int , output_path : str ):
119 statements = [
1210 "install tpch" ,
1311 "load tpch" ,
12+ f"call dbgen(sf = { scale_factor } )" ,
1413 ]
1514 execute (statements )
1615
17- print (f"step { step } " )
18- sql = f"call dbgen(sf={ scale_factor } , children={ partitions } , step={ step } )"
19- conn .execute (sql )
20- conn .sql ("show tables" ).show ()
21-
2216 statements = []
23-
2417 for row in conn .execute ("show tables" ).fetchall ():
2518 table = row [0 ]
26- os .makedirs (f"{ output_path } /{ table } .parquet" , exist_ok = True )
2719 statements .append (
28- f"copy { table } to '{ output_path } /{ table } .parquet/part { step } .parquet ' (format parquet, compression zstd)"
20+ f"copy { table } to '{ output_path } /{ table } .parquet' (format parquet, compression zstd)"
2921 )
3022 execute (statements )
3123
@@ -37,15 +29,4 @@ def execute(statements):
3729
3830
3931if __name__ == "__main__" :
40- # this is quick and dirty, it should be tidied up with click to process args
41- scale_factor = int (sys .argv [1 ])
42- partitions = int (sys .argv [2 ])
43- data_path = sys .argv [3 ]
44- procs = int (sys .argv [4 ])
45-
46- def go (step ):
47- make (scale_factor , partitions , data_path , step )
48-
49- steps = list (range (partitions ))
50- with multiprocessing .Pool (processes = procs ) as pool :
51- pool .map (go , steps )
32+ make (int (sys .argv [1 ]), sys .argv [2 ])
0 commit comments