Skip to content

Commit d80cbdd

Browse files
kkrik-esmartijnvgelasticsearchmachine
authored
Add LogsDB option to route on sort fields (#116687)
* Add LogsDB option to route on sort fields * fix encoding * Update docs/changelog/116687.yaml * tests * tests * tests * fix mode * tests * tests * tests * add test * fix test * sync * updates from review * test fixes * test fixes * test fixes * Move logic to SyntheticSourceIndexSettingsProvider * fix test * sync * merge, no fallback * comments * fix test * address comments * address comments * address comments * Update x-pack/plugin/logsdb/src/main/java/org/elasticsearch/xpack/logsdb/LogsdbIndexModeSettingsProvider.java Co-authored-by: Martijn van Groningen <[email protected]> * [CI] Auto commit changes from spotless * update tests * [CI] Auto commit changes from spotless * update tests * fix rest compat tests --------- Co-authored-by: Martijn van Groningen <[email protected]> Co-authored-by: elasticsearchmachine <[email protected]>
1 parent 5487927 commit d80cbdd

File tree

25 files changed

+682
-58
lines changed

25 files changed

+682
-58
lines changed

docs/changelog/116687.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 116687
2+
summary: Add LogsDB option to route on sort fields
3+
area: Logs
4+
type: enhancement
5+
issues: []

rest-api-spec/build.gradle

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,15 @@ tasks.named("yamlRestCompatTestTransform").configure ({ task ->
5959
task.replaceValueInMatch("profile.shards.0.dfs.knn.0.query.0.description", "DocAndScoreQuery[0,...][0.009673266,...],0.009673266", "dfs knn vector profiling with vector_operations_count")
6060
task.skipTest("cat.aliases/10_basic/Deprecated local parameter", "CAT APIs not covered by compatibility policy")
6161
task.skipTest("cat.shards/10_basic/Help", "sync_id is removed in 9.0")
62+
task.skipTest("tsdb/20_mapping/exact match object type", "skip until pr/116687 gets backported")
63+
task.skipTest("tsdb/25_id_generation/delete over _bulk", "skip until pr/116687 gets backported")
64+
task.skipTest("tsdb/80_index_resize/split", "skip until pr/116687 gets backported")
65+
task.skipTest("tsdb/90_unsupported_operations/noop update", "skip until pr/116687 gets backported")
66+
task.skipTest("tsdb/90_unsupported_operations/regular update", "skip until pr/116687 gets backported")
67+
task.skipTest("tsdb/90_unsupported_operations/search with routing", "skip until pr/116687 gets backported")
68+
task.skipTest("tsdb/90_unsupported_operations/index with routing over _bulk", "skip until pr/116687 gets backported")
69+
task.skipTest("tsdb/90_unsupported_operations/update over _bulk", "skip until pr/116687 gets backported")
70+
task.skipTest("tsdb/90_unsupported_operations/index with routing", "skip until pr/116687 gets backported")
6271
task.skipTest("search/500_date_range/from, to, include_lower, include_upper deprecated", "deprecated parameters are removed in 9.0")
6372
task.skipTest("tsdb/20_mapping/stored source is supported", "no longer serialize source_mode")
6473
task.skipTest("tsdb/20_mapping/Synthetic source", "no longer serialize source_mode")

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/logsdb/10_settings.yml

Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,48 @@ routing path not allowed in logs mode:
513513
- match: { error.type: "illegal_argument_exception" }
514514
- match: { error.reason: "[index.routing_path] requires [index.mode=time_series]" }
515515

516+
---
517+
routing path allowed in logs mode with routing on sort fields:
518+
- requires:
519+
cluster_features: [ "routing.logsb_route_on_sort_fields" ]
520+
reason: introduction of route on index sorting fields
521+
522+
- do:
523+
indices.create:
524+
index: test
525+
body:
526+
settings:
527+
index:
528+
mode: logsdb
529+
number_of_replicas: 0
530+
number_of_shards: 2
531+
routing_path: [ host.name, agent_id ]
532+
logsdb:
533+
route_on_sort_fields: true
534+
mappings:
535+
properties:
536+
"@timestamp":
537+
type: date
538+
host.name:
539+
type: keyword
540+
agent_id:
541+
type: keyword
542+
process_id:
543+
type: integer
544+
http_method:
545+
type: keyword
546+
message:
547+
type: text
548+
549+
- do:
550+
indices.get_settings:
551+
index: test
552+
553+
- is_true: test
554+
- match: { test.settings.index.mode: logsdb }
555+
- match: { test.settings.index.logsdb.route_on_sort_fields: "true" }
556+
- match: { test.settings.index.routing_path: [ host.name, agent_id ] }
557+
516558
---
517559
start time not allowed in logs mode:
518560
- requires:

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/20_mapping.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ exact match object type:
127127
reason: routing_path error message updated in 8.14.0
128128

129129
- do:
130-
catch: '/All fields that match routing_path must be configured with \[time_series_dimension: true\] or flattened fields with a list of dimensions in \[time_series_dimensions\] and without the \[script\] parameter. \[dim\] was \[object\]./'
130+
catch: '/All fields that match routing_path must be .*flattened fields.* \[dim\] was \[object\]./'
131131
indices.create:
132132
index: tsdb_index
133133
body:

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/25_id_generation.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -427,7 +427,7 @@ delete over _bulk:
427427
- match: {items.0.delete.result: deleted}
428428
- match: {items.1.delete.result: deleted}
429429
- match: {items.2.delete.status: 404}
430-
- match: {items.2.delete.error.reason: "invalid id [not found ++ not found] for index [id_generation_test] in time series mode"}
430+
- match: {items.2.delete.error.reason: '/invalid\ id\ \[not\ found\ \+\+\ not\ found\]\ for\ index\ \[id_generation_test\]\ in\ time.series\ mode/'}
431431

432432
---
433433
routing_path matches deep object:

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/80_index_resize.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,7 @@ split:
9595
reason: tsdb indexing changed in 8.2.0
9696

9797
- do:
98-
catch: /index-split is not supported because the destination index \[test\] is in time series mode/
98+
catch: /index-split is not supported because the destination index \[test\] is in time.series mode/
9999
indices.split:
100100
index: test
101101
target: test_split

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/tsdb/90_unsupported_operations.yml

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ index with routing:
7575
reason: tsdb indexing changed in 8.2.0
7676

7777
- do:
78-
catch: /specifying routing is not supported because the destination index \[test\] is in time series mode/
78+
catch: /specifying routing is not supported because the destination index \[test\] is in time.series mode/
7979
index:
8080
index: test
8181
routing: foo
@@ -104,7 +104,7 @@ index with routing over _bulk:
104104
body:
105105
- '{"index": {"routing": "foo"}}'
106106
- '{"@timestamp": "2021-04-28T18:50:04.467Z", "metricset": "pod", "k8s": {"pod": {"name": "cat", "uid":"947e4ced-1786-4e53-9e0c-5c447e959507", "ip": "10.10.55.1", "network": {"tx": 2001818691, "rx": 802133794}}}}'
107-
- match: {items.0.index.error.reason: "specifying routing is not supported because the destination index [test] is in time series mode"}
107+
- match: {items.0.index.error.reason: '/specifying\ routing\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'}
108108

109109
---
110110
noop update:
@@ -120,7 +120,7 @@ noop update:
120120
- length: {hits.hits: 1}
121121

122122
- do:
123-
catch: /update is not supported because the destination index \[test\] is in time series mode/
123+
catch: /update is not supported because the destination index \[test\] is in time.series mode/
124124
update:
125125
index: test
126126
id: "1"
@@ -136,7 +136,7 @@ regular update:
136136

137137
# We fail even though the document isn't found.
138138
- do:
139-
catch: /update is not supported because the destination index \[test\] is in time series mode/
139+
catch: /update is not supported because the destination index \[test\] is in time.series mode/
140140
update:
141141
index: test
142142
id: "1"
@@ -165,7 +165,7 @@ update over _bulk:
165165
body:
166166
- '{"update": {"_id": 1}}'
167167
- '{"doc":{"@timestamp": "2021-04-28T18:03:24.467Z", "metricset": "pod", "k8s": {"pod": {"name": "dog", "uid":"df3145b3-0563-4d3b-a0f7-897eb2876ea9", "ip": "10.10.55.3", "network": {"tx": 1434595272, "rx": 530605511}}}}}'
168-
- match: {items.0.update.error.reason: "update is not supported because the destination index [test] is in time series mode"}
168+
- match: {items.0.update.error.reason: '/update\ is\ not\ supported\ because\ the\ destination\ index\ \[test\]\ is\ in\ time.series\ mode/'}
169169

170170
---
171171
search with routing:
@@ -175,7 +175,7 @@ search with routing:
175175

176176
# We fail even though the document isn't found.
177177
- do:
178-
catch: /searching with a specified routing is not supported because the destination index \[test\] is in time series mode/
178+
catch: /searching with a specified routing is not supported because the destination index \[test\] is in time.series mode/
179179
search:
180180
index: test
181181
routing: rrrr

server/src/main/java/org/elasticsearch/action/index/IndexRequest.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,7 @@
5151
import java.util.Locale;
5252
import java.util.Map;
5353
import java.util.Objects;
54+
import java.util.OptionalInt;
5455
import java.util.function.Supplier;
5556

5657
import static org.elasticsearch.action.ValidateActions.addValidationError;
@@ -78,7 +79,6 @@ public class IndexRequest extends ReplicatedWriteRequest<IndexRequest> implement
7879
private static final TransportVersion PIPELINES_HAVE_RUN_FIELD_ADDED = TransportVersions.V_8_10_X;
7980

8081
private static final Supplier<String> ID_GENERATOR = UUIDs::base64UUID;
81-
private static final Supplier<String> K_SORTED_TIME_BASED_ID_GENERATOR = UUIDs::base64TimeBasedKOrderedUUID;
8282

8383
/**
8484
* Max length of the source document to include into string()
@@ -705,9 +705,18 @@ public void autoGenerateId() {
705705
}
706706

707707
public void autoGenerateTimeBasedId() {
708+
autoGenerateTimeBasedId(OptionalInt.empty());
709+
}
710+
711+
/**
712+
* Set the {@code #id()} to an automatically generated one, optimized for storage (compression) efficiency.
713+
* If a routing hash is passed, it is included in the generated id starting at 9 bytes before the end.
714+
* @param hash optional routing hash value, used to route requests by id to the right shard.
715+
*/
716+
public void autoGenerateTimeBasedId(OptionalInt hash) {
708717
assertBeforeGeneratingId();
709718
autoGenerateTimestamp();
710-
id(K_SORTED_TIME_BASED_ID_GENERATOR.get());
719+
id(UUIDs.base64TimeBasedKOrderedUUIDWithHash(hash));
711720
}
712721

713722
private void autoGenerateTimestamp() {

server/src/main/java/org/elasticsearch/cluster/routing/IndexRouting.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,7 @@
4040
import java.util.Collections;
4141
import java.util.List;
4242
import java.util.Map;
43+
import java.util.OptionalInt;
4344
import java.util.Set;
4445
import java.util.function.IntConsumer;
4546
import java.util.function.IntSupplier;
@@ -55,6 +56,7 @@ public abstract class IndexRouting {
5556

5657
static final NodeFeature BOOLEAN_ROUTING_PATH = new NodeFeature("routing.boolean_routing_path");
5758
static final NodeFeature MULTI_VALUE_ROUTING_PATH = new NodeFeature("routing.multi_value_routing_path");
59+
static final NodeFeature LOGSB_ROUTE_ON_SORT_FIELDS = new NodeFeature("routing.logsb_route_on_sort_fields");
5860

5961
/**
6062
* Build the routing from {@link IndexMetadata}.
@@ -165,7 +167,8 @@ private abstract static class IdAndRoutingOnly extends IndexRouting {
165167

166168
@Override
167169
public void preProcess(IndexRequest indexRequest) {
168-
// generate id if not already provided
170+
// Generate id if not already provided.
171+
// This is needed for routing, so it has to happen in pre-processing.
169172
final String id = indexRequest.id();
170173
if (id == null) {
171174
if (shouldUseTimeBasedId(indexMode, creationVersion)) {
@@ -272,15 +275,20 @@ public void collectSearchShards(String routing, IntConsumer consumer) {
272275
public static class ExtractFromSource extends IndexRouting {
273276
private final Predicate<String> isRoutingPath;
274277
private final XContentParserConfiguration parserConfig;
278+
private final IndexMode indexMode;
275279
private final boolean trackTimeSeriesRoutingHash;
280+
private final boolean addIdWithRoutingHash;
276281
private int hash = Integer.MAX_VALUE;
277282

278283
ExtractFromSource(IndexMetadata metadata) {
279284
super(metadata);
280285
if (metadata.isRoutingPartitionedIndex()) {
281286
throw new IllegalArgumentException("routing_partition_size is incompatible with routing_path");
282287
}
283-
trackTimeSeriesRoutingHash = metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID);
288+
indexMode = metadata.getIndexMode();
289+
trackTimeSeriesRoutingHash = indexMode == IndexMode.TIME_SERIES
290+
&& metadata.getCreationVersion().onOrAfter(IndexVersions.TIME_SERIES_ROUTING_HASH_IN_ID);
291+
addIdWithRoutingHash = indexMode == IndexMode.LOGSDB;
284292
List<String> routingPaths = metadata.getRoutingPaths();
285293
isRoutingPath = Regex.simpleMatcher(routingPaths.toArray(String[]::new));
286294
this.parserConfig = XContentParserConfiguration.EMPTY.withFiltering(null, Set.copyOf(routingPaths), null, true);
@@ -292,8 +300,13 @@ public boolean matchesField(String fieldName) {
292300

293301
@Override
294302
public void postProcess(IndexRequest indexRequest) {
303+
// Update the request with the routing hash, if needed.
304+
// This needs to happen in post-processing, after the routing hash is calculated.
295305
if (trackTimeSeriesRoutingHash) {
296306
indexRequest.routing(TimeSeriesRoutingHashFieldMapper.encode(hash));
307+
} else if (addIdWithRoutingHash) {
308+
assert hash != Integer.MAX_VALUE;
309+
indexRequest.autoGenerateTimeBasedId(OptionalInt.of(hash));
297310
}
298311
}
299312

@@ -461,12 +474,15 @@ private int idToHash(String id) {
461474
try {
462475
idBytes = Base64.getUrlDecoder().decode(id);
463476
} catch (IllegalArgumentException e) {
464-
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName);
477+
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName);
465478
}
466479
if (idBytes.length < 4) {
467-
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in time series mode", id, indexName);
480+
throw new ResourceNotFoundException("invalid id [{}] for index [{}] in " + indexMode.getName() + " mode", id, indexName);
468481
}
469-
return hashToShardId(ByteUtils.readIntLE(idBytes, 0));
482+
// For TSDB, the hash is stored as the id prefix.
483+
// For LogsDB with routing on sort fields, the routing hash is stored in the range[id.length - 9, id.length - 5] of the id,
484+
// see IndexRequest#autoGenerateTimeBasedId.
485+
return hashToShardId(ByteUtils.readIntLE(idBytes, addIdWithRoutingHash ? idBytes.length - 9 : 0));
470486
}
471487

472488
@Override
@@ -480,7 +496,7 @@ public void collectSearchShards(String routing, IntConsumer consumer) {
480496
}
481497

482498
private String error(String operation) {
483-
return operation + " is not supported because the destination index [" + indexName + "] is in time series mode";
499+
return operation + " is not supported because the destination index [" + indexName + "] is in " + indexMode.getName() + " mode";
484500
}
485501
}
486502

server/src/main/java/org/elasticsearch/cluster/routing/RoutingFeatures.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,4 +20,9 @@ public class RoutingFeatures implements FeatureSpecification {
2020
public Set<NodeFeature> getFeatures() {
2121
return Set.of(IndexRouting.BOOLEAN_ROUTING_PATH, IndexRouting.MULTI_VALUE_ROUTING_PATH);
2222
}
23+
24+
@Override
25+
public Set<NodeFeature> getTestFeatures() {
26+
return Set.of(IndexRouting.LOGSB_ROUTE_ON_SORT_FIELDS);
27+
}
2328
}

0 commit comments

Comments
 (0)