Skip to content

Commit 1b3b3ee

Browse files
authored
Pq compression user metrics (#18227)
* pq-compression: wire through custom user metrics for ratio/spend * add test for queue compression metrics * pq metrics: IORatioMetric edge-case logging, use int at interface By using `int` type in `IORatioMetric#incrementBy(int,int)`, we simplify the failure scenarios while still allowing the desired behaviour, since this is always called in practice with `byte[]#length`. We ensure that attempts to decrement the value are ignored, and result in a log message, and that overflows reduce precision and are also logged. Together, these ensure that long overflows won't ever result in pipeline crashes.
1 parent 88b853a commit 1b3b3ee

27 files changed

+716
-73
lines changed

docs/static/spec/openapi/logstash-api.yaml

Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2340,6 +2340,59 @@ components:
23402340
max_queue_size_in_bytes:
23412341
type: integer
23422342
format: int64
2343+
compression:
2344+
type: object
2345+
properties:
2346+
encode:
2347+
type: object
2348+
properties:
2349+
ratio:
2350+
type: object
2351+
description: the ratio of event size in bytes to its representation on disk
2352+
properties:
2353+
lifetime:
2354+
oneOf:
2355+
- type: number
2356+
- enum:
2357+
- "Infinity"
2358+
- "NaN"
2359+
- "-Infinity"
2360+
spend:
2361+
type: object
2362+
description: the fraction of wall-clock time spent encoding events
2363+
properties:
2364+
lifetime:
2365+
oneOf:
2366+
- type: number
2367+
- enum:
2368+
- "Infinity"
2369+
- "NaN"
2370+
- "-Infinity"
2371+
decode:
2372+
type: object
2373+
properties:
2374+
ratio:
2375+
type: object
2376+
description: the ratio of event representation on disk to event size
2377+
properties:
2378+
lifetime:
2379+
oneOf:
2380+
- type: number
2381+
- enum:
2382+
- "Infinity"
2383+
- "NaN"
2384+
- "-Infinity"
2385+
spend:
2386+
type: object
2387+
description: the fraction of wall-clock time spent decoding events
2388+
properties:
2389+
lifetime:
2390+
oneOf:
2391+
- type: number
2392+
- enum:
2393+
- "Infinity"
2394+
- "NaN"
2395+
- "-Infinity"
23432396
- type: object
23442397
description: "The metrics of memory queue."
23452398
required:

logstash-core/src/main/java/org/logstash/ackedqueue/AbstractZstdAwareCompressionCodec.java

Lines changed: 16 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package org.logstash.ackedqueue;
22

3+
import co.elastic.logstash.api.Metric;
4+
import co.elastic.logstash.api.NamespacedMetric;
35
import com.github.luben.zstd.Zstd;
46
import org.apache.logging.log4j.LogManager;
57
import org.apache.logging.log4j.Logger;
@@ -13,13 +15,26 @@ abstract class AbstractZstdAwareCompressionCodec implements CompressionCodec {
1315
// log from the concrete class
1416
protected final Logger logger = LogManager.getLogger(this.getClass());
1517

18+
private final IORatioMetric decodeRatioMetric;
19+
private final RelativeSpendMetric decodeTimerMetric;
20+
21+
public AbstractZstdAwareCompressionCodec(Metric queueMetric) {
22+
final NamespacedMetric decodeNamespace = queueMetric.namespace("compression", "decode");
23+
decodeRatioMetric = decodeNamespace.namespace("ratio")
24+
.register("lifetime", AtomicIORatioMetric.FACTORY);
25+
decodeTimerMetric = decodeNamespace.namespace("spend")
26+
.register("lifetime", CalculatedRelativeSpendMetric.FACTORY);
27+
}
28+
1629
@Override
1730
public byte[] decode(byte[] data) {
1831
if (!isZstd(data)) {
32+
decodeRatioMetric.incrementBy(data.length, data.length);
1933
return data;
2034
}
2135
try {
22-
final byte[] decoded = Zstd.decompress(data);
36+
final byte[] decoded = decodeTimerMetric.time(() -> Zstd.decompress(data));
37+
decodeRatioMetric.incrementBy(data.length, decoded.length);
2338
logger.trace("decoded {} -> {}", data.length, decoded.length);
2439
return decoded;
2540
} catch (Exception e) {
Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package org.logstash.ackedqueue;
2+
3+
import co.elastic.logstash.api.UserMetric;
4+
import org.apache.logging.log4j.LogManager;
5+
import org.apache.logging.log4j.Logger;
6+
import org.logstash.instrument.metrics.AbstractMetric;
7+
8+
import java.math.BigDecimal;
9+
import java.math.MathContext;
10+
import java.math.RoundingMode;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
13+
/**
14+
* It uses {@code long} under the hood, and is capable of handling sustained 1GiB/sec
15+
* for ~272 years before overflowing.
16+
*/
17+
class AtomicIORatioMetric extends AbstractMetric<Double> implements IORatioMetric {
18+
19+
public static UserMetric.Factory<IORatioMetric> FACTORY = IORatioMetric.PROVIDER.getFactory(AtomicIORatioMetric::new);
20+
21+
private static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP);
22+
private static final ImmutableRatio ZERO = new ImmutableRatio(0L, 0L);
23+
private static final Logger LOGGER = LogManager.getLogger(AtomicIORatioMetric.class);
24+
25+
private final AtomicReference<ImmutableRatio> atomicReference = new AtomicReference<>(ZERO);
26+
private final Logger logger;
27+
28+
AtomicIORatioMetric(final String name) {
29+
this(name, LOGGER);
30+
}
31+
32+
AtomicIORatioMetric(final String name, final Logger logger) {
33+
super(name);
34+
this.logger = logger;
35+
}
36+
37+
@Override
38+
public Value getLifetime() {
39+
return atomicReference.get();
40+
}
41+
42+
@Override
43+
public void incrementBy(int bytesIn, int bytesOut) {
44+
if (bytesIn < 0 || bytesOut < 0) {
45+
logger.warn("cannot decrement IORatioMetric {}", this.getName());
46+
return;
47+
}
48+
this.atomicReference.getAndUpdate((existing) -> doIncrement(existing, bytesIn, bytesOut));
49+
}
50+
51+
// test injection
52+
void setTo(long bytesIn, long bytesOut) {
53+
this.atomicReference.set(new ImmutableRatio(bytesIn, bytesOut));
54+
}
55+
56+
@Override
57+
public Double getValue() {
58+
final Value snapshot = getLifetime();
59+
60+
final BigDecimal bytesIn = BigDecimal.valueOf(snapshot.bytesIn());
61+
final BigDecimal bytesOut = BigDecimal.valueOf(snapshot.bytesOut());
62+
63+
if (bytesIn.signum() == 0) {
64+
return switch(bytesOut.signum()) {
65+
case -1 -> Double.NEGATIVE_INFINITY;
66+
case 1 -> Double.POSITIVE_INFINITY;
67+
default -> Double.NaN;
68+
};
69+
}
70+
71+
return bytesOut.divide(bytesIn, LIMITED_PRECISION).doubleValue();
72+
}
73+
74+
public void reset() {
75+
this.atomicReference.set(ZERO);
76+
}
77+
78+
private ImmutableRatio doIncrement(final ImmutableRatio existing, final int bytesIn, final int bytesOut) {
79+
80+
final long combinedBytesIn = existing.bytesIn() + bytesIn;
81+
final long combinedBytesOut = existing.bytesOut() + bytesOut;
82+
83+
if (combinedBytesIn < 0 || combinedBytesOut < 0) {
84+
logger.warn("long overflow; precision will be reduced");
85+
final long reducedBytesIn = Math.addExact(Math.floorDiv(existing.bytesIn(), 2), bytesIn);
86+
final long reducedBytesOut = Math.addExact(Math.floorDiv(existing.bytesOut(), 2), bytesOut);
87+
88+
return new ImmutableRatio(reducedBytesIn, reducedBytesOut);
89+
}
90+
91+
return new ImmutableRatio(combinedBytesIn, combinedBytesOut);
92+
}
93+
94+
public record ImmutableRatio(long bytesIn, long bytesOut) implements Value { }
95+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package org.logstash.ackedqueue;
2+
3+
import org.logstash.instrument.metrics.AbstractMetric;
4+
import org.logstash.instrument.metrics.UptimeMetric;
5+
import org.logstash.instrument.metrics.timer.TimerMetric;
6+
import org.logstash.instrument.metrics.timer.TimerMetricFactory;
7+
8+
import java.math.BigDecimal;
9+
import java.math.MathContext;
10+
import java.math.RoundingMode;
11+
12+
class CalculatedRelativeSpendMetric extends AbstractMetric<Double> implements RelativeSpendMetric {
13+
private static final MathContext LIMITED_PRECISION = new MathContext(4, RoundingMode.HALF_UP);
14+
15+
private final TimerMetric spendMetric;
16+
private final UptimeMetric uptimeMetric;
17+
18+
public static Factory<RelativeSpendMetric> FACTORY = RelativeSpendMetric.PROVIDER.getFactory(CalculatedRelativeSpendMetric::new);
19+
20+
public CalculatedRelativeSpendMetric(final String name) {
21+
this(name, TimerMetricFactory.getInstance().create(name + ":spend"), new UptimeMetric(name + ":uptime"));
22+
}
23+
24+
CalculatedRelativeSpendMetric(String name, TimerMetric spendMetric, UptimeMetric uptimeMetric) {
25+
super(name);
26+
this.spendMetric = spendMetric;
27+
this.uptimeMetric = uptimeMetric;
28+
}
29+
30+
@Override
31+
public <T, E extends Throwable> T time(ExceptionalSupplier<T, E> exceptionalSupplier) throws E {
32+
return this.spendMetric.time(exceptionalSupplier);
33+
}
34+
35+
@Override
36+
public void reportUntrackedMillis(long untrackedMillis) {
37+
this.spendMetric.reportUntrackedMillis(untrackedMillis);
38+
}
39+
40+
@Override
41+
public Double getValue() {
42+
BigDecimal spend = BigDecimal.valueOf(spendMetric.getValue());
43+
BigDecimal uptime = BigDecimal.valueOf(uptimeMetric.getValue());
44+
45+
if (uptime.signum() == 0) {
46+
switch (spend.signum()) {
47+
case -1:
48+
return Double.NEGATIVE_INFINITY;
49+
case 0:
50+
return 0.0;
51+
case +1:
52+
return Double.POSITIVE_INFINITY;
53+
}
54+
}
55+
56+
return spend.divide(uptime, LIMITED_PRECISION).doubleValue();
57+
}
58+
}
Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package org.logstash.ackedqueue;
22

3+
import co.elastic.logstash.api.Metric;
34
import org.apache.logging.log4j.LogManager;
45
import org.apache.logging.log4j.Logger;
6+
import org.logstash.ackedqueue.ZstdEnabledCompressionCodec.Goal;
7+
import org.logstash.plugins.NamespacedMetricImpl;
58

69
public interface CompressionCodec {
710
Logger LOGGER = LogManager.getLogger(CompressionCodec.class);
@@ -26,33 +29,41 @@ public byte[] decode(byte[] data) {
2629
}
2730
};
2831

29-
static CompressionCodec fromConfigValue(final String configValue) {
30-
return fromConfigValue(configValue, LOGGER);
32+
@FunctionalInterface
33+
interface Factory {
34+
CompressionCodec create(final Metric metric);
35+
default CompressionCodec create() {
36+
return create(NamespacedMetricImpl.getNullMetric());
37+
}
3138
}
3239

33-
static CompressionCodec fromConfigValue(final String configValue, final Logger logger) {
34-
return switch (configValue) {
35-
case "disabled" -> {
40+
static CompressionCodec.Factory fromConfigValue(final String configValue, final Logger logger) {
41+
return switch(configValue) {
42+
case "disabled" -> (metric) -> {
3643
logger.warn("compression support has been disabled");
37-
yield CompressionCodec.NOOP;
38-
}
39-
case "none" -> {
44+
return CompressionCodec.NOOP;
45+
};
46+
case "none" -> (metric) -> {
4047
logger.info("compression support is enabled (read-only)");
41-
yield ZstdAwareCompressionCodec.getInstance();
42-
}
43-
case "speed" -> {
48+
return new ZstdAwareCompressionCodec(metric);
49+
};
50+
case "speed" -> (metric) -> {
4451
logger.info("compression support is enabled (goal: speed)");
45-
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SPEED);
46-
}
47-
case "balanced" -> {
52+
return new ZstdEnabledCompressionCodec(Goal.SPEED, metric);
53+
};
54+
case "balanced" -> (metric) -> {
4855
logger.info("compression support is enabled (goal: balanced)");
49-
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.BALANCED);
50-
}
51-
case "size" -> {
56+
return new ZstdEnabledCompressionCodec(Goal.BALANCED, metric);
57+
};
58+
case "size" -> (metric) -> {
5259
logger.info("compression support is enabled (goal: size)");
53-
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SIZE);
54-
}
60+
return new ZstdEnabledCompressionCodec(Goal.SIZE, metric);
61+
};
5562
default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue));
5663
};
5764
}
65+
66+
static CompressionCodec.Factory fromConfigValue(final String configValue) {
67+
return fromConfigValue(configValue, LOGGER);
68+
}
5869
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
package org.logstash.ackedqueue;
2+
3+
import co.elastic.logstash.api.UserMetric;
4+
import org.logstash.instrument.metrics.MetricType;
5+
6+
/**
7+
* A {@code IORatioMetric} is a custom metric that tracks the ratio of input to output.
8+
*/
9+
interface IORatioMetric extends UserMetric<Double>, org.logstash.instrument.metrics.Metric<Double> {
10+
Double getValue();
11+
12+
Value getLifetime();
13+
14+
void incrementBy(int bytesIn, int bytesOut);
15+
16+
@Override
17+
default MetricType getType() {
18+
return MetricType.USER;
19+
}
20+
21+
// NOTE: at 100GiB/sec, this value type has capacity for ~272 years.
22+
interface Value {
23+
long bytesIn();
24+
25+
long bytesOut();
26+
}
27+
28+
Provider<IORatioMetric> PROVIDER = new Provider<>(IORatioMetric.class, new IORatioMetric() {
29+
@Override
30+
public Double getValue() {
31+
return Double.NaN;
32+
}
33+
34+
@Override
35+
public Value getLifetime() {
36+
return null;
37+
}
38+
39+
@Override
40+
public void incrementBy(int bytesIn, int bytesOut) {
41+
// no-op
42+
}
43+
44+
@Override
45+
public String getName() {
46+
return "NULL";
47+
}
48+
});
49+
}

0 commit comments

Comments
 (0)