Skip to content

Commit bebaa4f

Browse files
committed
Merge branch 'main' into esql_pragma_load_source
2 parents 87c3446 + 2ae80c7 commit bebaa4f

File tree

38 files changed

+516
-137
lines changed

38 files changed

+516
-137
lines changed

docs/changelog/120952.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120952
2+
summary: Add `_metric_names_hash` field to OTel metric mappings
3+
area: Data streams
4+
type: bug
5+
issues: []

docs/changelog/122409.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122409
2+
summary: Allow setting the `type` in the reroute processor
3+
area: Ingest Node
4+
type: enhancement
5+
issues:
6+
- 121553

docs/changelog/122637.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122637
2+
summary: Use `FallbackSyntheticSourceBlockLoader` for `unsigned_long` and `scaled_float`
3+
fields
4+
area: Mapping
5+
type: enhancement
6+
issues: []

docs/internal/DistributedArchitectureGuide.md

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -229,19 +229,45 @@ works in parallel with the storage engine.)
229229

230230
# Allocation
231231

232-
(AllocationService runs on the master node)
233-
234-
(Discuss different deciders that limit allocation. Sketch / list the different deciders that we have.)
235-
236-
### APIs for Balancing Operations
237-
238-
(Significant internal APIs for balancing a cluster)
239-
240-
### Heuristics for Allocation
241-
242-
### Cluster Reroute Command
243-
244-
(How does this command behave with the desired auto balancer.)
232+
### Core Components
233+
234+
The `DesiredBalanceShardsAllocator` is what runs shard allocation decisions. It leverages the `DesiredBalanceComputer` to produce
235+
`DesiredBalance` instances for the cluster based on the latest cluster changes (add/remove nodes, create/remove indices, load, etc.). Then
236+
the `DesiredBalanceReconciler` is invoked to choose the next steps to take to move the cluster from the current shard allocation to the
237+
latest computed `DesiredBalance` shard allocation. The `DesiredBalanceReconciler` will apply changes to a copy of the `RoutingNodes`, which
238+
is then published in a cluster state update that will reach the data nodes to start the individual shard recovery/deletion/move work.
239+
240+
The `DesiredBalanceReconciler` is throttled by cluster settings, like the max number of concurrent shard moves and recoveries per cluster
241+
and node: this is why the `DesiredBalanceReconciler` will make, and publish via cluster state updates, incremental changes to the cluster
242+
shard allocation. The `DesiredBalanceShardsAllocator` is the endpoint for reroute requests, which may trigger immediate requests to the
243+
`DesiredBalanceReconciler`, but asynchronous requests to the `DesiredBalanceComputer` via the `ContinuousComputation` component. Cluster
244+
state changes that affect shard balancing (for example index deletion) all call some reroute method interface that reaches the
245+
`DesiredBalanceShardsAllocator` to run reconciliation and queue a request for the `DesiredBalancerComputer`, leading to desired balance
246+
computation and reconciliation actions. Asynchronous completion of a new `DesiredBalance` will also invoke a reconciliation action, as will
247+
cluster state updates completing shard moves/recoveries (unthrottling the next shard move/recovery).
248+
249+
The `ContinuousComputation` saves the latest desired balance computation request, which holds the cluster information at the time of that
250+
request, and a thread that runs the `DesiredBalanceComputer`. The `ContinuousComputation` thread takes the latest request, with the
251+
associated cluster information, feeds it into the `DesiredBalanceComputer` and publishes a `DesiredBalance` back to the
252+
`DesiredBalanceShardsAllocator` to use for reconciliation actions. Sometimes the `ContinuousComputation` thread's desired balance
253+
computation will be signalled to exit early and publish the initial `DesiredBalance` improvements it has made, when newer rebalancing
254+
requests (due to cluster state changes) have arrived, or in order to begin recovery of unassigned shards as quickly as possible.
255+
256+
### Rebalancing Process
257+
258+
There are different priorities in shard allocation, reflected in which moves the `DesiredBalancerReconciler` selects to do first given that
259+
it can only move, recover, or remove a limited number of shards at once. The first priority is assigning unassigned shards, primaries being
260+
more important than replicas. The second is to move shards that violate any rule (such as node resource limits) as defined by an
261+
`AllocationDecider`. The `AllocationDeciders` holds a group of `AllocationDecider` implementations that place hard constraints on shard
262+
allocation. There is a decider, `DiskThresholdDecider`, that manages disk memory usage thresholds, such that further shards may not be
263+
allowed assignment to a node, or shards may be required to move off because they grew to exceed the disk space; or another,
264+
`FilterAllocationDecider`, that excludes a configurable list of indices from certain nodes; or `MaxRetryAllocationDecider` that will not
265+
attempt to recover a shard on a certain node after so many failed retries. The third priority is to rebalance shards to even out the
266+
relative weight of shards on each node: the intention is to avoid, or ease, future hot-spotting on data nodes due to too many shards being
267+
placed on the same data node. Node shard weight is based on a sum of factors: disk memory usage, projected shard write load, total number
268+
of shards, and an incentive to distribute shards within the same index across different nodes. See the `WeightFunction` and
269+
`NodeAllocationStatsAndWeightsCalculator` classes for more details on the weight calculations that support the `DesiredBalanceComputer`
270+
decisions.
245271

246272
# Autoscaling
247273

docs/reference/ingest/processors/reroute.asciidoc

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,6 +45,9 @@ Otherwise, the document will be rejected with a security exception which looks l
4545
|======
4646
| Name | Required | Default | Description
4747
| `destination` | no | - | A static value for the target. Can't be set when the `dataset` or `namespace` option is set.
48+
| `type` | no | `{{data_stream.type}}` a| Field references or a static value for the type part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `logs` and `metrics`.
49+
50+
Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<type>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.
4851
| `dataset` | no | `{{data_stream.dataset}}` a| Field references or a static value for the dataset part of the data stream name. In addition to the criteria for <<indices-create-api-path-params, index names>>, cannot contain `-` and must be no longer than 100 characters. Example values are `nginx.access` and `nginx.error`.
4952

5053
Supports field references with a mustache-like syntax (denoted as `{{double}}` or `{{{triple}}}` curly braces). When resolving field references, the processor replaces invalid characters with `_`. Uses the `<dataset>` part of the index name as a fallback if all field references resolve to a `null`, missing, or non-string value.

modules/ingest-common/src/main/java/org/elasticsearch/ingest/common/RerouteProcessor.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import static org.elasticsearch.ingest.ConfigurationUtils.newConfigurationException;
2727
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.DATASET_VALUE_SOURCE;
2828
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.NAMESPACE_VALUE_SOURCE;
29+
import static org.elasticsearch.ingest.common.RerouteProcessor.DataStreamValueSource.TYPE_VALUE_SOURCE;
2930

3031
public final class RerouteProcessor extends AbstractProcessor {
3132

@@ -39,18 +40,25 @@ public final class RerouteProcessor extends AbstractProcessor {
3940
private static final String DATA_STREAM_DATASET = DATA_STREAM_PREFIX + "dataset";
4041
private static final String DATA_STREAM_NAMESPACE = DATA_STREAM_PREFIX + "namespace";
4142
private static final String EVENT_DATASET = "event.dataset";
43+
private final List<DataStreamValueSource> type;
4244
private final List<DataStreamValueSource> dataset;
4345
private final List<DataStreamValueSource> namespace;
4446
private final String destination;
4547

4648
RerouteProcessor(
4749
String tag,
4850
String description,
51+
List<DataStreamValueSource> type,
4952
List<DataStreamValueSource> dataset,
5053
List<DataStreamValueSource> namespace,
5154
String destination
5255
) {
5356
super(tag, description);
57+
if (type.isEmpty()) {
58+
this.type = List.of(TYPE_VALUE_SOURCE);
59+
} else {
60+
this.type = type;
61+
}
5462
if (dataset.isEmpty()) {
5563
this.dataset = List.of(DATASET_VALUE_SOURCE);
5664
} else {
@@ -71,7 +79,7 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
7179
return ingestDocument;
7280
}
7381
final String indexName = ingestDocument.getFieldValue(IngestDocument.Metadata.INDEX.getFieldName(), String.class);
74-
final String type;
82+
final String currentType;
7583
final String currentDataset;
7684
final String currentNamespace;
7785

@@ -84,10 +92,11 @@ public IngestDocument execute(IngestDocument ingestDocument) throws Exception {
8492
if (indexOfSecondDash < 0) {
8593
throw new IllegalArgumentException(format(NAMING_SCHEME_ERROR_MESSAGE, indexName));
8694
}
87-
type = parseDataStreamType(indexName, indexOfFirstDash);
95+
currentType = parseDataStreamType(indexName, indexOfFirstDash);
8896
currentDataset = parseDataStreamDataset(indexName, indexOfFirstDash, indexOfSecondDash);
8997
currentNamespace = parseDataStreamNamespace(indexName, indexOfSecondDash);
9098

99+
String type = determineDataStreamField(ingestDocument, this.type, currentType);
91100
String dataset = determineDataStreamField(ingestDocument, this.dataset, currentDataset);
92101
String namespace = determineDataStreamField(ingestDocument, this.namespace, currentNamespace);
93102
String newTarget = type + "-" + dataset + "-" + namespace;
@@ -168,6 +177,15 @@ public RerouteProcessor create(
168177
String description,
169178
Map<String, Object> config
170179
) throws Exception {
180+
List<DataStreamValueSource> type;
181+
try {
182+
type = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "type")
183+
.stream()
184+
.map(DataStreamValueSource::type)
185+
.toList();
186+
} catch (IllegalArgumentException e) {
187+
throw newConfigurationException(TYPE, tag, "type", e.getMessage());
188+
}
171189
List<DataStreamValueSource> dataset;
172190
try {
173191
dataset = ConfigurationUtils.readOptionalListOrString(TYPE, tag, config, "dataset")
@@ -188,11 +206,11 @@ public RerouteProcessor create(
188206
}
189207

190208
String destination = ConfigurationUtils.readOptionalStringProperty(TYPE, tag, config, "destination");
191-
if (destination != null && (dataset.isEmpty() == false || namespace.isEmpty() == false)) {
192-
throw newConfigurationException(TYPE, tag, "destination", "can only be set if dataset and namespace are not set");
209+
if (destination != null && (type.isEmpty() == false || dataset.isEmpty() == false || namespace.isEmpty() == false)) {
210+
throw newConfigurationException(TYPE, tag, "destination", "can only be set if type, dataset, and namespace are not set");
193211
}
194212

195-
return new RerouteProcessor(tag, description, dataset, namespace, destination);
213+
return new RerouteProcessor(tag, description, type, dataset, namespace, destination);
196214
}
197215
}
198216

@@ -203,15 +221,21 @@ static final class DataStreamValueSource {
203221

204222
private static final int MAX_LENGTH = 100;
205223
private static final String REPLACEMENT = "_";
224+
private static final Pattern DISALLOWED_IN_TYPE = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
206225
private static final Pattern DISALLOWED_IN_DATASET = Pattern.compile("[\\\\/*?\"<>| ,#:-]");
207226
private static final Pattern DISALLOWED_IN_NAMESPACE = Pattern.compile("[\\\\/*?\"<>| ,#:]");
227+
static final DataStreamValueSource TYPE_VALUE_SOURCE = type("{{" + DATA_STREAM_TYPE + "}}");
208228
static final DataStreamValueSource DATASET_VALUE_SOURCE = dataset("{{" + DATA_STREAM_DATASET + "}}");
209229
static final DataStreamValueSource NAMESPACE_VALUE_SOURCE = namespace("{{" + DATA_STREAM_NAMESPACE + "}}");
210230

211231
private final String value;
212232
private final String fieldReference;
213233
private final Function<String, String> sanitizer;
214234

235+
public static DataStreamValueSource type(String type) {
236+
return new DataStreamValueSource(type, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_TYPE));
237+
}
238+
215239
public static DataStreamValueSource dataset(String dataset) {
216240
return new DataStreamValueSource(dataset, ds -> sanitizeDataStreamField(ds, DISALLOWED_IN_DATASET));
217241
}

modules/ingest-common/src/test/java/org/elasticsearch/ingest/common/RerouteProcessorFactoryTests.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@ public void testDestinationAndDataset() {
4747
ElasticsearchParseException.class,
4848
() -> create(Map.of("destination", "foo", "dataset", "bar"))
4949
);
50-
assertThat(e.getMessage(), equalTo("[destination] can only be set if dataset and namespace are not set"));
50+
assertThat(e.getMessage(), equalTo("[destination] can only be set if type, dataset, and namespace are not set"));
5151
}
5252

5353
public void testFieldReference() throws Exception {

0 commit comments

Comments
 (0)