Skip to content

Commit 7d07d1a

Browse files
committed
Merge branch 'main' into feature/unmapped_fields_squashed
2 parents 8f3c235 + 09bc343 commit 7d07d1a

File tree

9 files changed

+59
-390
lines changed

9 files changed

+59
-390
lines changed

server/src/main/java/org/elasticsearch/action/search/AbstractSearchAsyncAction.java

Lines changed: 19 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -90,15 +90,13 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
9090
private final Object shardFailuresMutex = new Object();
9191
private final AtomicBoolean hasShardResponse = new AtomicBoolean(false);
9292
private final AtomicInteger successfulOps = new AtomicInteger();
93-
private final AtomicInteger skippedOps = new AtomicInteger();
9493
private final SearchTimeProvider timeProvider;
9594
private final SearchResponse.Clusters clusters;
9695

9796
protected final GroupShardsIterator<SearchShardIterator> toSkipShardsIts;
9897
protected final GroupShardsIterator<SearchShardIterator> shardsIts;
9998
private final SearchShardIterator[] shardIterators;
100-
private final int expectedTotalOps;
101-
private final AtomicInteger totalOps = new AtomicInteger();
99+
private final AtomicInteger outstandingShards;
102100
private final int maxConcurrentRequestsPerNode;
103101
private final Map<String, PendingExecutions> pendingExecutionsPerNode = new ConcurrentHashMap<>();
104102
private final boolean throttleConcurrentRequests;
@@ -139,18 +137,12 @@ abstract class AbstractSearchAsyncAction<Result extends SearchPhaseResult> exten
139137
}
140138
this.toSkipShardsIts = new GroupShardsIterator<>(toSkipIterators);
141139
this.shardsIts = new GroupShardsIterator<>(iterators);
142-
140+
outstandingShards = new AtomicInteger(shardsIts.size());
143141
this.shardIterators = iterators.toArray(new SearchShardIterator[0]);
144142
// we later compute the shard index based on the natural order of the shards
145143
// that participate in the search request. This means that this number is
146144
// consistent between two requests that target the same shards.
147145
Arrays.sort(shardIterators);
148-
149-
// we need to add 1 for non active partition, since we count it in the total. This means for each shard in the iterator we sum up
150-
// it's number of active shards but use 1 as the default if no replica of a shard is active at this point.
151-
// on a per shards level we use shardIt.remaining() to increment the totalOps pointer but add 1 for the current shard result
152-
// we process hence we add one for the non active partition here.
153-
this.expectedTotalOps = shardsIts.totalSizeWith1ForEmpty();
154146
this.maxConcurrentRequestsPerNode = maxConcurrentRequestsPerNode;
155147
// in the case were we have less shards than maxConcurrentRequestsPerNode we don't need to throttle
156148
this.throttleConcurrentRequests = maxConcurrentRequestsPerNode < shardsIts.size();
@@ -251,9 +243,8 @@ protected final void run() {
251243

252244
void skipShard(SearchShardIterator iterator) {
253245
successfulOps.incrementAndGet();
254-
skippedOps.incrementAndGet();
255246
assert iterator.skip();
256-
successfulShardExecution(iterator);
247+
successfulShardExecution();
257248
}
258249

259250
private static boolean assertExecuteOnStartThread() {
@@ -380,7 +371,7 @@ protected void executeNextPhase(String currentPhase, Supplier<SearchPhase> nextP
380371
"Partial shards failure (unavailable: {}, successful: {}, skipped: {}, num-shards: {}, phase: {})",
381372
discrepancy,
382373
successfulOps.get(),
383-
skippedOps.get(),
374+
toSkipShardsIts.size(),
384375
getNumShards(),
385376
currentPhase
386377
);
@@ -449,17 +440,14 @@ private void onShardFailure(final int shardIndex, SearchShardTarget shard, final
449440
}
450441
onShardGroupFailure(shardIndex, shard, e);
451442
}
452-
final int totalOps = this.totalOps.incrementAndGet();
453-
if (totalOps == expectedTotalOps) {
454-
onPhaseDone();
455-
} else if (totalOps > expectedTotalOps) {
456-
throw new AssertionError(
457-
"unexpected higher total ops [" + totalOps + "] compared to expected [" + expectedTotalOps + "]",
458-
new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures())
459-
);
443+
if (lastShard == false) {
444+
performPhaseOnShard(shardIndex, shardIt, nextShard);
460445
} else {
461-
if (lastShard == false) {
462-
performPhaseOnShard(shardIndex, shardIt, nextShard);
446+
// count down outstanding shards, we're done with this shard as there's no more copies to try
447+
final int outstanding = outstandingShards.decrementAndGet();
448+
assert outstanding >= 0 : "outstanding: " + outstanding;
449+
if (outstanding == 0) {
450+
onPhaseDone();
463451
}
464452
}
465453
}
@@ -535,10 +523,10 @@ protected void onShardResult(Result result, SearchShardIterator shardIt) {
535523
if (logger.isTraceEnabled()) {
536524
logger.trace("got first-phase result from {}", result != null ? result.getSearchShardTarget() : null);
537525
}
538-
results.consumeResult(result, () -> onShardResultConsumed(result, shardIt));
526+
results.consumeResult(result, () -> onShardResultConsumed(result));
539527
}
540528

541-
private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
529+
private void onShardResultConsumed(Result result) {
542530
successfulOps.incrementAndGet();
543531
// clean a previous error on this shard group (note, this code will be serialized on the same shardIndex value level
544532
// so its ok concurrency wise to miss potentially the shard failures being created because of another failure
@@ -552,28 +540,14 @@ private void onShardResultConsumed(Result result, SearchShardIterator shardIt) {
552540
// cause the successor to read a wrong value from successfulOps if second phase is very fast ie. count etc.
553541
// increment all the "future" shards to update the total ops since we some may work and some may not...
554542
// and when that happens, we break on total ops, so we must maintain them
555-
successfulShardExecution(shardIt);
543+
successfulShardExecution();
556544
}
557545

558-
private void successfulShardExecution(SearchShardIterator shardsIt) {
559-
final int remainingOpsOnIterator;
560-
if (shardsIt.skip()) {
561-
// It's possible that we're skipping a shard that's unavailable
562-
// but its range was available in the IndexMetadata, in that
563-
// case the shardsIt.remaining() would be 0, expectedTotalOps
564-
// accounts for unavailable shards too.
565-
remainingOpsOnIterator = Math.max(shardsIt.remaining(), 1);
566-
} else {
567-
remainingOpsOnIterator = shardsIt.remaining() + 1;
568-
}
569-
final int xTotalOps = totalOps.addAndGet(remainingOpsOnIterator);
570-
if (xTotalOps == expectedTotalOps) {
546+
private void successfulShardExecution() {
547+
final int outstanding = outstandingShards.decrementAndGet();
548+
assert outstanding >= 0 : "outstanding: " + outstanding;
549+
if (outstanding == 0) {
571550
onPhaseDone();
572-
} else if (xTotalOps > expectedTotalOps) {
573-
throw new AssertionError(
574-
"unexpected higher total ops [" + xTotalOps + "] compared to expected [" + expectedTotalOps + "]",
575-
new SearchPhaseExecutionException(getName(), "Shard failures", null, buildShardFailures())
576-
);
577551
}
578552
}
579553

@@ -640,7 +614,7 @@ private SearchResponse buildSearchResponse(
640614
scrollId,
641615
getNumShards(),
642616
numSuccess,
643-
skippedOps.get(),
617+
toSkipShardsIts.size(),
644618
buildTookInMillis(),
645619
failures,
646620
clusters,

server/src/main/java/org/elasticsearch/cluster/routing/GroupShardsIterator.java

Lines changed: 0 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -41,26 +41,6 @@ public GroupShardsIterator(List<ShardIt> iterators) {
4141
this.iterators = iterators;
4242
}
4343

44-
/**
45-
* Returns the total number of shards within all groups
46-
* @return total number of shards
47-
*/
48-
public int totalSize() {
49-
return iterators.stream().mapToInt(Countable::size).sum();
50-
}
51-
52-
/**
53-
* Returns the total number of shards plus the number of empty groups
54-
* @return number of shards and empty groups
55-
*/
56-
public int totalSizeWith1ForEmpty() {
57-
int size = 0;
58-
for (ShardIt shard : iterators) {
59-
size += Math.max(1, shard.size());
60-
}
61-
return size;
62-
}
63-
6444
/**
6545
* Return the number of groups
6646
* @return number of groups

server/src/main/java/org/elasticsearch/transport/TransportHandshaker.java

Lines changed: 7 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -44,49 +44,17 @@ final class TransportHandshaker {
4444
* ignores the body of the request. After the handshake, the OutboundHandler uses the min(local,remote) protocol version for all later
4545
* messages.
4646
*
47-
* This version supports three handshake protocols, v6080099, v7170099 and v8800000, which respectively have the same message structure
48-
* as the transport protocols of v6.8.0, v7.17.0, and v8.18.0. This node only sends v7170099 requests, but it can send a valid response
49-
* to any v6080099 or v8800000 requests that it receives.
47+
* This version supports two handshake protocols, v7170099 and v8800000, which respectively have the same message structure as the
48+
* transport protocols of v7.17.0, and v8.18.0. This node only sends v8800000 requests, but it can send a valid response to any v7170099
49+
* requests that it receives.
5050
*
5151
* Note that these are not really TransportVersion constants as used elsewhere in ES, they're independent things that just happen to be
52-
* stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did
53-
* rely on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
52+
* stored in the same location in the message header and which roughly match the same ID numbering scheme. Older versions of ES did rely
53+
* on them matching the real transport protocol (which itself matched the release version numbers), but these days that's no longer
5454
* true.
5555
*
5656
* Here are some example messages, broken down to show their structure. See TransportHandshakerRawMessageTests for supporting tests.
5757
*
58-
* ## v6080099 Request:
59-
*
60-
* 45 53 -- 'ES' marker
61-
* 00 00 00 34 -- total message length
62-
* 00 00 00 00 00 00 00 01 -- request ID
63-
* 08 -- status flags (0b1000 == handshake request)
64-
* 00 5c c6 63 -- handshake protocol version (0x5cc663 == 6080099)
65-
* 00 -- no request headers [1]
66-
* 00 -- no response headers [1]
67-
* 01 -- one feature [2]
68-
* 06 -- feature name length
69-
* 78 2d 70 61 63 6b -- feature name 'x-pack'
70-
* 16 -- action string size
71-
* 69 6e 74 65 72 6e 61 6c }
72-
* 3a 74 63 70 2f 68 61 6e }- ASCII representation of HANDSHAKE_ACTION_NAME
73-
* 64 73 68 61 6b 65 }
74-
* 00 -- no parent task ID [3]
75-
* 04 -- payload length
76-
* 8b d5 b5 03 -- max acceptable protocol version (vInt: 00000011 10110101 11010101 10001011 == 7170699)
77-
*
78-
* ## v6080099 Response:
79-
*
80-
* 45 53 -- 'ES' marker
81-
* 00 00 00 13 -- total message length
82-
* 00 00 00 00 00 00 00 01 -- request ID (copied from request)
83-
* 09 -- status flags (0b1001 == handshake response)
84-
* 00 5c c6 63 -- handshake protocol version (0x5cc663 == 6080099, copied from request)
85-
* 00 -- no request headers [1]
86-
* 00 -- no response headers [1]
87-
* c3 f9 eb 03 -- max acceptable protocol version (vInt: 00000011 11101011 11111001 11000011 == 8060099)
88-
*
89-
*
9058
* ## v7170099 Requests:
9159
*
9260
* 45 53 -- 'ES' marker
@@ -158,14 +126,9 @@ final class TransportHandshaker {
158126
* [3] Parent task ID should be empty; see org.elasticsearch.tasks.TaskId.writeTo for its structure.
159127
*/
160128

161-
static final TransportVersion V7_HANDSHAKE_VERSION = TransportVersion.fromId(6_08_00_99);
162129
static final TransportVersion V8_HANDSHAKE_VERSION = TransportVersion.fromId(7_17_00_99);
163130
static final TransportVersion V9_HANDSHAKE_VERSION = TransportVersion.fromId(8_800_00_0);
164-
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(
165-
V7_HANDSHAKE_VERSION,
166-
V8_HANDSHAKE_VERSION,
167-
V9_HANDSHAKE_VERSION
168-
);
131+
static final Set<TransportVersion> ALLOWED_HANDSHAKE_VERSIONS = Set.of(V8_HANDSHAKE_VERSION, V9_HANDSHAKE_VERSION);
169132

170133
static final String HANDSHAKE_ACTION_NAME = "internal:tcp/handshake";
171134
private final ConcurrentMap<Long, HandshakeResponseHandler> pendingHandshakes = new ConcurrentHashMap<>();
@@ -203,7 +166,7 @@ void sendHandshake(
203166
);
204167
boolean success = false;
205168
try {
206-
handshakeRequestSender.sendRequest(node, channel, requestId, V8_HANDSHAKE_VERSION);
169+
handshakeRequestSender.sendRequest(node, channel, requestId, V9_HANDSHAKE_VERSION);
207170

208171
threadPool.schedule(
209172
() -> handler.handleLocalException(new ConnectTransportException(node, "handshake_timeout[" + timeout + "]")),

server/src/test/java/org/elasticsearch/action/search/ExpandSearchPhaseTests.java

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -144,7 +144,6 @@ protected void run() {
144144
hits.decRef();
145145
}
146146
} finally {
147-
mockSearchPhaseContext.execute(() -> {});
148147
var resp = mockSearchPhaseContext.searchResponse.get();
149148
if (resp != null) {
150149
resp.decRef();

server/src/test/java/org/elasticsearch/cluster/routing/GroupShardsIteratorTests.java

Lines changed: 0 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -38,33 +38,6 @@ private static List<ShardRouting> randomShardRoutings(ShardId shardId, int numRe
3838
return shardRoutings;
3939
}
4040

41-
public void testSize() {
42-
List<ShardIterator> list = new ArrayList<>();
43-
Index index = new Index("foo", "na");
44-
{
45-
ShardId shardId = new ShardId(index, 0);
46-
list.add(new PlainShardIterator(shardId, randomShardRoutings(shardId, 2)));
47-
}
48-
list.add(new PlainShardIterator(new ShardId(index, 1), Collections.emptyList()));
49-
{
50-
ShardId shardId = new ShardId(index, 2);
51-
list.add(new PlainShardIterator(shardId, randomShardRoutings(shardId, 0)));
52-
}
53-
index = new Index("foo_1", "na");
54-
{
55-
ShardId shardId = new ShardId(index, 0);
56-
list.add(new PlainShardIterator(shardId, randomShardRoutings(shardId, 0)));
57-
}
58-
{
59-
ShardId shardId = new ShardId(index, 1);
60-
list.add(new PlainShardIterator(shardId, randomShardRoutings(shardId, 0)));
61-
}
62-
GroupShardsIterator<ShardIterator> iter = new GroupShardsIterator<>(list);
63-
assertEquals(7, iter.totalSizeWith1ForEmpty());
64-
assertEquals(5, iter.size());
65-
assertEquals(6, iter.totalSize());
66-
}
67-
6841
public void testIterate() {
6942
List<ShardIterator> list = new ArrayList<>();
7043
Index index = new Index("foo", "na");

0 commit comments

Comments
 (0)