Skip to content

Commit 4ca957f

Browse files
authored
Merge branch 'main' into refactor-downsampling-dlm
2 parents e23f767 + e1c852b commit 4ca957f

File tree

32 files changed

+1214
-161
lines changed

32 files changed

+1214
-161
lines changed

build-tools-internal/src/integTest/groovy/org/elasticsearch/gradle/internal/transport/TransportVersionGenerationFuncTest.groovy

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,4 +513,19 @@ class TransportVersionGenerationFuncTest extends AbstractTransportVersionFuncTes
513513
assertGenerateAndValidateSuccess(result)
514514
assertUpperBound("9.2", "existing_92,8123000")
515515
}
516+
517+
def "generation cannot run on release branch"() {
518+
given:
519+
file("myserver/build.gradle") << """
520+
tasks.named('generateTransportVersion') {
521+
currentUpperBoundName = '9.1'
522+
}
523+
"""
524+
525+
when:
526+
def result = runGenerateTask().buildAndFail()
527+
528+
then:
529+
assertGenerateFailure(result, "Transport version generation cannot run on release branches")
530+
}
516531
}

build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/transport/GenerateTransportVersionDefinitionTask.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,12 @@ public abstract class GenerateTransportVersionDefinitionTask extends DefaultTask
9696
@TaskAction
9797
public void run() throws IOException {
9898
TransportVersionResourcesService resources = getResourceService().get();
99+
List<TransportVersionUpperBound> upstreamUpperBounds = resources.getUpperBoundsFromGitBase();
100+
boolean onReleaseBranch = resources.checkIfDefinitelyOnReleaseBranch(upstreamUpperBounds, getCurrentUpperBoundName().get());
101+
if (onReleaseBranch) {
102+
throw new IllegalArgumentException("Transport version generation cannot run on release branches");
103+
}
104+
99105
Set<String> referencedNames = TransportVersionReference.collectNames(getReferencesFiles());
100106
Set<String> changedDefinitionNames = resources.getChangedReferableDefinitionNames();
101107
String targetDefinitionName = getTargetDefinitionName(resources, referencedNames, changedDefinitionNames);
@@ -109,7 +115,7 @@ public void run() throws IOException {
109115
resetAllUpperBounds(resources, idsByBase);
110116
} else {
111117
getLogger().lifecycle("Generating transport version name: " + targetDefinitionName);
112-
List<TransportVersionUpperBound> upstreamUpperBounds = resources.getUpperBoundsFromGitBase();
118+
113119
Set<String> targetUpperBoundNames = getTargetUpperBoundNames(resources, upstreamUpperBounds, targetDefinitionName);
114120

115121
List<TransportVersionId> ids = updateUpperBounds(

build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/transport/TransportVersionResourcesService.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.nio.file.Files;
2626
import java.nio.file.Path;
2727
import java.util.ArrayList;
28+
import java.util.Collection;
2829
import java.util.Collections;
2930
import java.util.Comparator;
3031
import java.util.HashMap;
@@ -259,6 +260,20 @@ private Path getUpperBoundRelativePath(String name) {
259260
return UPPER_BOUNDS_DIR.resolve(name + ".csv");
260261
}
261262

263+
boolean checkIfDefinitelyOnReleaseBranch(Collection<TransportVersionUpperBound> upperBounds, String currentUpperBoundName) {
264+
// only want to look at definitions <= the current upper bound.
265+
// TODO: we should filter all of the upper bounds/definitions that are validated by this, not just in this method
266+
TransportVersionUpperBound currentUpperBound = upperBounds.stream()
267+
.filter(u -> u.name().equals(currentUpperBoundName))
268+
.findFirst()
269+
.orElse(null);
270+
if (currentUpperBound == null) {
271+
// since there is no current upper bound, we don't know if we are on a release branch
272+
return false;
273+
}
274+
return upperBounds.stream().anyMatch(u -> u.definitionId().complete() > currentUpperBound.definitionId().complete());
275+
}
276+
262277
private String getBaseRefName() {
263278
if (baseRefName.get() == null) {
264279
synchronized (baseRefName) {

build-tools-internal/src/main/java/org/elasticsearch/gradle/internal/transport/ValidateTransportVersionResourcesTask.java

Lines changed: 1 addition & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -84,7 +84,7 @@ public void validateTransportVersions() throws IOException {
8484
Map<Integer, List<IdAndDefinition>> idsByBase = resources.getIdsByBase();
8585
Map<String, TransportVersionUpperBound> upperBounds = resources.getUpperBounds();
8686
TransportVersionUpperBound currentUpperBound = upperBounds.get(getCurrentUpperBoundName().get());
87-
boolean onReleaseBranch = checkIfDefinitelyOnReleaseBranch(upperBounds);
87+
boolean onReleaseBranch = resources.checkIfDefinitelyOnReleaseBranch(upperBounds.values(), getCurrentUpperBoundName().get());
8888
boolean validateModifications = onReleaseBranch == false || getCI().get();
8989

9090
for (var definition : referableDefinitions.values()) {
@@ -330,15 +330,6 @@ private void validatePrimaryIds(
330330
);
331331
}
332332

333-
private boolean checkIfDefinitelyOnReleaseBranch(Map<String, TransportVersionUpperBound> upperBounds) {
334-
// only want to look at definitions <= the current upper bound.
335-
// TODO: we should filter all of the upper bounds/definitions that are validated by this, not just in this method
336-
String currentUpperBoundName = getCurrentUpperBoundName().get();
337-
TransportVersionUpperBound currentUpperBound = upperBounds.get(currentUpperBoundName);
338-
339-
return upperBounds.values().stream().anyMatch(u -> u.definitionId().complete() > currentUpperBound.definitionId().complete());
340-
}
341-
342333
private void throwDefinitionFailure(TransportVersionDefinition definition, String message) {
343334
Path relativePath = getResources().get().getDefinitionPath(definition);
344335
throw new VerificationException("Transport version definition file [" + relativePath + "] " + message);

docs/changelog/135231.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 135231
2+
summary: Improve retrying PIT contexts for read-only indices
3+
area: Search
4+
type: enhancement
5+
issues: []

docs/changelog/137302.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 137302
2+
summary: Add audit log testing for cert-based cross-cluster authentication
3+
area: Security
4+
type: enhancement
5+
issues: []

docs/changelog/137387.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 137387
2+
summary: Extends constant MVs handling with warnings to general binary comparisons
3+
area: ES|QL
4+
type: bug
5+
issues: []

muted-tests.yml

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -498,6 +498,15 @@ tests:
498498
- class: org.elasticsearch.xpack.security.CoreWithSecurityClientYamlTestSuiteIT
499499
method: test {yaml=indices.validate_query/20_query_string/validate_query with query_string parameters}
500500
issue: https://github.com/elastic/elasticsearch/issues/137391
501+
- class: org.elasticsearch.xpack.downsample.ILMDownsampleDisruptionIT
502+
method: testILMDownsampleRollingRestart
503+
issue: https://github.com/elastic/elasticsearch/issues/137489
504+
- class: org.elasticsearch.smoketest.SmokeTestIngestWithAllDepsClientYamlTestSuiteIT
505+
method: test {yaml=ingest/100_sampling_with_reroute/Test get sample with multiple reroutes}
506+
issue: https://github.com/elastic/elasticsearch/issues/137457
507+
- class: org.elasticsearch.xpack.esql.qa.single_node.PushQueriesIT
508+
method: testEqualityAndOther {SEMANTIC_TEXT_WITH_KEYWORD}
509+
issue: https://github.com/elastic/elasticsearch/issues/137491
501510

502511
# Examples:
503512
#

server/src/internalClusterTest/java/org/elasticsearch/search/scroll/SearchScrollIT.java

Lines changed: 11 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,12 @@
1111

1212
import org.elasticsearch.ExceptionsHelper;
1313
import org.elasticsearch.action.search.ClearScrollResponse;
14+
import org.elasticsearch.action.search.ParsedScrollId;
1415
import org.elasticsearch.action.search.SearchPhaseExecutionException;
1516
import org.elasticsearch.action.search.SearchRequestBuilder;
1617
import org.elasticsearch.action.search.SearchResponse;
18+
import org.elasticsearch.action.search.SearchScrollRequestBuilder;
1719
import org.elasticsearch.action.search.SearchType;
18-
import org.elasticsearch.action.search.ShardSearchFailure;
1920
import org.elasticsearch.cluster.metadata.IndexMetadata;
2021
import org.elasticsearch.common.Priority;
2122
import org.elasticsearch.common.bytes.BytesReference;
@@ -28,6 +29,7 @@
2829
import org.elasticsearch.index.query.RangeQueryBuilder;
2930
import org.elasticsearch.rest.RestStatus;
3031
import org.elasticsearch.search.SearchHit;
32+
import org.elasticsearch.search.internal.ShardSearchContextId;
3133
import org.elasticsearch.search.sort.FieldSortBuilder;
3234
import org.elasticsearch.search.sort.SortOrder;
3335
import org.elasticsearch.test.ESIntegTestCase;
@@ -703,13 +705,15 @@ public void testRestartDataNodesDuringScrollSearch() throws Exception {
703705
} finally {
704706
respFromProdIndex.decRef();
705707
}
706-
SearchPhaseExecutionException error = expectThrows(
707-
SearchPhaseExecutionException.class,
708-
client().prepareSearchScroll(respFromDemoIndexScrollId)
708+
SearchScrollRequestBuilder searchScrollRequestBuilder = client().prepareSearchScroll(respFromDemoIndexScrollId);
709+
SearchPhaseExecutionException error = expectThrows(SearchPhaseExecutionException.class, searchScrollRequestBuilder);
710+
assertEquals(1, error.shardFailures().length);
711+
ParsedScrollId parsedScrollId = searchScrollRequestBuilder.request().parseScrollId();
712+
ShardSearchContextId shardSearchContextId = parsedScrollId.getContext()[0].getSearchContextId();
713+
assertThat(
714+
error.shardFailures()[0].getCause().getMessage(),
715+
containsString("No search context found for id [" + shardSearchContextId + "]")
709716
);
710-
for (ShardSearchFailure shardSearchFailure : error.shardFailures()) {
711-
assertThat(shardSearchFailure.getCause().getMessage(), containsString("No search context found for id [1]"));
712-
}
713717
client().prepareSearchScroll(respFromProdIndexScrollId).get().decRef();
714718
}
715719

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 93 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313
import org.apache.lucene.util.SetOnce;
1414
import org.elasticsearch.ElasticsearchException;
1515
import org.elasticsearch.ExceptionsHelper;
16+
import org.elasticsearch.TransportVersion;
1617
import org.elasticsearch.action.ActionListener;
1718
import org.elasticsearch.action.NoShardAvailableActionException;
1819
import org.elasticsearch.action.OriginalIndices;
@@ -21,6 +22,7 @@
2122
import org.elasticsearch.action.support.SubscribableListener;
2223
import org.elasticsearch.action.support.TransportActions;
2324
import org.elasticsearch.cluster.ClusterState;
25+
import org.elasticsearch.cluster.node.DiscoveryNodes;
2426
import org.elasticsearch.common.bytes.BytesReference;
2527
import org.elasticsearch.common.io.stream.NamedWriteableRegistry;
2628
import org.elasticsearch.common.util.Maps;
@@ -30,8 +32,10 @@
3032
import org.elasticsearch.rest.action.search.SearchResponseMetrics;
3133
import org.elasticsearch.search.SearchContextMissingException;
3234
import org.elasticsearch.search.SearchPhaseResult;
35+
import org.elasticsearch.search.SearchService;
3336
import org.elasticsearch.search.SearchShardTarget;
3437
import org.elasticsearch.search.builder.PointInTimeBuilder;
38+
import org.elasticsearch.search.builder.SearchSourceBuilder;
3539
import org.elasticsearch.search.internal.AliasFilter;
3640
import org.elasticsearch.search.internal.SearchContext;
3741
import org.elasticsearch.search.internal.ShardSearchContextId;
@@ -40,6 +44,8 @@
4044

4145
import java.util.ArrayList;
4246
import java.util.Arrays;
47+
import java.util.Collection;
48+
import java.util.HashMap;
4349
import java.util.List;
4450
import java.util.Map;
4551
import java.util.concurrent.ConcurrentHashMap;
@@ -53,6 +59,7 @@
5359
import java.util.function.Supplier;
5460
import java.util.stream.Collectors;
5561

62+
import static org.elasticsearch.action.search.TransportClosePointInTimeAction.closeContexts;
5663
import static org.elasticsearch.core.Strings.format;
5764

5865
/**
@@ -94,11 +101,13 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
94101
private final Map<String, PendingExecutions> pendingExecutionsPerNode;
95102
private final AtomicBoolean requestCancelled = new AtomicBoolean();
96103
private final int skippedCount;
104+
private final TransportVersion mintransportVersion;
97105
protected final SearchResponseMetrics searchResponseMetrics;
98106
protected long phaseStartTimeInNanos;
99107

100108
// protected for tests
101109
protected final SubscribableListener<Void> doneFuture = new SubscribableListener<>();
110+
private final Supplier<DiscoveryNodes> discoveryNodes;
102111

103112
AbstractSearchAsyncAction(
104113
String name,
@@ -153,6 +162,8 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
153162
this.nodeIdToConnection = nodeIdToConnection;
154163
this.concreteIndexBoosts = concreteIndexBoosts;
155164
this.clusterStateVersion = clusterState.version();
165+
this.mintransportVersion = clusterState.getMinTransportVersion();
166+
this.discoveryNodes = clusterState::nodes;
156167
this.aliasFilter = aliasFilter;
157168
this.results = resultConsumer;
158169
// register the release of the query consumer to free up the circuit breaker memory
@@ -422,6 +433,7 @@ protected final void onShardFailure(final int shardIndex, SearchShardTarget shar
422433
onShardGroupFailure(shardIndex, shard, e);
423434
}
424435
if (lastShard == false) {
436+
logger.debug("Retrying shard [{}] with target [{}]", shard.getShardId(), nextShard);
425437
performPhaseOnShard(shardIndex, shardIt, nextShard);
426438
} else {
427439
// count down outstanding shards, we're done with this shard as there's no more copies to try
@@ -613,10 +625,87 @@ public void sendSearchResponse(SearchResponseSections internalSearchResponse, At
613625
}
614626

615627
protected BytesReference buildSearchContextId(ShardSearchFailure[] failures) {
616-
var source = request.source();
617-
return source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false
618-
? source.pointInTimeBuilder().getEncodedId()
619-
: null;
628+
SearchSourceBuilder source = request.source();
629+
// only (re-)build a search context id if we are running a long-lived point-in-time request
630+
if (source != null && source.pointInTimeBuilder() != null && source.pointInTimeBuilder().singleSession() == false) {
631+
if (SearchService.PIT_RELOCATION_FEATURE_FLAG.isEnabled()) {
632+
// we want to change node ids in the PIT id if any shards and its PIT context have moved
633+
return maybeReEncodeNodeIds(
634+
source.pointInTimeBuilder(),
635+
results.getAtomicArray().asList(),
636+
namedWriteableRegistry,
637+
mintransportVersion,
638+
searchTransportService,
639+
discoveryNodes.get(),
640+
logger
641+
);
642+
} else {
643+
return source.pointInTimeBuilder().getEncodedId();
644+
}
645+
} else {
646+
return null;
647+
}
648+
}
649+
650+
static <Result extends SearchPhaseResult> BytesReference maybeReEncodeNodeIds(
651+
PointInTimeBuilder originalPit,
652+
List<Result> results,
653+
NamedWriteableRegistry namedWriteableRegistry,
654+
TransportVersion mintransportVersion,
655+
SearchTransportService searchTransportService,
656+
DiscoveryNodes nodes,
657+
Logger logger
658+
) {
659+
SearchContextId original = originalPit.getSearchContextId(namedWriteableRegistry);
660+
// only create the following two collections if we detect an id change
661+
Map<ShardId, SearchContextIdForNode> updatedShardMap = null;
662+
Collection<SearchContextIdForNode> contextsToClose = null;
663+
logger.debug("checking search result shards to detect PIT node changes");
664+
for (Result result : results) {
665+
SearchShardTarget searchShardTarget = result.getSearchShardTarget();
666+
ShardId shardId = searchShardTarget.getShardId();
667+
SearchContextIdForNode originalShard = original.shards().get(shardId);
668+
if (originalShard != null && originalShard.getSearchContextId() != null && originalShard.getSearchContextId().isRetryable()) {
669+
// check if the node is different, if so we need to re-encode the PIT
670+
String originalNode = originalShard.getNode();
671+
if (originalNode != null && originalNode.equals(searchShardTarget.getNodeId()) == false) {
672+
// the target node for this shard entry in the PIT has changed, we need to update it
673+
if (updatedShardMap == null) {
674+
// initialize the map with entries from old map to keep ids for shards that have not responded in this results
675+
updatedShardMap = new HashMap<>(original.shards());
676+
contextsToClose = new ArrayList<>();
677+
}
678+
SearchContextIdForNode updatedId = new SearchContextIdForNode(
679+
searchShardTarget.getClusterAlias(),
680+
searchShardTarget.getNodeId(),
681+
result.getContextId()
682+
);
683+
684+
logger.debug("changing node for PIT shard id from [{}] to [{}]", originalShard, updatedId);
685+
updatedShardMap.put(shardId, updatedId);
686+
contextsToClose.add(original.shards().get(shardId));
687+
688+
}
689+
}
690+
}
691+
if (updatedShardMap != null) {
692+
// we free all old contexts that have moved, just in case we have re-tried them elsewhere
693+
// but they still exist in the old location
694+
closeContexts(nodes, searchTransportService, contextsToClose, new ActionListener<Integer>() {
695+
@Override
696+
public void onResponse(Integer integer) {
697+
// ignore
698+
}
699+
700+
@Override
701+
public void onFailure(Exception e) {
702+
logger.trace("Failure while freeing old point in time contexts", e);
703+
}
704+
});
705+
return SearchContextId.encode(updatedShardMap, original.aliasFilter(), mintransportVersion, ShardSearchFailure.EMPTY_ARRAY);
706+
} else {
707+
return originalPit.getEncodedId();
708+
}
620709
}
621710

622711
/**

0 commit comments

Comments
 (0)