Skip to content

Commit a094df7

Browse files
committed
Fix query engine edge cases, NT scaling regressions, and add JIT warmup
Edge cases fixed: - E1: Multi-column dict-encoded join key unification (silent wrong results) - E2: HAVING alias fallback uses last-index-of to handle underscored names - E3: OR/IN predicates only materialize referenced columns, not all Code quality (O1-O10): - Extract shared column helpers into query/columns.clj (DRY) - Extract partition-sizes/boundaries helpers in window.clj - Extract date-trunc/date-add helpers in expression.clj - Remove dead bindings, stale comments, NTILE dead code - Add ColumnOpsVar.java for JIT-isolated VARIANCE/CORR group-by NT scaling fixes: - Window functions respect PARALLEL_THRESHOLD with avgPartSize<8 guard - LIKE broadcast loop made sequential (FJP overhead dominated) - Expression pre-computation moved inside group-by fallback path - Benchmark WIN/DS execution order swapped for fair JIT warmup JIT warmup for production safety: - q/jit-warmup! exercises all 11 Java code paths with tiny data (0.2s) - Called on server startup and before benchmark tiers - Prevents deoptimization cliffs from mixed analytical workloads
1 parent 9a369df commit a094df7

File tree

14 files changed

+921
-637
lines changed

14 files changed

+921
-637
lines changed

CLAUDE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -56,6 +56,7 @@ After modifying Java files in `src-java/`:
5656
javac --add-modules jdk.incubator.vector -d target/classes \
5757
src-java/stratum/internal/ColumnOps.java \
5858
src-java/stratum/internal/ColumnOpsExt.java \
59+
src-java/stratum/internal/ColumnOpsVar.java \
5960
src-java/stratum/internal/ColumnOpsChunked.java \
6061
src-java/stratum/internal/ColumnOpsChunkedSimd.java \
6162
src-java/stratum/internal/ColumnOpsAnalytics.java

bench/olap_bench.clj

Lines changed: 11 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2344,8 +2344,8 @@
23442344
(let [q {:from {:orderkey (:orderkey data) :linenumber (:linenumber data) :price (:price data)}
23452345
:window [{:op :row-number :partition-by [:orderkey] :order-by [[:linenumber :asc]] :as :rn}]}
23462346
qc (assoc q :result :columns)
2347-
r (bench #(q/q qc))
23482347
r-1t (bench-1t #(q/q qc))
2348+
r (bench #(q/q qc))
23492349
cr (q/q qc)
23502350
n (:n-rows cr)]
23512351
(println (format " Stratum 1T: %s (%d rows)" (fmt-ms (:median r-1t)) n))
@@ -2367,8 +2367,8 @@
23672367
(let [q {:from {:orderkey (:orderkey data) :linenumber (:linenumber data) :price (:price data)}
23682368
:window [{:op :lag :col :price :offset 1 :partition-by [:orderkey] :order-by [[:linenumber :asc]] :as :prev_price}]}
23692369
qc (assoc q :result :columns)
2370-
r (bench #(q/q qc))
23712370
r-1t (bench-1t #(q/q qc))
2371+
r (bench #(q/q qc))
23722372
cr (q/q qc)
23732373
n (:n-rows cr)]
23742374
(println (format " Stratum 1T: %s (%d rows)" (fmt-ms (:median r-1t)) n))
@@ -2390,8 +2390,8 @@
23902390
(let [q {:from {:orderkey (:orderkey data) :shipdate (:shipdate data) :price (:price data)}
23912391
:window [{:op :sum :col :price :partition-by [:orderkey] :order-by [[:shipdate :asc]] :as :running_sum}]}
23922392
qc (assoc q :result :columns)
2393-
r (bench #(q/q qc))
23942393
r-1t (bench-1t #(q/q qc))
2394+
r (bench #(q/q qc))
23952395
cr (q/q qc)
23962396
n (:n-rows cr)]
23972397
(println (format " Stratum 1T: %s (%d rows)" (fmt-ms (:median r-1t)) n))
@@ -2493,8 +2493,8 @@
24932493
(let [q {:from (dissoc data :n)
24942494
:window [{:op :row-number :partition-by [:ss_store_sk] :order-by [[:ss_net_profit :asc]] :as :rn}]}
24952495
qc (assoc q :result :columns)
2496-
r (bench #(q/q qc) :warmup 10 :iters 5)
24972496
r-1t (bench-1t #(q/q qc) :warmup 10 :iters 5)
2497+
r (bench #(q/q qc) :warmup 10 :iters 5)
24982498
cr (q/q qc)
24992499
n (:n-rows cr)]
25002500
(println (format " Stratum 1T: %s (%d rows)" (fmt-ms (:median r-1t)) n))
@@ -2622,14 +2622,20 @@
26222622
(when (seq query-args) (println (format " Queries: %s" (str/join ", " query-args))))
26232623
(println "================================================================"))
26242624

2625+
;; JIT warmup — exercise all Java hot paths so cross-tier deoptimization
2626+
;; doesn't distort benchmark results.
2627+
(let [t0 (System/nanoTime)]
2628+
(q/jit-warmup!)
2629+
(println (format "\nJIT warmup: %.1fs" (/ (- (System/nanoTime) t0) 1e9))))
2630+
26252631
(let [all-results (atom {})]
26262632

26272633
;; === Tier 1: TPC-H / SSB (existing B1-B6) ===
26282634
;; Sort key: :shipdate (primary filter column for range predicates)
26292635
(when (run-tier? "t1" "1" "tpch" "ssb")
26302636
(println "\n=== Tier 1: TPC-H / SSB ===")
26312637
(let [arrays (generate-arrays n)
2632-
conn (duckdb-setup-lineitem n)]
2638+
^Connection conn (duckdb-setup-lineitem n)]
26332639
(try
26342640
(doseq [m modes]
26352641
(println (format "\n --- Input mode: %s ---" (name m)))

src-java/stratum/internal/ColumnOpsAnalytics.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1048,7 +1048,7 @@ public static int[] windowArgSort(long[] partKeys, Object[] orderKeys,
10481048
pos[p]++;
10491049
}
10501050

1051-
if (nParts >= 2 && length > 100_000) {
1051+
if (nParts >= 2 && length > ColumnOps.PARALLEL_THRESHOLD) {
10521052
// Batch partitions into nThreads coarse tasks (not one task per partition)
10531053
int nThreads = Math.min(POOL.getParallelism(), nParts);
10541054
int partsPerThread = (nParts + nThreads - 1) / nThreads;
@@ -1315,10 +1315,17 @@ private static int compareMulti(Object[] orderKeys, boolean[] orderDirs, int a,
13151315
* Compute ROW_NUMBER window function over pre-sorted indices.
13161316
*/
13171317
public static double[] windowRowNumber(long[] partKeys, int[] sortedIndices, int length) {
1318-
if (length < 200_000) return windowRowNumberSeq(partKeys, sortedIndices, length);
1318+
if (length < ColumnOps.PARALLEL_THRESHOLD) return windowRowNumberSeq(partKeys, sortedIndices, length);
13191319
// Find partition boundaries in sorted order
13201320
int[] partStarts = findPartitionBoundaries(partKeys, sortedIndices, length);
13211321
int nParts = partStarts.length - 1;
1322+
// Only parallelize when partitions are large enough to amortize FJP overhead.
1323+
// With many tiny partitions (avg < 8 rows), the per-element work is trivial
1324+
// and parallel random writes to result[] cause cache contention.
1325+
// Threshold 8 keeps WIN benchmarks (avg 4 rows) sequential while allowing
1326+
// H2O-Q8 (avg 60 rows) to benefit from parallelism.
1327+
long avgPartSize = length / Math.max(nParts, 1);
1328+
if (avgPartSize < 8) return windowRowNumberSeq(partKeys, sortedIndices, length);
13221329
double[] result = new double[length];
13231330
int nThreads = Math.min(POOL.getParallelism(), Math.max(1, nParts));
13241331
int partsPerThread = (nParts + nThreads - 1) / nThreads;
@@ -1381,10 +1388,13 @@ private static int[] findPartitionBoundaries(long[] partKeys, int[] sortedIndice
13811388
public static double[] windowLagLead(long[] partKeys, int[] sortedIndices,
13821389
Object values, int length,
13831390
int offset, double defaultVal, boolean isLead) {
1384-
if (length < 200_000) return windowLagLeadSeq(partKeys, sortedIndices, values, length, offset, defaultVal, isLead);
1391+
if (length < ColumnOps.PARALLEL_THRESHOLD) return windowLagLeadSeq(partKeys, sortedIndices, values, length, offset, defaultVal, isLead);
13851392

13861393
int[] partStarts = findPartitionBoundaries(partKeys, sortedIndices, length);
13871394
int nParts = partStarts.length - 1;
1395+
long avgPartSize = length / Math.max(nParts, 1);
1396+
if (avgPartSize < 8) return windowLagLeadSeq(partKeys, sortedIndices, values, length, offset, defaultVal, isLead);
1397+
13881398
double[] result = new double[length];
13891399
boolean isLong = values instanceof long[];
13901400
boolean isDouble = values instanceof double[];
@@ -1459,10 +1469,13 @@ private static double[] windowLagLeadSeq(long[] partKeys, int[] sortedIndices,
14591469
*/
14601470
public static double[] windowRunningSum(long[] partKeys, int[] sortedIndices,
14611471
Object values, int length) {
1462-
if (length < 200_000) return windowRunningSumSeq(partKeys, sortedIndices, values, length);
1472+
if (length < ColumnOps.PARALLEL_THRESHOLD) return windowRunningSumSeq(partKeys, sortedIndices, values, length);
14631473

14641474
int[] partStarts = findPartitionBoundaries(partKeys, sortedIndices, length);
14651475
int nParts = partStarts.length - 1;
1476+
long avgPartSize = length / Math.max(nParts, 1);
1477+
if (avgPartSize < 8) return windowRunningSumSeq(partKeys, sortedIndices, values, length);
1478+
14661479
double[] result = new double[length];
14671480
boolean isLong = values instanceof long[];
14681481
boolean isDouble = values instanceof double[];

0 commit comments

Comments
 (0)