Skip to content

Commit f323967

Browse files
Merge branch 'main' into reindex-data-stream-make-source-read-only
2 parents 37c049a + 80b7879 commit f323967

File tree

32 files changed

+416
-99
lines changed

32 files changed

+416
-99
lines changed

docs/changelog/120952.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 120952
2+
summary: Add `_metric_names_hash` field to OTel metric mappings
3+
area: Data streams
4+
type: bug
5+
issues: []

docs/changelog/122637.yaml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
pr: 122637
2+
summary: Use `FallbackSyntheticSourceBlockLoader` for `unsigned_long` and `scaled_float`
3+
fields
4+
area: Mapping
5+
type: enhancement
6+
issues: []

docs/internal/DistributedArchitectureGuide.md

Lines changed: 39 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -229,19 +229,45 @@ works in parallel with the storage engine.)
229229

230230
# Allocation
231231

232-
(AllocationService runs on the master node)
233-
234-
(Discuss different deciders that limit allocation. Sketch / list the different deciders that we have.)
235-
236-
### APIs for Balancing Operations
237-
238-
(Significant internal APIs for balancing a cluster)
239-
240-
### Heuristics for Allocation
241-
242-
### Cluster Reroute Command
243-
244-
(How does this command behave with the desired auto balancer.)
232+
### Core Components
233+
234+
The `DesiredBalanceShardsAllocator` is what runs shard allocation decisions. It leverages the `DesiredBalanceComputer` to produce
235+
`DesiredBalance` instances for the cluster based on the latest cluster changes (add/remove nodes, create/remove indices, load, etc.). Then
236+
the `DesiredBalanceReconciler` is invoked to choose the next steps to take to move the cluster from the current shard allocation to the
237+
latest computed `DesiredBalance` shard allocation. The `DesiredBalanceReconciler` will apply changes to a copy of the `RoutingNodes`, which
238+
is then published in a cluster state update that will reach the data nodes to start the individual shard recovery/deletion/move work.
239+
240+
The `DesiredBalanceReconciler` is throttled by cluster settings, like the max number of concurrent shard moves and recoveries per cluster
241+
and node: this is why the `DesiredBalanceReconciler` will make, and publish via cluster state updates, incremental changes to the cluster
242+
shard allocation. The `DesiredBalanceShardsAllocator` is the endpoint for reroute requests, which may trigger immediate requests to the
243+
`DesiredBalanceReconciler`, but asynchronous requests to the `DesiredBalanceComputer` via the `ContinuousComputation` component. Cluster
244+
state changes that affect shard balancing (for example index deletion) all call some reroute method interface that reaches the
245+
`DesiredBalanceShardsAllocator` to run reconciliation and queue a request for the `DesiredBalancerComputer`, leading to desired balance
246+
computation and reconciliation actions. Asynchronous completion of a new `DesiredBalance` will also invoke a reconciliation action, as will
247+
cluster state updates completing shard moves/recoveries (unthrottling the next shard move/recovery).
248+
249+
The `ContinuousComputation` saves the latest desired balance computation request, which holds the cluster information at the time of that
250+
request, and a thread that runs the `DesiredBalanceComputer`. The `ContinuousComputation` thread takes the latest request, with the
251+
associated cluster information, feeds it into the `DesiredBalanceComputer` and publishes a `DesiredBalance` back to the
252+
`DesiredBalanceShardsAllocator` to use for reconciliation actions. Sometimes the `ContinuousComputation` thread's desired balance
253+
computation will be signalled to exit early and publish the initial `DesiredBalance` improvements it has made, when newer rebalancing
254+
requests (due to cluster state changes) have arrived, or in order to begin recovery of unassigned shards as quickly as possible.
255+
256+
### Rebalancing Process
257+
258+
There are different priorities in shard allocation, reflected in which moves the `DesiredBalancerReconciler` selects to do first given that
259+
it can only move, recover, or remove a limited number of shards at once. The first priority is assigning unassigned shards, primaries being
260+
more important than replicas. The second is to move shards that violate any rule (such as node resource limits) as defined by an
261+
`AllocationDecider`. The `AllocationDeciders` holds a group of `AllocationDecider` implementations that place hard constraints on shard
262+
allocation. There is a decider, `DiskThresholdDecider`, that manages disk memory usage thresholds, such that further shards may not be
263+
allowed assignment to a node, or shards may be required to move off because they grew to exceed the disk space; or another,
264+
`FilterAllocationDecider`, that excludes a configurable list of indices from certain nodes; or `MaxRetryAllocationDecider` that will not
265+
attempt to recover a shard on a certain node after so many failed retries. The third priority is to rebalance shards to even out the
266+
relative weight of shards on each node: the intention is to avoid, or ease, future hot-spotting on data nodes due to too many shards being
267+
placed on the same data node. Node shard weight is based on a sum of factors: disk memory usage, projected shard write load, total number
268+
of shards, and an incentive to distribute shards within the same index across different nodes. See the `WeightFunction` and
269+
`NodeAllocationStatsAndWeightsCalculator` classes for more details on the weight calculations that support the `DesiredBalanceComputer`
270+
decisions.
245271

246272
# Autoscaling
247273

modules/mapper-extras/src/main/java/org/elasticsearch/index/mapper/extras/ScaledFloatFieldMapper.java

Lines changed: 78 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import org.elasticsearch.index.mapper.BlockLoader;
3434
import org.elasticsearch.index.mapper.BlockSourceReader;
3535
import org.elasticsearch.index.mapper.DocumentParserContext;
36+
import org.elasticsearch.index.mapper.FallbackSyntheticSourceBlockLoader;
3637
import org.elasticsearch.index.mapper.FieldMapper;
3738
import org.elasticsearch.index.mapper.IgnoreMalformedStoredValues;
3839
import org.elasticsearch.index.mapper.MapperBuilderContext;
@@ -195,7 +196,9 @@ public ScaledFloatFieldMapper build(MapperBuilderContext context) {
195196
scalingFactor.getValue(),
196197
nullValue.getValue(),
197198
metric.getValue(),
198-
indexMode
199+
indexMode,
200+
coerce.getValue().value(),
201+
context.isSourceSynthetic()
199202
);
200203
return new ScaledFloatFieldMapper(leafName(), type, builderParams(this, context), context.isSourceSynthetic(), this);
201204
}
@@ -209,6 +212,8 @@ public static final class ScaledFloatFieldType extends SimpleMappedFieldType {
209212
private final Double nullValue;
210213
private final TimeSeriesParams.MetricType metricType;
211214
private final IndexMode indexMode;
215+
private final boolean coerce;
216+
private final boolean isSyntheticSource;
212217

213218
public ScaledFloatFieldType(
214219
String name,
@@ -219,21 +224,25 @@ public ScaledFloatFieldType(
219224
double scalingFactor,
220225
Double nullValue,
221226
TimeSeriesParams.MetricType metricType,
222-
IndexMode indexMode
227+
IndexMode indexMode,
228+
boolean coerce,
229+
boolean isSyntheticSource
223230
) {
224231
super(name, indexed, stored, hasDocValues, TextSearchInfo.SIMPLE_MATCH_WITHOUT_TERMS, meta);
225232
this.scalingFactor = scalingFactor;
226233
this.nullValue = nullValue;
227234
this.metricType = metricType;
228235
this.indexMode = indexMode;
236+
this.coerce = coerce;
237+
this.isSyntheticSource = isSyntheticSource;
229238
}
230239

231240
public ScaledFloatFieldType(String name, double scalingFactor) {
232241
this(name, scalingFactor, true);
233242
}
234243

235244
public ScaledFloatFieldType(String name, double scalingFactor, boolean indexed) {
236-
this(name, indexed, false, true, Collections.emptyMap(), scalingFactor, null, null, null);
245+
this(name, indexed, false, true, Collections.emptyMap(), scalingFactor, null, null, null, false, false);
237246
}
238247

239248
public double getScalingFactor() {
@@ -315,13 +324,73 @@ public BlockLoader blockLoader(BlockLoaderContext blContext) {
315324
double scalingFactorInverse = 1d / scalingFactor;
316325
return new BlockDocValuesReader.DoublesBlockLoader(name(), l -> l * scalingFactorInverse);
317326
}
327+
if (isSyntheticSource) {
328+
return new FallbackSyntheticSourceBlockLoader(fallbackSyntheticSourceBlockLoaderReader(), name()) {
329+
@Override
330+
public Builder builder(BlockFactory factory, int expectedCount) {
331+
return factory.doubles(expectedCount);
332+
}
333+
};
334+
}
335+
318336
ValueFetcher valueFetcher = sourceValueFetcher(blContext.sourcePaths(name()));
319337
BlockSourceReader.LeafIteratorLookup lookup = isStored() || isIndexed()
320338
? BlockSourceReader.lookupFromFieldNames(blContext.fieldNames(), name())
321339
: BlockSourceReader.lookupMatchingAll();
322340
return new BlockSourceReader.DoublesBlockLoader(valueFetcher, lookup);
323341
}
324342

343+
private FallbackSyntheticSourceBlockLoader.Reader<?> fallbackSyntheticSourceBlockLoaderReader() {
344+
var nullValueAdjusted = nullValue != null ? adjustSourceValue(nullValue, scalingFactor) : null;
345+
346+
return new FallbackSyntheticSourceBlockLoader.ReaderWithNullValueSupport<>(nullValue) {
347+
@Override
348+
public void convertValue(Object value, List<Double> accumulator) {
349+
if (coerce && value.equals("")) {
350+
if (nullValueAdjusted != null) {
351+
accumulator.add(nullValueAdjusted);
352+
}
353+
}
354+
355+
try {
356+
// Convert to doc_values format
357+
var converted = adjustSourceValue(NumberFieldMapper.NumberType.objectToDouble(value), scalingFactor);
358+
accumulator.add(converted);
359+
} catch (Exception e) {
360+
// Malformed value, skip it
361+
}
362+
}
363+
364+
@Override
365+
protected void parseNonNullValue(XContentParser parser, List<Double> accumulator) throws IOException {
366+
// Aligned with implementation of `parseCreateField(XContentParser)`
367+
if (coerce && parser.currentToken() == XContentParser.Token.VALUE_STRING && parser.textLength() == 0) {
368+
if (nullValueAdjusted != null) {
369+
accumulator.add(nullValueAdjusted);
370+
}
371+
}
372+
373+
try {
374+
double value = parser.doubleValue(coerce);
375+
// Convert to doc_values format
376+
var converted = adjustSourceValue(value, scalingFactor);
377+
accumulator.add(converted);
378+
} catch (Exception e) {
379+
// Malformed value, skip it
380+
}
381+
}
382+
383+
@Override
384+
public void writeToBlock(List<Double> values, BlockLoader.Builder blockBuilder) {
385+
var longBuilder = (BlockLoader.DoubleBuilder) blockBuilder;
386+
387+
for (var value : values) {
388+
longBuilder.appendDouble(value);
389+
}
390+
}
391+
};
392+
}
393+
325394
@Override
326395
public IndexFieldData.Builder fielddataBuilder(FieldDataContext fieldDataContext) {
327396
FielddataOperation operation = fieldDataContext.fielddataOperation();
@@ -386,12 +455,16 @@ protected Double parseSourceValue(Object value) {
386455
doubleValue = NumberFieldMapper.NumberType.objectToDouble(value);
387456
}
388457

389-
double factor = getScalingFactor();
390-
return Math.round(doubleValue * factor) / factor;
458+
return adjustSourceValue(doubleValue, getScalingFactor());
391459
}
392460
};
393461
}
394462

463+
// Adjusts precision of a double value so that it looks like it came from doc_values.
464+
private static Double adjustSourceValue(double value, double scalingFactor) {
465+
return Math.round(value * scalingFactor) / scalingFactor;
466+
}
467+
395468
@Override
396469
public Object valueForDisplay(Object value) {
397470
if (value == null) {
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
3+
* or more contributor license agreements. Licensed under the "Elastic License
4+
* 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side
5+
* Public License v 1"; you may not use this file except in compliance with, at
6+
* your election, the "Elastic License 2.0", the "GNU Affero General Public
7+
* License v3.0 only", or the "Server Side Public License, v 1".
8+
*/
9+
10+
package org.elasticsearch.index.mapper.extras;
11+
12+
import org.elasticsearch.index.mapper.NumberFieldBlockLoaderTestCase;
13+
import org.elasticsearch.logsdb.datageneration.FieldType;
14+
import org.elasticsearch.plugins.Plugin;
15+
16+
import java.util.Collection;
17+
import java.util.List;
18+
import java.util.Map;
19+
20+
public class ScaledFloatFieldBlockLoaderTests extends NumberFieldBlockLoaderTestCase<Double> {
21+
public ScaledFloatFieldBlockLoaderTests() {
22+
super(FieldType.SCALED_FLOAT);
23+
}
24+
25+
@Override
26+
protected Double convert(Number value, Map<String, Object> fieldMapping) {
27+
var scalingFactor = ((Number) fieldMapping.get("scaling_factor")).doubleValue();
28+
29+
var docValues = (boolean) fieldMapping.getOrDefault("doc_values", false);
30+
31+
// There is a slight inconsistency between values that are read from doc_values and from source.
32+
// Due to how precision reduction is applied to source values so that they are consistent with doc_values.
33+
// See #122547.
34+
if (docValues) {
35+
var reverseScalingFactor = 1d / scalingFactor;
36+
return Math.round(value.doubleValue() * scalingFactor) * reverseScalingFactor;
37+
}
38+
39+
// Adjust values coming from source to the way they are stored in doc_values.
40+
// See mapper implementation.
41+
return Math.round(value.doubleValue() * scalingFactor) / scalingFactor;
42+
}
43+
44+
@Override
45+
protected Collection<? extends Plugin> getPlugins() {
46+
return List.of(new MapperExtrasPlugin());
47+
}
48+
}

modules/mapper-extras/src/test/java/org/elasticsearch/index/mapper/extras/ScaledFloatFieldTypeTests.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,9 @@ public void testRangeQuery() throws IOException {
9595
0.1 + randomDouble() * 100,
9696
null,
9797
null,
98-
null
98+
null,
99+
false,
100+
false
99101
);
100102
Directory dir = newDirectory();
101103
IndexWriter w = new IndexWriter(dir, new IndexWriterConfig(null));

muted-tests.yml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -315,15 +315,9 @@ tests:
315315
- class: org.elasticsearch.xpack.autoscaling.storage.ReactiveStorageIT
316316
method: testScaleWhileShrinking
317317
issue: https://github.com/elastic/elasticsearch/issues/122119
318-
- class: org.elasticsearch.xpack.searchablesnapshots.FrozenSearchableSnapshotsIntegTests
319-
method: testCreateAndRestorePartialSearchableSnapshot
320-
issue: https://github.com/elastic/elasticsearch/issues/122693
321318
- class: org.elasticsearch.search.basic.SearchWithRandomDisconnectsIT
322319
method: testSearchWithRandomDisconnects
323320
issue: https://github.com/elastic/elasticsearch/issues/122707
324-
- class: org.elasticsearch.indices.recovery.IndexRecoveryIT
325-
method: testSourceThrottling
326-
issue: https://github.com/elastic/elasticsearch/issues/122712
327321
- class: org.elasticsearch.xpack.esql.action.EsqlActionBreakerIT
328322
issue: https://github.com/elastic/elasticsearch/issues/122810
329323
- class: org.elasticsearch.snapshots.DedicatedClusterSnapshotRestoreIT

server/src/internalClusterTest/java/org/elasticsearch/indices/recovery/IndexRecoveryIT.java

Lines changed: 19 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -342,9 +342,17 @@ public void assertNodeHasThrottleTimeAndNoRecoveries(String nodeName, Boolean is
342342
assertThat(recoveryStats.currentAsSource(), equalTo(0));
343343
assertThat(recoveryStats.currentAsTarget(), equalTo(0));
344344
if (isRecoveryThrottlingNode) {
345-
assertThat("Throttling should be >0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), greaterThan(0L));
345+
assertThat(
346+
"Throttling should be >0 for '" + nodeName + "'. Node stats: " + nodesStatsResponse,
347+
recoveryStats.throttleTime().millis(),
348+
greaterThan(0L)
349+
);
346350
} else {
347-
assertThat("Throttling should be =0 for '" + nodeName + "'", recoveryStats.throttleTime().millis(), equalTo(0L));
351+
assertThat(
352+
"Throttling should be =0 for '" + nodeName + "'. Node stats: " + nodesStatsResponse,
353+
recoveryStats.throttleTime().millis(),
354+
equalTo(0L)
355+
);
348356
}
349357
}
350358

@@ -1967,7 +1975,14 @@ public void testPostRecoveryMerge() throws Exception {
19671975
internalCluster().startMasterOnlyNode();
19681976
final var dataNode = internalCluster().startDataOnlyNode();
19691977
final var indexName = randomIdentifier();
1970-
createIndex(indexName, indexSettings(1, 0).put(INDEX_MERGE_ENABLED, false).build());
1978+
final var indexSettingsBuilder = indexSettings(1, 0).put(INDEX_MERGE_ENABLED, false);
1979+
if (randomBoolean()) {
1980+
indexSettingsBuilder.put(
1981+
IndexMetadata.SETTING_VERSION_CREATED,
1982+
IndexVersionUtils.randomVersionBetween(random(), IndexVersions.UPGRADE_TO_LUCENE_10_0_0, IndexVersion.current())
1983+
);
1984+
}
1985+
createIndex(indexName, indexSettingsBuilder.build());
19711986

19721987
final var initialSegmentCount = 20;
19731988
for (int i = 0; i < initialSegmentCount; i++) {
@@ -2051,7 +2066,7 @@ public void testPostRecoveryMergeDisabledOnOlderIndices() throws Exception {
20512066
IndexVersionUtils.randomVersionBetween(
20522067
random(),
20532068
IndexVersionUtils.getLowestWriteCompatibleVersion(),
2054-
IndexVersionUtils.getPreviousVersion(IndexVersions.MERGE_ON_RECOVERY_VERSION)
2069+
IndexVersionUtils.getPreviousVersion(IndexVersions.UPGRADE_TO_LUCENE_10_0_0)
20552070
)
20562071
)
20572072
.build()

server/src/internalClusterTest/java/org/elasticsearch/search/retriever/RetrieverRewriteIT.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -127,10 +127,7 @@ public void testRewriteCompoundRetrieverShouldThrowForPartialResults() throws Ex
127127
SearchPhaseExecutionException.class,
128128
client().prepareSearch(testIndex).setSource(source)::get
129129
);
130-
assertThat(
131-
ex.getDetailedMessage(),
132-
containsString("[open_point_in_time] action requires all shards to be available. Missing shards")
133-
);
130+
assertThat(ex.getDetailedMessage(), containsString("Search rejected due to missing shards"));
134131
} finally {
135132
internalCluster().restartNode(randomDataNode);
136133
}

server/src/main/java/org/elasticsearch/action/search/CanMatchPreFilterSearchPhase.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,7 @@ private synchronized void consumeResult(int shardIndex, boolean canMatch, MinAnd
201201

202202
private void checkNoMissingShards(List<SearchShardIterator> shards) {
203203
assert assertSearchCoordinationThread();
204-
SearchPhase.doCheckNoMissingShards("can_match", request, shards, SearchPhase::makeMissingShardsError);
204+
SearchPhase.doCheckNoMissingShards("can_match", request, shards);
205205
}
206206

207207
private Map<SendingTarget, List<SearchShardIterator>> groupByNode(List<SearchShardIterator> shards) {

0 commit comments

Comments
 (0)