Skip to content

Commit 6311e6f

Browse files
authored
Merge branch 'main' into 2025/03/13/upgrade-aws-sdk-v2
2 parents 6b29766 + 5f20493 commit 6311e6f

File tree

18 files changed

+172
-184
lines changed

18 files changed

+172
-184
lines changed

modules/aggregations/src/main/java/org/elasticsearch/aggregations/bucket/histogram/InternalAutoDateHistogram.java

Lines changed: 5 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -532,15 +532,11 @@ public InternalAggregation get() {
532532

533533
@Override
534534
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
535-
return new InternalAutoDateHistogram(
536-
getName(),
537-
buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(),
538-
targetBuckets,
539-
bucketInfo,
540-
format,
541-
getMetadata(),
542-
bucketInnerInterval
543-
);
535+
final List<Bucket> buckets = new ArrayList<>(this.buckets);
536+
for (int i = 0; i < buckets.size(); i++) {
537+
buckets.set(i, buckets.get(i).finalizeSampling(samplingContext));
538+
}
539+
return new InternalAutoDateHistogram(getName(), buckets, targetBuckets, bucketInfo, format, getMetadata(), bucketInnerInterval);
544540
}
545541

546542
private BucketReduceResult maybeMergeConsecutiveBuckets(BucketReduceResult current, AggregationReduceContext reduceContext) {

server/src/main/java/org/elasticsearch/rest/RestController.java

Lines changed: 15 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@
3434
import org.elasticsearch.core.RestApiVersion;
3535
import org.elasticsearch.core.Streams;
3636
import org.elasticsearch.core.TimeValue;
37+
import org.elasticsearch.core.UpdateForV10;
3738
import org.elasticsearch.http.HttpHeadersValidationException;
3839
import org.elasticsearch.http.HttpRouteStats;
3940
import org.elasticsearch.http.HttpRouteStatsTracker;
@@ -874,12 +875,17 @@ public void sendResponse(RestResponse response) {
874875
}
875876
}
876877

878+
// exposed for tests; marked as UpdateForV10 because this assertion should have flushed out all double-close bugs by the time v10 is
879+
// released so we should be able to drop the tests that check we behave reasonably in production on this impossible path
880+
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
881+
static boolean PERMIT_DOUBLE_RESPONSE = false;
882+
877883
private static final class ResourceHandlingHttpChannel extends DelegatingRestChannel {
878884
private final CircuitBreakerService circuitBreakerService;
879885
private final int contentLength;
880886
private final HttpRouteStatsTracker statsTracker;
881887
private final long startTime;
882-
private final AtomicBoolean closed = new AtomicBoolean();
888+
private final AtomicBoolean responseSent = new AtomicBoolean();
883889

884890
ResourceHandlingHttpChannel(
885891
RestChannel delegate,
@@ -898,7 +904,14 @@ private static final class ResourceHandlingHttpChannel extends DelegatingRestCha
898904
public void sendResponse(RestResponse response) {
899905
boolean success = false;
900906
try {
901-
close();
907+
// protect against double-response bugs
908+
if (responseSent.compareAndSet(false, true) == false) {
909+
final var message = "have already sent a response to this request, cannot send another";
910+
assert PERMIT_DOUBLE_RESPONSE : message;
911+
throw new IllegalStateException(message);
912+
}
913+
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
914+
902915
statsTracker.addRequestStats(contentLength);
903916
statsTracker.addResponseTime(rawRelativeTimeInMillis() - startTime);
904917
if (response.isChunked() == false) {
@@ -929,14 +942,6 @@ public void sendResponse(RestResponse response) {
929942
private static long rawRelativeTimeInMillis() {
930943
return TimeValue.nsecToMSec(System.nanoTime());
931944
}
932-
933-
private void close() {
934-
// attempt to close once atomically
935-
if (closed.compareAndSet(false, true) == false) {
936-
throw new IllegalStateException("Channel is already closed");
937-
}
938-
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
939-
}
940945
}
941946

942947
private static class ResponseLengthRecorder extends AtomicReference<HttpRouteStatsTracker> implements Releasable {

server/src/main/java/org/elasticsearch/search/aggregations/bucket/composite/InternalComposite.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -270,12 +270,16 @@ public InternalAggregation get() {
270270

271271
@Override
272272
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
273+
final List<InternalBucket> buckets = new ArrayList<>(this.buckets.size());
274+
for (InternalBucket bucket : this.buckets) {
275+
buckets.add(bucket.finalizeSampling(samplingContext));
276+
}
273277
return new InternalComposite(
274278
name,
275279
size,
276280
sourceNames,
277281
buckets.isEmpty() ? formats : buckets.get(buckets.size() - 1).formats,
278-
buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(),
282+
buckets,
279283
buckets.isEmpty() ? afterKey : buckets.get(buckets.size() - 1).getRawKey(),
280284
reverseMuls,
281285
missingOrders,

server/src/main/java/org/elasticsearch/search/aggregations/bucket/filter/InternalFilters.java

Lines changed: 5 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -223,13 +223,11 @@ public void close() {
223223

224224
@Override
225225
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
226-
return new InternalFilters(
227-
name,
228-
buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(),
229-
keyed,
230-
keyedBucket,
231-
getMetadata()
232-
);
226+
final List<InternalBucket> buckets = new ArrayList<>(this.buckets.size());
227+
for (InternalBucket bucket : this.buckets) {
228+
buckets.add(bucket.finalizeSampling(samplingContext));
229+
}
230+
return new InternalFilters(name, buckets, keyed, keyedBucket, getMetadata());
233231
}
234232

235233
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/geogrid/InternalGeoGrid.java

Lines changed: 11 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.elasticsearch.xcontent.XContentBuilder;
2424

2525
import java.io.IOException;
26+
import java.util.ArrayList;
2627
import java.util.Arrays;
2728
import java.util.List;
2829
import java.util.Map;
@@ -137,20 +138,17 @@ public void close() {
137138

138139
@Override
139140
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
140-
return create(
141-
getName(),
142-
requiredSize,
143-
buckets.stream()
144-
.<InternalGeoGridBucket>map(
145-
b -> this.createBucket(
146-
b.hashAsLong,
147-
samplingContext.scaleUp(b.docCount),
148-
InternalAggregations.finalizeSampling(b.aggregations, samplingContext)
149-
)
141+
final List<InternalGeoGridBucket> buckets = new ArrayList<>(this.buckets.size());
142+
for (InternalGeoGridBucket bucket : this.buckets) {
143+
buckets.add(
144+
this.createBucket(
145+
bucket.hashAsLong,
146+
samplingContext.scaleUp(bucket.docCount),
147+
InternalAggregations.finalizeSampling(bucket.aggregations, samplingContext)
150148
)
151-
.toList(),
152-
getMetadata()
153-
);
149+
);
150+
}
151+
return create(getName(), requiredSize, buckets, getMetadata());
154152
}
155153

156154
protected abstract B createBucket(long hashAsLong, long docCount, InternalAggregations aggregations);

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalDateHistogram.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -515,9 +515,13 @@ public InternalAggregation get() {
515515

516516
@Override
517517
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
518+
final List<Bucket> buckets = new ArrayList<>(this.buckets.size());
519+
for (Bucket bucket : this.buckets) {
520+
buckets.add(bucket.finalizeSampling(samplingContext));
521+
}
518522
return new InternalDateHistogram(
519523
getName(),
520-
buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(),
524+
buckets,
521525
order,
522526
minDocCount,
523527
offset,

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalHistogram.java

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -446,16 +446,11 @@ public InternalAggregation get() {
446446

447447
@Override
448448
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
449-
return new InternalHistogram(
450-
getName(),
451-
buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(),
452-
order,
453-
minDocCount,
454-
emptyBucketInfo,
455-
format,
456-
keyed,
457-
getMetadata()
458-
);
449+
final List<Bucket> buckets = new ArrayList<>(this.buckets.size());
450+
for (Bucket bucket : this.buckets) {
451+
buckets.add(bucket.finalizeSampling(samplingContext));
452+
}
453+
return new InternalHistogram(getName(), buckets, order, minDocCount, emptyBucketInfo, format, keyed, getMetadata());
459454
}
460455

461456
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/histogram/InternalVariableWidthHistogram.java

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -539,14 +539,11 @@ public InternalAggregation get() {
539539

540540
@Override
541541
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
542-
return new InternalVariableWidthHistogram(
543-
getName(),
544-
buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(),
545-
emptyBucketInfo,
546-
targetNumBuckets,
547-
format,
548-
getMetadata()
549-
);
542+
final List<Bucket> buckets = new ArrayList<>(this.buckets.size());
543+
for (Bucket bucket : this.buckets) {
544+
buckets.add(bucket.finalizeSampling(samplingContext));
545+
}
546+
return new InternalVariableWidthHistogram(getName(), buckets, emptyBucketInfo, targetNumBuckets, format, getMetadata());
550547
}
551548

552549
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalBinaryRange.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.elasticsearch.xcontent.XContentBuilder;
2626

2727
import java.io.IOException;
28+
import java.util.ArrayList;
2829
import java.util.List;
2930
import java.util.Map;
3031
import java.util.Objects;
@@ -258,13 +259,11 @@ public void close() {
258259

259260
@Override
260261
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
261-
return new InternalBinaryRange(
262-
name,
263-
format,
264-
keyed,
265-
buckets.stream().map(b -> b.finalizeSampling(samplingContext)).toList(),
266-
metadata
267-
);
262+
final List<Bucket> buckets = new ArrayList<>(this.buckets.size());
263+
for (Bucket bucket : this.buckets) {
264+
buckets.add(bucket.finalizeSampling(samplingContext));
265+
}
266+
return new InternalBinaryRange(name, format, keyed, buckets, metadata);
268267
}
269268

270269
@Override

server/src/main/java/org/elasticsearch/search/aggregations/bucket/range/InternalRange.java

Lines changed: 13 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -325,24 +325,20 @@ public void close() {
325325
@Override
326326
public InternalAggregation finalizeSampling(SamplingContext samplingContext) {
327327
InternalRange.Factory<B, R> factory = getFactory();
328-
return factory.create(
329-
name,
330-
ranges.stream()
331-
.map(
332-
b -> factory.createBucket(
333-
b.getKey(),
334-
b.from,
335-
b.to,
336-
samplingContext.scaleUp(b.getDocCount()),
337-
InternalAggregations.finalizeSampling(b.getAggregations(), samplingContext),
338-
b.format
339-
)
328+
final List<B> buckets = new ArrayList<>(ranges.size());
329+
for (B range : ranges) {
330+
buckets.add(
331+
factory.createBucket(
332+
range.getKey(),
333+
range.from,
334+
range.to,
335+
samplingContext.scaleUp(range.getDocCount()),
336+
InternalAggregations.finalizeSampling(range.getAggregations(), samplingContext),
337+
range.format
340338
)
341-
.toList(),
342-
format,
343-
keyed,
344-
getMetadata()
345-
);
339+
);
340+
}
341+
return factory.create(name, buckets, format, keyed, getMetadata());
346342
}
347343

348344
@Override

0 commit comments

Comments
 (0)