diff --git a/src/main/perf/IndexThreads.java b/src/main/perf/IndexThreads.java index f68a028b..b40b6430 100644 --- a/src/main/perf/IndexThreads.java +++ b/src/main/perf/IndexThreads.java @@ -25,6 +25,7 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.document.Document; import org.apache.lucene.document.Field; @@ -48,14 +49,30 @@ public enum Mode { UPDATE, ADD, NDV_UPDATE, BDV_UPDATE } final Thread[] threads; final AtomicBoolean refreshing; final AtomicLong lastRefreshNS; + final boolean enableUpdateStorms; + /** + * Constructor with default enableUpdateStorms=false for backward compatibility + * @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads. + */ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit, - boolean addGroupingFields, boolean printDPS, Mode mode, float docsPerSecPerThread, UpdatesListener updatesListener, + boolean addGroupingFields, boolean printDPS, Mode mode, AtomicReference docsPerSecPerThreadRef, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) throws IOException, InterruptedException { + this(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, docsPerSecPerThreadRef, updatesListener, nrtEverySec, -1, false); + } + + /** + * @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads. + */ + public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit, + boolean addGroupingFields, boolean printDPS, Mode mode, AtomicReference docsPerSecPerThreadRef, UpdatesListener updatesListener, + double nrtEverySec, int randomDocIDMax, boolean enableUpdateStorms) + throws IOException, InterruptedException { final AtomicInteger groupBlockIndex; this.docs = lineFileDocs; + this.enableUpdateStorms = enableUpdateStorms; if (addGroupingFields) { IndexThread.group100 = randomStrings(100, random); IndexThread.group10K = randomStrings(10000, random); @@ -77,8 +94,8 @@ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, for(int thread=0;thread docsPerSecRef; private final Random random; private final AtomicBoolean failed; private final UpdatesListener updatesListener; @@ -149,10 +166,11 @@ private static class IndexThread extends Thread { private final AtomicLong lastRefreshNS; private final double nrtEverySec; final int randomDocIDMax; + private final boolean enableUpdateStorms; public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stopLatch, IndexWriter w, LineFileDocs docs, int numTotalDocs, AtomicInteger count, Mode mode, AtomicInteger groupBlockIndex, - AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, float docsPerSec, - AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) { + AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, AtomicReference docsPerSecRef, + AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax, boolean enableUpdateStorms) { this.startLatch = startLatch; this.stopLatch = stopLatch; this.w = w; @@ -162,7 +180,7 @@ public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stop this.mode = mode; this.groupBlockIndex = groupBlockIndex; this.stop = stop; - this.docsPerSec = docsPerSec; + this.docsPerSecRef = docsPerSecRef; this.random = random; this.failed = failed; this.updatesListener = updatesListener; @@ -170,6 +188,7 @@ public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stop this.lastRefreshNS = lastRefreshNS; this.nrtEverySec = nrtEverySec; this.randomDocIDMax = randomDocIDMax; + this.enableUpdateStorms = enableUpdateStorms; } private static Field getStringIDField(Document doc) { @@ -329,7 +348,7 @@ public void remove() { docState.doc.removeField("groupend"); } - } else if (docsPerSec > 0 && mode != null) { + } else if (docsPerSecRef.get() > 0 && mode != null) { System.out.println("Indexing single docs (add/updateDocument)"); final long startNS = System.nanoTime(); int threadCount = 0; @@ -382,10 +401,24 @@ public void remove() { System.out.println("Indexer: " + docCount + " docs... (" + (System.currentTimeMillis() - tStart) + " msec)"); } + double docsPerSec = docsPerSecRef.get(); final long sleepNS = startNS + (long) (1000000000*(threadCount/docsPerSec)) - System.nanoTime(); if (sleepNS > 0) { - final long sleepMS = sleepNS/1000000; - final int sleepNS2 = (int) (sleepNS - sleepMS*1000000); + final long sleepMS; + final int sleepNS2; + + if (enableUpdateStorms) { + // Cap sleep at 100ms for update storms to maintain responsiveness + final long maxSleepNS = 100000000L; // 100 ms in nanoseconds + final long actualSleepNS = Math.min(sleepNS, maxSleepNS); + sleepMS = actualSleepNS/1000000; + sleepNS2 = (int) (actualSleepNS - sleepMS*1000000); + System.out.println("Update storms: capped indexer sleep at " + sleepMS + " ms, " + sleepNS2 + " ns (requested: " + (sleepNS/1000000) + " ms)"); + } else { + // Normal operation: no sleep cap, use original behavior + sleepMS = sleepNS/1000000; + sleepNS2 = (int) (sleepNS - sleepMS*1000000); + } Thread.sleep(sleepMS, sleepNS2); } diff --git a/src/main/perf/Indexer.java b/src/main/perf/Indexer.java index 966fa795..afa49632 100644 --- a/src/main/perf/Indexer.java +++ b/src/main/perf/Indexer.java @@ -35,6 +35,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.apache.lucene.analysis.Analyzer; import org.apache.lucene.analysis.CharArraySet; @@ -573,8 +574,8 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) { float docsPerSecPerThread = -1f; //float docsPerSecPerThread = 100f; - IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, docsPerSecPerThread, null, nrtEverySec, - randomDocIDMax); + IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, new AtomicReference<>((double)docsPerSecPerThread), null, nrtEverySec, + randomDocIDMax, false); System.out.println("\nIndexer: start"); final long t0 = System.currentTimeMillis(); diff --git a/src/main/perf/NRTPerfTest.java b/src/main/perf/NRTPerfTest.java index a892dd7c..5702b556 100644 --- a/src/main/perf/NRTPerfTest.java +++ b/src/main/perf/NRTPerfTest.java @@ -21,6 +21,8 @@ import java.io.FileNotFoundException; import java.io.FileReader; import java.io.IOException; +import java.io.PrintStream; +import java.io.FileOutputStream; import java.nio.file.Files; import java.nio.file.Paths; import java.util.Collections; @@ -44,6 +46,7 @@ import org.apache.lucene.index.LeafReader; import org.apache.lucene.index.NoDeletionPolicy; import org.apache.lucene.index.Term; +import org.apache.lucene.index.VectorEncoding; import org.apache.lucene.index.TieredMergePolicy; import org.apache.lucene.queryparser.classic.ParseException; import org.apache.lucene.queryparser.classic.QueryParser; @@ -69,6 +72,22 @@ // - hmm: we really should have a separate line file, shuffled, that holds the IDs for each line; this way we can update doc w/ same original doc and then we can assert hit counts match // - share *Task code from SearchPerfTest +// CONFIGURATION SETTINGS: +// +// Normal Operation Settings: +// - RAM Buffer Size: 256MB +// - TieredMergePolicy deletesPctAllowed: default (10.0) +// - Max merged segment size: 1000000MB (effectively unlimited) +// - ConcurrentMergeScheduler: 4 max merges, 1 thread +// +// Update Storms Settings (-updateStorms true, see nrtPerf.py for example): +// - RAM Buffer Size: 4GB +// - TieredMergePolicy deletesPctAllowed: 2.0 +// - Max merged segment size: use Lucene defaults +// - ConcurrentMergeScheduler: use Lucene defaults for max merges & threads +// - Vector indexing enabled for testing heavy update workloads: +// docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, "/path/to/vectors_file", 768, VectorEncoding.FLOAT32); + public class NRTPerfTest { static final class MergedReaderWarmer implements IndexWriter.IndexReaderWarmer { @@ -243,6 +262,9 @@ public static void main(String[] args) throws Exception { throw new FileNotFoundException("tasks file not found " + tasksFile); } + // By default, disable update storms + final boolean enableUpdateStorms = args.length > 15 ? Boolean.parseBoolean(args[15]) : false; + final boolean hasProcMemInfo = Files.exists(Paths.get("/proc/meminfo")); System.out.println("DIR=" + dirImpl); @@ -256,6 +278,7 @@ public static void main(String[] args) throws Exception { System.out.println("Reopen/sec=" + reopenPerSec); System.out.println("Mode=" + mode); System.out.println("tasksFile=" + tasksFile); + System.out.println("EnableUpdateStorms=" + enableUpdateStorms); System.out.println("Record stats every " + statsEverySec + " seconds"); final int count = (int) ((runTimeSec / statsEverySec) + 2); @@ -273,7 +296,8 @@ public static void main(String[] args) throws Exception { System.out.println("Max merge MB/sec = " + (mergeMaxWriteMBPerSec <= 0.0 ? "unlimited" : mergeMaxWriteMBPerSec)); final Random random = new Random(seed); - final LineFileDocs docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null); + final LineFileDocs docs; + docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null); final Directory dir0; if (dirImpl.equals("MMapDirectory")) { @@ -298,7 +322,8 @@ public static void main(String[] args) throws Exception { StandardAnalyzer analyzer = new StandardAnalyzer(CharArraySet.EMPTY_SET); final IndexWriterConfig conf = new IndexWriterConfig(analyzer); conf.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE); - conf.setRAMBufferSizeMB(256.0); + conf.setRAMBufferSizeMB(256.0); // 256MB RAM buffer for normal operation + //iwc.setMergeScheduler(ms); /* @@ -328,9 +353,12 @@ public DocValuesFormat getDocValuesFormatForField(String field) { TieredMergePolicy tmp = new TieredMergePolicy(); tmp.setNoCFSRatio(0.0); - tmp.setMaxMergedSegmentMB(1000000.0); + tmp.setMaxMergedSegmentMB(1000000.0); // effectively unlimited //tmp.setReclaimDeletesWeight(3.0); //tmp.setMaxMergedSegmentMB(7000.0); + + tmp.setDeletesPctAllowed(2.0); + conf.setMergePolicy(tmp); if (!commit.equals("none")) { @@ -339,10 +367,14 @@ public DocValuesFormat getDocValuesFormatForField(String field) { // Make sure merges run @ higher prio than indexing: final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) conf.getMergeScheduler(); + // Can swap to your own MergeScheduler impl cms.setMaxMergesAndThreads(4, 1); conf.setMergedSegmentWarmer(new MergedReaderWarmer(field)); + // Set infoStream to log to file + // PrintStream infoStream = new PrintStream(new FileOutputStream("lucene-infostream.log", true), true, "UTF-8"); + // conf.setInfoStream(infoStream); final IndexWriter w = new IndexWriter(dir, conf); // w.setInfoStream(System.out); @@ -360,8 +392,9 @@ public void afterUpdate() { } }; IndexWriter.DocStats stats = w.getDocStats(); + final AtomicReference docsPerSecRef = new AtomicReference<>(docsPerSec / numIndexThreads); IndexThreads indexThreads = new IndexThreads(random, w, new AtomicBoolean(false), docs, numIndexThreads, -1, false, false, mode, - (float) (docsPerSec / numIndexThreads), updatesListener, -1.0, stats.maxDoc); + docsPerSecRef, updatesListener, -1.0, stats.maxDoc, enableUpdateStorms); // NativePosixUtil.mlockTermsDict(startR, "id"); final SearcherManager manager = new SearcherManager(w, null); @@ -376,12 +409,46 @@ public void afterUpdate() { final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, null); TaskParserFactory taskParserFactory = new TaskParserFactory(indexState, field, analyzer, field, 10, random, null, null, -1, true, TestContext.parse("")); - final TaskSource tasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) { + final double peaceTimeRate = docsPerSec / numIndexThreads; + // Conditionally create update storm thread + if (enableUpdateStorms) { + System.out.println("Starting update storms thread..."); + // Periodically increase docsPerSec + Thread docsPerSecIncreaser = new Thread(() -> { + // Loop of update storm followed by peace time + while (true) { + try { + int increaseCount = 0; + int maxIncreases = 6; + Thread.sleep(10000); // 10 seconds peace time at the beginning + while (increaseCount < maxIncreases) { + double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2); + System.out.println("Increased docsPerSec per thread to " + newRate); + Thread.sleep(20000); // every 20 seconds + increaseCount++; + } + System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode"); + docsPerSecRef.set(peaceTimeRate); + System.out.println("Decreased docsPerSec per thread to " + (peaceTimeRate)); + Thread.sleep(900000); // 15 minutes peace time + } catch (InterruptedException e) { + // exit thread + } + } + }); + docsPerSecIncreaser.setDaemon(true); + docsPerSecIncreaser.start(); + } else { + System.out.println("Update storms disabled - maintaining constant indexing rate"); + } + // Aggressive delete storm task source + final TaskSource baseTasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) { @Override public void taskDone(Task task, long queueTimeNS, TotalHits toalHitCount) { searchesByTime[currentQT.get()].incrementAndGet(); } }; + final TaskSource tasks = baseTasks; System.out.println("Task repeat count 1"); System.out.println("Tasks file " + tasksFile); System.out.println("Num task per cat 20"); diff --git a/src/main/perf/SearchPerfTest.java b/src/main/perf/SearchPerfTest.java index fd831041..ef20556a 100755 --- a/src/main/perf/SearchPerfTest.java +++ b/src/main/perf/SearchPerfTest.java @@ -416,7 +416,7 @@ public void warm(LeafReader reader) throws IOException { // hardwired false: boolean addDVFields = mode == Mode.BDV_UPDATE || mode == Mode.NDV_UPDATE; LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null); - IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, docsPerSecPerThread, null, -1.0, -1); + IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1, false); threads.start(); mgr = new SearcherManager(writer, new SearcherFactory() { diff --git a/src/main/perf/TaskThreads.java b/src/main/perf/TaskThreads.java index 450db80b..b0e56a86 100644 --- a/src/main/perf/TaskThreads.java +++ b/src/main/perf/TaskThreads.java @@ -103,14 +103,17 @@ public void run() { try { while (stop.get() == false) { - final Task task = tasks.nextTask(); - if (task == null) { + final Task originalTask = tasks.nextTask(); + if (originalTask == null) { // Done this.tasksStopNanos = System.nanoTime(); // first thread that finishes snapshots all threads. this way we do not include "winddown" time in our measurement. endThreadDetails.compareAndSet(null, new SearchPerfTest.ThreadDetails()); break; } + + // Clone the task to avoid reuse issues + final Task task = originalTask.clone(); // Run the task in the IndexSearcher's executor. This is important because IndexSearcher#search also uses the current thread to // search, so not running #search from the executor would artificially use one more thread than configured via luceneutil. diff --git a/src/python/nrtPerf.py b/src/python/nrtPerf.py index 6342d585..c08e5473 100644 --- a/src/python/nrtPerf.py +++ b/src/python/nrtPerf.py @@ -17,6 +17,14 @@ # Measures the NRT performance under different document updates mode # (add, update, ndv_update, bdv_update) +# +# EXAMPLE USAGE: +# +# Normal operation: +# python nrtPerf.py -source /path/to/source -dps 100 -rps 1.0 -rts 3000 +# +# With update storms enabled (aggressive indexing bursts): +# python nrtPerf.py -source /path/to/source -dps 200 -rps 0.06 -nst 8 -nit 8 -rts 3000 -updateStorms true import os import re @@ -53,6 +61,7 @@ def runOne( numIndexThreads=constants.INDEX_NUM_THREADS, statsEverySec=1, commit="no", + enableUpdateStorms=False, ): logFileName = "%s/%s_dps%s_reopen%s.txt" % (constants.LOGS_DIR, mode, docsPerSec, reopensPerSec) print("log: %s" % logFileName) @@ -75,6 +84,8 @@ def runOne( command += " %s" % commit command += " 0.0" command += " %s" % data.tasksFile + if enableUpdateStorms: + command += " true" command += " > %s 2>&1" % logFileName if VERBOSE: @@ -190,6 +201,7 @@ def toString(self): for rps in reopenPerSec.split(","): print() print("params: mode=%s docs/sec=%s reopen/sec=%s runTime(s)=%s searchThreads=%s indexThreads=%s" % (mode, dps, rps, runTimeSec, numSearchThreads, numIndexThreads)) + enableUpdateStorms = benchUtil.getArg("-updateStorms", "false", True).lower() == "true" reopenStats = runOne( classpath=cp, mode=mode, @@ -200,6 +212,7 @@ def toString(self): runTimeSec=runTimeSec, numSearchThreads=numSearchThreads, numIndexThreads=numIndexThreads, + enableUpdateStorms=enableUpdateStorms, ) allStats.append((dps, rps, runTimeSec, reopenStats))