Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132548.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132548
summary: Have top level knn searches tracked in query stats
area: Vector Search
type: bug
issues: []
25 changes: 19 additions & 6 deletions server/src/main/java/org/elasticsearch/search/dfs/DfsPhase.java
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private static Timer maybeStartTimer(DfsProfiler profiler, DfsTimingType dtt) {
return null;
};

private static void executeKnnVectorQuery(SearchContext context) throws IOException {
static void executeKnnVectorQuery(SearchContext context) throws IOException {
SearchSourceBuilder source = context.request().source();
if (source == null || source.knnSearch().isEmpty()) {
return;
Expand All @@ -195,11 +195,24 @@ private static void executeKnnVectorQuery(SearchContext context) throws IOExcept
}
}
List<DfsKnnResults> knnResults = new ArrayList<>(knnVectorQueryBuilders.size());
for (int i = 0; i < knnSearch.size(); i++) {
String knnField = knnVectorQueryBuilders.get(i).getFieldName();
String knnNestedPath = searchExecutionContext.nestedLookup().getNestedParent(knnField);
Query knnQuery = searchExecutionContext.toQuery(knnVectorQueryBuilders.get(i)).query();
knnResults.add(singleKnnSearch(knnQuery, knnSearch.get(i).k(), context.getProfilers(), context.searcher(), knnNestedPath));
final long afterQueryTime;
final long beforeQueryTime = System.nanoTime();
var opsListener = context.indexShard().getSearchOperationListener();
opsListener.onPreQueryPhase(context);
try {
for (int i = 0; i < knnSearch.size(); i++) {
String knnField = knnVectorQueryBuilders.get(i).getFieldName();
String knnNestedPath = searchExecutionContext.nestedLookup().getNestedParent(knnField);
Query knnQuery = searchExecutionContext.toQuery(knnVectorQueryBuilders.get(i)).query();
knnResults.add(singleKnnSearch(knnQuery, knnSearch.get(i).k(), context.getProfilers(), context.searcher(), knnNestedPath));
}
afterQueryTime = System.nanoTime();
opsListener.onQueryPhase(context, afterQueryTime - beforeQueryTime);
opsListener = null;
} finally {
if (opsListener != null) {
opsListener.onFailedQueryPhase(context);
}
}
context.dfsResult().knnResults(knnResults);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5627,6 +5627,7 @@ public void testShardExposesWriteLoadStats() throws Exception {
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
fakeClock,
Collections.emptyList(),
// Use a listener to advance the fake clock once per indexing operation:
new IndexingOperationListener() {
@Override
Expand Down Expand Up @@ -5772,6 +5773,7 @@ public void testShardExposesWriteLoadStats_variableRates() throws IOException {
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
fakeClock,
Collections.emptyList(),
// Use a listener to advance the fake clock once per indexing operation:
new IndexingOperationListener() {
@Override
Expand Down
125 changes: 123 additions & 2 deletions server/src/test/java/org/elasticsearch/search/dfs/DfsPhaseTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,44 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.profile.SearchProfileDfsPhaseResult;
import org.elasticsearch.search.profile.query.CollectorResult;
import org.elasticsearch.search.profile.query.QueryProfileShardResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.search.vectors.KnnSearchBuilder;
import org.elasticsearch.test.TestSearchContext;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

public class DfsPhaseTests extends ESTestCase {
import static org.elasticsearch.search.dfs.DfsPhase.executeKnnVectorQuery;

public class DfsPhaseTests extends IndexShardTestCase {

ThreadPoolExecutor threadPoolExecutor;
private TestThreadPool threadPool;
Expand All @@ -49,6 +71,105 @@ public void cleanup() {
terminate(threadPool);
}

public void testKnnSearch() throws IOException {
AtomicLong queryCount = new AtomicLong();
AtomicLong queryTime = new AtomicLong();

IndexShard indexShard = newShard(true, List.of(new SearchOperationListener() {
@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
queryCount.incrementAndGet();
queryTime.addAndGet(tookInNanos);
}
}));
try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir, newIndexWriterConfig())) {
int numDocs = randomIntBetween(900, 1000);
for (int i = 0; i < numDocs; i++) {
Document d = new Document();
d.add(new KnnFloatVectorField("float_vector", new float[] { i, 0, 0 }));
w.addDocument(d);
}
w.flush();

IndexReader reader = w.getReader();
ContextIndexSearcher searcher = new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
randomBoolean(),
threadPoolExecutor,
threadPoolExecutor.getMaximumPoolSize(),
1
);
IndexSettings indexSettings = new IndexSettings(
IndexMetadata.builder("index")
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()))
.numberOfShards(1)
.numberOfReplicas(0)
.creationDate(System.currentTimeMillis())
.build(),
Settings.EMPTY
);
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetFilterCache.Listener() {
@Override
public void onCache(ShardId shardId, Accountable accountable) {

}

@Override
public void onRemoval(ShardId shardId, Accountable accountable) {

}
});
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(
0,
0,
indexSettings,
bitsetFilterCache,
null,
null,
MappingLookup.EMPTY,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
Collections.emptyMap(),
null,
MapperMetrics.NOOP
);

Query query = new KnnFloatVectorQuery("float_vector", new float[] { 0, 0, 0 }, numDocs, null);
try (TestSearchContext context = new TestSearchContext(searchExecutionContext, indexShard, searcher) {
@Override
public DfsSearchResult dfsResult() {
return new DfsSearchResult(null, null, null);
}
}) {
context.request()
.source(
new SearchSourceBuilder().knnSearch(
List.of(new KnnSearchBuilder("float_vector", new float[] { 0, 0, 0 }, numDocs, numDocs, null, null))
)
);
context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()));
context.parsedQuery(new ParsedQuery(query));
executeKnnVectorQuery(context);
assertTrue(queryCount.get() > 0);
assertTrue(queryTime.get() > 0);
reader.close();
closeShards(indexShard);
}
}
}

public void testSingleKnnSearch() throws IOException {
try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir, newIndexWriterConfig())) {
int numDocs = randomIntBetween(900, 1000);
Expand Down
Loading