Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 22 additions & 5 deletions src/main/perf/IndexThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,17 +49,19 @@ public enum Mode { UPDATE, ADD, NDV_UPDATE, BDV_UPDATE }
final Thread[] threads;
final AtomicBoolean refreshing;
final AtomicLong lastRefreshNS;
final boolean enableUpdateStorms;

/**
* @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads.
*/
public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit,
boolean addGroupingFields, boolean printDPS, Mode mode, AtomicReference<Double> docsPerSecPerThreadRef, UpdatesListener updatesListener,
double nrtEverySec, int randomDocIDMax)
double nrtEverySec, int randomDocIDMax, boolean enableUpdateStorms)
throws IOException, InterruptedException {
final AtomicInteger groupBlockIndex;

this.docs = lineFileDocs;
this.enableUpdateStorms = enableUpdateStorms;
if (addGroupingFields) {
IndexThread.group100 = randomStrings(100, random);
IndexThread.group10K = randomStrings(10000, random);
Expand All @@ -82,7 +84,7 @@ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed,
for(int thread=0;thread<numThreads;thread++) {
threads[thread] = new IndexThread(random, startLatch, stopLatch, w, docs, docCountLimit, count, mode,
groupBlockIndex, stop, refreshing, lastRefreshNS, docsPerSecPerThreadRef, failed, updatesListener,
nrtEverySec, randomDocIDMax);
nrtEverySec, randomDocIDMax, enableUpdateStorms);
threads[thread].setName("Index #" + thread);
threads[thread].start();
}
Expand Down Expand Up @@ -153,10 +155,11 @@ private static class IndexThread extends Thread {
private final AtomicLong lastRefreshNS;
private final double nrtEverySec;
final int randomDocIDMax;
private final boolean enableUpdateStorms;
public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stopLatch, IndexWriter w,
LineFileDocs docs, int numTotalDocs, AtomicInteger count, Mode mode, AtomicInteger groupBlockIndex,
AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, AtomicReference<Double> docsPerSecRef,
AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) {
AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax, boolean enableUpdateStorms) {
this.startLatch = startLatch;
this.stopLatch = stopLatch;
this.w = w;
Expand All @@ -174,6 +177,7 @@ public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stop
this.lastRefreshNS = lastRefreshNS;
this.nrtEverySec = nrtEverySec;
this.randomDocIDMax = randomDocIDMax;
this.enableUpdateStorms = enableUpdateStorms;
}

private static Field getStringIDField(Document doc) {
Expand Down Expand Up @@ -389,8 +393,21 @@ public void remove() {
double docsPerSec = docsPerSecRef.get();
final long sleepNS = startNS + (long) (1000000000*(threadCount/docsPerSec)) - System.nanoTime();
if (sleepNS > 0) {
final long sleepMS = sleepNS/1000000;
final int sleepNS2 = (int) (sleepNS - sleepMS*1000000);
final long sleepMS;
final int sleepNS2;

if (enableUpdateStorms) {
// Cap sleep at 100ms for update storms to maintain responsiveness
final long maxSleepNS = 100000000L; // 100 ms in nanoseconds
final long actualSleepNS = Math.min(sleepNS, maxSleepNS);
sleepMS = actualSleepNS/1000000;
sleepNS2 = (int) (actualSleepNS - sleepMS*1000000);
System.out.println("Update storms: capped indexer sleep at " + sleepMS + " ms, " + sleepNS2 + " ns (requested: " + (sleepNS/1000000) + " ms)");
} else {
// Normal operation: no sleep cap, use original behavior
sleepMS = sleepNS/1000000;
sleepNS2 = (int) (sleepNS - sleepMS*1000000);
}
Thread.sleep(sleepMS, sleepNS2);
}

Expand Down
2 changes: 1 addition & 1 deletion src/main/perf/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
//float docsPerSecPerThread = 100f;

IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, new AtomicReference<>((double)docsPerSecPerThread), null, nrtEverySec,
randomDocIDMax);
randomDocIDMax, false);

System.out.println("\nIndexer: start");
final long t0 = System.currentTimeMillis();
Expand Down
95 changes: 66 additions & 29 deletions src/main/perf/NRTPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
import org.apache.lucene.index.LeafReader;
import org.apache.lucene.index.NoDeletionPolicy;
import org.apache.lucene.index.Term;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.TieredMergePolicy;
import org.apache.lucene.queryparser.classic.ParseException;
import org.apache.lucene.queryparser.classic.QueryParser;
Expand Down Expand Up @@ -245,6 +246,9 @@ public static void main(String[] args) throws Exception {
throw new FileNotFoundException("tasks file not found " + tasksFile);
}

// By default, disable update storms
final boolean enableUpdateStorms = args.length > 15 ? Boolean.parseBoolean(args[15]) : false;

final boolean hasProcMemInfo = Files.exists(Paths.get("/proc/meminfo"));

System.out.println("DIR=" + dirImpl);
Expand All @@ -258,6 +262,7 @@ public static void main(String[] args) throws Exception {
System.out.println("Reopen/sec=" + reopenPerSec);
System.out.println("Mode=" + mode);
System.out.println("tasksFile=" + tasksFile);
System.out.println("EnableUpdateStorms=" + enableUpdateStorms);

System.out.println("Record stats every " + statsEverySec + " seconds");
final int count = (int) ((runTimeSec / statsEverySec) + 2);
Expand All @@ -275,7 +280,13 @@ public static void main(String[] args) throws Exception {
System.out.println("Max merge MB/sec = " + (mergeMaxWriteMBPerSec <= 0.0 ? "unlimited" : mergeMaxWriteMBPerSec));
final Random random = new Random(seed);

final LineFileDocs docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null);
final LineFileDocs docs;
if (enableUpdateStorms) {
docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, "../data/cohere-wikipedia-docs-5M-768d.vec", 768, VectorEncoding.FLOAT32);
System.out.println("Update storms: vector indexing enabled");
} else {
docs = new LineFileDocs(lineDocFile, true, false, false, false, false, null, Collections.emptyMap(), null, true, null, 0, null);
}

final Directory dir0;
if (dirImpl.equals("MMapDirectory")) {
Expand All @@ -300,7 +311,12 @@ public static void main(String[] args) throws Exception {
StandardAnalyzer analyzer = new StandardAnalyzer(CharArraySet.EMPTY_SET);
final IndexWriterConfig conf = new IndexWriterConfig(analyzer);
conf.setIndexDeletionPolicy(NoDeletionPolicy.INSTANCE);
conf.setRAMBufferSizeMB(256.0);
if (enableUpdateStorms) {
conf.setRAMBufferSizeMB(2048.0); // 2GB RAM buffer for aggressive update storms
} else {
conf.setRAMBufferSizeMB(256.0); // 256MB RAM buffer for normal operation
}

//iwc.setMergeScheduler(ms);

/*
Expand Down Expand Up @@ -334,10 +350,15 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
//tmp.setReclaimDeletesWeight(3.0);
//tmp.setMaxMergedSegmentMB(7000.0);

// AGGRESSIVE UPDATE STORM SETTINGS:
// Allow more deletes before forcing merges
// nocommit
tmp.setDeletesPctAllowed(2.0);
// AGGRESSIVE UPDATE STORM SETTINGS (only if enabled):
if (enableUpdateStorms) {
// Allow less deletes before forcing merges
tmp.setDeletesPctAllowed(2.0);
System.out.println("Update storms enabled: using aggressive deletesPctAllowed=2.0");
} else {
// Use default settings for normal operation
System.out.println("Update storms disabled: using default merge policy settings");
}

conf.setMergePolicy(tmp);

Expand All @@ -348,13 +369,18 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
// Make sure merges run @ higher prio than indexing:
final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) conf.getMergeScheduler();
// Can swap to your own MergeScheduler impl
// cms.setMaxMergesAndThreads(4, 1);
if (enableUpdateStorms) {
// cms.setMaxMergesAndThreads(4, 1);
// Use Lucene default settings for max merges & threads
} else {
cms.setMaxMergesAndThreads(4, 1);
}

conf.setMergedSegmentWarmer(new MergedReaderWarmer(field));

// Set infoStream to log to file
PrintStream infoStream = new PrintStream(new FileOutputStream("lucene-infostream.log", true), true, "UTF-8");
conf.setInfoStream(infoStream);
// PrintStream infoStream = new PrintStream(new FileOutputStream("lucene-infostream.log", true), true, "UTF-8");
// conf.setInfoStream(infoStream);
final IndexWriter w = new IndexWriter(dir, conf);
// w.setInfoStream(System.out);

Expand All @@ -374,7 +400,7 @@ public void afterUpdate() {
IndexWriter.DocStats stats = w.getDocStats();
final AtomicReference<Double> docsPerSecRef = new AtomicReference<>(docsPerSec / numIndexThreads);
IndexThreads indexThreads = new IndexThreads(random, w, new AtomicBoolean(false), docs, numIndexThreads, -1, false, false, mode,
docsPerSecRef, updatesListener, -1.0, stats.maxDoc);
docsPerSecRef, updatesListener, -1.0, stats.maxDoc, enableUpdateStorms);

// NativePosixUtil.mlockTermsDict(startR, "id");
final SearcherManager manager = new SearcherManager(w, null);
Expand All @@ -389,27 +415,38 @@ public void afterUpdate() {
final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, null);
TaskParserFactory taskParserFactory =
new TaskParserFactory(indexState, field, analyzer, field, 10, random, null, null, -1, true, TestContext.parse(""));
// Periodically increase docsPerSec
Thread docsPerSecIncreaser = new Thread(() -> {
while (true) {
try {
int increaseCount = 0;
int maxIncreases = 8;
while (increaseCount < maxIncreases) {
Thread.sleep(20000); // every 20 seconds
double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2);
System.out.println("Increased docsPerSec per thread to " + newRate);
increaseCount++;
final double peaceTimeRate = docsPerSec / numIndexThreads;
// Conditionally create update storm thread
if (enableUpdateStorms) {
System.out.println("Starting update storms thread...");
// Periodically increase docsPerSec
Thread docsPerSecIncreaser = new Thread(() -> {
// Loop of update storm followed by peace time
while (true) {
try {
int increaseCount = 0;
int maxIncreases = 6;
Thread.sleep(10000); // 10 seconds peace time at the beginning
while (increaseCount < maxIncreases) {
double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2);
System.out.println("Increased docsPerSec per thread to " + newRate);
Thread.sleep(20000); // every 20 seconds
increaseCount++;
}
System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode");
docsPerSecRef.set(peaceTimeRate);
System.out.println("Decreased docsPerSec per thread to " + (peaceTimeRate));
Thread.sleep(900000); // 15 minutes peace time
} catch (InterruptedException e) {
// exit thread
}
}
System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode");
Thread.sleep(300000); // 5 minutes of peace time
} catch (InterruptedException e) {
// exit thread
}
});
docsPerSecIncreaser.setDaemon(true);
docsPerSecIncreaser.start();
} else {
System.out.println("Update storms disabled - maintaining constant indexing rate");
}
});
docsPerSecIncreaser.setDaemon(true);
docsPerSecIncreaser.start();
// Aggressive delete storm task source
final TaskSource baseTasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) {
@Override
Expand Down
2 changes: 1 addition & 1 deletion src/main/perf/SearchPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public void warm(LeafReader reader) throws IOException {
// hardwired false:
boolean addDVFields = mode == Mode.BDV_UPDATE || mode == Mode.NDV_UPDATE;
LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null);
IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1);
IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1, false);
threads.start();

mgr = new SearcherManager(writer, new SearcherFactory() {
Expand Down