Skip to content

Commit 963f255

Browse files
authored
Deduplicate IngestStats and IngestStats.Stats identity records when deserializing (elastic#122496) (elastic#122516)
This commit makes sure we reuse the existing static instance when deserializing to avoid excessive heap usage. # Conflicts: # server/src/main/java/org/elasticsearch/ingest/IngestStats.java
1 parent d9ef61f commit 963f255

File tree

3 files changed

+32
-10
lines changed

3 files changed

+32
-10
lines changed

docs/changelog/122496.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122496
2+
summary: Deduplicate `IngestStats` and `IngestStats.Stats` identity records when deserializing
3+
area: Ingest Node
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/ingest/IngestStats.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,14 +57,17 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
5757
* Read from a stream.
5858
*/
5959
public static IngestStats read(StreamInput in) throws IOException {
60-
var stats = new Stats(in);
60+
var stats = readStats(in);
6161
var size = in.readVInt();
62+
if (stats == Stats.IDENTITY && size == 0) {
63+
return IDENTITY;
64+
}
6265
var pipelineStats = new ArrayList<PipelineStat>(size);
6366
var processorStats = Maps.<String, List<ProcessorStat>>newMapWithExpectedSize(size);
6467

6568
for (var i = 0; i < size; i++) {
6669
var pipelineId = in.readString();
67-
var pipelineStat = new Stats(in);
70+
var pipelineStat = readStats(in);
6871
var byteStat = in.getTransportVersion().onOrAfter(TransportVersions.NODE_STATS_INGEST_BYTES)
6972
? new ByteStats(in)
7073
: new ByteStats(0, 0);
@@ -74,7 +77,7 @@ public static IngestStats read(StreamInput in) throws IOException {
7477
for (var j = 0; j < processorsSize; j++) {
7578
var processorName = in.readString();
7679
var processorType = in.readString();
77-
var processorStat = new Stats(in);
80+
var processorStat = readStats(in);
7881
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
7982
}
8083
processorStats.put(pipelineId, Collections.unmodifiableList(processorStatsPerPipeline));
@@ -171,20 +174,28 @@ static Map<String, List<ProcessorStat>> merge(Map<String, List<ProcessorStat>> f
171174
return totalsPerPipelineProcessor;
172175
}
173176

177+
/**
178+
* Read {@link Stats} from a stream.
179+
*/
180+
private static Stats readStats(StreamInput in) throws IOException {
181+
long ingestCount = in.readVLong();
182+
long ingestTimeInMillis = in.readVLong();
183+
long ingestCurrent = in.readVLong();
184+
long ingestFailedCount = in.readVLong();
185+
if (ingestCount == 0 && ingestTimeInMillis == 0 && ingestCurrent == 0 && ingestFailedCount == 0) {
186+
return Stats.IDENTITY;
187+
} else {
188+
return new Stats(ingestCount, ingestTimeInMillis, ingestCurrent, ingestFailedCount);
189+
}
190+
}
191+
174192
public record Stats(long ingestCount, long ingestTimeInMillis, long ingestCurrent, long ingestFailedCount)
175193
implements
176194
Writeable,
177195
ToXContentFragment {
178196

179197
public static final Stats IDENTITY = new Stats(0, 0, 0, 0);
180198

181-
/**
182-
* Read from a stream.
183-
*/
184-
public Stats(StreamInput in) throws IOException {
185-
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
186-
}
187-
188199
@Override
189200
public void writeTo(StreamOutput out) throws IOException {
190201
out.writeVLong(ingestCount);

server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020

2121
import static org.hamcrest.Matchers.containsInAnyOrder;
22+
import static org.hamcrest.Matchers.sameInstance;
2223

2324
public class IngestStatsTests extends ESTestCase {
2425

@@ -31,6 +32,11 @@ public void testSerialization() throws IOException {
3132
assertIngestStats(ingestStats, serializedStats);
3233
}
3334

35+
public void testIdentitySerialization() throws IOException {
36+
IngestStats serializedStats = serialize(IngestStats.IDENTITY);
37+
assertThat(serializedStats, sameInstance(IngestStats.IDENTITY));
38+
}
39+
3440
public void testStatsMerge() {
3541
var first = randomStats();
3642
var second = randomStats();

0 commit comments

Comments
 (0)