Skip to content

Commit 33a2bc9

Browse files
authored
Deduplicate IngestStats and IngestStats.Stats identity records when deserializing (#122496)
This commit makes sure we reuse the existing static instance when deserializing to avoid excessive heap usage.
1 parent 1b452c7 commit 33a2bc9

File tree

3 files changed

+32
-10
lines changed

3 files changed

+32
-10
lines changed

docs/changelog/122496.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122496
2+
summary: Deduplicate `IngestStats` and `IngestStats.Stats` identity records when deserializing
3+
area: Ingest Node
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/ingest/IngestStats.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,25 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
5757
* Read from a stream.
5858
*/
5959
public static IngestStats read(StreamInput in) throws IOException {
60-
var stats = new Stats(in);
60+
var stats = readStats(in);
6161
var size = in.readVInt();
62+
if (stats == Stats.IDENTITY && size == 0) {
63+
return IDENTITY;
64+
}
6265
var pipelineStats = new ArrayList<PipelineStat>(size);
6366
var processorStats = Maps.<String, List<ProcessorStat>>newMapWithExpectedSize(size);
6467

6568
for (var i = 0; i < size; i++) {
6669
var pipelineId = in.readString();
67-
var pipelineStat = new Stats(in);
70+
var pipelineStat = readStats(in);
6871
var byteStat = in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) ? new ByteStats(in) : new ByteStats(0, 0);
6972
pipelineStats.add(new PipelineStat(pipelineId, pipelineStat, byteStat));
7073
int processorsSize = in.readVInt();
7174
var processorStatsPerPipeline = new ArrayList<ProcessorStat>(processorsSize);
7275
for (var j = 0; j < processorsSize; j++) {
7376
var processorName = in.readString();
7477
var processorType = in.readString();
75-
var processorStat = new Stats(in);
78+
var processorStat = readStats(in);
7679
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
7780
}
7881
processorStats.put(pipelineId, Collections.unmodifiableList(processorStatsPerPipeline));
@@ -167,20 +170,28 @@ static Map<String, List<ProcessorStat>> merge(Map<String, List<ProcessorStat>> f
167170
return totalsPerPipelineProcessor;
168171
}
169172

173+
/**
174+
* Read {@link Stats} from a stream.
175+
*/
176+
private static Stats readStats(StreamInput in) throws IOException {
177+
long ingestCount = in.readVLong();
178+
long ingestTimeInMillis = in.readVLong();
179+
long ingestCurrent = in.readVLong();
180+
long ingestFailedCount = in.readVLong();
181+
if (ingestCount == 0 && ingestTimeInMillis == 0 && ingestCurrent == 0 && ingestFailedCount == 0) {
182+
return Stats.IDENTITY;
183+
} else {
184+
return new Stats(ingestCount, ingestTimeInMillis, ingestCurrent, ingestFailedCount);
185+
}
186+
}
187+
170188
public record Stats(long ingestCount, long ingestTimeInMillis, long ingestCurrent, long ingestFailedCount)
171189
implements
172190
Writeable,
173191
ToXContentFragment {
174192

175193
public static final Stats IDENTITY = new Stats(0, 0, 0, 0);
176194

177-
/**
178-
* Read from a stream.
179-
*/
180-
public Stats(StreamInput in) throws IOException {
181-
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
182-
}
183-
184195
@Override
185196
public void writeTo(StreamOutput out) throws IOException {
186197
out.writeVLong(ingestCount);

server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020

2121
import static org.hamcrest.Matchers.containsInAnyOrder;
22+
import static org.hamcrest.Matchers.sameInstance;
2223

2324
public class IngestStatsTests extends ESTestCase {
2425

@@ -31,6 +32,11 @@ public void testSerialization() throws IOException {
3132
assertIngestStats(ingestStats, serializedStats);
3233
}
3334

35+
public void testIdentitySerialization() throws IOException {
36+
IngestStats serializedStats = serialize(IngestStats.IDENTITY);
37+
assertThat(serializedStats, sameInstance(IngestStats.IDENTITY));
38+
}
39+
3440
public void testStatsMerge() {
3541
var first = randomStats();
3642
var second = randomStats();

0 commit comments

Comments
 (0)