From 5bdf35d51280bdcacaef83152fd812d67e15cc67 Mon Sep 17 00:00:00 2001 From: Nipun Batra <83050981+nipunbatra8@users.noreply.github.com> Date: Tue, 12 Aug 2025 10:25:58 -0700 Subject: [PATCH 1/8] Update Storm Configuration --- src/main/perf/IndexThreads.java | 13 +++++----- src/main/perf/Indexer.java | 2 +- src/main/perf/NRTPerfTest.java | 40 +++++++++++++++++++++++++++---- src/main/perf/SearchPerfTest.java | 2 +- src/main/perf/TaskThreads.java | 13 ++++++---- 5 files changed, 54 insertions(+), 16 deletions(-) diff --git a/src/main/perf/IndexThreads.java b/src/main/perf/IndexThreads.java index f68a028be..6cfcd5a36 100644 --- a/src/main/perf/IndexThreads.java +++ b/src/main/perf/IndexThreads.java @@ -50,7 +50,7 @@ public enum Mode { UPDATE, ADD, NDV_UPDATE, BDV_UPDATE } final AtomicLong lastRefreshNS; public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit, - boolean addGroupingFields, boolean printDPS, Mode mode, float docsPerSecPerThread, UpdatesListener updatesListener, + boolean addGroupingFields, boolean printDPS, Mode mode, java.util.concurrent.atomic.AtomicReference docsPerSecPerThreadRef, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) throws IOException, InterruptedException { final AtomicInteger groupBlockIndex; @@ -77,7 +77,7 @@ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, for(int thread=0;thread docsPerSecRef; private final Random random; private final AtomicBoolean failed; private final UpdatesListener updatesListener; @@ -151,7 +151,7 @@ private static class IndexThread extends Thread { final int randomDocIDMax; public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stopLatch, IndexWriter w, LineFileDocs docs, int numTotalDocs, AtomicInteger count, Mode mode, AtomicInteger groupBlockIndex, - AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, float docsPerSec, + AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, java.util.concurrent.atomic.AtomicReference docsPerSecRef, AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) { this.startLatch = startLatch; this.stopLatch = stopLatch; @@ -162,7 +162,7 @@ public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stop this.mode = mode; this.groupBlockIndex = groupBlockIndex; this.stop = stop; - this.docsPerSec = docsPerSec; + this.docsPerSecRef = docsPerSecRef; this.random = random; this.failed = failed; this.updatesListener = updatesListener; @@ -329,7 +329,7 @@ public void remove() { docState.doc.removeField("groupend"); } - } else if (docsPerSec > 0 && mode != null) { + } else if (docsPerSecRef.get() > 0 && mode != null) { System.out.println("Indexing single docs (add/updateDocument)"); final long startNS = System.nanoTime(); int threadCount = 0; @@ -382,6 +382,7 @@ public void remove() { System.out.println("Indexer: " + docCount + " docs... (" + (System.currentTimeMillis() - tStart) + " msec)"); } + double docsPerSec = docsPerSecRef.get(); final long sleepNS = startNS + (long) (1000000000*(threadCount/docsPerSec)) - System.nanoTime(); if (sleepNS > 0) { final long sleepMS = sleepNS/1000000; diff --git a/src/main/perf/Indexer.java b/src/main/perf/Indexer.java index 966fa795d..02b15d030 100644 --- a/src/main/perf/Indexer.java +++ b/src/main/perf/Indexer.java @@ -573,7 +573,7 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) { float docsPerSecPerThread = -1f; //float docsPerSecPerThread = 100f; - IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, docsPerSecPerThread, null, nrtEverySec, + IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, new java.util.concurrent.atomic.AtomicReference<>((double)docsPerSecPerThread), null, nrtEverySec, randomDocIDMax); System.out.println("\nIndexer: start"); diff --git a/src/main/perf/NRTPerfTest.java b/src/main/perf/NRTPerfTest.java index a892dd7c5..d9b8703e9 100644 --- a/src/main/perf/NRTPerfTest.java +++ b/src/main/perf/NRTPerfTest.java @@ -21,9 +21,12 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; +import java.io.PrintStream; +import java.io.FileOutputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; +import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Random; @@ -331,6 +334,11 @@ public DocValuesFormat getDocValuesFormatForField(String field) { tmp.setMaxMergedSegmentMB(1000000.0); //tmp.setReclaimDeletesWeight(3.0); //tmp.setMaxMergedSegmentMB(7000.0); + + // AGGRESSIVE UPDATE STORM SETTINGS: + // Allow more deletes before forcing merges + tmp.setDeletesPctAllowed(2.0); + conf.setMergePolicy(tmp); if (!commit.equals("none")) { @@ -339,10 +347,13 @@ public DocValuesFormat getDocValuesFormatForField(String field) { // Make sure merges run @ higher prio than indexing: final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) conf.getMergeScheduler(); - cms.setMaxMergesAndThreads(4, 1); + // cms.setMaxMergesAndThreads(4, 1); conf.setMergedSegmentWarmer(new MergedReaderWarmer(field)); + // Set infoStream to log to file + PrintStream infoStream = new PrintStream(new FileOutputStream("lucene-infostream.log", true), true, "UTF-8"); + conf.setInfoStream(infoStream); final IndexWriter w = new IndexWriter(dir, conf); // w.setInfoStream(System.out); @@ -360,8 +371,9 @@ public void afterUpdate() { } }; IndexWriter.DocStats stats = w.getDocStats(); + final AtomicReference docsPerSecRef = new AtomicReference<>(docsPerSec / numIndexThreads); IndexThreads indexThreads = new IndexThreads(random, w, new AtomicBoolean(false), docs, numIndexThreads, -1, false, false, mode, - (float) (docsPerSec / numIndexThreads), updatesListener, -1.0, stats.maxDoc); + docsPerSecRef, updatesListener, -1.0, stats.maxDoc); // NativePosixUtil.mlockTermsDict(startR, "id"); final SearcherManager manager = new SearcherManager(w, null); @@ -373,15 +385,35 @@ public void afterUpdate() { } final DirectSpellChecker spellChecker = new DirectSpellChecker(); - final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, null); + final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, new HashMap<>()); TaskParserFactory taskParserFactory = new TaskParserFactory(indexState, field, analyzer, field, 10, random, null, null, -1, true, TestContext.parse("")); - final TaskSource tasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) { + // Periodically increase docsPerSec + Thread docsPerSecIncreaser = new Thread(() -> { + try { + int increaseCount = 0; + final int maxIncreases = 8; + while (increaseCount < maxIncreases) { + Thread.sleep(20000); // every 20 seconds + double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2); + System.out.println("Increased docsPerSec per thread to " + newRate); + increaseCount++; + } + System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode"); + } catch (InterruptedException e) { + // exit thread + } + }); + docsPerSecIncreaser.setDaemon(true); + docsPerSecIncreaser.start(); + // Aggressive delete storm task source + final TaskSource baseTasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) { @Override public void taskDone(Task task, long queueTimeNS, TotalHits toalHitCount) { searchesByTime[currentQT.get()].incrementAndGet(); } }; + final TaskSource tasks = baseTasks; System.out.println("Task repeat count 1"); System.out.println("Tasks file " + tasksFile); System.out.println("Num task per cat 20"); diff --git a/src/main/perf/SearchPerfTest.java b/src/main/perf/SearchPerfTest.java index fd8310412..57264becd 100755 --- a/src/main/perf/SearchPerfTest.java +++ b/src/main/perf/SearchPerfTest.java @@ -416,7 +416,7 @@ public void warm(LeafReader reader) throws IOException { // hardwired false: boolean addDVFields = mode == Mode.BDV_UPDATE || mode == Mode.NDV_UPDATE; LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null); - IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, docsPerSecPerThread, null, -1.0, -1); + IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new java.util.concurrent.atomic.AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1); threads.start(); mgr = new SearcherManager(writer, new SearcherFactory() { diff --git a/src/main/perf/TaskThreads.java b/src/main/perf/TaskThreads.java index 450db80b1..8e1c17d8b 100644 --- a/src/main/perf/TaskThreads.java +++ b/src/main/perf/TaskThreads.java @@ -115,21 +115,26 @@ public void run() { // Run the task in the IndexSearcher's executor. This is important because IndexSearcher#search also uses the current thread to // search, so not running #search from the executor would artificially use one more thread than configured via luceneutil. // We're counting time within the task to not include forking time for the top-level search in the reported time. + final Task clonedTask = task.clone(); executor.submit(() -> { - task.startTimeNanos = System.nanoTime(); + clonedTask.startTimeNanos = System.nanoTime(); try { - task.go(indexState, taskParser); + clonedTask.go(indexState, taskParser); } catch (IOException ioe) { throw new RuntimeException(ioe); } try { - tasks.taskDone(task, task.startTimeNanos-task.recvTimeNS, task.totalHitCount); + tasks.taskDone(clonedTask, clonedTask.startTimeNanos-clonedTask.recvTimeNS, clonedTask.totalHitCount); } catch (Exception e) { System.out.println(Thread.currentThread().getName() + ": ignoring exc:"); e.printStackTrace(); } - task.runTimeNanos = System.nanoTime()-task.startTimeNanos; + clonedTask.runTimeNanos = System.nanoTime()-clonedTask.startTimeNanos; }).get(); + + // Copy results back to original task for reporting + task.totalHitCount = clonedTask.totalHitCount; + task.runTimeNanos = clonedTask.runTimeNanos; task.threadID = threadID; } From 2aae0edd5dc8f61a2b98bfe0f5fdc752e3f192cc Mon Sep 17 00:00:00 2001 From: Nipun Batra <83050981+nipunbatra8@users.noreply.github.com> Date: Tue, 12 Aug 2025 11:26:56 -0700 Subject: [PATCH 2/8] nocommit --- src/main/perf/NRTPerfTest.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/main/perf/NRTPerfTest.java b/src/main/perf/NRTPerfTest.java index d9b8703e9..28f7e9544 100644 --- a/src/main/perf/NRTPerfTest.java +++ b/src/main/perf/NRTPerfTest.java @@ -337,6 +337,7 @@ public DocValuesFormat getDocValuesFormatForField(String field) { // AGGRESSIVE UPDATE STORM SETTINGS: // Allow more deletes before forcing merges + // nocommit tmp.setDeletesPctAllowed(2.0); conf.setMergePolicy(tmp); @@ -347,6 +348,7 @@ public DocValuesFormat getDocValuesFormatForField(String field) { // Make sure merges run @ higher prio than indexing: final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) conf.getMergeScheduler(); + // Can swap to your own MergeScheduler impl // cms.setMaxMergesAndThreads(4, 1); conf.setMergedSegmentWarmer(new MergedReaderWarmer(field)); From 5d69dfe2c3f1e861ddf346db06a55cb58f392c04 Mon Sep 17 00:00:00 2001 From: Nipun Batra <83050981+nipunbatra8@users.noreply.github.com> Date: Wed, 13 Aug 2025 13:43:33 -0700 Subject: [PATCH 3/8] fix imports and remove unnecessary things --- src/main/perf/IndexThreads.java | 10 +++++++--- src/main/perf/Indexer.java | 3 ++- src/main/perf/NRTPerfTest.java | 5 ++--- src/main/perf/SearchPerfTest.java | 2 +- src/main/perf/TaskThreads.java | 20 +++++++++----------- 5 files changed, 21 insertions(+), 19 deletions(-) diff --git a/src/main/perf/IndexThreads.java b/src/main/perf/IndexThreads.java index 6cfcd5a36..5f4d819b9 100644 --- a/src/main/perf/IndexThreads.java +++ b/src/main/perf/IndexThreads.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -49,8 +50,11 @@ public enum Mode { UPDATE, ADD, NDV_UPDATE, BDV_UPDATE } final AtomicBoolean refreshing; final AtomicLong lastRefreshNS; + /** + * @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads. + */ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit, - boolean addGroupingFields, boolean printDPS, Mode mode, java.util.concurrent.atomic.AtomicReference docsPerSecPerThreadRef, UpdatesListener updatesListener, + boolean addGroupingFields, boolean printDPS, Mode mode, AtomicReference docsPerSecPerThreadRef, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) throws IOException, InterruptedException { final AtomicInteger groupBlockIndex; @@ -141,7 +145,7 @@ private static class IndexThread extends Thread { private final Mode mode; private final CountDownLatch startLatch; private final CountDownLatch stopLatch; - private final java.util.concurrent.atomic.AtomicReference docsPerSecRef; + private final AtomicReference docsPerSecRef; private final Random random; private final AtomicBoolean failed; private final UpdatesListener updatesListener; @@ -151,7 +155,7 @@ private static class IndexThread extends Thread { final int randomDocIDMax; public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stopLatch, IndexWriter w, LineFileDocs docs, int numTotalDocs, AtomicInteger count, Mode mode, AtomicInteger groupBlockIndex, - AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, java.util.concurrent.atomic.AtomicReference docsPerSecRef, + AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, AtomicReference docsPerSecRef, AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) { this.startLatch = startLatch; this.stopLatch = stopLatch; diff --git a/src/main/perf/Indexer.java b/src/main/perf/Indexer.java index 02b15d030..9e1bc409c 100644 --- a/src/main/perf/Indexer.java +++ b/src/main/perf/Indexer.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.CharArraySet; @@ -573,7 +574,7 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) { float docsPerSecPerThread = -1f; //float docsPerSecPerThread = 100f; - IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, new java.util.concurrent.atomic.AtomicReference<>((double)docsPerSecPerThread), null, nrtEverySec, + IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, new AtomicReference<>((double)docsPerSecPerThread), null, nrtEverySec, randomDocIDMax); System.out.println("\nIndexer: start"); diff --git a/src/main/perf/NRTPerfTest.java b/src/main/perf/NRTPerfTest.java index 28f7e9544..ec76388fa 100644 --- a/src/main/perf/NRTPerfTest.java +++ b/src/main/perf/NRTPerfTest.java @@ -26,7 +26,6 @@ import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; -import java.util.HashMap; import java.util.List; import java.util.Locale; import java.util.Random; @@ -387,14 +386,14 @@ public void afterUpdate() { } final DirectSpellChecker spellChecker = new DirectSpellChecker(); - final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, new HashMap<>()); + final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, null); TaskParserFactory taskParserFactory = new TaskParserFactory(indexState, field, analyzer, field, 10, random, null, null, -1, true, TestContext.parse("")); // Periodically increase docsPerSec Thread docsPerSecIncreaser = new Thread(() -> { try { int increaseCount = 0; - final int maxIncreases = 8; + int maxIncreases = 8; while (increaseCount < maxIncreases) { Thread.sleep(20000); // every 20 seconds double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2); diff --git a/src/main/perf/SearchPerfTest.java b/src/main/perf/SearchPerfTest.java index 57264becd..a36a66c3c 100755 --- a/src/main/perf/SearchPerfTest.java +++ b/src/main/perf/SearchPerfTest.java @@ -416,7 +416,7 @@ public void warm(LeafReader reader) throws IOException { // hardwired false: boolean addDVFields = mode == Mode.BDV_UPDATE || mode == Mode.NDV_UPDATE; LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null); - IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new java.util.concurrent.atomic.AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1); + IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1); threads.start(); mgr = new SearcherManager(writer, new SearcherFactory() { diff --git a/src/main/perf/TaskThreads.java b/src/main/perf/TaskThreads.java index 8e1c17d8b..b0e56a860 100644 --- a/src/main/perf/TaskThreads.java +++ b/src/main/perf/TaskThreads.java @@ -103,38 +103,36 @@ public void run() { try { while (stop.get() == false) { - final Task task = tasks.nextTask(); - if (task == null) { + final Task originalTask = tasks.nextTask(); + if (originalTask == null) { // Done this.tasksStopNanos = System.nanoTime(); // first thread that finishes snapshots all threads. this way we do not include "winddown" time in our measurement. endThreadDetails.compareAndSet(null, new SearchPerfTest.ThreadDetails()); break; } + + // Clone the task to avoid reuse issues + final Task task = originalTask.clone(); // Run the task in the IndexSearcher's executor. This is important because IndexSearcher#search also uses the current thread to // search, so not running #search from the executor would artificially use one more thread than configured via luceneutil. // We're counting time within the task to not include forking time for the top-level search in the reported time. - final Task clonedTask = task.clone(); executor.submit(() -> { - clonedTask.startTimeNanos = System.nanoTime(); + task.startTimeNanos = System.nanoTime(); try { - clonedTask.go(indexState, taskParser); + task.go(indexState, taskParser); } catch (IOException ioe) { throw new RuntimeException(ioe); } try { - tasks.taskDone(clonedTask, clonedTask.startTimeNanos-clonedTask.recvTimeNS, clonedTask.totalHitCount); + tasks.taskDone(task, task.startTimeNanos-task.recvTimeNS, task.totalHitCount); } catch (Exception e) { System.out.println(Thread.currentThread().getName() + ": ignoring exc:"); e.printStackTrace(); } - clonedTask.runTimeNanos = System.nanoTime()-clonedTask.startTimeNanos; + task.runTimeNanos = System.nanoTime()-task.startTimeNanos; }).get(); - - // Copy results back to original task for reporting - task.totalHitCount = clonedTask.totalHitCount; - task.runTimeNanos = clonedTask.runTimeNanos; task.threadID = threadID; } From fcab7291d22d59ba2179b6c37712229a50c1aeba Mon Sep 17 00:00:00 2001 From: Nipun Batra <83050981+nipunbatra8@users.noreply.github.com> Date: Mon, 18 Aug 2025 16:26:58 -0700 Subject: [PATCH 4/8] loop war-peace time cycle --- src/main/perf/NRTPerfTest.java | 25 ++++++++++++++----------- 1 file changed, 14 insertions(+), 11 deletions(-) diff --git a/src/main/perf/NRTPerfTest.java b/src/main/perf/NRTPerfTest.java index ec76388fa..8625d8dc2 100644 --- a/src/main/perf/NRTPerfTest.java +++ b/src/main/perf/NRTPerfTest.java @@ -391,18 +391,21 @@ public void afterUpdate() { new TaskParserFactory(indexState, field, analyzer, field, 10, random, null, null, -1, true, TestContext.parse("")); // Periodically increase docsPerSec Thread docsPerSecIncreaser = new Thread(() -> { - try { - int increaseCount = 0; - int maxIncreases = 8; - while (increaseCount < maxIncreases) { - Thread.sleep(20000); // every 20 seconds - double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2); - System.out.println("Increased docsPerSec per thread to " + newRate); - increaseCount++; + while (true) { + try { + int increaseCount = 0; + int maxIncreases = 8; + while (increaseCount < maxIncreases) { + Thread.sleep(20000); // every 20 seconds + double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2); + System.out.println("Increased docsPerSec per thread to " + newRate); + increaseCount++; + } + System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode"); + Thread.sleep(300000); // 5 minutes of peace time + } catch (InterruptedException e) { + // exit thread } - System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode"); - } catch (InterruptedException e) { - // exit thread } }); docsPerSecIncreaser.setDaemon(true); From 0c09bea61309823a5110bb7ad2066d48f30b43d3 Mon Sep 17 00:00:00 2001 From: Nipun Batra <83050981+nipunbatra8@users.noreply.github.com> Date: Mon, 25 Aug 2025 17:18:31 -0700 Subject: [PATCH 5/8] Opt-in option for update storms --- src/main/perf/IndexThreads.java | 27 +++++++-- src/main/perf/Indexer.java | 2 +- src/main/perf/NRTPerfTest.java | 95 +++++++++++++++++++++---------- src/main/perf/SearchPerfTest.java | 2 +- 4 files changed, 90 insertions(+), 36 deletions(-) diff --git a/src/main/perf/IndexThreads.java b/src/main/perf/IndexThreads.java index 5f4d819b9..744277939 100644 --- a/src/main/perf/IndexThreads.java +++ b/src/main/perf/IndexThreads.java @@ -49,17 +49,19 @@ public enum Mode { UPDATE, ADD, NDV_UPDATE, BDV_UPDATE } final Thread[] threads; final AtomicBoolean refreshing; final AtomicLong lastRefreshNS; + final boolean enableUpdateStorms; /** * @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads. */ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit, boolean addGroupingFields, boolean printDPS, Mode mode, AtomicReference docsPerSecPerThreadRef, UpdatesListener updatesListener, - double nrtEverySec, int randomDocIDMax) + double nrtEverySec, int randomDocIDMax, boolean enableUpdateStorms) throws IOException, InterruptedException { final AtomicInteger groupBlockIndex; this.docs = lineFileDocs; + this.enableUpdateStorms = enableUpdateStorms; if (addGroupingFields) { IndexThread.group100 = randomStrings(100, random); IndexThread.group10K = randomStrings(10000, random); @@ -82,7 +84,7 @@ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, for(int thread=0;thread docsPerSecRef, - AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) { + AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax, boolean enableUpdateStorms) { this.startLatch = startLatch; this.stopLatch = stopLatch; this.w = w; @@ -174,6 +177,7 @@ public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stop this.lastRefreshNS = lastRefreshNS; this.nrtEverySec = nrtEverySec; this.randomDocIDMax = randomDocIDMax; + this.enableUpdateStorms = enableUpdateStorms; } private static Field getStringIDField(Document doc) { @@ -389,8 +393,21 @@ public void remove() { double docsPerSec = docsPerSecRef.get(); final long sleepNS = startNS + (long) (1000000000*(threadCount/docsPerSec)) - System.nanoTime(); if (sleepNS > 0) { - final long sleepMS = sleepNS/1000000; - final int sleepNS2 = (int) (sleepNS - sleepMS*1000000); + final long sleepMS; + final int sleepNS2; + + if (enableUpdateStorms) { + // Cap sleep at 100ms for update storms to maintain responsiveness + final long maxSleepNS = 100000000L; // 100 ms in nanoseconds + final long actualSleepNS = Math.min(sleepNS, maxSleepNS); + sleepMS = actualSleepNS/1000000; + sleepNS2 = (int) (actualSleepNS - sleepMS*1000000); + System.out.println("Update storms: capped indexer sleep at " + sleepMS + " ms, " + sleepNS2 + " ns (requested: " + (sleepNS/1000000) + " ms)"); + } else { + // Normal operation: no sleep cap, use original behavior + sleepMS = sleepNS/1000000; + sleepNS2 = (int) (sleepNS - sleepMS*1000000); + } Thread.sleep(sleepMS, sleepNS2); } diff --git a/src/main/perf/Indexer.java b/src/main/perf/Indexer.java index 9e1bc409c..afa496320 100644 --- a/src/main/perf/Indexer.java +++ b/src/main/perf/Indexer.java @@ -575,7 +575,7 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) { //float docsPerSecPerThread = 100f; IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, new AtomicReference<>((double)docsPerSecPerThread), null, nrtEverySec, - randomDocIDMax); + randomDocIDMax, false); System.out.println("\nIndexer: start"); final long t0 = System.currentTimeMillis(); diff --git a/src/main/perf/NRTPerfTest.java b/src/main/perf/NRTPerfTest.java index 8625d8dc2..eccc76d46 100644 --- a/src/main/perf/NRTPerfTest.java +++ b/src/main/perf/NRTPerfTest.java @@ -46,6 +46,7 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.NoDeletionPolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.queryparser.classic.ParseException; import org.apache.lucene.queryparser.classic.QueryParser; @@ -245,6 +246,9 @@ public static void main(String[] args) throws Exception { throw new FileNotFoundException("tasks file not found " + tasksFile); } + // By default, disable update storms + final boolean enableUpdateStorms = args.length > 15 ? Boolean.parseBoolean(args[15]) : false; + final boolean hasProcMemInfo = Files.exists(Paths.get("/proc/meminfo")); System.out.println("DIR=" + dirImpl); @@ -258,6 +262,7 @@ public static void main(String[] args) throws Exception { System.out.println("Reopen/sec=" + reopenPerSec); System.out.println("Mode=" + mode); System.out.println("tasksFile=" + tasksFile); + System.out.println("EnableUpdateStorms=" + enableUpdateStorms); System.out.println("Record stats every " + statsEverySec + " seconds"); final int count = (int) ((runTimeSec / statsEverySec) + 2); @@ -275,7 +280,13 @@ public static void main(String[] args) throws Exception { System.out.println("Max merge MB/sec = " + (mergeMaxWriteMBPerSec <= 0.0 ? "unlimited" : mergeMaxWriteMBPerSec)); final Random random = new Random(seed); - final LineFileDocs docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null); + final LineFileDocs docs; + if (enableUpdateStorms) { + docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, "../data/cohere-wikipedia-docs-5M-768d.vec", 768, VectorEncoding.FLOAT32); + System.out.println("Update storms: vector indexing enabled"); + } else { + docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null); + } final Directory dir0; if (dirImpl.equals("MMapDirectory")) { @@ -300,7 +311,12 @@ public static void main(String[] args) throws Exception { StandardAnalyzer analyzer = new StandardAnalyzer(CharArraySet.EMPTY_SET); final IndexWriterConfig conf = new IndexWriterConfig(analyzer); conf.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); - conf.setRAMBufferSizeMB(256.0); + if (enableUpdateStorms) { + conf.setRAMBufferSizeMB(2048.0); // 2GB RAM buffer for aggressive update storms + } else { + conf.setRAMBufferSizeMB(256.0); // 256MB RAM buffer for normal operation + } + //iwc.setMergeScheduler(ms); /* @@ -334,10 +350,15 @@ public DocValuesFormat getDocValuesFormatForField(String field) { //tmp.setReclaimDeletesWeight(3.0); //tmp.setMaxMergedSegmentMB(7000.0); - // AGGRESSIVE UPDATE STORM SETTINGS: - // Allow more deletes before forcing merges - // nocommit - tmp.setDeletesPctAllowed(2.0); + // AGGRESSIVE UPDATE STORM SETTINGS (only if enabled): + if (enableUpdateStorms) { + // Allow less deletes before forcing merges + tmp.setDeletesPctAllowed(2.0); + System.out.println("Update storms enabled: using aggressive deletesPctAllowed=2.0"); + } else { + // Use default settings for normal operation + System.out.println("Update storms disabled: using default merge policy settings"); + } conf.setMergePolicy(tmp); @@ -348,13 +369,18 @@ public DocValuesFormat getDocValuesFormatForField(String field) { // Make sure merges run @ higher prio than indexing: final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) conf.getMergeScheduler(); // Can swap to your own MergeScheduler impl - // cms.setMaxMergesAndThreads(4, 1); + if (enableUpdateStorms) { + // cms.setMaxMergesAndThreads(4, 1); + // Use Lucene default settings for max merges & threads + } else { + cms.setMaxMergesAndThreads(4, 1); + } conf.setMergedSegmentWarmer(new MergedReaderWarmer(field)); // Set infoStream to log to file - PrintStream infoStream = new PrintStream(new FileOutputStream("lucene-infostream.log", true), true, "UTF-8"); - conf.setInfoStream(infoStream); + // PrintStream infoStream = new PrintStream(new FileOutputStream("lucene-infostream.log", true), true, "UTF-8"); + // conf.setInfoStream(infoStream); final IndexWriter w = new IndexWriter(dir, conf); // w.setInfoStream(System.out); @@ -374,7 +400,7 @@ public void afterUpdate() { IndexWriter.DocStats stats = w.getDocStats(); final AtomicReference docsPerSecRef = new AtomicReference<>(docsPerSec / numIndexThreads); IndexThreads indexThreads = new IndexThreads(random, w, new AtomicBoolean(false), docs, numIndexThreads, -1, false, false, mode, - docsPerSecRef, updatesListener, -1.0, stats.maxDoc); + docsPerSecRef, updatesListener, -1.0, stats.maxDoc, enableUpdateStorms); // NativePosixUtil.mlockTermsDict(startR, "id"); final SearcherManager manager = new SearcherManager(w, null); @@ -389,27 +415,38 @@ public void afterUpdate() { final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, null); TaskParserFactory taskParserFactory = new TaskParserFactory(indexState, field, analyzer, field, 10, random, null, null, -1, true, TestContext.parse("")); - // Periodically increase docsPerSec - Thread docsPerSecIncreaser = new Thread(() -> { - while (true) { - try { - int increaseCount = 0; - int maxIncreases = 8; - while (increaseCount < maxIncreases) { - Thread.sleep(20000); // every 20 seconds - double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2); - System.out.println("Increased docsPerSec per thread to " + newRate); - increaseCount++; + final double peaceTimeRate = docsPerSec / numIndexThreads; + // Conditionally create update storm thread + if (enableUpdateStorms) { + System.out.println("Starting update storms thread..."); + // Periodically increase docsPerSec + Thread docsPerSecIncreaser = new Thread(() -> { + // Loop of update storm followed by peace time + while (true) { + try { + int increaseCount = 0; + int maxIncreases = 6; + Thread.sleep(10000); // 10 seconds peace time at the beginning + while (increaseCount < maxIncreases) { + double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2); + System.out.println("Increased docsPerSec per thread to " + newRate); + Thread.sleep(20000); // every 20 seconds + increaseCount++; + } + System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode"); + docsPerSecRef.set(peaceTimeRate); + System.out.println("Decreased docsPerSec per thread to " + (peaceTimeRate)); + Thread.sleep(900000); // 15 minutes peace time + } catch (InterruptedException e) { + // exit thread + } } - System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode"); - Thread.sleep(300000); // 5 minutes of peace time - } catch (InterruptedException e) { - // exit thread - } + }); + docsPerSecIncreaser.setDaemon(true); + docsPerSecIncreaser.start(); + } else { + System.out.println("Update storms disabled - maintaining constant indexing rate"); } - }); - docsPerSecIncreaser.setDaemon(true); - docsPerSecIncreaser.start(); // Aggressive delete storm task source final TaskSource baseTasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) { @Override diff --git a/src/main/perf/SearchPerfTest.java b/src/main/perf/SearchPerfTest.java index a36a66c3c..ef20556aa 100755 --- a/src/main/perf/SearchPerfTest.java +++ b/src/main/perf/SearchPerfTest.java @@ -416,7 +416,7 @@ public void warm(LeafReader reader) throws IOException { // hardwired false: boolean addDVFields = mode == Mode.BDV_UPDATE || mode == Mode.NDV_UPDATE; LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null); - IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1); + IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1, false); threads.start(); mgr = new SearcherManager(writer, new SearcherFactory() { From 36a88c0b2e6dd55f310f59d65eab637e6b7597dd Mon Sep 17 00:00:00 2001 From: Nipun Batra <83050981+nipunbatra8@users.noreply.github.com> Date: Wed, 27 Aug 2025 21:21:33 -0700 Subject: [PATCH 6/8] no unlimited segment size --- src/main/perf/NRTPerfTest.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/perf/NRTPerfTest.java b/src/main/perf/NRTPerfTest.java index eccc76d46..64da149df 100644 --- a/src/main/perf/NRTPerfTest.java +++ b/src/main/perf/NRTPerfTest.java @@ -346,7 +346,9 @@ public DocValuesFormat getDocValuesFormatForField(String field) { TieredMergePolicy tmp = new TieredMergePolicy(); tmp.setNoCFSRatio(0.0); - tmp.setMaxMergedSegmentMB(1000000.0); + if (!enableUpdateStorms) { + tmp.setMaxMergedSegmentMB(1000000.0); // effectively unlimited + } //tmp.setReclaimDeletesWeight(3.0); //tmp.setMaxMergedSegmentMB(7000.0); From eee5cf48171b9a693916894c4b74b8c1a94236e5 Mon Sep 17 00:00:00 2001 From: Nipun Batra <83050981+nipunbatra8@users.noreply.github.com> Date: Tue, 2 Sep 2025 16:21:44 -0700 Subject: [PATCH 7/8] Backwards compatible --- src/main/perf/IndexThreads.java | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/src/main/perf/IndexThreads.java b/src/main/perf/IndexThreads.java index 744277939..b40b6430e 100644 --- a/src/main/perf/IndexThreads.java +++ b/src/main/perf/IndexThreads.java @@ -51,6 +51,17 @@ public enum Mode { UPDATE, ADD, NDV_UPDATE, BDV_UPDATE } final AtomicLong lastRefreshNS; final boolean enableUpdateStorms; + /** + * Constructor with default enableUpdateStorms=false for backward compatibility + * @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads. + */ + public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit, + boolean addGroupingFields, boolean printDPS, Mode mode, AtomicReference docsPerSecPerThreadRef, UpdatesListener updatesListener, + double nrtEverySec, int randomDocIDMax) + throws IOException, InterruptedException { + this(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, docsPerSecPerThreadRef, updatesListener, nrtEverySec, -1, false); + } + /** * @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads. */ From 28e906f11968bea8f94dd66d8d13a23896fc5f86 Mon Sep 17 00:00:00 2001 From: Nipun Batra <83050981+nipunbatra8@users.noreply.github.com> Date: Fri, 5 Sep 2025 15:04:10 -0700 Subject: [PATCH 8/8] configs written out --- src/main/perf/NRTPerfTest.java | 50 ++++++++++++++-------------------- src/python/nrtPerf.py | 13 +++++++++ 2 files changed, 34 insertions(+), 29 deletions(-) diff --git a/src/main/perf/NRTPerfTest.java b/src/main/perf/NRTPerfTest.java index 64da149df..5702b5569 100644 --- a/src/main/perf/NRTPerfTest.java +++ b/src/main/perf/NRTPerfTest.java @@ -72,6 +72,22 @@ // - hmm: we really should have a separate line file, shuffled, that holds the IDs for each line; this way we can update doc w/ same original doc and then we can assert hit counts match // - share *Task code from SearchPerfTest +// CONFIGURATION SETTINGS: +// +// Normal Operation Settings: +// - RAM Buffer Size: 256MB +// - TieredMergePolicy deletesPctAllowed: default (10.0) +// - Max merged segment size: 1000000MB (effectively unlimited) +// - ConcurrentMergeScheduler: 4 max merges, 1 thread +// +// Update Storms Settings (-updateStorms true, see nrtPerf.py for example): +// - RAM Buffer Size: 4GB +// - TieredMergePolicy deletesPctAllowed: 2.0 +// - Max merged segment size: use Lucene defaults +// - ConcurrentMergeScheduler: use Lucene defaults for max merges & threads +// - Vector indexing enabled for testing heavy update workloads: +// docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, "/path/to/vectors_file", 768, VectorEncoding.FLOAT32); + public class NRTPerfTest { static final class MergedReaderWarmer implements IndexWriter.IndexReaderWarmer { @@ -281,12 +297,7 @@ public static void main(String[] args) throws Exception { final Random random = new Random(seed); final LineFileDocs docs; - if (enableUpdateStorms) { - docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, "../data/cohere-wikipedia-docs-5M-768d.vec", 768, VectorEncoding.FLOAT32); - System.out.println("Update storms: vector indexing enabled"); - } else { - docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null); - } + docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null); final Directory dir0; if (dirImpl.equals("MMapDirectory")) { @@ -311,11 +322,7 @@ public static void main(String[] args) throws Exception { StandardAnalyzer analyzer = new StandardAnalyzer(CharArraySet.EMPTY_SET); final IndexWriterConfig conf = new IndexWriterConfig(analyzer); conf.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); - if (enableUpdateStorms) { - conf.setRAMBufferSizeMB(2048.0); // 2GB RAM buffer for aggressive update storms - } else { - conf.setRAMBufferSizeMB(256.0); // 256MB RAM buffer for normal operation - } + conf.setRAMBufferSizeMB(256.0); // 256MB RAM buffer for normal operation //iwc.setMergeScheduler(ms); @@ -346,21 +353,11 @@ public DocValuesFormat getDocValuesFormatForField(String field) { TieredMergePolicy tmp = new TieredMergePolicy(); tmp.setNoCFSRatio(0.0); - if (!enableUpdateStorms) { - tmp.setMaxMergedSegmentMB(1000000.0); // effectively unlimited - } + tmp.setMaxMergedSegmentMB(1000000.0); // effectively unlimited //tmp.setReclaimDeletesWeight(3.0); //tmp.setMaxMergedSegmentMB(7000.0); - // AGGRESSIVE UPDATE STORM SETTINGS (only if enabled): - if (enableUpdateStorms) { - // Allow less deletes before forcing merges - tmp.setDeletesPctAllowed(2.0); - System.out.println("Update storms enabled: using aggressive deletesPctAllowed=2.0"); - } else { - // Use default settings for normal operation - System.out.println("Update storms disabled: using default merge policy settings"); - } + tmp.setDeletesPctAllowed(2.0); conf.setMergePolicy(tmp); @@ -371,12 +368,7 @@ public DocValuesFormat getDocValuesFormatForField(String field) { // Make sure merges run @ higher prio than indexing: final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) conf.getMergeScheduler(); // Can swap to your own MergeScheduler impl - if (enableUpdateStorms) { - // cms.setMaxMergesAndThreads(4, 1); - // Use Lucene default settings for max merges & threads - } else { - cms.setMaxMergesAndThreads(4, 1); - } + cms.setMaxMergesAndThreads(4, 1); conf.setMergedSegmentWarmer(new MergedReaderWarmer(field)); diff --git a/src/python/nrtPerf.py b/src/python/nrtPerf.py index 6342d585b..c08e54736 100644 --- a/src/python/nrtPerf.py +++ b/src/python/nrtPerf.py @@ -17,6 +17,14 @@ # Measures the NRT performance under different document updates mode # (add, update, ndv_update, bdv_update) +# +# EXAMPLE USAGE: +# +# Normal operation: +# python nrtPerf.py -source /path/to/source -dps 100 -rps 1.0 -rts 3000 +# +# With update storms enabled (aggressive indexing bursts): +# python nrtPerf.py -source /path/to/source -dps 200 -rps 0.06 -nst 8 -nit 8 -rts 3000 -updateStorms true import os import re @@ -53,6 +61,7 @@ def runOne( numIndexThreads=constants.INDEX_NUM_THREADS, statsEverySec=1, commit="no", + enableUpdateStorms=False, ): logFileName = "%s/%s_dps%s_reopen%s.txt" % (constants.LOGS_DIR, mode, docsPerSec, reopensPerSec) print("log: %s" % logFileName) @@ -75,6 +84,8 @@ def runOne( command += " %s" % commit command += " 0.0" command += " %s" % data.tasksFile + if enableUpdateStorms: + command += " true" command += " > %s 2>&1" % logFileName if VERBOSE: @@ -190,6 +201,7 @@ def toString(self): for rps in reopenPerSec.split(","): print() print("params: mode=%s docs/sec=%s reopen/sec=%s runTime(s)=%s searchThreads=%s indexThreads=%s" % (mode, dps, rps, runTimeSec, numSearchThreads, numIndexThreads)) + enableUpdateStorms = benchUtil.getArg("-updateStorms", "false", True).lower() == "true" reopenStats = runOne( classpath=cp, mode=mode, @@ -200,6 +212,7 @@ def toString(self): runTimeSec=runTimeSec, numSearchThreads=numSearchThreads, numIndexThreads=numIndexThreads, + enableUpdateStorms=enableUpdateStorms, ) allStats.append((dps, rps, runTimeSec, reopenStats))