Skip to content

Commit d183445

Browse files
committed
Merge remote-tracking branch 'origin/main' into remove-snapshot-failures
2 parents a748d09 + 94d7f22 commit d183445

File tree

24 files changed

+472
-516
lines changed

24 files changed

+472
-516
lines changed

muted-tests.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,12 @@ tests:
134134
- class: org.elasticsearch.datastreams.DataStreamsClientYamlTestSuiteIT
135135
method: test {p0=data_stream/120_data_streams_stats/Multiple data stream}
136136
issue: https://github.com/elastic/elasticsearch/issues/118217
137+
# TODO: re-enable after backporting https://github.com/elastic/elasticsearch/pull/119110
138+
- class: org.elasticsearch.test.rest.ClientYamlTestSuiteIT
139+
method: test {yaml=update/100_synthetic_source/keyword}
140+
# TODO: re-enable after backporting https://github.com/elastic/elasticsearch/pull/119110
141+
- class: org.elasticsearch.test.rest.ClientYamlTestSuiteIT
142+
method: test {yaml=update/100_synthetic_source/stored text}
137143
- class: org.elasticsearch.xpack.searchablesnapshots.RetrySearchIntegTests
138144
method: testSearcherId
139145
issue: https://github.com/elastic/elasticsearch/issues/118374

qa/smoke-test-multinode/build.gradle

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,5 +28,7 @@ tasks.named("yamlRestTest").configure {
2828
'cat.templates/10_basic/No templates',
2929
'cat.templates/10_basic/Sort templates',
3030
'cat.templates/10_basic/Multiple template',
31+
'update/100_synthetic_source/keyword',
32+
'update/100_synthetic_source/stored text'
3133
].join(',')
3234
}

rest-api-spec/src/yamlRestTest/resources/rest-api-spec/test/update/100_synthetic_source.yml

Lines changed: 8 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ setup:
66
---
77
keyword:
88
- requires:
9-
cluster_features: ["gte_v8.4.0"]
10-
reason: introduced in 8.4.0
9+
cluster_features: [ "mapper.synthetic_recovery_source" ]
10+
reason: requires synthetic recovery source
1111

1212
- do:
1313
indices.create:
@@ -60,13 +60,14 @@ keyword:
6060
index: test
6161
run_expensive_tasks: true
6262
- is_false: test.fields._source
63-
- is_true: test.fields._recovery_source
63+
# When synthetic source is used there is no _recovery_source field
64+
- match: { test.fields._recovery_source: null }
6465

6566
---
6667
stored text:
6768
- requires:
68-
cluster_features: ["gte_v8.5.0"]
69-
reason: introduced in 8.5.0
69+
cluster_features: [ "mapper.synthetic_recovery_source" ]
70+
reason: requires synthetic recovery source
7071

7172
- do:
7273
indices.create:
@@ -121,4 +122,5 @@ stored text:
121122
index: test
122123
run_expensive_tasks: true
123124
- is_false: test.fields._source
124-
- is_true: test.fields._recovery_source
125+
# When synthetic source is used there is no _recovery_source field
126+
- match: { test.fields._recovery_source: null }

server/src/main/java/org/elasticsearch/TransportVersions.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,6 +176,7 @@ static TransportVersion def(int id) {
176176
public static final TransportVersion ELASTICSEARCH_9_0 = def(9_000_0_00);
177177
public static final TransportVersion COHERE_BIT_EMBEDDING_TYPE_SUPPORT_ADDED = def(9_001_0_00);
178178
public static final TransportVersion REMOVE_SNAPSHOT_FAILURES = def(9_002_0_00);
179+
public static final TransportVersion TRANSPORT_STATS_HANDLING_TIME_REQUIRED = def(9_003_0_00);
179180

180181
/*
181182
* STOP! READ THIS FIRST! No, really,

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 19 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,13 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9090
private final Object shardFailuresMutex = new Object();
9191
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
9292
private final AtomicInteger successfulOps = new AtomicInteger();
93-
private final AtomicInteger skippedOps = new AtomicInteger();
9493
private final SearchTimeProvider timeProvider;
9594
private final SearchResponse.Clusters clusters;
9695

9796
protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
9897
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
9998
private final SearchShardIterator[] shardIterators;
100-
private final int expectedTotalOps;
101-
private final AtomicInteger totalOps = new AtomicInteger();
99+
private final AtomicInteger outstandingShards;
102100
private final int maxConcurrentRequestsPerNode;
103101
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
104102
private final boolean throttleConcurrentRequests;
@@ -139,18 +137,12 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
139137
}
140138
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
141139
this.shardsIts = new GroupShardsIterator<>(iterators);
142-
140+
outstandingShards = new AtomicInteger(shardsIts.size());
143141
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
144142
// we later compute the shard index based on the natural order of the shards
145143
// that participate in the search request. This means that this number is
146144
// consistent between two requests that target the same shards.
147145
Arrays.sort(shardIterators);
148-
149-
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
150-
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
151-
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
152-
// we process hence we add one for the non active partition here.
153-
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
154146
this.maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode;
155147
// in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle
156148
this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size();
@@ -251,9 +243,8 @@ protected final void run() {
251243

252244
void skipShard(SearchShardIterator iterator) {
253245
successfulOps.incrementAndGet();
254-
skippedOps.incrementAndGet();
255246
assert iterator.skip();
256-
successfulShardExecution(iterator);
247+
successfulShardExecution();
257248
}
258249

259250
private static boolean assertExecuteOnStartThread() {
@@ -380,7 +371,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
380371
"Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})",
381372
discrepancy,
382373
successfulOps.get(),
383-
skippedOps.get(),
374+
toSkipShardsIts.size(),
384375
getNumShards(),
385376
currentPhase
386377
);
@@ -449,17 +440,14 @@ private void onShardFailure(final int shardIndex, SearchShardTarget shard, final
449440
}
450441
onShardGroupFailure(shardIndex, shard, e);
451442
}
452-
final int totalOps = this.totalOps.incrementAndGet();
453-
if (totalOps == expectedTotalOps) {
454-
onPhaseDone();
455-
} else if (totalOps > expectedTotalOps) {
456-
throw new AssertionError(
457-
"unexpected higher total ops [" + totalOps + "] compared to expected [" + expectedTotalOps + "]",
458-
new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures())
459-
);
443+
if (lastShard == false) {
444+
performPhaseOnShard(shardIndex, shardIt, nextShard);
460445
} else {
461-
if (lastShard == false) {
462-
performPhaseOnShard(shardIndex, shardIt, nextShard);
446+
// count down outstanding shards, we're done with this shard as there's no more copies to try
447+
final int outstanding = outstandingShards.decrementAndGet();
448+
assert outstanding >= 0 : "outstanding: " + outstanding;
449+
if (outstanding == 0) {
450+
onPhaseDone();
463451
}
464452
}
465453
}
@@ -535,10 +523,10 @@ protected void onShardResult(Result result, SearchShardIterator shardIt) {
535523
if (logger.isTraceEnabled()) {
536524
logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
537525
}
538-
results.consumeResult(result, () -> onShardResultConsumed(result, shardIt));
526+
results.consumeResult(result, () -> onShardResultConsumed(result));
539527
}
540528

541-
private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
529+
private void onShardResultConsumed(Result result) {
542530
successfulOps.incrementAndGet();
543531
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
544532
// so its ok concurrency wise to miss potentially the shard failures being created because of another failure
@@ -552,28 +540,14 @@ private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
552540
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
553541
// increment all the "future" shards to update the total ops since we some may work and some may not...
554542
// and when that happens, we break on total ops, so we must maintain them
555-
successfulShardExecution(shardIt);
543+
successfulShardExecution();
556544
}
557545

558-
private void successfulShardExecution(SearchShardIterator shardsIt) {
559-
final int remainingOpsOnIterator;
560-
if (shardsIt.skip()) {
561-
// It's possible that we're skipping a shard that's unavailable
562-
// but its range was available in the IndexMetadata, in that
563-
// case the shardsIt.remaining() would be 0, expectedTotalOps
564-
// accounts for unavailable shards too.
565-
remainingOpsOnIterator = Math.max(shardsIt.remaining(), 1);
566-
} else {
567-
remainingOpsOnIterator = shardsIt.remaining() + 1;
568-
}
569-
final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
570-
if (xTotalOps == expectedTotalOps) {
546+
private void successfulShardExecution() {
547+
final int outstanding = outstandingShards.decrementAndGet();
548+
assert outstanding >= 0 : "outstanding: " + outstanding;
549+
if (outstanding == 0) {
571550
onPhaseDone();
572-
} else if (xTotalOps > expectedTotalOps) {
573-
throw new AssertionError(
574-
"unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]",
575-
new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures())
576-
);
577551
}
578552
}
579553

@@ -640,7 +614,7 @@ private SearchResponse buildSearchResponse(
640614
scrollId,
641615
getNumShards(),
642616
numSuccess,
643-
skippedOps.get(),
617+
toSkipShardsIts.size(),
644618
buildTookInMillis(),
645619
failures,
646620
clusters,

server/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,6 @@ public GroupShardsIterator(List<ShardIt> iterators) {
4141
this.iterators = iterators;
4242
}
4343

44-
/**
45-
* Returns the total number of shards within all groups
46-
* @return total number of shards
47-
*/
48-
public int totalSize() {
49-
return iterators.stream().mapToInt(Countable::size).sum();
50-
}
51-
52-
/**
53-
* Returns the total number of shards plus the number of empty groups
54-
* @return number of shards and empty groups
55-
*/
56-
public int totalSizeWith1ForEmpty() {
57-
int size = 0;
58-
for (ShardIt shard : iterators) {
59-
size += Math.max(1, shard.size());
60-
}
61-
return size;
62-
}
63-
6444
/**
6545
* Return the number of groups
6646
* @return number of groups

server/src/main/java/org/elasticsearch/common/settings/Setting.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1580,6 +1580,15 @@ public static Setting<Boolean> boolSetting(String key, boolean defaultValue, Val
15801580
return new Setting<>(key, Boolean.toString(defaultValue), booleanParser(key, properties), validator, properties);
15811581
}
15821582

1583+
public static Setting<Boolean> boolSetting(
1584+
String key,
1585+
Function<Settings, String> defaultValueFn,
1586+
Validator<Boolean> validator,
1587+
Property... properties
1588+
) {
1589+
return new Setting<>(key, defaultValueFn, booleanParser(key, properties), validator, properties);
1590+
}
1591+
15831592
public static Setting<Boolean> boolSetting(String key, Function<Settings, String> defaultValueFn, Property... properties) {
15841593
return new Setting<>(key, defaultValueFn, booleanParser(key, properties), properties);
15851594
}

server/src/main/java/org/elasticsearch/index/IndexSettings.java

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import org.elasticsearch.common.time.DateUtils;
2525
import org.elasticsearch.common.unit.ByteSizeUnit;
2626
import org.elasticsearch.common.unit.ByteSizeValue;
27+
import org.elasticsearch.common.util.FeatureFlag;
2728
import org.elasticsearch.core.TimeValue;
2829
import org.elasticsearch.index.mapper.IgnoredSourceFieldMapper;
2930
import org.elasticsearch.index.mapper.Mapper;
@@ -39,6 +40,7 @@
3940
import java.util.List;
4041
import java.util.Locale;
4142
import java.util.Map;
43+
import java.util.Objects;
4244
import java.util.concurrent.TimeUnit;
4345
import java.util.function.Consumer;
4446

@@ -722,9 +724,25 @@ public Iterator<Setting<?>> settings() {
722724
Setting.Property.ServerlessPublic
723725
);
724726

727+
public static final FeatureFlag RECOVERY_USE_SYNTHETIC_SOURCE = new FeatureFlag("index_recovery_use_synthetic_source");
725728
public static final Setting<Boolean> RECOVERY_USE_SYNTHETIC_SOURCE_SETTING = Setting.boolSetting(
726729
"index.recovery.use_synthetic_source",
727-
false,
730+
settings -> {
731+
boolean isSyntheticSourceRecoveryFeatureFlagEnabled = RECOVERY_USE_SYNTHETIC_SOURCE.isEnabled();
732+
boolean isNewIndexVersion = SETTING_INDEX_VERSION_CREATED.get(settings)
733+
.onOrAfter(IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY_BY_DEFAULT);
734+
boolean isIndexVersionInBackportRange = SETTING_INDEX_VERSION_CREATED.get(settings)
735+
.between(IndexVersions.USE_SYNTHETIC_SOURCE_FOR_RECOVERY_BY_DEFAULT_BACKPORT, IndexVersions.UPGRADE_TO_LUCENE_10_0_0);
736+
737+
boolean useSyntheticRecoverySource = isSyntheticSourceRecoveryFeatureFlagEnabled
738+
&& (isNewIndexVersion || isIndexVersionInBackportRange);
739+
740+
return String.valueOf(
741+
useSyntheticRecoverySource
742+
&& Objects.equals(INDEX_MAPPER_SOURCE_MODE_SETTING.get(settings), SourceFieldMapper.Mode.SYNTHETIC)
743+
);
744+
745+
},
728746
new Setting.Validator<>() {
729747
@Override
730748
public void validate(Boolean value) {}
@@ -1083,7 +1101,8 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
10831101
skipIgnoredSourceRead = scopedSettings.get(IgnoredSourceFieldMapper.SKIP_IGNORED_SOURCE_READ_SETTING);
10841102
indexMappingSourceMode = scopedSettings.get(INDEX_MAPPER_SOURCE_MODE_SETTING);
10851103
recoverySourceEnabled = RecoverySettings.INDICES_RECOVERY_SOURCE_ENABLED_SETTING.get(nodeSettings);
1086-
recoverySourceSyntheticEnabled = scopedSettings.get(RECOVERY_USE_SYNTHETIC_SOURCE_SETTING);
1104+
recoverySourceSyntheticEnabled = DiscoveryNode.isStateless(nodeSettings) == false
1105+
&& scopedSettings.get(RECOVERY_USE_SYNTHETIC_SOURCE_SETTING);
10871106
if (recoverySourceSyntheticEnabled) {
10881107
if (DiscoveryNode.isStateless(settings)) {
10891108
throw new IllegalArgumentException("synthetic recovery source is only allowed in stateful");

server/src/main/java/org/elasticsearch/index/IndexVersions.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -134,6 +134,7 @@ private static Version parseUnchecked(String version) {
134134
public static final IndexVersion UPGRADE_TO_LUCENE_9_12_1 = def(8_523_0_00, parseUnchecked("9.12.1"));
135135
public static final IndexVersion INFERENCE_METADATA_FIELDS_BACKPORT = def(8_524_0_00, parseUnchecked("9.12.1"));
136136
public static final IndexVersion LOGSB_OPTIONAL_SORTING_ON_HOST_NAME_BACKPORT = def(8_525_0_00, parseUnchecked("9.12.1"));
137+
public static final IndexVersion USE_SYNTHETIC_SOURCE_FOR_RECOVERY_BY_DEFAULT_BACKPORT = def(8_526_0_00, parseUnchecked("9.12.1"));
137138
public static final IndexVersion UPGRADE_TO_LUCENE_10_0_0 = def(9_000_0_00, Version.LUCENE_10_0_0);
138139
public static final IndexVersion LOGSDB_DEFAULT_IGNORE_DYNAMIC_BEYOND_LIMIT = def(9_001_0_00, Version.LUCENE_10_0_0);
139140
public static final IndexVersion TIME_BASED_K_ORDERED_DOC_ID = def(9_002_0_00, Version.LUCENE_10_0_0);
@@ -144,6 +145,7 @@ private static Version parseUnchecked(String version) {
144145
public static final IndexVersion SOURCE_MAPPER_MODE_ATTRIBUTE_NOOP = def(9_007_0_00, Version.LUCENE_10_0_0);
145146
public static final IndexVersion HOSTNAME_DOC_VALUES_SPARSE_INDEX = def(9_008_0_00, Version.LUCENE_10_0_0);
146147
public static final IndexVersion UPGRADE_TO_LUCENE_10_1_0 = def(9_009_0_00, Version.LUCENE_10_1_0);
148+
public static final IndexVersion USE_SYNTHETIC_SOURCE_FOR_RECOVERY_BY_DEFAULT = def(9_010_00_0, Version.LUCENE_10_1_0);
147149
/*
148150
* STOP! READ THIS FIRST! No, really,
149151
* ____ _____ ___ ____ _ ____ _____ _ ____ _____ _ _ ___ ____ _____ ___ ____ ____ _____ _

0 commit comments

Comments
 (0)