Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 15 additions & 10 deletions server/src/main/java/org/elasticsearch/rest/RestController.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import org.elasticsearch.core.RestApiVersion;
import org.elasticsearch.core.Streams;
import org.elasticsearch.core.TimeValue;
import org.elasticsearch.core.UpdateForV10;
import org.elasticsearch.http.HttpHeadersValidationException;
import org.elasticsearch.http.HttpRouteStats;
import org.elasticsearch.http.HttpRouteStatsTracker;
Expand Down Expand Up @@ -874,12 +875,17 @@ public void sendResponse(RestResponse response) {
}
}

// exposed for tests; marked as UpdateForV10 because this assertion should have flushed out all double-close bugs by the time v10 is
// released so we should be able to drop the tests that check we behave reasonably in production on this impossible path
@UpdateForV10(owner = UpdateForV10.Owner.DISTRIBUTED_COORDINATION)
static boolean PERMIT_DOUBLE_RESPONSE = false;

private static final class ResourceHandlingHttpChannel extends DelegatingRestChannel {
private final CircuitBreakerService circuitBreakerService;
private final int contentLength;
private final HttpRouteStatsTracker statsTracker;
private final long startTime;
private final AtomicBoolean closed = new AtomicBoolean();
private final AtomicBoolean responseSent = new AtomicBoolean();

ResourceHandlingHttpChannel(
RestChannel delegate,
Expand All @@ -898,7 +904,14 @@ private static final class ResourceHandlingHttpChannel extends DelegatingRestCha
public void sendResponse(RestResponse response) {
boolean success = false;
try {
close();
// protect against double-response bugs
if (responseSent.compareAndSet(false, true) == false) {
final var message = "have already sent a response to this request, cannot send another";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

cannot send another

we can but we wont

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's an implicit "without violating the HTTP spec" here I guess ;)

Copy link
Contributor

@mhl-b mhl-b Mar 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HTTP Spec:

First shalt thou create the HTTP Response. Then shalt thou count to one, no more, no less. One shall be the number thou shalt count, and the number of the counting shall be one. Two shalt thou not count, neither count thou three, excepting that thou then proceed to one. Five is right out. Once the number one, being the first number, be reached, then lobbest thou thy HTTP Response towards thy foe, who, being naughty in My sight, shall snuff it.

assert PERMIT_DOUBLE_RESPONSE : message;
throw new IllegalStateException(message);
}
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);

statsTracker.addRequestStats(contentLength);
statsTracker.addResponseTime(rawRelativeTimeInMillis() - startTime);
if (response.isChunked() == false) {
Expand Down Expand Up @@ -929,14 +942,6 @@ public void sendResponse(RestResponse response) {
private static long rawRelativeTimeInMillis() {
return TimeValue.nsecToMSec(System.nanoTime());
}

private void close() {
// attempt to close once atomically
if (closed.compareAndSet(false, true) == false) {
throw new IllegalStateException("Channel is already closed");
}
inFlightRequestsBreaker(circuitBreakerService).addWithoutBreaking(-contentLength);
}
}

private static class ResponseLengthRecorder extends AtomicReference<HttpRouteStatsTracker> implements Releasable {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,40 +498,50 @@ public void testDispatchRequestAddsAndFreesBytesOnError() {
}

public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnError() {
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAlphaOfLength((int) Math.round(contentLength / inFlightRequestsBreaker.getOverhead()));
// we will produce an error in the rest handler and one more when sending the error response
RestRequest request = testRestRequest("/error", content, XContentType.JSON);
ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, randomBoolean());

restController.dispatchRequest(request, channel, client.threadPool().getThreadContext());

assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
try {
RestController.PERMIT_DOUBLE_RESPONSE = true;
int contentLength = BREAKER_LIMIT.bytesAsInt();
String content = randomAlphaOfLength((int) Math.round(contentLength / inFlightRequestsBreaker.getOverhead()));
// we will produce an error in the rest handler and one more when sending the error response
RestRequest request = testRestRequest("/error", content, XContentType.JSON);
ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, randomBoolean());

restController.dispatchRequest(request, channel, client.threadPool().getThreadContext());

assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
} finally {
RestController.PERMIT_DOUBLE_RESPONSE = false;
}
}

public void testDispatchRequestAddsAndFreesBytesOnlyOnceOnErrorDuringSend() {
int contentLength = Math.toIntExact(BREAKER_LIMIT.getBytes());
String content = randomAlphaOfLength((int) Math.round(contentLength / inFlightRequestsBreaker.getOverhead()));
// use a real recycler that tracks leaks and create some content bytes in the test handler to check for leaks
final BytesRefRecycler recycler = new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY));
restController.registerHandler(
new Route(GET, "/foo"),
(request, c, client) -> new RestToXContentListener<>(c).onResponse((b, p) -> b.startObject().endObject())
);
// we will produce an error in the rest handler and one more when sending the error response
RestRequest request = testRestRequest("/foo", content, XContentType.JSON);
ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, randomBoolean()) {
@Override
protected BytesStream newBytesOutput() {
return new RecyclerBytesStreamOutput(recycler);
}
};
try {
RestController.PERMIT_DOUBLE_RESPONSE = true;
int contentLength = Math.toIntExact(BREAKER_LIMIT.getBytes());
String content = randomAlphaOfLength((int) Math.round(contentLength / inFlightRequestsBreaker.getOverhead()));
// use a real recycler that tracks leaks and create some content bytes in the test handler to check for leaks
final BytesRefRecycler recycler = new BytesRefRecycler(new MockPageCacheRecycler(Settings.EMPTY));
restController.registerHandler(
new Route(GET, "/foo"),
(request, c, client) -> new RestToXContentListener<>(c).onResponse((b, p) -> b.startObject().endObject())
);
// we will produce an error in the rest handler and one more when sending the error response
RestRequest request = testRestRequest("/foo", content, XContentType.JSON);
ExceptionThrowingChannel channel = new ExceptionThrowingChannel(request, randomBoolean()) {
@Override
protected BytesStream newBytesOutput() {
return new RecyclerBytesStreamOutput(recycler);
}
};

restController.dispatchRequest(request, channel, client.threadPool().getThreadContext());
restController.dispatchRequest(request, channel, client.threadPool().getThreadContext());

assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
assertEquals(0, inFlightRequestsBreaker.getTrippedCount());
assertEquals(0, inFlightRequestsBreaker.getUsed());
} finally {
RestController.PERMIT_DOUBLE_RESPONSE = false;
}
}

public void testDispatchRequestLimitsBytes() {
Expand Down