Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,12 @@ void postIngestBytes(long bytesProduced) {
* Creates a serializable representation for these metrics.
*/
IngestStats.ByteStats createByteStats() {
return new IngestStats.ByteStats(this.bytesIngested.count(), this.bytesProduced.count());
long bytesIngested = this.bytesIngested.count();
long bytesProduced = this.bytesProduced.count();
if (bytesIngested == 0L && bytesProduced == 0L) {
return IngestStats.ByteStats.IDENTITY;
}
return new IngestStats.ByteStats(bytesIngested, bytesProduced);
}

}
21 changes: 17 additions & 4 deletions server/src/main/java/org/elasticsearch/ingest/IngestStats.java
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public static IngestStats read(StreamInput in) throws IOException {
for (var i = 0; i < size; i++) {
var pipelineId = in.readString();
var pipelineStat = readStats(in);
var byteStat = in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) ? new ByteStats(in) : new ByteStats(0, 0);
var byteStat = in.getTransportVersion().onOrAfter(TransportVersions.V_8_15_0) ? readByteStats(in) : ByteStats.IDENTITY;
pipelineStats.add(new PipelineStat(pipelineId, pipelineStat, byteStat));
int processorsSize = in.readVInt();
var processorStatsPerPipeline = new ArrayList<ProcessorStat>(processorsSize);
Expand Down Expand Up @@ -287,13 +287,21 @@ private static PipelineStat merge(PipelineStat first, PipelineStat second) {
}
}

static ByteStats readByteStats(StreamInput in) throws IOException {
long bytesIngested = in.readVLong();
long bytesProduced = in.readVLong();
if (bytesProduced == 0L && bytesIngested == 0L) {
return ByteStats.IDENTITY;
}
return new ByteStats(bytesIngested, bytesProduced);
}

/**
* Container for ingested byte stats
*/
public record ByteStats(long bytesIngested, long bytesProduced) implements Writeable, ToXContentFragment {
public ByteStats(StreamInput in) throws IOException {
this(in.readVLong(), in.readVLong());
}

public static final ByteStats IDENTITY = new ByteStats(0L, 0L);

@Override
public void writeTo(StreamOutput out) throws IOException {
Expand All @@ -317,6 +325,11 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws
}

static ByteStats merge(ByteStats first, ByteStats second) {
if (first == IDENTITY) {
return second;
} else if (second == IDENTITY) {
return first;
}
return new ByteStats((first.bytesIngested + second.bytesIngested), first.bytesProduced + second.bytesProduced);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import java.util.Map;

import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.sameInstance;

Expand Down Expand Up @@ -65,6 +66,39 @@ public void testProcessorNameAndTypeIdentitySerialization() throws IOException {
assertThat(processorStats.get(2).type(), sameInstance(set));
}

public void testBytesStatsSerialization() throws IOException {
{
IngestPipelineMetric metric = new IngestPipelineMetric();
IngestStats.ByteStats byteStats = metric.createByteStats();
assertThat(byteStats, sameInstance(IngestStats.ByteStats.IDENTITY));

IngestStats.ByteStats serializedByteStats = serialize(byteStats);
assertThat(serializedByteStats, sameInstance(IngestStats.ByteStats.IDENTITY));
assertThat(IngestStats.ByteStats.merge(IngestStats.ByteStats.IDENTITY, byteStats), sameInstance(byteStats));
}
{
long ingestBytes = randomLongBetween(0, Long.MAX_VALUE);
long producedBytes = randomLongBetween(0, Long.MAX_VALUE);
IngestPipelineMetric metric = new IngestPipelineMetric();
metric.preIngestBytes(ingestBytes);
metric.postIngestBytes(producedBytes);
IngestStats.ByteStats byteStats = metric.createByteStats();
assertThat(byteStats.bytesIngested(), equalTo(ingestBytes));
assertThat(byteStats.bytesProduced(), equalTo(producedBytes));

IngestStats.ByteStats serializedByteStats = serialize(byteStats);
assertThat(serializedByteStats.bytesIngested(), equalTo(ingestBytes));
assertThat(serializedByteStats.bytesProduced(), equalTo(producedBytes));

assertThat(IngestStats.ByteStats.merge(byteStats, IngestStats.ByteStats.IDENTITY), sameInstance(byteStats));
assertThat(IngestStats.ByteStats.merge(IngestStats.ByteStats.IDENTITY, byteStats), sameInstance(byteStats));
assertThat(
IngestStats.ByteStats.merge(IngestStats.ByteStats.IDENTITY, IngestStats.ByteStats.IDENTITY),
sameInstance(IngestStats.ByteStats.IDENTITY)
);
}
}

public void testStatsMerge() {
var first = randomStats();
var second = randomStats();
Expand Down Expand Up @@ -273,6 +307,13 @@ private static IngestStats serialize(IngestStats stats) throws IOException {
return IngestStats.read(in);
}

private static IngestStats.ByteStats serialize(IngestStats.ByteStats stats) throws IOException {
var out = new BytesStreamOutput();
stats.writeTo(out);
var in = out.bytes().streamInput();
return IngestStats.readByteStats(in);
}

private static void assertIngestStats(IngestStats ingestStats, IngestStats serializedStats) {
assertNotSame(ingestStats, serializedStats);
assertNotSame(ingestStats.totalStats(), serializedStats.totalStats());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
pipelineStat -> new IngestStats.PipelineStat(
pipelineStat.pipelineId(),
pipelineStat.stats(),
new IngestStats.ByteStats(0, 0)
IngestStats.ByteStats.IDENTITY
)
)
.toList(),
Expand Down Expand Up @@ -141,7 +141,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
pipelineStat -> new IngestStats.PipelineStat(
pipelineStat.pipelineId(),
pipelineStat.stats(),
new IngestStats.ByteStats(0, 0)
IngestStats.ByteStats.IDENTITY
)
)
.toList(),
Expand Down Expand Up @@ -214,7 +214,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
pipelineStat -> new IngestStats.PipelineStat(
pipelineStat.pipelineId(),
pipelineStat.stats(),
new IngestStats.ByteStats(0, 0)
IngestStats.ByteStats.IDENTITY
)
)
.toList(),
Expand Down Expand Up @@ -287,7 +287,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
pipelineStat -> new IngestStats.PipelineStat(
pipelineStat.pipelineId(),
pipelineStat.stats(),
new IngestStats.ByteStats(0, 0)
IngestStats.ByteStats.IDENTITY
)
)
.toList(),
Expand Down Expand Up @@ -360,7 +360,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
pipelineStat -> new IngestStats.PipelineStat(
pipelineStat.pipelineId(),
pipelineStat.stats(),
new IngestStats.ByteStats(0, 0)
IngestStats.ByteStats.IDENTITY
)
)
.toList(),
Expand Down Expand Up @@ -434,7 +434,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
pipelineStat -> new IngestStats.PipelineStat(
pipelineStat.pipelineId(),
pipelineStat.stats(),
new IngestStats.ByteStats(0, 0)
IngestStats.ByteStats.IDENTITY
)
)
.toList(),
Expand Down Expand Up @@ -508,7 +508,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
pipelineStat -> new IngestStats.PipelineStat(
pipelineStat.pipelineId(),
pipelineStat.stats(),
new IngestStats.ByteStats(0, 0)
IngestStats.ByteStats.IDENTITY
)
)
.toList(),
Expand Down Expand Up @@ -582,7 +582,7 @@ protected Response mutateInstanceForVersion(Response instance, TransportVersion
pipelineStat -> new IngestStats.PipelineStat(
pipelineStat.pipelineId(),
pipelineStat.stats(),
new IngestStats.ByteStats(0, 0)
IngestStats.ByteStats.IDENTITY
)
)
.toList(),
Expand Down