-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Implement incremental bulk execution #113044
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Pinging @elastic/es-distributed (Team:Distributed) |
ff66351
to
a85a02a
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The changes in BulkOperation
LGTM. I do think it would be worth adding one test that ensures that short-circuited shard requests also end up in the failure store - maybe in IncrementalBulkRestIT.java
or IncrementalBulkIT.java
perhaps.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
|
||
if (request.isStreamedContent()) { | ||
assert action instanceof RequestBodyChunkConsumer; | ||
var chunkConsumer = (RequestBodyChunkConsumer) action; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is where we'd fail if there is a discrepancy between the handlers and the decider
filter in Netty4HttpAggregator
. This is fine for now (for the merge), and possibly also for our initial roll-out plans.
I do wonder though if we could have a simple aggregating handler here that protects us against a discrepancy, in particular before we enable this for regular releases (or perhaps before that even)? Would it be feasible to always do the aggregation here even (when the mode is enabled), assuming larger requests are handled by proper handlers?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a temporary measure to do minimal change in http/rest layer to allow partial content. Current intention is to remove HttpAggregator and do aggregation in (Base)RestHandler. I drafted it before in #112120 and planning to revisit once we merge to main. There are several options how to do that, I will prepare a design doc.
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { | ||
assert msg instanceof HttpObject; | ||
if (msg instanceof HttpRequest request) { | ||
var preReq = HttpHeadersAuthenticatorUtils.asHttpPreRequest(request); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I find this somewhat unnecessary, could the decider not work on the uri directly instead or even the HttpRequest
(the decider is quite intimate anyway). Happy to leave as is for now ofc.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. IMO we definitely have room for improvement here. There is a ticket tracking improved handling in this area.
Thanks, that makes sense. We should tackle that as a follow-up though, given that the incremental bulk handling is off by default and I'd prefer to get this in and start some early testing on it. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BulkOperation changes look like they make sense to me! I second the ask for a test, but it's fine if that comes as a follow up.
server/src/main/java/org/elasticsearch/action/bulk/BulkOperation.java
Outdated
Show resolved
Hide resolved
Allow a single bulk request to be passed to Elasticsearch in multiple parts. Once a certain memory threshold or number of operations have been received, the request can be split and submitted for processing.
This commit splits bulks once memory usage for indexing pressure has passed a configurable threshold.
Integrate the incremental bulks into RestBulkAction
Currently the rest.incremental_bulk is read in two different places. This means that it will be employed in two steps introducing unpredictable behavior. This commit ensures that it is only read in a single place.
The header validator is very aggressive about adjusting autoread on the belief it is the only place where autoread is tweaked. However, with stream backpressure, we should only change it when we are starting or finishing header validation.
Currently, unless a rest handler specifies that it handles "unsafe" buffers, we must copy the http buffers in releaseAndCopy. Unfortuantely, the original content was slipping through in the initial stream PR. This less to memory corruption on index and update requests which depend on buffers being copied.
Currently, the entire close pipeline is not hooked up in case of a channel close while a request is being buffered or executed. This commit resolves the issue by adding a connection to a stream closure.
This commit ensures we properly throw exceptions when an empty bulk request is received with the incremental handling enabled.
a326505
to
50eb7f9
Compare
Spotless broke during a rebase. Fixing in this commit.
50eb7f9
to
529d349
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
This commit back ports all of the work introduced in: #113044 * #111438 - 5e1f655 * #111865 - 478baf1 * #112179 - 1b77421 * #112227 - cbcbc34 * #112267 - c00768a * #112154 - a03fb12 * #112479 - 95b42a7 * #112608 - ce2d648 * #112629 - 0d55dc6 * #112767 - 2dbbd7d * #112724 - 58e3a39 * dce8a0b * #112974 - 92daeeb * 529d349 * #113161 - e3424bd
This commit adds a mechanism of splitting bulk requests when a
coordinating nodes is under memory pressure. In order to do that it
adds a new mechanism of http stream handling. And the proper framework
to use this with bulk requests.