Skip to content
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public static class TrainedModelStats implements ToXContentObject, Writeable {
private final int pipelineCount;

private static final IngestStats EMPTY_INGEST_STATS = new IngestStats(
new IngestStats.Stats(0, 0, 0, 0),
IngestStats.Stats.IDENTITY,
Collections.emptyList(),
Collections.emptyMap()
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -428,24 +428,11 @@ static IngestStats ingestStatsForPipelineIds(NodeStats nodeStats, Set<String> pi
.stream()
.filter(pipelineStat -> pipelineIds.contains(pipelineStat.pipelineId()))
.collect(Collectors.toList());
CounterMetric ingestCount = new CounterMetric();
CounterMetric ingestTimeInMillis = new CounterMetric();
CounterMetric ingestCurrent = new CounterMetric();
CounterMetric ingestFailedCount = new CounterMetric();
IngestStatsAccumulator accumulator = new IngestStatsAccumulator();

filteredPipelineStats.forEach(pipelineStat -> {
IngestStats.Stats stats = pipelineStat.stats();
ingestCount.inc(stats.ingestCount());
ingestTimeInMillis.inc(stats.ingestTimeInMillis());
ingestCurrent.inc(stats.ingestCurrent());
ingestFailedCount.inc(stats.ingestFailedCount());
});
filteredPipelineStats.forEach(pipelineStat -> accumulator.inc(pipelineStat.stats()));

return new IngestStats(
new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis.count(), ingestCurrent.count(), ingestFailedCount.count()),
filteredPipelineStats,
filteredProcessorStats
);
return new IngestStats(accumulator.build(), filteredPipelineStats, filteredProcessorStats);
}

private static IngestStats mergeStats(List<IngestStats> ingestStatsList) {
Expand Down Expand Up @@ -515,7 +502,14 @@ void inc(IngestStats.Stats s) {
}

IngestStats.Stats build() {
return new IngestStats.Stats(ingestCount.count(), ingestTimeInMillis.count(), ingestCurrent.count(), ingestFailedCount.count());
long ingestCount = this.ingestCount.count();
long ingestTimeInMillis = this.ingestTimeInMillis.count();
long ingestCurrent = this.ingestCurrent.count();
long ingestFailedCount = this.ingestFailedCount.count();
if (ingestCount == 0 && ingestTimeInMillis == 0 && ingestCurrent == 0 && ingestFailedCount == 0) {
return IngestStats.Stats.IDENTITY;
}
return new IngestStats.Stats(ingestCount, ingestTimeInMillis, ingestCurrent, ingestFailedCount);
}
}

Expand All @@ -535,7 +529,12 @@ IngestStats.Stats buildStats() {
}

IngestStats.ByteStats buildByteStats() {
return new IngestStats.ByteStats(ingestBytesConsumed.count(), ingestBytesProduced.count());
long ingestBytesConsumed = this.ingestBytesConsumed.count();
long ingestBytesProduced = this.ingestBytesProduced.count();
if (ingestBytesConsumed == 0 && ingestBytesProduced == 0) {
return IngestStats.ByteStats.IDENTITY;
}
return new IngestStats.ByteStats(ingestBytesConsumed, ingestBytesProduced);
}

}
Expand Down