Skip to content
Open
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 7 additions & 6 deletions src/main/perf/IndexThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public enum Mode { UPDATE, ADD, NDV_UPDATE, BDV_UPDATE }
final AtomicLong lastRefreshNS;

public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed, LineFileDocs lineFileDocs, int numThreads, int docCountLimit,
boolean addGroupingFields, boolean printDPS, Mode mode, float docsPerSecPerThread, UpdatesListener updatesListener,
boolean addGroupingFields, boolean printDPS, Mode mode, java.util.concurrent.atomic.AtomicReference<Double> docsPerSecPerThreadRef, UpdatesListener updatesListener,
double nrtEverySec, int randomDocIDMax)
throws IOException, InterruptedException {
final AtomicInteger groupBlockIndex;
Expand All @@ -77,7 +77,7 @@ public IndexThreads(Random random, IndexWriter w, AtomicBoolean indexingFailed,

for(int thread=0;thread<numThreads;thread++) {
threads[thread] = new IndexThread(random, startLatch, stopLatch, w, docs, docCountLimit, count, mode,
groupBlockIndex, stop, refreshing, lastRefreshNS, docsPerSecPerThread, failed, updatesListener,
groupBlockIndex, stop, refreshing, lastRefreshNS, docsPerSecPerThreadRef, failed, updatesListener,
nrtEverySec, randomDocIDMax);
threads[thread].setName("Index #" + thread);
threads[thread].start();
Expand Down Expand Up @@ -141,7 +141,7 @@ private static class IndexThread extends Thread {
private final Mode mode;
private final CountDownLatch startLatch;
private final CountDownLatch stopLatch;
private final float docsPerSec;
private final java.util.concurrent.atomic.AtomicReference<Double> docsPerSecRef;
private final Random random;
private final AtomicBoolean failed;
private final UpdatesListener updatesListener;
Expand All @@ -151,7 +151,7 @@ private static class IndexThread extends Thread {
final int randomDocIDMax;
public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stopLatch, IndexWriter w,
LineFileDocs docs, int numTotalDocs, AtomicInteger count, Mode mode, AtomicInteger groupBlockIndex,
AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, float docsPerSec,
AtomicBoolean stop, AtomicBoolean refreshing, AtomicLong lastRefreshNS, java.util.concurrent.atomic.AtomicReference<Double> docsPerSecRef,
AtomicBoolean failed, UpdatesListener updatesListener, double nrtEverySec, int randomDocIDMax) {
this.startLatch = startLatch;
this.stopLatch = stopLatch;
Expand All @@ -162,7 +162,7 @@ public IndexThread(Random random, CountDownLatch startLatch, CountDownLatch stop
this.mode = mode;
this.groupBlockIndex = groupBlockIndex;
this.stop = stop;
this.docsPerSec = docsPerSec;
this.docsPerSecRef = docsPerSecRef;
this.random = random;
this.failed = failed;
this.updatesListener = updatesListener;
Expand Down Expand Up @@ -329,7 +329,7 @@ public void remove() {

docState.doc.removeField("groupend");
}
} else if (docsPerSec > 0 && mode != null) {
} else if (docsPerSecRef.get() > 0 && mode != null) {
System.out.println("Indexing single docs (add/updateDocument)");
final long startNS = System.nanoTime();
int threadCount = 0;
Expand Down Expand Up @@ -382,6 +382,7 @@ public void remove() {
System.out.println("Indexer: " + docCount + " docs... (" + (System.currentTimeMillis() - tStart) + " msec)");
}

double docsPerSec = docsPerSecRef.get();
final long sleepNS = startNS + (long) (1000000000*(threadCount/docsPerSec)) - System.nanoTime();
if (sleepNS > 0) {
final long sleepMS = sleepNS/1000000;
Expand Down
2 changes: 1 addition & 1 deletion src/main/perf/Indexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -573,7 +573,7 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) {
float docsPerSecPerThread = -1f;
//float docsPerSecPerThread = 100f;

IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, docsPerSecPerThread, null, nrtEverySec,
IndexThreads threads = new IndexThreads(random, w, indexingFailed, lineFileDocs, numThreads, docCountLimit, addGroupingFields, printDPS, mode, new java.util.concurrent.atomic.AtomicReference<>((double)docsPerSecPerThread), null, nrtEverySec,
randomDocIDMax);

System.out.println("\nIndexer: start");
Expand Down
42 changes: 38 additions & 4 deletions src/main/perf/NRTPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,12 @@
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.io.PrintStream;
import java.io.FileOutputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Locale;
import java.util.Random;
Expand Down Expand Up @@ -331,6 +334,12 @@ public DocValuesFormat getDocValuesFormatForField(String field) {
tmp.setMaxMergedSegmentMB(1000000.0);
//tmp.setReclaimDeletesWeight(3.0);
//tmp.setMaxMergedSegmentMB(7000.0);

// AGGRESSIVE UPDATE STORM SETTINGS:
// Allow more deletes before forcing merges
// nocommit
tmp.setDeletesPctAllowed(2.0);

conf.setMergePolicy(tmp);

if (!commit.equals("none")) {
Expand All @@ -339,10 +348,14 @@ public DocValuesFormat getDocValuesFormatForField(String field) {

// Make sure merges run @ higher prio than indexing:
final ConcurrentMergeScheduler cms = (ConcurrentMergeScheduler) conf.getMergeScheduler();
cms.setMaxMergesAndThreads(4, 1);
// Can swap to your own MergeScheduler impl
// cms.setMaxMergesAndThreads(4, 1);

conf.setMergedSegmentWarmer(new MergedReaderWarmer(field));

// Set infoStream to log to file
PrintStream infoStream = new PrintStream(new FileOutputStream("lucene-infostream.log", true), true, "UTF-8");
conf.setInfoStream(infoStream);
final IndexWriter w = new IndexWriter(dir, conf);
// w.setInfoStream(System.out);

Expand All @@ -360,8 +373,9 @@ public void afterUpdate() {
}
};
IndexWriter.DocStats stats = w.getDocStats();
final AtomicReference<Double> docsPerSecRef = new AtomicReference<>(docsPerSec / numIndexThreads);
IndexThreads indexThreads = new IndexThreads(random, w, new AtomicBoolean(false), docs, numIndexThreads, -1, false, false, mode,
(float) (docsPerSec / numIndexThreads), updatesListener, -1.0, stats.maxDoc);
docsPerSecRef, updatesListener, -1.0, stats.maxDoc);

// NativePosixUtil.mlockTermsDict(startR, "id");
final SearcherManager manager = new SearcherManager(w, null);
Expand All @@ -373,15 +387,35 @@ public void afterUpdate() {
}

final DirectSpellChecker spellChecker = new DirectSpellChecker();
final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, null);
final IndexState indexState = new IndexState(null, manager, null, field, spellChecker, "FastVectorHighlighter", null, new HashMap<>());
TaskParserFactory taskParserFactory =
new TaskParserFactory(indexState, field, analyzer, field, 10, random, null, null, -1, true, TestContext.parse(""));
final TaskSource tasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) {
// Periodically increase docsPerSec
Thread docsPerSecIncreaser = new Thread(() -> {
try {
int increaseCount = 0;
final int maxIncreases = 8;
while (increaseCount < maxIncreases) {
Thread.sleep(20000); // every 20 seconds
double newRate = docsPerSecRef.updateAndGet(rate -> rate * 2);
System.out.println("Increased docsPerSec per thread to " + newRate);
increaseCount++;
}
System.out.println("Reached max increases (" + maxIncreases + "), now peace time mode");
} catch (InterruptedException e) {
// exit thread
}
});
docsPerSecIncreaser.setDaemon(true);
docsPerSecIncreaser.start();
// Aggressive delete storm task source
final TaskSource baseTasks = new RandomTaskSource(tasksFile, random, taskParserFactory.getTaskParser()) {
@Override
public void taskDone(Task task, long queueTimeNS, TotalHits toalHitCount) {
searchesByTime[currentQT.get()].incrementAndGet();
}
};
final TaskSource tasks = baseTasks;
System.out.println("Task repeat count 1");
System.out.println("Tasks file " + tasksFile);
System.out.println("Num task per cat 20");
Expand Down
2 changes: 1 addition & 1 deletion src/main/perf/SearchPerfTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -416,7 +416,7 @@ public void warm(LeafReader reader) throws IOException {
// hardwired false:
boolean addDVFields = mode == Mode.BDV_UPDATE || mode == Mode.NDV_UPDATE;
LineFileDocs lineFileDocs = new LineFileDocs(lineDocsFile, false, storeBody, tvsBody, false, cloneDocs, null, null, null, addDVFields, null, 0, null);
IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, docsPerSecPerThread, null, -1.0, -1);
IndexThreads threads = new IndexThreads(new Random(17), writer, new AtomicBoolean(false), lineFileDocs, indexThreadCount, -1, false, false, mode, new java.util.concurrent.atomic.AtomicReference<>((double)docsPerSecPerThread), null, -1.0, -1);
threads.start();

mgr = new SearcherManager(writer, new SearcherFactory() {
Expand Down
13 changes: 9 additions & 4 deletions src/main/perf/TaskThreads.java
Original file line number Diff line number Diff line change
Expand Up @@ -115,21 +115,26 @@ public void run() {
// Run the task in the IndexSearcher's executor. This is important because IndexSearcher#search also uses the current thread to
// search, so not running #search from the executor would artificially use one more thread than configured via luceneutil.
// We're counting time within the task to not include forking time for the top-level search in the reported time.
final Task clonedTask = task.clone();
executor.submit(() -> {
task.startTimeNanos = System.nanoTime();
clonedTask.startTimeNanos = System.nanoTime();
try {
task.go(indexState, taskParser);
clonedTask.go(indexState, taskParser);
} catch (IOException ioe) {
throw new RuntimeException(ioe);
}
try {
tasks.taskDone(task, task.startTimeNanos-task.recvTimeNS, task.totalHitCount);
tasks.taskDone(clonedTask, clonedTask.startTimeNanos-clonedTask.recvTimeNS, clonedTask.totalHitCount);
} catch (Exception e) {
System.out.println(Thread.currentThread().getName() + ": ignoring exc:");
e.printStackTrace();
}
task.runTimeNanos = System.nanoTime()-task.startTimeNanos;
clonedTask.runTimeNanos = System.nanoTime()-clonedTask.startTimeNanos;
}).get();

// Copy results back to original task for reporting
task.totalHitCount = clonedTask.totalHitCount;
task.runTimeNanos = clonedTask.runTimeNanos;

task.threadID = threadID;
}
Expand Down