Skip to content

Commit 737d8a4

Browse files
Atri SharmaAtri Sharma
authored andcommitted
Cleanup
Signed-off-by: Atri Sharma <atrisharma@Atris-Mac-Studio.local>
1 parent a72741d commit 737d8a4

File tree

5 files changed

+56
-122
lines changed

5 files changed

+56
-122
lines changed

server/src/main/java/org/opensearch/action/search/PreviewFirstPartialReceiver.java

Lines changed: 0 additions & 26 deletions
This file was deleted.

server/src/main/java/org/opensearch/action/search/SearchResponse.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,11 @@ public class SearchResponse extends ActionResponse implements StatusToXContentOb
9191
private static final ParseField TIMED_OUT = new ParseField("timed_out");
9292
private static final ParseField TERMINATED_EARLY = new ParseField("terminated_early");
9393
private static final ParseField NUM_REDUCE_PHASES = new ParseField("num_reduce_phases");
94+
95+
// Streaming REST fields
96+
private static final ParseField IS_PARTIAL = new ParseField("is_partial");
97+
private static final ParseField SEQUENCE_NUMBER = new ParseField("sequence_number");
98+
private static final ParseField TOTAL_PARTIALS = new ParseField("total_partials");
9499

95100
private final SearchResponseSections internalResponse;
96101
private final String scrollId;
@@ -132,6 +137,12 @@ public SearchResponse(StreamInput in) throws IOException {
132137
}
133138
skippedShards = in.readVInt();
134139
pointInTimeId = in.readOptionalString();
140+
// Read streaming fields
141+
if (in.getVersion().onOrAfter(Version.V_2_15_0)) {
142+
isPartial = in.readBoolean();
143+
sequenceNumber = in.readVInt();
144+
totalPartials = in.readVInt();
145+
}
135146
}
136147

137148
public SearchResponse(
@@ -385,6 +396,17 @@ public XContentBuilder innerToXContent(XContentBuilder builder, Params params) t
385396
if (getNumReducePhases() != 1) {
386397
builder.field(NUM_REDUCE_PHASES.getPreferredName(), getNumReducePhases());
387398
}
399+
400+
// Write streaming fields if it's a streaming partial
401+
// or if we have sequenced streaming responses
402+
if (isPartial) {
403+
builder.field(IS_PARTIAL.getPreferredName(), isPartial);
404+
builder.field(SEQUENCE_NUMBER.getPreferredName(), sequenceNumber);
405+
if (totalPartials > 0) {
406+
builder.field(TOTAL_PARTIALS.getPreferredName(), totalPartials);
407+
}
408+
}
409+
388410
RestActions.buildBroadcastShardsHeader(
389411
builder,
390412
params,
@@ -423,6 +445,9 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
423445
int skippedShards = 0; // 0 for BWC
424446
String scrollId = null;
425447
String searchContextId = null;
448+
boolean isPartial = false;
449+
int sequenceNumber = 0;
450+
int totalPartials = 0;
426451
List<ShardSearchFailure> failures = new ArrayList<>();
427452
Clusters clusters = Clusters.EMPTY;
428453
List<SearchExtBuilder> extBuilders = new ArrayList<>();
@@ -443,6 +468,12 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
443468
terminatedEarly = parser.booleanValue();
444469
} else if (NUM_REDUCE_PHASES.match(currentFieldName, parser.getDeprecationHandler())) {
445470
numReducePhases = parser.intValue();
471+
} else if (IS_PARTIAL.match(currentFieldName, parser.getDeprecationHandler())) {
472+
isPartial = parser.booleanValue();
473+
} else if (SEQUENCE_NUMBER.match(currentFieldName, parser.getDeprecationHandler())) {
474+
sequenceNumber = parser.intValue();
475+
} else if (TOTAL_PARTIALS.match(currentFieldName, parser.getDeprecationHandler())) {
476+
totalPartials = parser.intValue();
446477
} else {
447478
parser.skipChildren();
448479
}
@@ -583,6 +614,22 @@ public static SearchResponse innerFromXContent(XContentParser parser) throws IOE
583614
clusters,
584615
searchContextId
585616
);
617+
SearchResponse response = new SearchResponse(
618+
internalResponse,
619+
scrollId,
620+
totalShards,
621+
successfulShards,
622+
skippedShards,
623+
tookInMillis,
624+
phaseTook,
625+
failures.toArray(ShardSearchFailure.EMPTY_ARRAY),
626+
clusters,
627+
searchContextId
628+
);
629+
response.setPartial(isPartial);
630+
response.setSequenceNumber(sequenceNumber);
631+
response.setTotalPartials(totalPartials);
632+
return response;
586633
}
587634

588635
@Override
@@ -603,6 +650,13 @@ public void writeTo(StreamOutput out) throws IOException {
603650
}
604651
out.writeVInt(skippedShards);
605652
out.writeOptionalString(pointInTimeId);
653+
654+
// Write streaming fields
655+
if (out.getVersion().onOrAfter(Version.V_2_15_0)) {
656+
out.writeBoolean(isPartial);
657+
out.writeVInt(sequenceNumber);
658+
out.writeVInt(totalPartials);
659+
}
606660
}
607661

608662
@Override

server/src/main/java/org/opensearch/rest/action/search/StreamingPreviewFirstListener.java

Lines changed: 0 additions & 92 deletions
This file was deleted.

server/src/main/java/org/opensearch/search/aggregations/bucket/terms/StreamNumericTermsAggregator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,6 @@
3232
import org.opensearch.search.aggregations.bucket.LocalBucketCountThresholds;
3333
import org.opensearch.search.aggregations.support.ValuesSource;
3434
import org.opensearch.search.internal.SearchContext;
35-
import org.opensearch.search.streaming.Streamable;
3635
import org.opensearch.search.streaming.StreamingCostMetrics;
3736

3837
import java.io.IOException;
@@ -46,7 +45,7 @@
4645
import static java.util.Collections.emptyList;
4746
import static org.opensearch.search.aggregations.InternalOrder.isKeyOrder;
4847

49-
public class StreamNumericTermsAggregator extends TermsAggregator implements Streamable {
48+
public class StreamNumericTermsAggregator extends TermsAggregator {
5049
private static final Logger logger = LogManager.getLogger(StreamNumericTermsAggregator.class);
5150
private final ResultStrategy<?, ?> resultStrategy;
5251
private final ValuesSource.Numeric valuesSource;

server/src/main/java/org/opensearch/search/aggregations/metrics/StreamCardinalityAggregator.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@
1616
import org.opensearch.search.aggregations.support.ValuesSource;
1717
import org.opensearch.search.aggregations.support.ValuesSourceConfig;
1818
import org.opensearch.search.internal.SearchContext;
19-
import org.opensearch.search.streaming.Streamable;
2019
import org.opensearch.search.streaming.StreamingCostMetrics;
2120

2221
import java.io.IOException;
@@ -29,7 +28,7 @@
2928
*
3029
* @opensearch.internal
3130
*/
32-
public class StreamCardinalityAggregator extends CardinalityAggregator implements Streamable {
31+
public class StreamCardinalityAggregator extends CardinalityAggregator {
3332

3433
private Collector streamCollector;
3534

0 commit comments

Comments
 (0)