Skip to content

Commit 4bdc3f9

Browse files
authored
Deduplicate IngestStats and IngestStats.Stats identity records when deserializing (elastic#122496) (elastic#122514)
This commit makes sure we reuse the existing static instance when deserializing to avoid excessive heap usage.
1 parent dba9d58 commit 4bdc3f9

File tree

3 files changed

+32
-10
lines changed

3 files changed

+32
-10
lines changed

docs/changelog/122496.yaml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
pr: 122496
2+
summary: Deduplicate `IngestStats` and `IngestStats.Stats` identity records when deserializing
3+
area: Ingest Node
4+
type: bug
5+
issues: []

server/src/main/java/org/elasticsearch/ingest/IngestStats.java

Lines changed: 21 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -57,22 +57,25 @@ public record IngestStats(Stats totalStats, List<PipelineStat> pipelineStats, Ma
5757
* Read from a stream.
5858
*/
5959
public static IngestStats read(StreamInput in) throws IOException {
60-
var stats = new Stats(in);
60+
var stats = readStats(in);
6161
var size = in.readVInt();
62+
if (stats == Stats.IDENTITY && size == 0) {
63+
return IDENTITY;
64+
}
6265
var pipelineStats = new ArrayList<PipelineStat>(size);
6366
var processorStats = Maps.<String, List<ProcessorStat>>newMapWithExpectedSize(size);
6467

6568
for (var i = 0; i < size; i++) {
6669
var pipelineId = in.readString();
67-
var pipelineStat = new Stats(in);
70+
var pipelineStat = readStats(in);
6871
var byteStat = in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) ? new ByteStats(in) : new ByteStats(0, 0);
6972
pipelineStats.add(new PipelineStat(pipelineId, pipelineStat, byteStat));
7073
int processorsSize = in.readVInt();
7174
var processorStatsPerPipeline = new ArrayList<ProcessorStat>(processorsSize);
7275
for (var j = 0; j < processorsSize; j++) {
7376
var processorName = in.readString();
7477
var processorType = in.readString();
75-
var processorStat = new Stats(in);
78+
var processorStat = readStats(in);
7679
processorStatsPerPipeline.add(new ProcessorStat(processorName, processorType, processorStat));
7780
}
7881
processorStats.put(pipelineId, Collections.unmodifiableList(processorStatsPerPipeline));
@@ -169,20 +172,28 @@ static Map<String, List<ProcessorStat>> merge(Map<String, List<ProcessorStat>> f
169172
return totalsPerPipelineProcessor;
170173
}
171174

175+
/**
176+
* Read {@link Stats} from a stream.
177+
*/
178+
private static Stats readStats(StreamInput in) throws IOException {
179+
long ingestCount = in.readVLong();
180+
long ingestTimeInMillis = in.readVLong();
181+
long ingestCurrent = in.readVLong();
182+
long ingestFailedCount = in.readVLong();
183+
if (ingestCount == 0 && ingestTimeInMillis == 0 && ingestCurrent == 0 && ingestFailedCount == 0) {
184+
return Stats.IDENTITY;
185+
} else {
186+
return new Stats(ingestCount, ingestTimeInMillis, ingestCurrent, ingestFailedCount);
187+
}
188+
}
189+
172190
public record Stats(long ingestCount, long ingestTimeInMillis, long ingestCurrent, long ingestFailedCount)
173191
implements
174192
Writeable,
175193
ToXContentFragment {
176194

177195
public static final Stats IDENTITY = new Stats(0, 0, 0, 0);
178196

179-
/**
180-
* Read from a stream.
181-
*/
182-
public Stats(StreamInput in) throws IOException {
183-
this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong());
184-
}
185-
186197
@Override
187198
public void writeTo(StreamOutput out) throws IOException {
188199
out.writeVLong(ingestCount);

server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020

2121
import static org.hamcrest.Matchers.containsInAnyOrder;
22+
import static org.hamcrest.Matchers.sameInstance;
2223

2324
public class IngestStatsTests extends ESTestCase {
2425

@@ -31,6 +32,11 @@ public void testSerialization() throws IOException {
3132
assertIngestStats(ingestStats, serializedStats);
3233
}
3334

35+
public void testIdentitySerialization() throws IOException {
36+
IngestStats serializedStats = serialize(IngestStats.IDENTITY);
37+
assertThat(serializedStats, sameInstance(IngestStats.IDENTITY));
38+
}
39+
3440
public void testStatsMerge() {
3541
var first = randomStats();
3642
var second = randomStats();

0 commit comments

Comments
 (0)