Skip to content

Commit f688096

Browse files
committed
reindex support build source from fields result
1 parent 6edc76d commit f688096

File tree

4 files changed

+319
-2
lines changed

4 files changed

+319
-2
lines changed

modules/reindex/src/test/java/org/elasticsearch/reindex/ClientScrollableHitSourceTests.java

Lines changed: 128 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,11 @@
2323
import org.elasticsearch.client.internal.support.AbstractClient;
2424
import org.elasticsearch.common.BackoffPolicy;
2525
import org.elasticsearch.common.bytes.BytesArray;
26+
import org.elasticsearch.common.bytes.BytesReference;
27+
import org.elasticsearch.common.document.DocumentField;
2628
import org.elasticsearch.common.settings.Settings;
2729
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
30+
import org.elasticsearch.common.xcontent.XContentHelper;
2831
import org.elasticsearch.core.TimeValue;
2932
import org.elasticsearch.index.reindex.ClientScrollableHitSource;
3033
import org.elasticsearch.index.reindex.ScrollableHitSource;
@@ -38,7 +41,9 @@
3841
import org.junit.After;
3942
import org.junit.Before;
4043

44+
import java.util.HashMap;
4145
import java.util.List;
46+
import java.util.Map;
4247
import java.util.concurrent.ArrayBlockingQueue;
4348
import java.util.concurrent.BlockingQueue;
4449
import java.util.concurrent.TimeUnit;
@@ -159,6 +164,129 @@ public void testScrollKeepAlive() {
159164
client.validateRequest(TransportSearchScrollAction.TYPE, (SearchScrollRequest r) -> assertEquals(r.scroll().seconds(), 110));
160165
}
161166

167+
public void testGenerateSourceWithFields() {
168+
// Test case: source exists and fields should be merged
169+
SearchHit hit = SearchHit.unpooled(0, "id");
170+
hit.sourceRef(new BytesArray("{\"existing_field\":\"existing_value\"}"));
171+
172+
Map<String, DocumentField> fields = new HashMap<>();
173+
fields.put("single_value_field", new DocumentField("single_value_field", List.of("single_value")));
174+
fields.put("multi_value_field", new DocumentField("multi_value_field", List.of("value1", "value2")));
175+
fields.put("existing_field", new DocumentField("existing_field", List.of("new_value")));
176+
hit.addDocumentFields(fields, new HashMap<>());
177+
178+
BytesReference result = ClientScrollableHitSource.generateSource(hit);
179+
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();
180+
181+
// Verify source contains all expected fields
182+
assertEquals("existing_value", resultSource.get("existing_field")); // existing field should not be overwritten
183+
assertEquals("single_value", resultSource.get("single_value_field"));
184+
assertEquals(List.of("value1", "value2"), resultSource.get("multi_value_field"));
185+
}
186+
187+
public void testGenerateSourceWithEmptyFields() {
188+
// Test case: fields is empty
189+
SearchHit hit = SearchHit.unpooled(0, "id");
190+
hit.sourceRef(new BytesArray("{\"field1\":\"value1\",\"field2\":\"value2\"}"));
191+
hit.addDocumentFields(new HashMap<>(), new HashMap<>());
192+
193+
BytesReference result = ClientScrollableHitSource.generateSource(hit);
194+
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();
195+
196+
// Source should remain unchanged
197+
assertEquals("value1", resultSource.get("field1"));
198+
assertEquals("value2", resultSource.get("field2"));
199+
}
200+
201+
public void testGenerateSourceWithEmptySource() {
202+
// Test case: source is empty, fields exist
203+
SearchHit hit = SearchHit.unpooled(0, "id");
204+
hit.sourceRef(new BytesArray("{}"));
205+
206+
Map<String, DocumentField> fields = new HashMap<>();
207+
fields.put("field1", new DocumentField("field1", List.of("value1")));
208+
fields.put("field2", new DocumentField("field2", List.of("value2", "value3")));
209+
hit.addDocumentFields(fields, new HashMap<>());
210+
211+
BytesReference result = ClientScrollableHitSource.generateSource(hit);
212+
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();
213+
214+
assertEquals("value1", resultSource.get("field1"));
215+
assertEquals(List.of("value2", "value3"), resultSource.get("field2"));
216+
}
217+
218+
public void testGenerateSourceWithBothEmpty() {
219+
// Test case: both source and fields are empty
220+
SearchHit hit = SearchHit.unpooled(0, "id");
221+
hit.sourceRef(new BytesArray("{}"));
222+
hit.addDocumentFields(new HashMap<>(), new HashMap<>());
223+
224+
BytesReference result = ClientScrollableHitSource.generateSource(hit);
225+
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();
226+
227+
assertTrue(resultSource.isEmpty());
228+
}
229+
230+
public void testGenerateSourceWithComplexSource() {
231+
// Test case: complex source with nested objects
232+
SearchHit hit = SearchHit.unpooled(0, "id");
233+
hit.sourceRef(new BytesArray("{\"nested\":{\"field1\":\"value1\"}}"));
234+
235+
Map<String, DocumentField> fields = new HashMap<>();
236+
fields.put("new_field", new DocumentField("new_field", List.of("value2")));
237+
hit.addDocumentFields(fields, new HashMap<>());
238+
239+
BytesReference result = ClientScrollableHitSource.generateSource(hit);
240+
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();
241+
242+
@SuppressWarnings("unchecked")
243+
Map<String, Object> nested = (Map<String, Object>) resultSource.get("nested");
244+
assertEquals("value1", nested.get("field1"));
245+
assertEquals("value2", resultSource.get("new_field"));
246+
}
247+
248+
public void testGenerateSourceWithNoSource() {
249+
// Test case: no source, only fields
250+
SearchHit hit = SearchHit.unpooled(0, "id");
251+
hit.sourceRef(null);
252+
253+
Map<String, DocumentField> fields = new HashMap<>();
254+
fields.put("field1", new DocumentField("field1", List.of("value1")));
255+
hit.addDocumentFields(fields, new HashMap<>());
256+
257+
BytesReference result = ClientScrollableHitSource.generateSource(hit);
258+
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();
259+
260+
// Source value should be preserved
261+
assertEquals("value1", resultSource.get("field1"));
262+
}
263+
264+
public void testGenerateSourceWithNoSourceAndNoFields() {
265+
// Test case: no source and no fields
266+
SearchHit hit = SearchHit.unpooled(0, "id");
267+
hit.sourceRef(null);
268+
hit.addDocumentFields(new HashMap<>(), new HashMap<>());
269+
270+
BytesReference result = ClientScrollableHitSource.generateSource(hit);
271+
assertNull(result);
272+
}
273+
274+
public void testGenerateSourceWithExistingFieldInSource() {
275+
// Test case: field already exists in source
276+
SearchHit hit = SearchHit.unpooled(0, "id");
277+
hit.sourceRef(new BytesArray("{\"field1\":\"source_value\"}"));
278+
279+
Map<String, DocumentField> fields = new HashMap<>();
280+
fields.put("field1", new DocumentField("field1", List.of("field_value")));
281+
hit.addDocumentFields(fields, new HashMap<>());
282+
283+
BytesReference result = ClientScrollableHitSource.generateSource(hit);
284+
Map<String, Object> resultSource = XContentHelper.convertToMap(result, false, XContentHelper.xContentType(result)).v2();
285+
286+
// Source value should be preserved
287+
assertEquals("source_value", resultSource.get("field1"));
288+
}
289+
162290
private SearchResponse createSearchResponse() {
163291
// create a simulated response.
164292
SearchHit hit = SearchHit.unpooled(0, "id").sourceRef(new BytesArray("{}"));
Lines changed: 149 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
setup:
2+
- requires:
3+
cluster_features: [ "reindex.support_from_fields" ]
4+
reason: "reindexing support fields introduced"
5+
6+
- do:
7+
indices.create:
8+
index: source_docvalue
9+
body:
10+
mappings:
11+
_source:
12+
excludes: ["excluded_field"]
13+
properties:
14+
included_field:
15+
type: keyword
16+
excluded_field:
17+
type: keyword
18+
date_field:
19+
type: date
20+
long_field:
21+
type: long
22+
23+
- do:
24+
bulk:
25+
refresh: true
26+
index: source_docvalue
27+
body:
28+
- '{"index": {}}'
29+
- '{"included_field": "value1", "excluded_field": "excluded1", "date_field": "2024-01-01", "long_field": 123}'
30+
- '{"index": {}}'
31+
- '{"included_field": "value2", "excluded_field": "excluded2", "date_field": "2024-01-02", "long_field": 456}'
32+
33+
- do:
34+
indices.create:
35+
index: target_docvalue
36+
body:
37+
mappings:
38+
properties:
39+
included_field:
40+
type: keyword
41+
excluded_field:
42+
type: keyword
43+
date_field:
44+
type: date
45+
long_field:
46+
type: long
47+
48+
- do:
49+
indices.create:
50+
index: target_docvalue2
51+
body:
52+
mappings:
53+
properties:
54+
included_field:
55+
type: keyword
56+
excluded_field:
57+
type: keyword
58+
date_field:
59+
type: date
60+
long_field:
61+
type: long
62+
63+
---
64+
from docvalue fields:
65+
- do:
66+
reindex:
67+
refresh: true
68+
body:
69+
source:
70+
index: source_docvalue
71+
docvalue_fields: ["excluded_field"]
72+
dest:
73+
index: target_docvalue
74+
75+
- match: {created: 2}
76+
- match: {updated: 0}
77+
- match: {version_conflicts: 0}
78+
- match: {batches: 1}
79+
- match: {failures: []}
80+
- match: {throttled_millis: 0}
81+
- gte: { took: 0 }
82+
- is_false: task
83+
- is_false: deleted
84+
85+
- do:
86+
search:
87+
index: target_docvalue
88+
body:
89+
sort:
90+
- included_field: asc
91+
- match: { hits.total.value: 2 }
92+
- match:
93+
hits.hits.0._source:
94+
included_field: value1
95+
excluded_field: excluded1
96+
date_field: "2024-01-01"
97+
long_field: 123
98+
- match:
99+
hits.hits.1._source:
100+
included_field: value2
101+
excluded_field: excluded2
102+
date_field: "2024-01-02"
103+
long_field: 456
104+
105+
---
106+
from docvalue fields with partial source:
107+
- do:
108+
reindex:
109+
refresh: true
110+
body:
111+
source:
112+
index: source_docvalue
113+
_source:
114+
includes: ["date_field"]
115+
docvalue_fields: ["excluded_field", "long_field"]
116+
dest:
117+
index: target_docvalue2
118+
119+
- match: {created: 2}
120+
- match: {updated: 0}
121+
- match: {version_conflicts: 0}
122+
- match: {batches: 1}
123+
- match: {failures: []}
124+
- match: {throttled_millis: 0}
125+
- gte: { took: 0 }
126+
- is_false: task
127+
- is_false: deleted
128+
129+
- do:
130+
search:
131+
index: target_docvalue2
132+
body:
133+
sort:
134+
- long_field: asc
135+
- match: { hits.total.value: 2 }
136+
- match:
137+
hits.hits.0._source:
138+
excluded_field: excluded1
139+
date_field: "2024-01-01"
140+
long_field: 123
141+
- match:
142+
hits.hits.1._source:
143+
excluded_field: excluded2
144+
date_field: "2024-01-02"
145+
long_field: 456
146+
147+
- do:
148+
indices.delete:
149+
index: [source_docvalue, target_docvalue, target_docvalue2]

server/src/main/java/org/elasticsearch/index/IndexFeatures.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,8 @@
1414

1515
import java.util.Set;
1616

17+
import static org.elasticsearch.index.reindex.ClientScrollableHitSource.REINDEX_SUPPORT_FROM_FIELDS;
18+
1719
public class IndexFeatures implements FeatureSpecification {
1820

1921
@Override
@@ -39,7 +41,8 @@ public Set<NodeFeature> getTestFeatures() {
3941
LOGSDB_NO_HOST_NAME_FIELD,
4042
SYNONYMS_SET_LENIENT_ON_NON_EXISTING,
4143
THROW_EXCEPTION_FOR_UNKNOWN_TOKEN_IN_REST_INDEX_PUT_ALIAS_ACTION,
42-
THROW_EXCEPTION_ON_INDEX_CREATION_IF_UNSUPPORTED_VALUE_TYPE_IN_ALIAS
44+
THROW_EXCEPTION_ON_INDEX_CREATION_IF_UNSUPPORTED_VALUE_TYPE_IN_ALIAS,
45+
REINDEX_SUPPORT_FROM_FIELDS
4346
);
4447
}
4548
}

server/src/main/java/org/elasticsearch/index/reindex/ClientScrollableHitSource.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,13 +26,17 @@
2626
import org.elasticsearch.common.util.concurrent.EsRejectedExecutionException;
2727
import org.elasticsearch.common.xcontent.XContentHelper;
2828
import org.elasticsearch.core.TimeValue;
29+
import org.elasticsearch.features.NodeFeature;
2930
import org.elasticsearch.index.mapper.RoutingFieldMapper;
3031
import org.elasticsearch.search.SearchHit;
32+
import org.elasticsearch.search.lookup.Source;
3133
import org.elasticsearch.threadpool.ThreadPool;
3234
import org.elasticsearch.xcontent.XContentType;
3335

3436
import java.util.ArrayList;
37+
import java.util.LinkedHashMap;
3538
import java.util.List;
39+
import java.util.Map;
3640
import java.util.function.Consumer;
3741

3842
import static java.util.Collections.emptyList;
@@ -47,6 +51,8 @@ public class ClientScrollableHitSource extends ScrollableHitSource {
4751
private final ParentTaskAssigningClient client;
4852
private final SearchRequest firstSearchRequest;
4953

54+
public static final NodeFeature REINDEX_SUPPORT_FROM_FIELDS = new NodeFeature("reindex.support_from_fields");
55+
5056
public ClientScrollableHitSource(
5157
Logger logger,
5258
BackoffPolicy backoffPolicy,
@@ -153,13 +159,44 @@ private static Response wrapSearchResponse(SearchResponse response) {
153159
return new Response(response.isTimedOut(), failures, total, hits, response.getScrollId());
154160
}
155161

162+
public static BytesReference generateSource(SearchHit hit) {
163+
Map<String, DocumentField> fields = hit.getDocumentFields();
164+
if (fields.isEmpty()) {
165+
return hit.hasSource() ? hit.getSourceRef() : null;
166+
}
167+
168+
Source sourceObj = Source.fromBytes(hit.getSourceRef());
169+
Map<String, Object> sourceAsMap = new LinkedHashMap<>(sourceObj.source());
170+
boolean changeSource = false;
171+
for (DocumentField field : fields.values()) {
172+
if (false == sourceAsMap.containsKey(field.getName())) {
173+
if (field.getValues() == null || field.getValues().isEmpty()) {
174+
continue;
175+
}
176+
177+
if (field.getValues().size() == 1) {
178+
sourceAsMap.put(field.getName(), field.getValue());
179+
changeSource = true;
180+
} else {
181+
sourceAsMap.put(field.getName(), field.getValues());
182+
changeSource = true;
183+
}
184+
}
185+
}
186+
if (changeSource) {
187+
return Source.fromMap(sourceAsMap, sourceObj.sourceContentType()).internalSourceRef();
188+
} else {
189+
return hit.hasSource() ? hit.getSourceRef() : null;
190+
}
191+
}
192+
156193
private static class ClientHit implements Hit {
157194
private final SearchHit delegate;
158195
private final BytesReference source;
159196

160197
ClientHit(SearchHit delegate) {
161198
this.delegate = delegate.asUnpooled(); // TODO: use pooled version here
162-
source = this.delegate.hasSource() ? this.delegate.getSourceRef() : null;
199+
this.source = generateSource(delegate);
163200
}
164201

165202
@Override

0 commit comments

Comments
 (0)