-
Notifications
You must be signed in to change notification settings - Fork 25.6k
Release source bytes when source rewritten #134017
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Left a few initial comments.
client.bulk(bulkRequest, ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), content)); | ||
}; | ||
|
||
// The actual bulk request items are mutable during the bulk process so we must create a copy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This seems odd given that we go through transport? I'd probably find it better to handle that mutation differently?
List<DocWriteRequest<?>> toClose = new ArrayList<>(bulkRequest.requests()); | ||
return channel -> client.bulk( | ||
bulkRequest, | ||
ActionListener.releaseAfter(new RestRefCountedChunkedToXContentListener<>(channel), () -> Releasables.close(toClose)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the bulk should retain up front so we can release here immediately rather than only when the bulk completes? Since if we only do it when it completes we have not gained anything. I guess the streaming case is the primary case so I am probably ok with this, just seemed slightly counter intuitive.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if the bulk should retain up front so we can release here immediately rather than only when the bulk completes?
I would prefer to not expose reference counting into the bulk action. I would prefer to operate I guess on a close model although I am open to something else if there is a strong requirement for reference counting.
Since if we only do it when it completes we have not gained anything.
IMO this PR provides the following gain which I would describe as first step: when the source is rewritten for an ingest pipeline instead of keeping the original source and the ingest source allocated we release the original source immediately.
I believe you are describing "early release" which is when a doc write request completes early (due to failure, being on an early shard, etc) can we release those bytes. I agree that would be a nice improvement, but I feel like that is a PR by itself with testing.
request.getRestApiVersion() | ||
); | ||
|
||
// The actual bulk request items are mutable during the bulk process so we must create a copy |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am not sure I found the place where it is not mutating, maybe I musunderstood something? Can you elaborate or point me to where it is now mutating?
Feels like an anti-pattern?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
failRequestsWhenPrerequisiteActionFailed
. We mutate the request list to remove failed requests.
There is also pretty mutating type of stuff going on in BulkRequestModifier
. Although I don't think it rewrites the original requests list.
Feels like an anti-pattern?
I agree. Although I think refactoring the bulk request into an immutable version, introducing some type of builder pattern, etc will be a several hundred if not thousand line PR by itself. The ingest pipeline code, failure store, bulk action code, etc all have a lot of baggage built on an mutable bulk request.
This commit modifies the IndexSource to release any existing source
bytes whenever the source is rewritten. This will reduce memory usage
in the scenario where ingest pipelines rewrite and allocate a new
source. After this change, the original source will be released instead
of retained through the whole request.