Skip to content

Commit f032fae

Browse files
authored
Adds the _cluster object to the AsyncStatusResponse XContent when Clusters has a non-zero count total (#96535)
Adds the _cluster object to the AsyncStatusResponse XContent when Clusters has a non-zero count total. When CCS is performed, the _cluster object will have a non-zero `total` count and thus is useful status information for the user. When a local cluster only async search is done, the _cluster object will not be present in the AsyncStatusResponse (same as it is now) A TransportVersion bump was required since Clusters is now an optionally written to the AsyncStatusResponse object.
1 parent ec6b790 commit f032fae

File tree

5 files changed

+320
-47
lines changed

5 files changed

+320
-47
lines changed

server/src/main/java/org/elasticsearch/TransportVersion.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -132,12 +132,13 @@ private static TransportVersion registerTransportVersion(int id, String uniqueId
132132
public static final TransportVersion V_8_500_006 = registerTransportVersion(8_500_006, "7BB5621A-80AC-425F-BA88-75543C442F23");
133133
public static final TransportVersion V_8_500_007 = registerTransportVersion(8_500_007, "77261d43-4149-40af-89c5-7e71e0454fce");
134134
public static final TransportVersion V_8_500_008 = registerTransportVersion(8_500_008, "8884ab9d-94cd-4bac-aff8-01f2c394f47c");
135+
public static final TransportVersion V_8_500_009 = registerTransportVersion(8_500_009, "35091358-fd41-4106-a6e2-d2a1315494c1");
135136

136137
/**
137138
* Reference to the most recent transport version.
138139
* This should be the transport version with the highest id.
139140
*/
140-
public static final TransportVersion CURRENT = findCurrent(V_8_500_008);
141+
public static final TransportVersion CURRENT = findCurrent(V_8_500_009);
141142

142143
/**
143144
* Reference to the earliest compatible transport version to this version of the codebase.

server/src/main/java/org/elasticsearch/action/search/SearchResponse.java

Lines changed: 13 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -516,9 +516,17 @@ public Clusters(int total, int successful, int skipped, int remoteClusters, bool
516516
* We are not tracking number of remote clusters in this search.
517517
*/
518518
public Clusters(int total, int successful, int skipped) {
519-
assert total >= 0 && successful >= 0 && skipped >= 0
519+
this(total, successful, skipped, true);
520+
}
521+
522+
/**
523+
* @param finalState if true, then do an assert that total = successful + skipped. This is true
524+
* only when the cluster is in its final state, not an initial or intermediate state.
525+
*/
526+
Clusters(int total, int successful, int skipped, boolean finalState) {
527+
assert total >= 0 && successful >= 0 && skipped >= 0 && successful <= total
520528
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
521-
assert successful <= total && skipped == total - successful
529+
assert finalState == false || skipped == total - successful
522530
: "total: " + total + " successful: " + successful + " skipped: " + skipped;
523531
this.total = total;
524532
this.successful = successful;
@@ -527,8 +535,9 @@ public Clusters(int total, int successful, int skipped) {
527535
this.ccsMinimizeRoundtrips = false;
528536
}
529537

530-
private Clusters(StreamInput in) throws IOException {
531-
this(in.readVInt(), in.readVInt(), in.readVInt());
538+
public Clusters(StreamInput in) throws IOException {
539+
// when coming across the wire, we don't have context to know if this Cluster is in a final state, so set finalState=false
540+
this(in.readVInt(), in.readVInt(), in.readVInt(), false);
532541
}
533542

534543
@Override

x-pack/plugin/async-search/src/main/java/org/elasticsearch/xpack/search/MutableSearchResponse.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,11 @@ synchronized AsyncSearchResponse toAsyncSearchResponse(AsyncSearchTask task, lon
241241
* @return response representing the status of async search
242242
*/
243243
synchronized AsyncStatusResponse toStatusResponse(String asyncExecutionId, long startTime, long expirationTime) {
244+
SearchResponse.Clusters clustersInStatus = null;
245+
if (clusters != null && clusters.getTotal() > 0) {
246+
// include clusters in the status if present and not Clusters.EMPTY (the case for local searches only)
247+
clustersInStatus = clusters;
248+
}
244249
if (finalResponse != null) {
245250
return new AsyncStatusResponse(
246251
asyncExecutionId,
@@ -252,7 +257,8 @@ synchronized AsyncStatusResponse toStatusResponse(String asyncExecutionId, long
252257
finalResponse.getSuccessfulShards(),
253258
finalResponse.getSkippedShards(),
254259
finalResponse.getShardFailures() != null ? finalResponse.getShardFailures().length : 0,
255-
finalResponse.status()
260+
finalResponse.status(),
261+
clustersInStatus
256262
);
257263
}
258264
if (failure != null) {
@@ -266,7 +272,8 @@ synchronized AsyncStatusResponse toStatusResponse(String asyncExecutionId, long
266272
successfulShards,
267273
skippedShards,
268274
queryFailures == null ? 0 : queryFailures.nonNullLength(),
269-
ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure))
275+
ExceptionsHelper.status(ExceptionsHelper.unwrapCause(failure)),
276+
clustersInStatus
270277
);
271278
}
272279
return new AsyncStatusResponse(
@@ -279,7 +286,8 @@ synchronized AsyncStatusResponse toStatusResponse(String asyncExecutionId, long
279286
successfulShards,
280287
skippedShards,
281288
queryFailures == null ? 0 : queryFailures.nonNullLength(),
282-
null // for a still running search, completion status is null
289+
null, // for a still running search, completion status is null
290+
clustersInStatus
283291
);
284292
}
285293

0 commit comments

Comments
 (0)