Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/changelog/130834.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 130834
summary: Ensure vectors are always included in reindex actions
area: Vector Search
type: enhancement
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.elasticsearch.script.Script;
import org.elasticsearch.script.ScriptService;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -119,6 +120,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
BulkByScrollTask task,
boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm,
boolean needsVectors,
Logger logger,
ParentTaskAssigningClient client,
ThreadPool threadPool,
Expand All @@ -131,6 +133,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
task,
needsSourceDocumentVersions,
needsSourceDocumentSeqNoAndPrimaryTerm,
needsVectors,
logger,
client,
client,
Expand All @@ -146,6 +149,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
BulkByScrollTask task,
boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm,
boolean needsVectors,
Logger logger,
ParentTaskAssigningClient searchClient,
ParentTaskAssigningClient bulkClient,
Expand Down Expand Up @@ -173,7 +177,7 @@ public abstract class AbstractAsyncBulkByScrollAction<
bulkRetry = new Retry(BackoffPolicy.wrap(backoffPolicy, worker::countBulkRetry), threadPool);
scrollSource = buildScrollableResultSource(
backoffPolicy,
prepareSearchRequest(mainRequest, needsSourceDocumentVersions, needsSourceDocumentSeqNoAndPrimaryTerm)
prepareSearchRequest(mainRequest, needsSourceDocumentVersions, needsSourceDocumentSeqNoAndPrimaryTerm, needsVectors)
);
scriptApplier = Objects.requireNonNull(buildScriptApplier(), "script applier must not be null");
}
Expand All @@ -186,7 +190,8 @@ public abstract class AbstractAsyncBulkByScrollAction<
static <Request extends AbstractBulkByScrollRequest<Request>> SearchRequest prepareSearchRequest(
Request mainRequest,
boolean needsSourceDocumentVersions,
boolean needsSourceDocumentSeqNoAndPrimaryTerm
boolean needsSourceDocumentSeqNoAndPrimaryTerm,
boolean needsVectors
) {
var preparedSearchRequest = new SearchRequest(mainRequest.getSearchRequest());

Expand All @@ -205,6 +210,16 @@ static <Request extends AbstractBulkByScrollRequest<Request>> SearchRequest prep
sourceBuilder.version(needsSourceDocumentVersions);
sourceBuilder.seqNoAndPrimaryTerm(needsSourceDocumentSeqNoAndPrimaryTerm);

if (needsVectors) {
// always include vectors in the response unless explicitly set
var fetchSource = sourceBuilder.fetchSource();
if (fetchSource == null) {
sourceBuilder.fetchSource(FetchSourceContext.FETCH_ALL_SOURCE);
} else if (fetchSource.excludeVectors() == null) {
sourceBuilder.excludeVectors(false);
}
}

/*
* Do not open scroll if max docs <= scroll size and not resuming on version conflicts
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ public AsyncDeleteByQueryAction(
ScriptService scriptService,
ActionListener<BulkByScrollResponse> listener
) {
super(task, false, true, logger, client, threadPool, request, listener, scriptService, null);
super(task, false, true, false, logger, client, threadPool, request, listener, scriptService, null);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,6 +237,7 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Re
*/
request.getDestination().versionType() != VersionType.INTERNAL,
false,
true,
logger,
searchClient,
bulkClient,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ static class AsyncIndexBySearchAction extends AbstractAsyncBulkByScrollAction<Up
// use sequence number powered optimistic concurrency control unless requested
request.getSearchRequest().source() != null && Boolean.TRUE.equals(request.getSearchRequest().source().version()),
true,
true,
logger,
client,
threadPool,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.elasticsearch.common.xcontent.LoggingDeprecationHandler;
import org.elasticsearch.common.xcontent.XContentHelper;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.search.fetch.subphase.FetchSourceContext;
import org.elasticsearch.search.sort.FieldSortBuilder;
import org.elasticsearch.search.sort.SortBuilder;
import org.elasticsearch.xcontent.NamedXContentRegistry;
Expand Down Expand Up @@ -140,13 +141,33 @@ static Request initialSearch(SearchRequest searchRequest, BytesReference query,
}
}

if (searchRequest.source().fetchSource() != null) {
entity.field("_source", searchRequest.source().fetchSource());
} else {
var fetchSource = searchRequest.source().fetchSource();
if (fetchSource == null) {
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
// Versions before 1.0 don't support `"_source": true` so we have to ask for the source as a stored field.
entity.field("_source", true);
}
} else {
if (remoteVersion.onOrAfter(Version.V_9_1_0) || fetchSource.excludeVectors() == null) {
entity.field("_source", fetchSource);
} else {
// Versions before 9.1.0 don't support "exclude_vectors" so we need to manually convert.
if (fetchSource.includes().length == 0 && fetchSource.excludes().length == 0) {
if (remoteVersion.onOrAfter(Version.fromId(1000099))) {
// Versions before 1.0 don't support `"_source": true` so we have to ask for the source as a stored field.
entity.field("_source", true);
}
} else {
entity.startObject("_source");
if (fetchSource.includes().length > 0) {
entity.field(FetchSourceContext.INCLUDES_FIELD.getPreferredName(), fetchSource.includes());
}
if (fetchSource.excludes().length > 0) {
entity.field(FetchSourceContext.EXCLUDES_FIELD.getPreferredName(), fetchSource.excludes());
}
entity.endObject();
}
}
}

entity.endObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -876,15 +876,15 @@ public void done(TimeValue extraKeepAlive) {}
}

public void testEnableScrollByDefault() {
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);
assertThat(preparedSearchRequest.scroll(), notNullValue());
}

public void testEnableScrollWhenMaxDocsIsGreaterThenScrollSize() {
testRequest.setMaxDocs(between(101, 1000));
testRequest.getSearchRequest().source().size(100);

var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);

assertThat(preparedSearchRequest.scroll(), notNullValue());
}
Expand All @@ -893,7 +893,7 @@ public void testDisableScrollWhenMaxDocsIsLessThenScrollSize() {
testRequest.setMaxDocs(between(1, 100));
testRequest.getSearchRequest().source().size(100);

var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);

assertThat(preparedSearchRequest.scroll(), nullValue());
}
Expand All @@ -903,7 +903,7 @@ public void testEnableScrollWhenProceedOnVersionConflict() {
testRequest.getSearchRequest().source().size(100);
testRequest.setAbortOnVersionConflict(false);

var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false);
var preparedSearchRequest = AbstractAsyncBulkByScrollAction.prepareSearchRequest(testRequest, false, false, false);

assertThat(preparedSearchRequest.scroll(), notNullValue());
}
Expand Down Expand Up @@ -943,6 +943,7 @@ private class DummyAsyncBulkByScrollAction extends AbstractAsyncBulkByScrollActi
testTask,
randomBoolean(),
randomBoolean(),
randomBoolean(),
AsyncBulkByScrollActionTests.this.logger,
new ParentTaskAssigningClient(client, localNode, testTask),
client.threadPool(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
package org.elasticsearch.reindex;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.ReindexRequestBuilder;
Expand All @@ -22,7 +24,10 @@
import java.util.stream.Collectors;

import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
Expand Down Expand Up @@ -175,4 +180,60 @@ public void testReindexFromComplexDateMathIndexName() throws Exception {
assertHitCount(prepareSearch(destIndexName).setSize(0), 4);
}

public void testReindexIncludeVectors() throws Exception {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

😍

var resp1 = prepareCreate("test").setSettings(
Settings.builder().put(IndexSettings.INDEX_MAPPING_SOURCE_SYNTHETIC_VECTORS_SETTING.getKey(), true).build()
).setMapping("foo", "type=dense_vector,similarity=l2_norm", "bar", "type=sparse_vector").get();
assertAcked(resp1);

var resp2 = prepareCreate("test_reindex").setSettings(
Settings.builder().put(IndexSettings.INDEX_MAPPING_SOURCE_SYNTHETIC_VECTORS_SETTING.getKey(), true).build()
).setMapping("foo", "type=dense_vector,similarity=l2_norm", "bar", "type=sparse_vector").get();
assertAcked(resp2);

indexRandom(
true,
prepareIndex("test").setId("1").setSource("foo", List.of(3f, 2f, 1.5f), "bar", Map.of("token_1", 4f, "token_2", 7f))
);

var searchResponse = prepareSearch("test").get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.size(), equalTo(0));
} finally {
searchResponse.decRef();
}

// Copy all the docs
ReindexRequestBuilder copy = reindex().source("test").destination("test_reindex").refresh(true);
var reindexResponse = copy.get();
assertThat(reindexResponse, matcher().created(1));

searchResponse = prepareSearch("test_reindex").get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.size(), equalTo(0));
} finally {
searchResponse.decRef();
}

searchResponse = prepareSearch("test_reindex").setExcludeVectors(false).get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.get("foo"), anyOf(equalTo(List.of(3f, 2f, 1.5f)), equalTo(List.of(3d, 2d, 1.5d))));
assertThat(
sourceMap.get("bar"),
anyOf(equalTo(Map.of("token_1", 4f, "token_2", 7f)), equalTo(Map.of("token_1", 4d, "token_2", 7d)))
);
} finally {
searchResponse.decRef();
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
package org.elasticsearch.reindex;

import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.index.IndexSettings;
import org.elasticsearch.index.reindex.AbstractBulkByScrollRequest;
import org.elasticsearch.index.reindex.BulkByScrollResponse;
import org.elasticsearch.index.reindex.UpdateByQueryRequestBuilder;
Expand All @@ -23,7 +25,10 @@
import java.util.stream.Collectors;

import static org.elasticsearch.index.query.QueryBuilders.termQuery;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertAcked;
import static org.elasticsearch.test.hamcrest.ElasticsearchAssertions.assertHitCount;
import static org.hamcrest.Matchers.anyOf;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.hasSize;

public class UpdateByQueryBasicTests extends ReindexTestCase {
Expand Down Expand Up @@ -150,4 +155,54 @@ public void testMissingSources() {
.get();
assertThat(response, matcher().updated(0).slices(hasSize(0)));
}

public void testUpdateByQueryIncludeVectors() throws Exception {
var resp1 = prepareCreate("test").setSettings(
Settings.builder().put(IndexSettings.INDEX_MAPPING_SOURCE_SYNTHETIC_VECTORS_SETTING.getKey(), true).build()
).setMapping("foo", "type=dense_vector,similarity=l2_norm", "bar", "type=sparse_vector").get();
assertAcked(resp1);

indexRandom(
true,
prepareIndex("test").setId("1").setSource("foo", List.of(3.0f, 2.0f, 1.5f), "bar", Map.of("token_1", 4f, "token_2", 7f))
);

var searchResponse = prepareSearch("test").get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.size(), equalTo(0));
} finally {
searchResponse.decRef();
}

// Copy all the docs
var updateByQueryResponse = updateByQuery().source("test").refresh(true).get();
assertThat(updateByQueryResponse, matcher().updated(1L));

searchResponse = prepareSearch("test").get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.size(), equalTo(0));
} finally {
searchResponse.decRef();
}

searchResponse = prepareSearch("test").setExcludeVectors(false).get();
try {
assertThat(searchResponse.getHits().getTotalHits().value(), equalTo(1L));
assertThat(searchResponse.getHits().getHits().length, equalTo(1));
var sourceMap = searchResponse.getHits().getAt(0).getSourceAsMap();
assertThat(sourceMap.get("foo"), anyOf(equalTo(List.of(3f, 2f, 1.5f)), equalTo(List.of(3d, 2d, 1.5d))));
assertThat(
sourceMap.get("bar"),
anyOf(equalTo(Map.of("token_1", 4f, "token_2", 7f)), equalTo(Map.of("token_1", 4d, "token_2", 7d)))
);
} finally {
searchResponse.decRef();
}
}
}
Loading
Loading