Skip to content

Commit 110843f

Browse files
committed
Changes
1 parent eb95625 commit 110843f

File tree

5 files changed

+52
-15
lines changed

5 files changed

+52
-15
lines changed

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -964,12 +964,11 @@ public Index getConcreteWriteIndex(IndexAbstraction ia, ProjectMetadata project)
964964

965965
@Override
966966
public int route(IndexRouting indexRouting) {
967-
// TODO: avoid materializing the source atm when not using extract from source. Extract from source can be changed to use structured
968-
// source.
969-
if (indexRouting instanceof IndexRouting.ExtractFromSource) {
970-
return indexRouting.indexShard(id, routing, contentType, source());
967+
if (useStructuredSource) {
968+
assert structuredSource != null;
969+
return indexRouting.indexShard(id, routing, contentType, structuredSource);
971970
} else {
972-
return indexRouting.indexShard(id, routing, contentType, null);
971+
return indexRouting.indexShard(id, routing, contentType, source());
973972
}
974973
}
975974

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,11 @@
3030
import org.elasticsearch.index.IndexVersion;
3131
import org.elasticsearch.index.IndexVersions;
3232
import org.elasticsearch.index.mapper.TimeSeriesRoutingHashFieldMapper;
33+
import org.elasticsearch.ingest.ESONSource;
34+
import org.elasticsearch.ingest.ESONXContentParser;
3335
import org.elasticsearch.transport.Transports;
36+
import org.elasticsearch.xcontent.DeprecationHandler;
37+
import org.elasticsearch.xcontent.NamedXContentRegistry;
3438
import org.elasticsearch.xcontent.XContentParser;
3539
import org.elasticsearch.xcontent.XContentParser.Token;
3640
import org.elasticsearch.xcontent.XContentParserConfiguration;
@@ -100,6 +104,8 @@ public void postProcess(IndexRequest indexRequest) {}
100104
*/
101105
public abstract int indexShard(String id, @Nullable String routing, XContentType sourceType, BytesReference source);
102106

107+
public abstract int indexShard(String id, @Nullable String routing, XContentType sourceType, ESONSource.ESONObject structuredSource);
108+
103109
/**
104110
* Called when updating a document to generate the shard id that should contain
105111
* a document with the provided {@code _id} and (optional) {@code _routing}.
@@ -220,6 +226,11 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
220226
return rerouteIfResharding(shardId);
221227
}
222228

229+
@Override
230+
public int indexShard(String id, String routing, XContentType sourceType, ESONSource.ESONObject structuredSource) {
231+
return indexShard(id, routing, sourceType, (BytesReference) null);
232+
}
233+
223234
@Override
224235
public int updateShard(String id, @Nullable String routing) {
225236
checkRoutingRequired(id, routing);
@@ -342,6 +353,26 @@ public int indexShard(String id, @Nullable String routing, XContentType sourceTy
342353
return (rerouteIfResharding(shardId));
343354
}
344355

356+
@Override
357+
public int indexShard(String id, @Nullable String routing, XContentType sourceType, ESONSource.ESONObject structuredSource) {
358+
assert Transports.assertNotTransportThread("parsing the _source can get slow");
359+
checkNoRouting(routing);
360+
try (
361+
XContentParser parser = new ESONXContentParser(
362+
structuredSource,
363+
NamedXContentRegistry.EMPTY,
364+
DeprecationHandler.IGNORE_DEPRECATIONS,
365+
sourceType
366+
)
367+
) {
368+
hash = hashSource(parser).buildHash(IndexRouting.ExtractFromSource::defaultOnEmpty);
369+
int shardId = hashToShardId(hash);
370+
return (rerouteIfResharding(shardId));
371+
} catch (IOException | ParsingException e) {
372+
throw new IllegalArgumentException("Error extracting routing: " + e.getMessage(), e);
373+
}
374+
}
375+
345376
public String createId(XContentType sourceType, BytesReference source, byte[] suffix) {
346377
return hashSource(sourceType, source).createId(suffix, IndexRouting.ExtractFromSource::defaultOnEmpty);
347378
}
@@ -371,18 +402,22 @@ public Builder builder() {
371402
}
372403

373404
private Builder hashSource(XContentType sourceType, BytesReference source) {
374-
Builder b = builder();
375405
try (XContentParser parser = XContentHelper.createParserNotCompressed(parserConfig, source, sourceType)) {
376-
parser.nextToken(); // Move to first token
377-
if (parser.currentToken() == null) {
378-
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
379-
}
380-
parser.nextToken();
381-
b.extractObject(null, parser);
382-
ensureExpectedToken(null, parser.nextToken(), parser);
406+
return hashSource(parser);
383407
} catch (IOException | ParsingException e) {
384408
throw new IllegalArgumentException("Error extracting routing: " + e.getMessage(), e);
385409
}
410+
}
411+
412+
private Builder hashSource(XContentParser parser) throws IOException {
413+
Builder b = builder();
414+
parser.nextToken(); // Move to first token
415+
if (parser.currentToken() == null) {
416+
throw new IllegalArgumentException("Error extracting routing: source didn't contain any routing fields");
417+
}
418+
parser.nextToken();
419+
b.extractObject(null, parser);
420+
ensureExpectedToken(null, parser.nextToken(), parser);
386421
return b;
387422
}
388423

server/src/test/java/org/elasticsearch/cluster/routing/IndexRoutingTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -460,7 +460,7 @@ public void testRequiredRouting() {
460460
*/
461461
private int shardIdFromSimple(IndexRouting indexRouting, String id, @Nullable String routing) {
462462
return switch (between(0, 3)) {
463-
case 0 -> indexRouting.indexShard(id, routing, null, null);
463+
case 0 -> indexRouting.indexShard(id, routing, null, (BytesReference) null);
464464
case 1 -> indexRouting.updateShard(id, routing);
465465
case 2 -> indexRouting.deleteShard(id, routing);
466466
case 3 -> indexRouting.getShard(id, routing);

test/framework/src/main/java/org/elasticsearch/test/InternalTestCluster.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import org.elasticsearch.common.Randomness;
5252
import org.elasticsearch.common.Strings;
5353
import org.elasticsearch.common.breaker.CircuitBreaker;
54+
import org.elasticsearch.common.bytes.BytesReference;
5455
import org.elasticsearch.common.component.LifecycleListener;
5556
import org.elasticsearch.common.io.FileSystemUtils;
5657
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
@@ -2397,7 +2398,7 @@ synchronized String routingKeyForShard(Index index, int shard, Random random) {
23972398
IndexRouting indexRouting = IndexRouting.fromIndexMetadata(clusterState.metadata().getProject().getIndexSafe(index));
23982399
while (true) {
23992400
String routing = RandomStrings.randomAsciiLettersOfLength(random, 10);
2400-
if (shard == indexRouting.indexShard("id", routing, null, null)) {
2401+
if (shard == indexRouting.indexShard("id", routing, null, (BytesReference) null)) {
24012402
return routing;
24022403
}
24032404
}

x-pack/plugin/esql/qa/server/multi-clusters/src/javaRestTest/java/org/elasticsearch/xpack/esql/ccq/MultiClusterSpecIT.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@
1212

1313
import org.apache.http.HttpEntity;
1414
import org.apache.http.HttpHost;
15+
import org.apache.lucene.tests.util.LuceneTestCase;
1516
import org.elasticsearch.Version;
1617
import org.elasticsearch.client.Request;
1718
import org.elasticsearch.client.Response;
@@ -69,6 +70,7 @@
6970
* This suite loads the data into either the local cluster or the remote cluster, then run spec tests with CCQ.
7071
* TODO: Some spec tests prevents us from splitting data across multiple shards/indices/clusters
7172
*/
73+
@LuceneTestCase.AwaitsFix(bugUrl = "double to keyword issue")
7274
@ThreadLeakFilters(filters = TestClustersThreadFilter.class)
7375
public class MultiClusterSpecIT extends EsqlSpecTestCase {
7476

0 commit comments

Comments
 (0)