Skip to content

Conversation

@ankikuma
Copy link
Contributor

@ankikuma ankikuma commented Jun 16, 2025

This PR addresses ES-12071.

We want to collect metrics for the time that is spent waiting for the next chunk of a bulk request. This can help with diagnosing high bulk latency in case the latency is attributable to external factors such as network connection.

@ankikuma ankikuma added Team:Distributed Indexing Meta label for Distributed Indexing team >non-issue labels Jun 16, 2025
@ankikuma ankikuma marked this pull request as ready for review June 17, 2025 10:57
@ankikuma ankikuma requested a review from a team as a code owner June 17, 2025 10:57
@ankikuma ankikuma requested review from henningandersen and tlrx June 17, 2025 10:57
@elasticsearchmachine elasticsearchmachine added needs:triage Requires assignment of a team area label and removed Team:Distributed Indexing Meta label for Distributed Indexing team labels Jun 17, 2025
@ankikuma ankikuma added the Team:Distributed Indexing Meta label for Distributed Indexing team label Jun 17, 2025
@elasticsearchmachine elasticsearchmachine removed the Team:Distributed Indexing Meta label for Distributed Indexing team label Jun 17, 2025
@ankikuma ankikuma added the :Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. label Jun 17, 2025
@elasticsearchmachine elasticsearchmachine added the Team:Distributed Indexing Meta label for Distributed Indexing team label Jun 17, 2025
@elasticsearchmachine
Copy link
Collaborator

Pinging @elastic/es-distributed-indexing (Team:Distributed Indexing)

@elasticsearchmachine elasticsearchmachine removed the needs:triage Requires assignment of a team area label label Jun 17, 2025
Copy link
Member

@tlrx tlrx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, I left some comments

public IncrementalBulkService(
Client client,
IndexingPressure indexingPressure,
BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder what are the advantages of using a dedicated BulkOperationWaitForChunkMetrics object here? Maube just inject the MeterRegistry and declare the histogram metric in IncrementalBulkService would be simpler.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Later on, calling updateWaitForChunkMetrics would update the metric directly instead of delegating to BulkOperationWaitForChunkMetrics too.

return incrementalOperation;
}

public void updateWaitForChunkMetrics(long chunkWaitTimeCentis) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe something like this?

Suggested change
public void updateWaitForChunkMetrics(long chunkWaitTimeCentis) {
public void recordWaitForNextChunkTime(long waitForNextChunkTimeInMillis) {

final IncrementalBulkService incrementalBulkService = new IncrementalBulkService(
client,
indexingLimits,
bulkOperationWaitForChunkMetrics
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would inject telemetryProvider.getMeterRegistry() directly

b.bind(PageCacheRecycler.class).toInstance(pageCacheRecycler);
b.bind(IngestService.class).toInstance(ingestService);
b.bind(IndexingPressure.class).toInstance(indexingLimits);
b.bind(BulkOperationWaitForChunkMetrics.class).toInstance(bulkOperationWaitForChunkMetrics);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Binding is necessary if the BulkOperationWaitForChunkMetrics is injected through Guice somewhere, I don't think it is the case?

import org.elasticsearch.telemetry.metric.MeterRegistry;

public class BulkOperationWaitForChunkMetrics {
public static final String CHUNK_WAIT_TIME_HISTOGRAM = "es.rest.wait.duration.histogram";
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should mention incremental bulk request / chunking:
es.rest.incremental_bulk.wait_for_next_chunk.duration.histogram

(or something along those lines)

public static final String CHUNK_WAIT_TIME_HISTOGRAM = "es.rest.wait.duration.histogram";

/* Capture in milliseconds because the APM histogram only has a range of 100,000 */
private final LongHistogram chunkWaitTimeMillisHistogram;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
private final LongHistogram chunkWaitTimeMillisHistogram;
private final LongHistogram chunkWaitTimeInMillisHistogram;

meterRegistry.registerLongHistogram(
CHUNK_WAIT_TIME_HISTOGRAM,
"Total time in millis spent waiting for next chunk of a bulk request",
"centis"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"centis"
"ms"

this.restChannel = restChannel;
this.handler = handlerSupplier.get();
request.contentStream().next();
requestNextChunkTime = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We often pass LongSupplier instead of the real time measurement method, it makes testing easier.

}
totalChunkWaitTime = TimeUnit.NANOSECONDS.toMillis(totalChunkWaitTime);
handler.updateWaitForChunkMetrics(totalChunkWaitTime);
totalChunkWaitTime = 0L;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
totalChunkWaitTime = 0L;
totalChunkWaitTime = -1L;

and then assert totalChunkWaitTime>= 0L in the handleChunk method?

handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
handler.addItems(toPass, () -> Releasables.close(releasables), () -> {
request.contentStream().next();
requestNextChunkTime = System.nanoTime();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know if request.contentStream().next(); immediately calls handleChunk if data is available? Otherwise I wonder if we want to capture the time before calling next.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do it prior, seems more correct regardless.

Copy link
Contributor

@henningandersen henningandersen left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM.

long elapsedTime = System.nanoTime() - requestNextChunkTime;
if (elapsedTime > 0) {
totalChunkWaitTime += elapsedTime;
requestNextChunkTime = 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we instead reset this to what System.nanoTime() gave above? 0 is not a special value and we do not seem to guard against it.

handler.addItems(toPass, () -> Releasables.close(releasables), () -> request.contentStream().next());
handler.addItems(toPass, () -> Releasables.close(releasables), () -> {
request.contentStream().next();
requestNextChunkTime = System.nanoTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should do it prior, seems more correct regardless.

} else {
Releasables.close(releasables);
request.contentStream().next();
requestNextChunkTime = System.nanoTime();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also set this before calling next here.

private boolean bulkInProgress = false;
private Exception bulkActionLevelFailure = null;
private BulkRequest bulkRequest = null;
private final BulkOperationWaitForChunkMetrics bulkOperationWaitForChunkMetrics;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we comment that the only reason this lives in this class is that it is simpler to inject here than into RestBulkAction?

}

public void updateWaitForChunkMetrics(long chunkWaitTimeCentis) {
if (bulkOperationWaitForChunkMetrics != null) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we can assert that it is not null instead here?

@fcofdez
Copy link
Contributor

fcofdez commented Jun 19, 2025

@tlrx I think that I went through all the comments, maybe you can take a look before we merge this? thanks!

Copy link
Member

@tlrx tlrx left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@fcofdez
Copy link
Contributor

fcofdez commented Jun 19, 2025

@elasticmachine test this (unrelated test failure GetSnapshotsIT > testFilterByState FAILED)

@fcofdez fcofdez merged commit 9e19b85 into elastic:main Jun 20, 2025
27 checks passed
kderusso pushed a commit to kderusso/elasticsearch that referenced this pull request Jun 23, 2025
)

This PR addresses ES-12071.

We want to collect metrics for the time that is spent waiting for the next chunk of a bulk request. This can help with diagnosing high bulk latency in case the latency is attributable to external factors such as network connection.

Co-authored-by: Francisco Fernández Castaño <[email protected]>
mridula-s109 pushed a commit to mridula-s109/elasticsearch that referenced this pull request Jun 25, 2025
)

This PR addresses ES-12071.

We want to collect metrics for the time that is spent waiting for the next chunk of a bulk request. This can help with diagnosing high bulk latency in case the latency is attributable to external factors such as network connection.

Co-authored-by: Francisco Fernández Castaño <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

:Distributed Indexing/Distributed A catch all label for anything in the Distributed Indexing Area. Please avoid if you can. >non-issue Team:Distributed Indexing Meta label for Distributed Indexing team v9.1.0

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants