Skip to content

Commit a825a5b

Browse files
Merge branch 'main' into layout-build-npe
2 parents 92985fa + f097818 commit a825a5b

File tree

24 files changed

+395
-212
lines changed

24 files changed

+395
-212
lines changed

docs/changelog/124913.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 124913
2+
summary: Report `original_types`
3+
area: ES|QL
4+
type: enhancement
5+
issues: []

docs/internal/DistributedArchitectureGuide.md

Lines changed: 18 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -110,6 +110,16 @@ to communicate with Elasticsearch.
110110

111111
### Cluster State
112112

113+
[ClusterState]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/cluster/ClusterState.java
114+
[Metadata]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java
115+
[ProjectMetadata]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/cluster/metadata/ProjectMetadata.java
116+
117+
The [Metadata] of a [ClusterState] is persisted on disk and comprises information from two categories:
118+
1. Cluster scope information such as `clusterUUID`, `CoordinationMetadata`
119+
2. Project scope information ([ProjectMetadata]) such as indices and templates belong to each project.
120+
121+
Some concepts are applicable to both cluster and project scopes, e.g. [persistent tasks](#persistent-tasks). The state of a persistent task is therefore stored accordingly depending on the task's scope.
122+
113123
#### Master Service
114124

115125
#### Cluster State Publication
@@ -306,7 +316,7 @@ policies.
306316

307317
### How cluster capacity is determined
308318

309-
[AutoscalingMetadata][] implements [Metadata.Custom][] in order to persist autoscaling policies. Each
319+
[AutoscalingMetadata][] implements [Metadata.ClusterCustom][] in order to persist autoscaling policies. Each
310320
Decider is an implementation of [AutoscalingDeciderService][]. The [AutoscalingCalculateCapacityService][]
311321
is responsible for running the calculation.
312322

@@ -322,7 +332,7 @@ calls [through the CapacityResponseCache][], into the `AutoscalingCalculateCapac
322332
concurrent callers.
323333

324334
[AutoscalingMetadata]: https://github.com/elastic/elasticsearch/blob/v8.13.2/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/AutoscalingMetadata.java#L38
325-
[Metadata.Custom]: https://github.com/elastic/elasticsearch/blob/v8.13.2/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java#L141-L145
335+
[Metadata.ClusterCustom]: https://github.com/elastic/elasticsearch/blob/f461731a30a6fe55d7d7b343d38426ddca1ac873/server/src/main/java/org/elasticsearch/cluster/metadata/Metadata.java#L147
326336
[AutoscalingDeciderService]: https://github.com/elastic/elasticsearch/blob/v8.13.2/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingDeciderService.java#L16-L19
327337
[AutoscalingCalculateCapacityService]: https://github.com/elastic/elasticsearch/blob/v8.13.2/x-pack/plugin/autoscaling/src/main/java/org/elasticsearch/xpack/autoscaling/capacity/AutoscalingCalculateCapacityService.java#L43
328338

@@ -505,6 +515,8 @@ The [Task management API] also exposes an endpoint where a task ID can be specif
505515

506516
[PersistentTaskPlugin]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/plugins/PersistentTaskPlugin.java
507517
[PersistentTasksExecutor]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java
518+
[PersistentTasksExecutor.Scope.Cluster]:https://github.com/elastic/elasticsearch/blob/f461731a30a6fe55d7d7b343d38426ddca1ac873/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java#L52
519+
[PersistentTasksExecutor.Scope.Project]:https://github.com/elastic/elasticsearch/blob/f461731a30a6fe55d7d7b343d38426ddca1ac873/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutor.java#L48
508520
[PersistentTasksExecutorRegistry]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/persistent/PersistentTasksExecutorRegistry.java
509521
[PersistentTasksNodeService]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/persistent/PersistentTasksNodeService.java
510522
[PersistentTasksClusterService]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/persistent/PersistentTasksClusterService.java
@@ -513,13 +525,14 @@ The [Task management API] also exposes an endpoint where a task ID can be specif
513525
[HealthNodeTaskExecutor]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/health/node/selection/HealthNodeTaskExecutor.java
514526
[SystemIndexMigrationExecutor]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/upgrades/SystemIndexMigrationExecutor.java
515527
[PersistentTasksCustomMetadata]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java
528+
[ClusterPersistentTasksCustomMetadata]:https://github.com/elastic/elasticsearch/blob/main/server/src/main/java/org/elasticsearch/persistent/ClusterPersistentTasksCustomMetadata.java
516529
[PersistentTasksCustomMetadata.PersistentTask]:https://github.com/elastic/elasticsearch/blob/d466ad1c3c4cedc7d5f6ab5794abe7bfd72aef4e/server/src/main/java/org/elasticsearch/persistent/PersistentTasksCustomMetadata.java#L305
517530

518531
Up until now we have discussed only ephemeral tasks. If we want a task to survive node failures, it needs to be registered as a persistent task at the cluster level.
519532

520-
Plugins can register persistent tasks definitions by implementing [PersistentTaskPlugin] and returning one or more [PersistentTasksExecutor] instances. These are collated into a [PersistentTasksExecutorRegistry] which is provided to [PersistentTasksNodeService] active on each node in the cluster, and a [PersistentTasksClusterService] active on the master.
533+
Plugins can register persistent tasks definitions by implementing [PersistentTaskPlugin] and returning one or more [PersistentTasksExecutor] instances. These are collated into a [PersistentTasksExecutorRegistry] which is provided to [PersistentTasksNodeService] active on each node in the cluster, and a [PersistentTasksClusterService] active on the master. A [PersistentTasksExecutor] can declare either [project][PersistentTasksExecutor.Scope.Project] or [cluster][PersistentTasksExecutor.Scope.Cluster] scope, but not both. A project scope task is not able to access data on a different project.
521534

522-
The [PersistentTasksClusterService] runs on the master to manage the set of running persistent tasks. It periodically checks that all persistent tasks are assigned to live nodes and handles the creation, completion, removal and updates-to-the-state of persistent task instances in the cluster state (see [PersistentTasksCustomMetadata]).
535+
The [PersistentTasksClusterService] runs on the master to manage the set of running persistent tasks. It periodically checks that all persistent tasks are assigned to live nodes and handles the creation, completion, removal and updates-to-the-state of persistent task instances in the cluster state (see [PersistentTasksCustomMetadata] and [ClusterPersistentTasksCustomMetadata]).
523536

524537
The [PersistentTasksNodeService] monitors the cluster state to:
525538
- Start any tasks allocated to it (tracked in the local [TaskManager] by an [AllocatedPersistentTask])
@@ -529,7 +542,7 @@ If a node leaves the cluster while it has a persistent task allocated to it, the
529542

530543
Some examples of the use of persistent tasks include:
531544
- [ShardFollowTasksExecutor]: Defined by [cross-cluster replication](#cross-cluster-replication-ccr) to poll a remote cluster for updates
532-
- [HealthNodeTaskExecutor]: Used to schedule work related to monitoring cluster health
545+
- [HealthNodeTaskExecutor]: Used to schedule work related to monitoring cluster health. This is currently the only example of a cluster scope persistent task.
533546
- [SystemIndexMigrationExecutor]: Manages the migration of system indices after an upgrade
534547

535548
### Integration with APM

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,7 @@ static TransportVersion def(int id) {
203203
public static final TransportVersion ESQL_AGGREGATE_METRIC_DOUBLE_LITERAL = def(9_035_0_00);
204204
public static final TransportVersion INDEX_METADATA_INCLUDES_RECENT_WRITE_LOAD = def(9_036_0_00);
205205
public static final TransportVersion RERANK_COMMON_OPTIONS_ADDED = def(9_037_0_00);
206+
public static final TransportVersion ESQL_REPORT_ORIGINAL_TYPES = def(9_038_00_0);
206207

207208
/*
208209
* STOP! READ THIS FIRST! No, really,

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/expression/Attribute.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -137,4 +137,14 @@ public String nodeString() {
137137
}
138138

139139
protected abstract String label();
140+
141+
/**
142+
* If this field is unsupported this contains the underlying ES types. If there
143+
* is a type conflict this will have many elements, some or all of which may
144+
* be actually supported types.
145+
*/
146+
@Nullable
147+
public List<String> originalTypes() {
148+
return null;
149+
}
140150
}

x-pack/plugin/esql-core/src/main/java/org/elasticsearch/xpack/esql/core/type/UnsupportedEsField.java

Lines changed: 27 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -6,10 +6,14 @@
66
*/
77
package org.elasticsearch.xpack.esql.core.type;
88

9+
import org.elasticsearch.TransportVersions;
910
import org.elasticsearch.common.io.stream.StreamInput;
1011
import org.elasticsearch.common.io.stream.StreamOutput;
12+
import org.elasticsearch.xpack.esql.core.util.PlanStreamInput;
13+
import org.elasticsearch.xpack.esql.core.util.PlanStreamOutput;
1114

1215
import java.io.IOException;
16+
import java.util.List;
1317
import java.util.Map;
1418
import java.util.Objects;
1519
import java.util.TreeMap;
@@ -23,32 +27,39 @@
2327
*/
2428
public class UnsupportedEsField extends EsField {
2529

26-
private final String originalType;
30+
private final List<String> originalTypes;
2731
private final String inherited; // for fields belonging to parents (or grandparents) that have an unsupported type
2832

29-
public UnsupportedEsField(String name, String originalType) {
30-
this(name, originalType, null, new TreeMap<>());
33+
public UnsupportedEsField(String name, List<String> originalTypes) {
34+
this(name, originalTypes, null, new TreeMap<>());
3135
}
3236

33-
public UnsupportedEsField(String name, String originalType, String inherited, Map<String, EsField> properties) {
37+
public UnsupportedEsField(String name, List<String> originalTypes, String inherited, Map<String, EsField> properties) {
3438
super(name, DataType.UNSUPPORTED, properties, false);
35-
this.originalType = originalType;
39+
this.originalTypes = originalTypes;
3640
this.inherited = inherited;
3741
}
3842

3943
public UnsupportedEsField(StreamInput in) throws IOException {
40-
this(
41-
readCachedStringWithVersionCheck(in),
42-
readCachedStringWithVersionCheck(in),
43-
in.readOptionalString(),
44-
in.readImmutableMap(EsField::readFrom)
45-
);
44+
this(readCachedStringWithVersionCheck(in), readOriginalTypes(in), in.readOptionalString(), in.readImmutableMap(EsField::readFrom));
45+
}
46+
47+
private static List<String> readOriginalTypes(StreamInput in) throws IOException {
48+
if (in.getTransportVersion().onOrAfter(TransportVersions.ESQL_REPORT_ORIGINAL_TYPES)) {
49+
return in.readCollectionAsList(i -> ((PlanStreamInput) i).readCachedString());
50+
} else {
51+
return List.of(readCachedStringWithVersionCheck(in).split(","));
52+
}
4653
}
4754

4855
@Override
4956
public void writeContent(StreamOutput out) throws IOException {
5057
writeCachedStringWithVersionCheck(out, getName());
51-
writeCachedStringWithVersionCheck(out, getOriginalType());
58+
if (out.getTransportVersion().onOrAfter(TransportVersions.ESQL_REPORT_ORIGINAL_TYPES)) {
59+
out.writeCollection(getOriginalTypes(), (o, s) -> ((PlanStreamOutput) o).writeCachedString(s));
60+
} else {
61+
writeCachedStringWithVersionCheck(out, String.join(",", getOriginalTypes()));
62+
}
5263
out.writeOptionalString(getInherited());
5364
out.writeMap(getProperties(), (o, x) -> x.writeTo(out));
5465
}
@@ -57,8 +68,8 @@ public String getWriteableName() {
5768
return "UnsupportedEsField";
5869
}
5970

60-
public String getOriginalType() {
61-
return originalType;
71+
public List<String> getOriginalTypes() {
72+
return originalTypes;
6273
}
6374

6475
public String getInherited() {
@@ -81,11 +92,11 @@ public boolean equals(Object o) {
8192
return false;
8293
}
8394
UnsupportedEsField that = (UnsupportedEsField) o;
84-
return Objects.equals(originalType, that.originalType) && Objects.equals(inherited, that.inherited);
95+
return Objects.equals(originalTypes, that.originalTypes) && Objects.equals(inherited, that.inherited);
8596
}
8697

8798
@Override
8899
public int hashCode() {
89-
return Objects.hash(super.hashCode(), originalType, inherited);
100+
return Objects.hash(super.hashCode(), originalTypes, inherited);
90101
}
91102
}

0 commit comments

Comments
 (0)