Skip to content
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/132548.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 132548
summary: Have top level knn searches tracked in query stats
area: Vector Search
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,7 @@ private static Timer maybeStartTimer(DfsProfiler profiler, DfsTimingType dtt) {
return null;
};

private static void executeKnnVectorQuery(SearchContext context) throws IOException {
static void executeKnnVectorQuery(SearchContext context) throws IOException {
SearchSourceBuilder source = context.request().source();
if (source == null || source.knnSearch().isEmpty()) {
return;
Expand All @@ -195,12 +195,18 @@ private static void executeKnnVectorQuery(SearchContext context) throws IOExcept
}
}
List<DfsKnnResults> knnResults = new ArrayList<>(knnVectorQueryBuilders.size());
final long afterQueryTime;
final long beforeQueryTime = System.nanoTime();
var opsListener = context.indexShard().getSearchOperationListener();
opsListener.onPreQueryPhase(context);
for (int i = 0; i < knnSearch.size(); i++) {
String knnField = knnVectorQueryBuilders.get(i).getFieldName();
String knnNestedPath = searchExecutionContext.nestedLookup().getNestedParent(knnField);
Query knnQuery = searchExecutionContext.toQuery(knnVectorQueryBuilders.get(i)).query();
knnResults.add(singleKnnSearch(knnQuery, knnSearch.get(i).k(), context.getProfilers(), context.searcher(), knnNestedPath));
}
afterQueryTime = System.nanoTime();
opsListener.onQueryPhase(context, afterQueryTime - beforeQueryTime);
context.dfsResult().knnResults(knnResults);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5608,6 +5608,7 @@ public void testShardExposesWriteLoadStats() throws Exception {
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
fakeClock,
Collections.emptyList(),
// Use a listener to advance the fake clock once per indexing operation:
new IndexingOperationListener() {
@Override
Expand Down Expand Up @@ -5753,6 +5754,7 @@ public void testShardExposesWriteLoadStats_variableRates() throws IOException {
RetentionLeaseSyncer.EMPTY,
EMPTY_EVENT_LISTENER,
fakeClock,
Collections.emptyList(),
// Use a listener to advance the fake clock once per indexing operation:
new IndexingOperationListener() {
@Override
Expand Down
125 changes: 123 additions & 2 deletions server/src/test/java/org/elasticsearch/search/dfs/DfsPhaseTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,44 @@
import org.apache.lucene.search.Query;
import org.apache.lucene.store.Directory;
import org.apache.lucene.tests.index.RandomIndexWriter;
import org.apache.lucene.util.Accountable;
import org.elasticsearch.action.search.SearchShardTask;
import org.elasticsearch.cluster.metadata.IndexMetadata;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.IndexVersion;
import org.elasticsearch.index.cache.bitset.BitsetFilterCache;
import org.elasticsearch.index.mapper.MapperMetrics;
import org.elasticsearch.index.mapper.MappingLookup;
import org.elasticsearch.index.query.ParsedQuery;
import org.elasticsearch.index.query.SearchExecutionContext;
import org.elasticsearch.index.shard.IndexShard;
import org.elasticsearch.index.shard.IndexShardTestCase;
import org.elasticsearch.index.shard.SearchOperationListener;
import org.elasticsearch.index.shard.ShardId;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.internal.ContextIndexSearcher;
import org.elasticsearch.search.internal.SearchContext;
import org.elasticsearch.search.profile.Profilers;
import org.elasticsearch.search.profile.SearchProfileDfsPhaseResult;
import org.elasticsearch.search.profile.query.CollectorResult;
import org.elasticsearch.search.profile.query.QueryProfileShardResult;
import org.elasticsearch.test.ESTestCase;
import org.elasticsearch.search.vectors.KnnSearchBuilder;
import org.elasticsearch.test.TestSearchContext;
import org.elasticsearch.threadpool.TestThreadPool;
import org.elasticsearch.threadpool.ThreadPool;
import org.junit.After;
import org.junit.Before;

import java.io.IOException;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.atomic.AtomicLong;

public class DfsPhaseTests extends ESTestCase {
import static org.elasticsearch.search.dfs.DfsPhase.executeKnnVectorQuery;

public class DfsPhaseTests extends IndexShardTestCase {

ThreadPoolExecutor threadPoolExecutor;
private TestThreadPool threadPool;
Expand All @@ -49,6 +71,105 @@ public void cleanup() {
terminate(threadPool);
}

public void testKnnSearch() throws IOException {
AtomicLong queryCount = new AtomicLong();
AtomicLong queryTime = new AtomicLong();

IndexShard indexShard = newShard(true, List.of(new SearchOperationListener() {
@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
queryCount.incrementAndGet();
queryTime.addAndGet(tookInNanos);
}
}));
try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir, newIndexWriterConfig())) {
int numDocs = randomIntBetween(900, 1000);
for (int i = 0; i < numDocs; i++) {
Document d = new Document();
d.add(new KnnFloatVectorField("float_vector", new float[] { i, 0, 0 }));
w.addDocument(d);
}
w.flush();

IndexReader reader = w.getReader();
ContextIndexSearcher searcher = new ContextIndexSearcher(
reader,
IndexSearcher.getDefaultSimilarity(),
IndexSearcher.getDefaultQueryCache(),
IndexSearcher.getDefaultQueryCachingPolicy(),
randomBoolean(),
threadPoolExecutor,
threadPoolExecutor.getMaximumPoolSize(),
1
);
IndexSettings indexSettings = new IndexSettings(
IndexMetadata.builder("index")
.settings(Settings.builder().put(IndexMetadata.SETTING_VERSION_CREATED, IndexVersion.current()))
.numberOfShards(1)
.numberOfReplicas(0)
.creationDate(System.currentTimeMillis())
.build(),
Settings.EMPTY
);
BitsetFilterCache bitsetFilterCache = new BitsetFilterCache(indexSettings, new BitsetFilterCache.Listener() {
@Override
public void onCache(ShardId shardId, Accountable accountable) {

}

@Override
public void onRemoval(ShardId shardId, Accountable accountable) {

}
});
SearchExecutionContext searchExecutionContext = new SearchExecutionContext(
0,
0,
indexSettings,
bitsetFilterCache,
null,
null,
MappingLookup.EMPTY,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
null,
Collections.emptyMap(),
null,
MapperMetrics.NOOP
);

Query query = new KnnFloatVectorQuery("float_vector", new float[] { 0, 0, 0 }, numDocs, null);
try (TestSearchContext context = new TestSearchContext(searchExecutionContext, indexShard, searcher) {
@Override
public DfsSearchResult dfsResult() {
return new DfsSearchResult(null, null, null);
}
}) {
context.request()
.source(
new SearchSourceBuilder().knnSearch(
List.of(new KnnSearchBuilder("float_vector", new float[] { 0, 0, 0 }, numDocs, numDocs, null, null))
)
);
context.setTask(new SearchShardTask(123L, "", "", "", null, Collections.emptyMap()));
context.parsedQuery(new ParsedQuery(query));
executeKnnVectorQuery(context);
assertTrue(queryCount.get() > 0);
assertTrue(queryTime.get() > 0);
reader.close();
closeShards(indexShard);
}
}
}

public void testSingleKnnSearch() throws IOException {
try (Directory dir = newDirectory(); RandomIndexWriter w = new RandomIndexWriter(random(), dir, newIndexWriterConfig())) {
int numDocs = randomIntBetween(900, 1000);
Expand Down
Loading