Skip to content

Commit 5b941f3

Browse files
committed
Add support for running benchmarks in both single and partitioned modes & add comments
1 parent f853e46 commit 5b941f3

File tree

3 files changed

+56
-25
lines changed

3 files changed

+56
-25
lines changed

daft-parquet/benchmark.sh

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,16 @@
11
#!/bin/bash
22

3+
machine=${1:-"c6a.4xlarge"}
4+
case "$machine" in
5+
"c6a.4xlarge"|"c6a.metal")
6+
machine_name="$machine"
7+
;;
8+
*)
9+
echo "Invalid machine parameter. Allowed: c6a.4xlarge or c6a.metal"
10+
exit 1
11+
;;
12+
esac
13+
314
# Install
415
sudo apt-get update
516
sudo apt-get install -y python3-pip
@@ -8,10 +19,13 @@ pip install --break-system-packages packaging
819
pip install --break-system-packages daft==0.4.9
920

1021
# Use for Daft (Parquet, partitioned)
11-
# seq 0 99 | xargs -P100 -I{} bash -c 'wget --continue https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{}.parquet'
22+
seq 0 99 | xargs -P100 -I{} bash -c 'wget --continue https://datasets.clickhouse.com/hits_compatible/athena_partitioned/hits_{}.parquet'
1223

1324
# Use for Daft (Parquet, single)
1425
wget --continue https://datasets.clickhouse.com/hits_compatible/athena/hits.parquet
1526

1627
# Run the queries
17-
./run.sh 2>&1 | tee daft_log.txt
28+
for mode in partitioned single; do
29+
echo "Running $mode mode..."
30+
./run.sh $machine_name $mode 2>&1 | tee "daft_log_${mode}.txt"
31+
done

daft-parquet/query.py

Lines changed: 33 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,13 +1,21 @@
11
#!/usr/bin/env python3
22

33
import daft
4+
import os
5+
import sys
46
import timeit
57
import traceback
68
import pandas as pd
79
from daft import col, DataType, TimeUnit
8-
from pathlib import Path
910

1011
hits = None
12+
current_dir = os.path.dirname(os.path.abspath(__file__))
13+
query_idx = int(sys.argv[1]) - 1
14+
is_single_mode = len(sys.argv) > 2 and sys.argv[2] == "single"
15+
parquet_path = os.path.join(
16+
current_dir,
17+
"hits.parquet" if is_single_mode else "hits_*.parquet"
18+
)
1119

1220
with open("queries.sql") as f:
1321
sql_list = [q.strip() for q in f.read().split(';') if q.strip()]
@@ -21,8 +29,17 @@ def daft_offset(df, start ,end):
2129
for idx, sql in enumerate(sql_list):
2230
query_entry = {"sql": sql}
2331

24-
if idx+1 in [19, 36, 43]:
25-
if idx+1 == 19:
32+
# Current limitations and workarounds for Daft execution:
33+
34+
# 1. Queries q18, q35, q42 require manual API workarounds:
35+
# - q18: The function `extract(minute FROM EventTime)` causes an error:
36+
# `expected input to minute to be temporal, got UInt32`.
37+
# - q35: Error is `duplicate field name ClientIP in the schema`.
38+
# Attempts to alias the column in SQL but still failed.
39+
# - q42: The function `DATE_TRUNC('minute', EventTime)` causes an error:
40+
# `Unsupported SQL: Function date_trunc not found`.
41+
if idx in [18, 35, 42]:
42+
if idx == 18:
2643
query_entry["lambda"] = lambda: (
2744
hits.with_column("m", col("EventTime").dt.minute())
2845
.groupby("UserID", "m", "SearchPhrase")
@@ -31,7 +48,7 @@ def daft_offset(df, start ,end):
3148
.limit(10)
3249
.select("UserID", "m", "SearchPhrase", "COUNT(*)")
3350
)
34-
elif idx+1 == 36:
51+
elif idx == 35:
3552
query_entry["lambda"] = lambda: (
3653
hits.groupby(
3754
"ClientIP",
@@ -43,7 +60,7 @@ def daft_offset(df, start ,end):
4360
.limit(10)
4461
.select("ClientIP", "ClientIP - 1", "ClientIP - 2", "ClientIP - 3", "c")
4562
)
46-
elif idx+1 == 43:
63+
elif idx == 42:
4764
query_entry["lambda"] = lambda: (
4865
hits.with_column("M", col("EventTime").dt.truncate("1 minute"))
4966
.where("CounterID = 62 AND EventDate >= '2013-07-14' AND EventDate <= '2013-07-15' AND IsRefresh = 0 AND DontCountHits = 0")
@@ -54,16 +71,19 @@ def daft_offset(df, start ,end):
5471
.select("M", "PageViews")
5572
)
5673

57-
if 39 <= idx+1 <= 43:
58-
if idx+1 == 39:
74+
# 2. OFFSET operator not supported in Daft:
75+
# For queries q38, q39, q40, q41, q42, after executing the query,
76+
# manually implement the `OFFSET` truncation logic via the API
77+
if 38 <= idx <= 42:
78+
if idx == 38:
5979
query_entry["extra_api"] = lambda df: daft_offset(df, 1000, 1010)
60-
elif idx+1 == 40:
80+
elif idx == 39:
6181
query_entry["extra_api"] = lambda df: daft_offset(df, 1000, 1010)
62-
elif idx+1 == 41:
82+
elif idx == 40:
6383
query_entry["extra_api"] = lambda df: daft_offset(df, 100, 110)
64-
elif idx+1 == 42:
84+
elif idx == 41:
6585
query_entry["extra_api"] = lambda df: daft_offset(df, 10000, 10010)
66-
elif idx+1 == 43:
86+
elif idx == 42:
6787
query_entry["extra_api"] = lambda df: daft_offset(df, 1000, 1010)
6888

6989
queries.append(query_entry)
@@ -74,12 +94,7 @@ def run_single_query(query, i):
7494

7595
global hits
7696
if hits is None:
77-
# Use for Daft (Parquet, partitioned)
78-
# Use absolute path when using wildcards
79-
# hits = daft.read_parquet("/path/to/hits_*.parquet")
80-
81-
# Use for Daft (Parquet, single)
82-
hits = daft.read_parquet("hits.parquet")
97+
hits = daft.read_parquet(parquet_path)
8398
hits = hits.with_column("EventTime", col("EventTime").cast(daft.DataType.timestamp("s")))
8499
hits = hits.with_column("EventDate", col("EventDate").cast(daft.DataType.date()))
85100
hits = hits.with_column("URL", col("URL").decode("utf-8"))
@@ -104,13 +119,11 @@ def run_single_query(query, i):
104119

105120
return run_time
106121
except Exception as e:
107-
print(f"Error executing query {query_idx+1}: {str(e)[:100]}", file=sys.stderr)
122+
print(f"Error executing query {query_idx}: {str(e)[:100]}", file=sys.stderr)
108123
traceback.print_exc()
109124
return None
110125

111126
if __name__ == "__main__":
112-
import sys
113-
query_idx = int(sys.argv[1]) - 1
114127
query = queries[query_idx]
115128

116129
times = []

daft-parquet/run.sh

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,12 @@
11
#!/bin/bash
22

3+
machine_name=${1}
4+
mode=${2}
5+
full_machine="${1}, 500gb gp2"
6+
37
TRIES=3
48
QUERY_COUNT=43
5-
RESULT_FILE="results/c6a.metal.json"
9+
RESULT_FILE="results/${machine_name}.${mode}.json"
610
FILE_SIZE=$(wc -c < hits.parquet | awk '{print $1}')
711

812
declare -a results=()
@@ -19,7 +23,7 @@ for ((q=1; q<=QUERY_COUNT; q++)); do
1923
sync
2024
echo 3 | sudo tee /proc/sys/vm/drop_caches > /dev/null
2125

22-
output=$(python3 query.py $q 2>&1)
26+
output=$(python3 query.py $q $mode 2>&1)
2327
IFS=',' read -r t1 t2 t3 <<< "$(echo "$output" | tail -1)"
2428

2529
results[$((q-1))]="[${t1:-null},${t2:-null},${t3:-null}]"
@@ -28,7 +32,7 @@ done
2832
echo '{
2933
"system": "Daft",
3034
"date": "'$(date +%Y-%m-%d)'",
31-
"machine": "c6a.4xlarge, 500gb gp2",
35+
"machine": "'$full_machine'",
3236
"cluster_size": 1,
3337
"comment": "",
3438
"tags": [

0 commit comments

Comments
 (0)