Skip to content
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ build/
**/.local*
.vagrant/
/logs/
**/target/

# osx stuff
.DS_Store
Expand Down
16 changes: 15 additions & 1 deletion qa/vector/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -51,13 +51,27 @@ tasks.register("checkVec", JavaExec) {
systemProperty "es.logger.out", "console"
systemProperty "es.logger.level", "INFO" // Change to DEBUG if needed
systemProperty 'es.nativelibs.path', TestUtil.getTestLibraryPath(file("../../libs/native/libraries/build/platform/").toString())
jvmArgs '-Xms4g', '-Xmx4g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError'
jvmArgs '-Xms16g', '-Xmx16g', '-Djava.util.concurrent.ForkJoinPool.common.parallelism=8', '-XX:+UnlockDiagnosticVMOptions', '-XX:+DebugNonSafepoints', '-XX:+HeapDumpOnOutOfMemoryError'
if (buildParams.getRuntimeJavaVersion().map { it.majorVersion.toInteger() }.get() >= 21) {
jvmArgs '--add-modules=jdk.incubator.vector', '--enable-native-access=ALL-UNNAMED'
}
if (System.getenv("DO_PROFILING") != null) {
jvmArgs '-XX:StartFlightRecording=dumponexit=true,maxsize=250M,filename=knn.jfr,settings=profile.jfc'
}
def asyncProfilerPath = System.getProperty("asyncProfiler.path", null)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am uncertain about this part about async profiler, but I trust your expertise on this.

if (asyncProfilerPath != null) {
if (OS.current().equals(OS.MAC)) {
def asyncProfilerAgent = "${asyncProfilerPath}/lib/libasyncProfiler.dylib"
println "Using async-profiler agent ${asyncProfilerAgent}"
jvmArgs "-agentpath:${asyncProfilerAgent}=start,event=cpu,interval=10ms,file=${layout.buildDirectory.asFile.get()}/tmp/elasticsearch-0_%t_%p.jfr"
} else if (OS.current().equals(OS.LINUX)) {
def asyncProfilerAgent = "${asyncProfilerPath}/lib/libasyncProfiler.so"
println "Using async-profiler agent ${asyncProfilerAgent}"
jvmArgs "-agentpath:${asyncProfilerAgent}=start,event=cpu,interval=10ms,wall=50ms,file=${layout.buildDirectory.asFile.get()}/tmp/elasticsearch-0_%t_%p.jfr"
} else {
println "Ignoring 'asyncProfiler.path': not available on ${OS.current()}";
}
}
Comment on lines +62 to +74
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am cool with this. However, why don't we add wall to MAC as well?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I tried it and had to back off. Looking at the error I got and at the async-profiler code, apparently only Linux has an implementation that uses perf events, which let you record both cpu time and wall time at the same time. On Mac, the engine behind is less flexible/precise, and you can have one or the other.

I'm wondering: maybe I should add an option of that, like adding a -DasyncProfiler.event=, default to cpu, but can be changed to wall so Mac users can have this choice?

if (buildParams.getIsRuntimeJavaHomeSet()) {
executable = "${buildParams.runtimeJavaHome.get()}/bin/java" + (OS.current() == OS.WINDOWS ? '.exe' : '')
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ record CmdLineArgs(
VectorEncoding vectorEncoding,
int dimensions,
boolean earlyTermination,
KnnIndexTester.MergePolicyType mergePolicy
KnnIndexTester.MergePolicyType mergePolicy,
double writerBufferSizeInMb
) implements ToXContentObject {

static final ParseField DOC_VECTORS_FIELD = new ParseField("doc_vectors");
Expand Down Expand Up @@ -82,6 +83,9 @@ record CmdLineArgs(
static final ParseField FILTER_SELECTIVITY_FIELD = new ParseField("filter_selectivity");
static final ParseField SEED_FIELD = new ParseField("seed");
static final ParseField MERGE_POLICY_FIELD = new ParseField("merge_policy");
static final ParseField WRITER_BUFFER_FIELD = new ParseField("writer_buffer_mb");

static final double DEFAULT_WRITER_BUFFER_MB = 128;

static CmdLineArgs fromXContent(XContentParser parser) throws IOException {
Builder builder = PARSER.apply(parser, null);
Expand Down Expand Up @@ -117,6 +121,7 @@ static CmdLineArgs fromXContent(XContentParser parser) throws IOException {
PARSER.declareFloat(Builder::setFilterSelectivity, FILTER_SELECTIVITY_FIELD);
PARSER.declareLong(Builder::setSeed, SEED_FIELD);
PARSER.declareString(Builder::setMergePolicy, MERGE_POLICY_FIELD);
PARSER.declareDouble(Builder::setWriterBufferMb, WRITER_BUFFER_FIELD);
}

@Override
Expand Down Expand Up @@ -186,6 +191,7 @@ static class Builder {
private float filterSelectivity = 1f;
private long seed = 1751900822751L;
private KnnIndexTester.MergePolicyType mergePolicy = null;
private double writerBufferSizeInMb = DEFAULT_WRITER_BUFFER_MB;

public Builder setDocVectors(List<String> docVectors) {
if (docVectors == null || docVectors.isEmpty()) {
Expand Down Expand Up @@ -316,6 +322,11 @@ public Builder setMergePolicy(String mergePolicy) {
return this;
}

public Builder setWriterBufferMb(double writerBufferSizeInMb) {
this.writerBufferSizeInMb = writerBufferSizeInMb;
return this;
}

public CmdLineArgs build() {
if (docVectors == null) {
throw new IllegalArgumentException("Document vectors path must be provided");
Expand Down Expand Up @@ -350,7 +361,8 @@ public CmdLineArgs build() {
vectorEncoding,
dimensions,
earlyTermination,
mergePolicy
mergePolicy,
writerBufferSizeInMb
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,8 @@ public static void main(String[] args) throws Exception {
cmdLineArgs.dimensions(),
cmdLineArgs.vectorSpace(),
cmdLineArgs.numDocs(),
mergePolicy
mergePolicy,
cmdLineArgs.writerBufferSizeInMb()
);
if (cmdLineArgs.reindex() == false && Files.exists(indexPath) == false) {
throw new IllegalArgumentException("Index path does not exist: " + indexPath);
Expand Down
29 changes: 23 additions & 6 deletions qa/vector/src/main/java/org/elasticsearch/test/knn/KnnIndexer.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import org.apache.lucene.index.ConcurrentMergeScheduler;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.IndexableField;
import org.apache.lucene.index.MergePolicy;
import org.apache.lucene.index.VectorEncoding;
import org.apache.lucene.index.VectorSimilarityFunction;
Expand Down Expand Up @@ -65,7 +66,6 @@
import static org.elasticsearch.test.knn.KnnIndexTester.logger;

class KnnIndexer {
private static final double WRITER_BUFFER_MB = 128;
static final String ID_FIELD = "id";
static final String VECTOR_FIELD = "vector";

Expand All @@ -78,6 +78,7 @@ class KnnIndexer {
private final int numDocs;
private final int numIndexThreads;
private final MergePolicy mergePolicy;
private final double writerBufferSizeInMb;

KnnIndexer(
List<Path> docsPath,
Expand All @@ -88,7 +89,8 @@ class KnnIndexer {
int dim,
VectorSimilarityFunction similarityFunction,
int numDocs,
MergePolicy mergePolicy
MergePolicy mergePolicy,
double writerBufferSizeInMb
) {
this.docsPath = docsPath;
this.indexPath = indexPath;
Expand All @@ -99,12 +101,14 @@ class KnnIndexer {
this.similarityFunction = similarityFunction;
this.numDocs = numDocs;
this.mergePolicy = mergePolicy;
this.writerBufferSizeInMb = writerBufferSizeInMb;
}

void createIndex(KnnIndexTester.Results result) throws IOException, InterruptedException, ExecutionException {
IndexWriterConfig iwc = new IndexWriterConfig().setOpenMode(IndexWriterConfig.OpenMode.CREATE);
iwc.setCodec(codec);
iwc.setRAMBufferSizeMB(WRITER_BUFFER_MB);
iwc.setMaxBufferedDocs(IndexWriterConfig.DISABLE_AUTO_FLUSH);
iwc.setRAMBufferSizeMB(writerBufferSizeInMb);
Comment on lines +110 to +111
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to be careful, we should by default benchmark with ES defaults. Optimizing our benchmarks but not our production code can give a false sense of improvement

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

setRAMBufferSizeMB is the same default as before, but now I'm not sure it is the same as ES. I'll check.
As for the number of docs... I'll check that too. I supposed these settings are exclusive, but it might be they are "first met wins" instead.
If that's the case, I'll default to what ES has and add an input option for that too.

iwc.setUseCompoundFile(false);
if (mergePolicy != null) {
iwc.setMergePolicy(mergePolicy);
Expand Down Expand Up @@ -248,6 +252,9 @@ static class IndexerThread extends Thread {
private final float[] floatVectorBuffer;
private final VectorReader in;

long readTime;
long docAddTime;

private IndexerThread(
IndexWriter iw,
VectorReader in,
Expand Down Expand Up @@ -283,6 +290,7 @@ public void run() {
} catch (IOException ioe) {
throw new UncheckedIOException(ioe);
}
logger.debug("Index thread times: [{}s] read, [{}s] add doc", readTime / 1e-9, docAddTime / 1e-9);
}

private void _run() throws IOException {
Expand All @@ -294,23 +302,32 @@ private void _run() throws IOException {
continue;
}

Document doc = new Document();
var startRead = System.nanoTime();
final IndexableField field;
switch (vectorEncoding) {
case BYTE -> {
in.next(byteVectorBuffer);
doc.add(new KnnByteVectorField(VECTOR_FIELD, byteVectorBuffer, fieldType));
field = new KnnByteVectorField(VECTOR_FIELD, byteVectorBuffer, fieldType);
}
case FLOAT32 -> {
in.next(floatVectorBuffer);
doc.add(new KnnFloatVectorField(VECTOR_FIELD, floatVectorBuffer, fieldType));
field = new KnnFloatVectorField(VECTOR_FIELD, floatVectorBuffer, fieldType);
}
default -> throw new UnsupportedOperationException();
}
long endRead = System.nanoTime();
readTime += (endRead - startRead);

Document doc = new Document();
doc.add(field);

if ((id + 1) % 25000 == 0) {
logger.debug("Done indexing " + (id + 1) + " documents.");
}
doc.add(new StoredField(ID_FIELD, id));
iw.addDocument(doc);

docAddTime += (System.nanoTime() - endRead);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@ private int numLockedResources() {
@Override
public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataType dataType) throws InterruptedException {
try {
var started = System.nanoTime();
lock.lock();

boolean allConditionsMet = false;
Expand Down Expand Up @@ -181,6 +182,8 @@ public ManagedCuVSResources acquire(int numVectors, int dims, CuVSMatrix.DataTyp
enoughResourcesCondition.await();
}
}
var elapsed = started - System.nanoTime();
logger.debug("Resource acquired in [{}ms]", elapsed / 1e-6);
res.locked = true;
return res;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,17 +162,21 @@ public KnnFieldVectorsWriter<?> addField(FieldInfo fieldInfo) throws IOException
@Override
// TODO: fix sorted index case
public void flush(int maxDoc, Sorter.DocMap sortMap) throws IOException {
var started = System.nanoTime();
flatVectorWriter.flush(maxDoc, sortMap);
try {
flushFieldsWithoutMemoryMappedFile(sortMap);
} catch (Throwable t) {
throw new IOException("Failed to flush GPU index: ", t);
}
var elapsed = started - System.nanoTime();
logger.debug("Flush total time [{}ms]", elapsed / 1e-6);
}

private void flushFieldsWithoutMemoryMappedFile(Sorter.DocMap sortMap) throws IOException, InterruptedException {
// No tmp file written, or the file cannot be mmapped
for (FieldWriter field : fields) {
var started = System.nanoTime();
var fieldInfo = field.fieldInfo;

var numVectors = field.flatFieldVectorsWriter.getVectors().size();
Expand Down Expand Up @@ -205,6 +209,8 @@ private void flushFieldsWithoutMemoryMappedFile(Sorter.DocMap sortMap) throws IO
cuVSResourceManager.release(cuVSResources);
}
}
var elapsed = started - System.nanoTime();
logger.debug("Flushed [{}] vectors in [{}ms]", numVectors, elapsed / 1e-6);
}
}

Expand Down Expand Up @@ -447,6 +453,7 @@ private static void deleteFilesIgnoringExceptions(Directory dir, String fileName
@Override
// fix sorted index case
public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOException {
var started = System.nanoTime();
flatVectorWriter.mergeOneField(fieldInfo, mergeState);
final int numVectors;
String tempRawVectorsFileName = null;
Expand Down Expand Up @@ -526,6 +533,8 @@ public void mergeOneField(FieldInfo fieldInfo, MergeState mergeState) throws IOE
} finally {
deleteFilesIgnoringExceptions(mergeState.segmentInfo.dir, tempRawVectorsFileName);
}
var elapsed = started - System.nanoTime();
logger.debug("Merged [{}] vectors in [{}ms]", numVectors, elapsed / 1e-6);
}

private ByteVectorValues getMergedByteVectorValues(FieldInfo fieldInfo, MergeState mergeState) throws IOException {
Expand Down