Skip to content

Conversation

@mhl-b
Copy link
Contributor

@mhl-b mhl-b commented Nov 30, 2024

This PR moves HTTP content aggregation from the Netty pipeline(HttpObjectAggregator) to the REST controller. Immediate benefit is that any RestHandler can consume streamed body by toggling RestHandler#supportContentStream flag and implementing BaseRestHandler.RequestBodyChunkConsumer. Also opens up opportunity for further improvements: circuit-breakers on a stream not full content, fast-fail on headers check, move Authentication to the REST layer and simplify HTTP code, content length limits per API, incremental JSON parsing.

Notable changes:

  • HttpObjectAggregate is replaced by Netty4HttpContentSizeHandler. Netty's aggregator handled 100-continue and oversized content besides combining content chunks into full content. This PR preserves that behaviour, but removes aggregation.
  • Add RestContentAggregator to the RestController. See RestController#maybeAggregateAndDispatch.
  • Coordinated access to Netty's ChannelConfig#setAutoRead with new AutoReadSync class. A more reliable autoRead toggle between Netty4HttpHeaderValidator and Netty4HttpRequestBodyStream. Should be removed in the future when Authentication is moved to the REST layer and HTTP flow control can be simplified.
  • Add ThreadWatchdog.ActivityTracker to Netty4HttpRequestBodyStream for off-pipeline channel event loop executions.

Closes:

  • ES-9968
  • ES-9166
  • Partly ES-9571
  • ES-9309

@mhl-b mhl-b added >enhancement :Distributed Coordination/Network Http and internode communication implementations Team:Distributed Coordination Meta label for Distributed Coordination team labels Nov 30, 2024
@elasticsearchmachine
Copy link
Collaborator

Hi @mhl-b, I've created a changelog YAML for you.

@mhl-b mhl-b force-pushed the rest-content-aggregation branch from cae8263 to c7d3e4d Compare December 5, 2024 05:55
@mhl-b mhl-b force-pushed the rest-content-aggregation branch from 37ba19b to 650738e Compare December 6, 2024 01:09
@mhl-b mhl-b changed the title [WIP] Add rest content aggregator [WIP] Add REST content aggregator Dec 9, 2024
@mhl-b mhl-b marked this pull request as ready for review December 9, 2024 20:15
@mhl-b mhl-b requested a review from a team December 9, 2024 20:15
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-coordination (Team:Distributed Coordination)

@mhl-b mhl-b changed the title [WIP] Add REST content aggregator Add REST content aggregator Dec 9, 2024
send();
} catch (Exception e) {
channel.pipeline().fireExceptionCaught(e);
} catch (Throwable t) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

die-with-dignity test throws OOM exception which is discarded by evenloop executor, need to catch and throw into pipeline

Copy link
Contributor

@DaveCTurner DaveCTurner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm finding this quite hard to get a handle on, there's a lot to review here. Could we pull out the changes to fix the auto-read problems and deal with that first?

I'm also wondering if auto-read is even necessary any more. We're already starting to move to an explicit-read model with stuff like org.elasticsearch.http.HttpBody.Stream#next. Could we go all-in with that model instead and avoid using auto-read at all?

}

int readBytes(int bytes) {
int readBytes(int bytes) throws InterruptedException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we make this method interrupt-safe rather than pushing that responsibility up to the caller? I.e. catch InterruptedException, reinstate the thread's interrupt status flag, and throw an AssertionError since we do not expect to be interrupted in these tests.

);

static {
EXPECTATION_FAILED.headers().set(CONTENT_LENGTH, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I'd rather we finished constructing these values (i.e. setting their headers) before assigning them to their respective fields.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I copied that from the netty's HttpObjectAggregator. Can instantiate those inline.

if (msg instanceof HttpRequest request) {
handleRequest(ctx, request);
} else {
handleContent(ctx, (HttpContent) msg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: HttpObject has other possibilities (e.g. io.netty.handler.codec.http.DefaultHttpObject), could we assert this is a HttpContent before casting?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

HttpObjectDecoder emits HttpRequest and HttpContent only. I dont think DefaultHttpObject is a valid type here.

}
})
.addLast("aggregator", aggregator);
.addLast("content_size", contentSizeHandler);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it'd be more harmonious with the other code here if we inlined contentSizeHandler. Wasn't possible with aggregator because of the setMaxCumulationBufferComponents call, but we can clean that up now.

private final Map<String, List<String>> headers;
private final AtomicBoolean released;
private final Exception inboundException;
private final boolean pooled;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

somewhat unrelated: is this ever false?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It should be true, I did not add these changes here due to already large scope.

}

public void handleNettyContent(HttpContent httpContent) {
public void handleNettyContent(HttpContent httpContent) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm uncomfortable with relaxing all these signatures to permit any checked exception. AFAICT this is because we can now end up calling dispatchRequest which calls various methods that claim they might throw an IOException, although in practice they shouldn't do so. Could we handle this better in dispatchRequest instead?

} else {
channel.eventLoop().submit(this::doClose);
if (channel.eventLoop().isShutdown() == false) {
channel.eventLoop().submit(this::doClose);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suspect this might not be threadsafe: what if the event loop shuts down after checking isShutdown but before submitting the task? Should we do something more like org.elasticsearch.transport.netty4.Netty4Utils#safeWriteAndFlush?

That said, don't we clean up all in-flight requests and close all HTTP channels etc. before shutting down the event loop anyway? If so, isShutdown() is always false here right?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yup, not threadsafe. I think I should catch RejectedException here from eventLoop().submit(). I dont remember all details, but shutdown integ tests invoke this code and isShutdown() is true.

this.threadContext = threadContext;
this.activityTracker = activityTracker;
this.requestContext = threadContext.newStoredContext();
this.autoRead = AutoReadSync.getHandle(channel);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm uncomfortable with dynamically acquiring a new auto-read flag for each body stream in the channel. We're only reading one of these at once, can we do something more obviously bounded here?

Copy link
Contributor Author

@mhl-b mhl-b Dec 30, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can read fairly large chunk from network that can contain multiple HTTP requests. We will dispatch each request headers to RestHandlers while holding content for each of them in the Netty4HttpRequestBodyStream until it's processed. So it's possible that multiple stream handlers can exists in the channel that toggle off auto-read at the same time. In particular when stream.next() is not invoked immediately by ChunkHandler.

The ultimate and clean solution that I see here is moving Netty4HttpHeaderValidator to the rest layer. Place FlowControlHandler in front of HttpContentDecompressor that will hold all incoming http parts, and toggle auto-read only in the Netty4HttpRequestBodyStream. The bitset toggle I introduced here only for the time being until we move auth stuff.

@mhl-b
Copy link
Contributor Author

mhl-b commented Jun 19, 2025

closed in favour of #129302

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Coordination/Network Http and internode communication implementations >enhancement Team:Distributed Coordination Meta label for Distributed Coordination team v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants