diff --git a/docs/contributing.md b/docs/contributing.md index 6dffbd0..760cc79 100644 --- a/docs/contributing.md +++ b/docs/contributing.md @@ -80,7 +80,7 @@ RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tips.py --data-dir=$(pwd)/../testdata - In the `tpch` directory, use `make_data.py` to create a TPCH dataset at a provided scale factor, then ```bash -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2 +RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --qnum 2 ``` To execute the TPCH query #2. To execute an arbitrary query against the TPCH dataset, provide it with `--query` instead of `--qnum`. This is useful for validating plans that DataFusion Ray will create. @@ -88,7 +88,7 @@ To execute the TPCH query #2. To execute an arbitrary query against the TPCH dat For example, to execute the following query: ```bash -RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpc.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1' +RAY_COLOR_PREFIX=1 RAY_DEDUP_LOGS=0 python tpcbench.py --data=file:///path/to/your/tpch/directory/ --concurrency=2 --batch-size=8182 --worker-pool-min=10 --query 'select c.c_name, sum(o.o_totalprice) as total from orders o inner join customer c on o.o_custkey = c.c_custkey group by c_name limit 1' ``` To further parallelize execution, you can choose how many partitions will be served by each Stage with `--partitions-per-processor`. If this number is less than `--concurrency` Then multiple Actors will host portions of the stage. For example, if there are 10 stages calculated for a query, `concurrency=16` and `partitions-per-processor=4`, then `40` `RayStage` Actors will be created. If `partitions-per-processor=16` or is absent, then `10` `RayStage` Actors will be created. diff --git a/tpch/tpcbench.py b/tpch/tpcbench.py index 16288fb..341ba99 100644 --- a/tpch/tpcbench.py +++ b/tpch/tpcbench.py @@ -31,7 +31,7 @@ def tpch_query(qnum: int) -> str: def main( - qnum: int, + queries: list[(str, str)], data_path: str, concurrency: int, batch_size: int, @@ -99,10 +99,7 @@ def main( if validate: results["validated"] = {} - queries = range(1, 23) if qnum == -1 else [qnum] - for qnum in queries: - sql = tpch_query(qnum) - + for (qid, sql) in queries: statements = list( filter(lambda x: len(x) > 0, map(lambda x: x.strip(), sql.split(";"))) ) @@ -115,7 +112,7 @@ def main( df = ctx.sql(sql) all_batches.append(df.collect()) end_time = time.time() - results["queries"][qnum] = end_time - start_time + results["queries"][qid] = end_time - start_time calculated = "\n".join([prettify(b) for b in all_batches]) print(calculated) @@ -125,8 +122,8 @@ def main( all_batches.append(local.collect_sql(sql)) expected = "\n".join([prettify(b) for b in all_batches]) - results["validated"][qnum] = calculated == expected - print(f"done with query {qnum}") + results["validated"][qid] = calculated == expected + print(f"done with query {qid}") # write the results as we go, so you can peek at them results_dump = json.dumps(results, indent=4) @@ -154,7 +151,10 @@ def main( parser.add_argument( "--concurrency", required=True, help="Number of concurrent tasks" ) - parser.add_argument("--qnum", type=int, default=-1, help="TPCH query number, 1-22") + parser.add_argument("--qnum", type=int, default=-1, + help="TPCH query number, 1-22") + parser.add_argument("--query", required=False, type=str, + help="Custom query to run with tpch tables") parser.add_argument("--listing-tables", action="store_true") parser.add_argument("--validate", action="store_true") parser.add_argument( @@ -186,8 +186,28 @@ def main( args = parser.parse_args() + if (args.qnum != -1 and args.query is not None): + print("Please specify either --qnum or --query, but not both") + exit(1) + + queries = [] + if (args.qnum != -1): + if args.qnum < 1 or args.qnum > 22: + print("Invalid query number. Please specify a number between 1 and 22.") + exit(1) + else: + queries.append((str(args.qnum), tpch_query(args.qnum))) + print("Executing tpch query ", args.qnum) + + elif (args.query is not None): + queries.append(("custom query", args.query)) + print("Executing custom query: ", args.query) + else: + print("Executing all tpch queries") + queries = [(str(i), tpch_query(i)) for i in range(1, 23)] + main( - args.qnum, + queries, args.data, int(args.concurrency), int(args.batch_size),