Skip to content

Commit d8892ce

Browse files
authored
Introduce the constant ByteStats.IDENTITY (#123857) (#123885)
we already have IDENTITY constant for IngestStats and Stats, it make sense to introduce a similar pattern for ByteStats.
1 parent ca4a159 commit d8892ce

File tree

4 files changed

+72
-13
lines changed

4 files changed

+72
-13
lines changed

server/src/main/java/org/elasticsearch/ingest/IngestPipelineMetric.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,12 @@ void postIngestBytes(long bytesProduced) {
5252
* Creates a serializable representation for these metrics.
5353
*/
5454
IngestStats.ByteStats createByteStats() {
55-
return new IngestStats.ByteStats(this.bytesIngested.count(), this.bytesProduced.count());
55+
long bytesIngested = this.bytesIngested.count();
56+
long bytesProduced = this.bytesProduced.count();
57+
if (bytesIngested == 0L && bytesProduced == 0L) {
58+
return IngestStats.ByteStats.IDENTITY;
59+
}
60+
return new IngestStats.ByteStats(bytesIngested, bytesProduced);
5661
}
5762

5863
}

server/src/main/java/org/elasticsearch/ingest/IngestStats.java

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,7 @@ public static IngestStats read(StreamInput in) throws IOException {
7474
for (var i = 0; i < size; i++) {
7575
var pipelineId = in.readString();
7676
var pipelineStat = readStats(in);
77-
var byteStat = in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) ? new ByteStats(in) : new ByteStats(0, 0);
77+
var byteStat = in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) ? readByteStats(in) : ByteStats.IDENTITY;
7878
pipelineStats.add(new PipelineStat(pipelineId, pipelineStat, byteStat));
7979
int processorsSize = in.readVInt();
8080
var processorStatsPerPipeline = new ArrayList<ProcessorStat>(processorsSize);
@@ -287,13 +287,21 @@ private static PipelineStat merge(PipelineStat first, PipelineStat second) {
287287
}
288288
}
289289

290+
static ByteStats readByteStats(StreamInput in) throws IOException {
291+
long bytesIngested = in.readVLong();
292+
long bytesProduced = in.readVLong();
293+
if (bytesProduced == 0L && bytesIngested == 0L) {
294+
return ByteStats.IDENTITY;
295+
}
296+
return new ByteStats(bytesIngested, bytesProduced);
297+
}
298+
290299
/**
291300
* Container for ingested byte stats
292301
*/
293302
public record ByteStats(long bytesIngested, long bytesProduced) implements Writeable, ToXContentFragment {
294-
public ByteStats(StreamInput in) throws IOException {
295-
this(in.readVLong(), in.readVLong());
296-
}
303+
304+
public static final ByteStats IDENTITY = new ByteStats(0L, 0L);
297305

298306
@Override
299307
public void writeTo(StreamOutput out) throws IOException {
@@ -317,6 +325,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
317325
}
318326

319327
static ByteStats merge(ByteStats first, ByteStats second) {
328+
if (first == IDENTITY) {
329+
return second;
330+
} else if (second == IDENTITY) {
331+
return first;
332+
}
320333
return new ByteStats((first.bytesIngested + second.bytesIngested), first.bytesProduced + second.bytesProduced);
321334
}
322335
}

server/src/test/java/org/elasticsearch/ingest/IngestStatsTests.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import java.util.Map;
2020

2121
import static org.hamcrest.Matchers.containsInAnyOrder;
22+
import static org.hamcrest.Matchers.equalTo;
2223
import static org.hamcrest.Matchers.is;
2324
import static org.hamcrest.Matchers.sameInstance;
2425

@@ -65,6 +66,39 @@ public void testProcessorNameAndTypeIdentitySerialization() throws IOException {
6566
assertThat(processorStats.get(2).type(), sameInstance(set));
6667
}
6768

69+
public void testBytesStatsSerialization() throws IOException {
70+
{
71+
IngestPipelineMetric metric = new IngestPipelineMetric();
72+
IngestStats.ByteStats byteStats = metric.createByteStats();
73+
assertThat(byteStats, sameInstance(IngestStats.ByteStats.IDENTITY));
74+
75+
IngestStats.ByteStats serializedByteStats = serialize(byteStats);
76+
assertThat(serializedByteStats, sameInstance(IngestStats.ByteStats.IDENTITY));
77+
assertThat(IngestStats.ByteStats.merge(IngestStats.ByteStats.IDENTITY, byteStats), sameInstance(byteStats));
78+
}
79+
{
80+
long ingestBytes = randomLongBetween(0, Long.MAX_VALUE);
81+
long producedBytes = randomLongBetween(0, Long.MAX_VALUE);
82+
IngestPipelineMetric metric = new IngestPipelineMetric();
83+
metric.preIngestBytes(ingestBytes);
84+
metric.postIngestBytes(producedBytes);
85+
IngestStats.ByteStats byteStats = metric.createByteStats();
86+
assertThat(byteStats.bytesIngested(), equalTo(ingestBytes));
87+
assertThat(byteStats.bytesProduced(), equalTo(producedBytes));
88+
89+
IngestStats.ByteStats serializedByteStats = serialize(byteStats);
90+
assertThat(serializedByteStats.bytesIngested(), equalTo(ingestBytes));
91+
assertThat(serializedByteStats.bytesProduced(), equalTo(producedBytes));
92+
93+
assertThat(IngestStats.ByteStats.merge(byteStats, IngestStats.ByteStats.IDENTITY), sameInstance(byteStats));
94+
assertThat(IngestStats.ByteStats.merge(IngestStats.ByteStats.IDENTITY, byteStats), sameInstance(byteStats));
95+
assertThat(
96+
IngestStats.ByteStats.merge(IngestStats.ByteStats.IDENTITY, IngestStats.ByteStats.IDENTITY),
97+
sameInstance(IngestStats.ByteStats.IDENTITY)
98+
);
99+
}
100+
}
101+
68102
public void testStatsMerge() {
69103
var first = randomStats();
70104
var second = randomStats();
@@ -273,6 +307,13 @@ private static IngestStats serialize(IngestStats stats) throws IOException {
273307
return IngestStats.read(in);
274308
}
275309

310+
private static IngestStats.ByteStats serialize(IngestStats.ByteStats stats) throws IOException {
311+
var out = new BytesStreamOutput();
312+
stats.writeTo(out);
313+
var in = out.bytes().streamInput();
314+
return IngestStats.readByteStats(in);
315+
}
316+
276317
private static void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats) {
277318
assertNotSame(ingestStats, serializedStats);
278319
assertNotSame(ingestStats.totalStats(), serializedStats.totalStats());

x-pack/plugin/core/src/test/java/org/elasticsearch/xpack/core/ml/action/GetTrainedModelsStatsActionResponseTests.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
106106
pipelineStat -> new IngestStats.PipelineStat(
107107
pipelineStat.pipelineId(),
108108
pipelineStat.stats(),
109-
new IngestStats.ByteStats(0, 0)
109+
IngestStats.ByteStats.IDENTITY
110110
)
111111
)
112112
.toList(),
@@ -141,7 +141,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
141141
pipelineStat -> new IngestStats.PipelineStat(
142142
pipelineStat.pipelineId(),
143143
pipelineStat.stats(),
144-
new IngestStats.ByteStats(0, 0)
144+
IngestStats.ByteStats.IDENTITY
145145
)
146146
)
147147
.toList(),
@@ -214,7 +214,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
214214
pipelineStat -> new IngestStats.PipelineStat(
215215
pipelineStat.pipelineId(),
216216
pipelineStat.stats(),
217-
new IngestStats.ByteStats(0, 0)
217+
IngestStats.ByteStats.IDENTITY
218218
)
219219
)
220220
.toList(),
@@ -287,7 +287,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
287287
pipelineStat -> new IngestStats.PipelineStat(
288288
pipelineStat.pipelineId(),
289289
pipelineStat.stats(),
290-
new IngestStats.ByteStats(0, 0)
290+
IngestStats.ByteStats.IDENTITY
291291
)
292292
)
293293
.toList(),
@@ -360,7 +360,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
360360
pipelineStat -> new IngestStats.PipelineStat(
361361
pipelineStat.pipelineId(),
362362
pipelineStat.stats(),
363-
new IngestStats.ByteStats(0, 0)
363+
IngestStats.ByteStats.IDENTITY
364364
)
365365
)
366366
.toList(),
@@ -434,7 +434,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
434434
pipelineStat -> new IngestStats.PipelineStat(
435435
pipelineStat.pipelineId(),
436436
pipelineStat.stats(),
437-
new IngestStats.ByteStats(0, 0)
437+
IngestStats.ByteStats.IDENTITY
438438
)
439439
)
440440
.toList(),
@@ -508,7 +508,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
508508
pipelineStat -> new IngestStats.PipelineStat(
509509
pipelineStat.pipelineId(),
510510
pipelineStat.stats(),
511-
new IngestStats.ByteStats(0, 0)
511+
IngestStats.ByteStats.IDENTITY
512512
)
513513
)
514514
.toList(),
@@ -582,7 +582,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
582582
pipelineStat -> new IngestStats.PipelineStat(
583583
pipelineStat.pipelineId(),
584584
pipelineStat.stats(),
585-
new IngestStats.ByteStats(0, 0)
585+
IngestStats.ByteStats.IDENTITY
586586
)
587587
)
588588
.toList(),

0 commit comments

Comments
 (0)