Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 11 additions & 6 deletions src/main/perf/IndexThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.Field;
Expand All @@ -49,8 +50,11 @@ public enum Mode { UPDATE, ADD, NDV_UPDATE, BDV_UPDATE }
final AtomicBoolean refreshing;
final AtomicLong lastRefreshNS;

/**
* @param docsPerSecPerThreadRef AtomicReference for thread-safe rate updates across multiple threads.
*/
public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit,
boolean addGroupingFields, boolean printDPS, Mode mode, float docsPerSecPerThread, UpdatesListener updatesListener,
boolean addGroupingFields, boolean printDPS, Mode mode, AtomicReference<Double> docsPerSecPerThreadRef, UpdatesListener updatesListener,
double nrtEverySec, int randomDocIDMax)
throws IOException, InterruptedException {
final AtomicInteger groupBlockIndex;
Expand All @@ -77,7 +81,7 @@ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed,

for(int thread=0;thread<numThreads;thread++) {
threads[thread] = new IndexThread(random, startLatch, stopLatch, w, docs, docCountLimit, count, mode,
groupBlockIndex, stop, refreshing, lastRefreshNS, docsPerSecPerThread, failed, updatesListener,
groupBlockIndex, stop, refreshing, lastRefreshNS, docsPerSecPerThreadRef, failed, updatesListener,
nrtEverySec, randomDocIDMax);
threads[thread].setName("Index #" + thread);
threads[thread].start();
Expand Down Expand Up @@ -141,7 +145,7 @@ private static class IndexThread extends Thread {
private final Mode mode;
private final CountDownLatch startLatch;
private final CountDownLatch stopLatch;
private final float docsPerSec;
private final AtomicReference<Double> docsPerSecRef;
private final Random random;
private final AtomicBoolean failed;
private final UpdatesListener updatesListener;
Expand All @@ -151,7 +155,7 @@ private static class IndexThread extends Thread {
final int randomDocIDMax;
public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stopLatch, IndexWriter w,
LineFileDocs docs, int numTotalDocs, AtomicInteger count, Mode mode, AtomicInteger groupBlockIndex,
AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, float docsPerSec,
AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, AtomicReference<Double> docsPerSecRef,
AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) {
this.startLatch = startLatch;
this.stopLatch = stopLatch;
Expand All @@ -162,7 +166,7 @@ public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stop
this.mode = mode;
this.groupBlockIndex = groupBlockIndex;
this.stop = stop;
this.docsPerSec = docsPerSec;
this.docsPerSecRef = docsPerSecRef;
this.random = random;
this.failed = failed;
this.updatesListener = updatesListener;
Expand Down Expand Up @@ -329,7 +333,7 @@ public void remove() {

docState.doc.removeField("groupend");
}
} else if (docsPerSec > 0 && mode != null) {
} else if (docsPerSecRef.get() > 0 && mode != null) {
System.out.println("Indexing single docs (add/updateDocument)");
final long startNS = System.nanoTime();
int threadCount = 0;
Expand Down Expand Up @@ -382,6 +386,7 @@ public void remove() {
System.out.println("Indexer: " + docCount + " docs... (" + (System.currentTimeMillis() - tStart) + " msec)");
}

double docsPerSec = docsPerSecRef.get();
final long sleepNS = startNS + (long) (1000000000*(threadCount/docsPerSec)) - System.nanoTime();
if (sleepNS > 0) {
final long sleepMS = sleepNS/1000000;
Expand Down
3 changes: 2 additions & 1 deletion src/main/perf/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.CharArraySet;
Expand Down Expand Up @@ -573,7 +574,7 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
float docsPerSecPerThread = -1f;
//float docsPerSecPerThread = 100f;

IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, docsPerSecPerThread, null, nrtEverySec,
IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, new AtomicReference<>((double)docsPerSecPerThread), null, nrtEverySec,
randomDocIDMax);

System.out.println("\nIndexer: start");
Expand Down
42 changes: 39 additions & 3 deletions src/main/perf/NRTPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.io.FileOutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
Expand Down Expand Up @@ -331,6 +333,12 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
tmp.setMaxMergedSegmentMB(1000000.0);
//tmp.setReclaimDeletesWeight(3.0);
//tmp.setMaxMergedSegmentMB(7000.0);

// AGGRESSIVE UPDATE STORM SETTINGS:
// Allow more deletes before forcing merges
// nocommit
tmp.setDeletesPctAllowed(2.0);

conf.setMergePolicy(tmp);

if (!commit.equals("none")) {
Expand All @@ -339,10 +347,14 @@ public DocValuesFormat getDocValuesFormatForField(String field) {

// Make sure merges run @ higher prio than indexing:
final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) conf.getMergeScheduler();
cms.setMaxMergesAndThreads(4, 1);
// Can swap to your own MergeScheduler impl
// cms.setMaxMergesAndThreads(4, 1);

conf.setMergedSegmentWarmer(new MergedReaderWarmer(field));

// Set infoStream to log to file
PrintStream infoStream = new PrintStream(new FileOutputStream("lucene-infostream.log", true), true, "UTF-8");
conf.setInfoStream(infoStream);
final IndexWriter w = new IndexWriter(dir, conf);
// w.setInfoStream(System.out);

Expand All @@ -360,8 +372,9 @@ public void afterUpdate() {
}
};
IndexWriter.DocStats stats = w.getDocStats();
final AtomicReference<Double> docsPerSecRef = new AtomicReference<>(docsPerSec / numIndexThreads);
IndexThreads indexThreads = new IndexThreads(random, w, new AtomicBoolean(false), docs, numIndexThreads, -1, false, false, mode,
(float) (docsPerSec / numIndexThreads), updatesListener, -1.0, stats.maxDoc);
docsPerSecRef, updatesListener, -1.0, stats.maxDoc);

// NativePosixUtil.mlockTermsDict(startR, "id");
final SearcherManager manager = new SearcherManager(w, null);
Expand All @@ -376,12 +389,35 @@ public void afterUpdate() {
final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, null);
TaskParserFactory taskParserFactory =
new TaskParserFactory(indexState, field, analyzer, field, 10, random, null, null, -1, true, TestContext.parse(""));
final TaskSource tasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) {
// Periodically increase docsPerSec
Thread docsPerSecIncreaser = new Thread(() -> {
while (true) {
try {
int increaseCount = 0;
int maxIncreases = 8;
while (increaseCount < maxIncreases) {
Thread.sleep(20000); // every 20 seconds
double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2);
System.out.println("Increased docsPerSec per thread to " + newRate);
increaseCount++;
}
System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode");
Thread.sleep(300000); // 5 minutes of peace time
} catch (InterruptedException e) {
// exit thread
}
}
});
docsPerSecIncreaser.setDaemon(true);
docsPerSecIncreaser.start();
// Aggressive delete storm task source
final TaskSource baseTasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) {
@Override
public void taskDone(Task task, long queueTimeNS, TotalHits toalHitCount) {
searchesByTime[currentQT.get()].incrementAndGet();
}
};
final TaskSource tasks = baseTasks;
System.out.println("Task repeat count 1");
System.out.println("Tasks file " + tasksFile);
System.out.println("Num task per cat 20");
Expand Down
2 changes: 1 addition & 1 deletion src/main/perf/SearchPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public void warm(LeafReader reader) throws IOException {
// hardwired false:
boolean addDVFields = mode == Mode.BDV_UPDATE || mode == Mode.NDV_UPDATE;
LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null);
IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, docsPerSecPerThread, null, -1.0, -1);
IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1);
threads.start();

mgr = new SearcherManager(writer, new SearcherFactory() {
Expand Down
7 changes: 5 additions & 2 deletions src/main/perf/TaskThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -103,14 +103,17 @@ public void run() {

try {
while (stop.get() == false) {
final Task task = tasks.nextTask();
if (task == null) {
final Task originalTask = tasks.nextTask();
if (originalTask == null) {
// Done
this.tasksStopNanos = System.nanoTime();
// first thread that finishes snapshots all threads. this way we do not include "winddown" time in our measurement.
endThreadDetails.compareAndSet(null, new SearchPerfTest.ThreadDetails());
break;
}

// Clone the task to avoid reuse issues
final Task task = originalTask.clone();

// Run the task in the IndexSearcher's executor. This is important because IndexSearcher#search also uses the current thread to
// search, so not running #search from the executor would artificially use one more thread than configured via luceneutil.
Expand Down