Skip to content

Commit a152238

Browse files
yaauiejsvdrobbavey
authored
PQ: Add support for event-level compression using ZStandard (ZSTD) (#18121)
* noop: add pq compression-codec with no-op implementation * pq: add support for event compression using zstd Adds non-breaking support for event compression to the persisted queue, as configured by a new per-pipeline setting `queue.compression`, which supports: - `none` (default): no compression is performed, but if compressed events are encountered in the queue they will be decompressed - `speed`: compression optimized for speed - `balanced`: compression balancing speed against result size - `size`: compression optimized for maximum reduction of size - `disabled`: compression support entirely disabled; if a pipeline is run in this configuration against a PQ that already contains unacked compressed events, the pipeline WILL crash. To accomplish this, we then provide an abstract base implementation of the CompressionCodec whose decode method is capable of _detecting_ and decoding zstd-encoded payload while letting other payloads through unmodified. The detection is done with an operation on the first four bytes of the payload, so no additional context is needed. An instance of this zstd-aware compression codec is provided with a pass-through encode operation when configured with `queue.compression: none`, which is the default, ensuring that by default logstash is able to decode any event that had previously been written. We provide an additional implementation that is capable of _encoding_ events with a configurable goal: speed, size, or a balance of the two. * license: add notice for `com.github.luben:zstd-jni` * pq: log compression encode/decode from the codec * Apply docs suggestions from code review Co-authored-by: João Duarte <[email protected]> * remove CleanerThreadLocal utility * license: add mapping for com.github.luben:zstd-jni * Apply suggestions from code review Co-authored-by: Rob Bavey <[email protected]> --------- Co-authored-by: João Duarte <[email protected]> Co-authored-by: Rob Bavey <[email protected]>
1 parent 0f9b2fe commit a152238

25 files changed

+792
-2
lines changed

config/logstash.yml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -229,6 +229,10 @@
229229
#
230230
# queue.checkpoint.writes: 1024
231231
#
232+
# If using queue.type: persisted, the compression goal. Valid values are `none`, `speed`, `balanced`, and `size`.
233+
# The default `none` is able to decompress previously-written events, even if they were compressed.
234+
#
235+
# queue.compression: none
232236
#
233237
# ------------ Dead-Letter Queue Settings --------------
234238
# Flag to turn on dead-letter queue.

docker/data/logstash/env2yaml/env2yaml.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ var validSettings = []string{
5858
"queue.checkpoint.acks",
5959
"queue.checkpoint.writes",
6060
"queue.checkpoint.interval", // remove it for #17155
61+
"queue.compression",
6162
"queue.drain",
6263
"dead_letter_queue.enable",
6364
"dead_letter_queue.max_bytes",

docs/reference/logstash-settings-file.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ The `logstash.yml` file includes these settings.
6868
| `queue.checkpoint.acks` | The maximum number of ACKed events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.acks: 0` to set this value to unlimited. | 1024 |
6969
| `queue.checkpoint.writes` | The maximum number of written events before forcing a checkpoint when persistent queues are enabled (`queue.type: persisted`). Specify `queue.checkpoint.writes: 0` to set this value to unlimited. | 1024 |
7070
| `queue.checkpoint.retry` | When enabled, Logstash will retry four times per attempted checkpoint write for any checkpoint writes that fail. Any subsequent errors are not retried. This is a workaround for failed checkpoint writes that have been seen only on Windows platform, filesystems with non-standard behavior such as SANs and is not recommended except in those specific circumstances. (`queue.type: persisted`) | `true` |
71+
| `queue.compression` {applies_to}`stack: ga 9.2` | Set a persisted queue compression level, which allows the pipeline to reduce the event size on disk at the cost of CPU usage. Possible values are `speed`, `balanced`, and `size`. | `none` |
7172
| `queue.drain` | When enabled, Logstash waits until the persistent queue (`queue.type: persisted`) is drained before shutting down. | `false` |
7273
| `dead_letter_queue.enable` | Flag to instruct Logstash to enable the DLQ feature supported by plugins. | `false` |
7374
| `dead_letter_queue.max_bytes` | The maximum size of each dead letter queue. Entries will be dropped if they would increase the size of the dead letter queue beyond this setting. | `1024mb` |

docs/reference/persistent-queues.md

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,17 @@ If you want to define values for a specific pipeline, use [`pipelines.yml`](/ref
8484
`queue.checkpoint.interval` {applies_to}`stack: deprecated 9.1`
8585
: Sets the interval in milliseconds when a checkpoint is forced on the head page. Default is `1000`. Set to `0` to eliminate periodic checkpoints.
8686

87+
`queue.compression` {applies_to}`stack: ga 9.2`
88+
: Sets the event compression level for use with the Persisted Queue. Default is `none`. Possible values are:
89+
* `none`: does not perform compression, but reads compressed events
90+
* `speed`: optimize for fastest compression operation
91+
* `size`: optimize for smallest possible size on disk, spending more CPU
92+
* `balanced`: a balance between the `speed` and `size` settings
93+
:::{important}
94+
Compression can be enabled for an existing PQ, but once compressed elements have been added to a PQ, that PQ cannot be read by previous Logstash releases that did not support compression.
95+
If you need to downgrade Logstash after enabling the PQ, you will need to either delete the PQ or run the pipeline with `queue.drain: true` first to ensure that no compressed elements remain.
96+
:::
97+
8798
## Configuration notes [pq-config-notes]
8899

89100
Every situation and environment is different, and the "ideal" configuration varies. If you optimize for performance, you may increase your risk of losing data. If you optimize for data protection, you may impact performance.

logstash-core/build.gradle

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -239,6 +239,7 @@ dependencies {
239239
implementation 'commons-codec:commons-codec:1.17.0' // transitively required by httpclient
240240
// Jackson version moved to versions.yml in the project root (the JrJackson version is there too)
241241
implementation "com.fasterxml.jackson.core:jackson-core:${jacksonVersion}"
242+
implementation "com.github.luben:zstd-jni:1.5.7-4"
242243
api "com.fasterxml.jackson.core:jackson-databind:${jacksonDatabindVersion}"
243244
api "com.fasterxml.jackson.core:jackson-annotations:${jacksonVersion}"
244245
implementation 'org.codehaus.janino:janino:3.1.0'

logstash-core/lib/logstash/environment.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,7 @@ def self.as_java_range(r)
9696
Setting::NumericSetting.new("queue.checkpoint.writes", 1024), # 0 is unlimited
9797
Setting::NumericSetting.new("queue.checkpoint.interval", 1000), # remove it for #17155
9898
Setting::BooleanSetting.new("queue.checkpoint.retry", true),
99+
Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
99100
Setting::BooleanSetting.new("dead_letter_queue.enable", false),
100101
Setting::Bytes.new("dead_letter_queue.max_bytes", "1024mb"),
101102
Setting::NumericSetting.new("dead_letter_queue.flush_interval", 5000),

logstash-core/lib/logstash/settings.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ def self.included(base)
7070
"queue.checkpoint.interval", # remove it for #17155
7171
"queue.checkpoint.writes",
7272
"queue.checkpoint.retry",
73+
"queue.compression",
7374
"queue.drain",
7475
"queue.max_bytes",
7576
"queue.max_events",

logstash-core/spec/logstash/queue_factory_spec.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
LogStash::Setting::NumericSetting.new("queue.checkpoint.acks", 1024),
3131
LogStash::Setting::NumericSetting.new("queue.checkpoint.writes", 1024),
3232
LogStash::Setting::BooleanSetting.new("queue.checkpoint.retry", false),
33+
LogStash::Setting::StringSetting.new("queue.compression", "none", true, %w(none speed balanced size disabled)),
3334
LogStash::Setting::StringSetting.new("pipeline.id", pipeline_id),
3435
LogStash::Setting::StringSetting.new("pipeline.batch.metrics.sampling_mode", "minimal", true, ["disabled", "minimal", "full"]),
3536
LogStash::Setting::PositiveIntegerSetting.new("pipeline.batch.size", 125),
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
package org.logstash.ackedqueue;
2+
3+
import com.github.luben.zstd.Zstd;
4+
import org.apache.logging.log4j.LogManager;
5+
import org.apache.logging.log4j.Logger;
6+
7+
/**
8+
* Subclasses of {@link AbstractZstdAwareCompressionCodec} are {@link CompressionCodec}s that are capable
9+
* of detecting and decompressing deflate-compressed events. When decoding byte sequences that are <em>NOT</em>
10+
* deflate-compressed, the given bytes are emitted verbatim.
11+
*/
12+
abstract class AbstractZstdAwareCompressionCodec implements CompressionCodec {
13+
// log from the concrete class
14+
protected final Logger logger = LogManager.getLogger(this.getClass());
15+
16+
@Override
17+
public byte[] decode(byte[] data) {
18+
if (!isZstd(data)) {
19+
return data;
20+
}
21+
try {
22+
final byte[] decoded = Zstd.decompress(data);
23+
logger.trace("decoded {} -> {}", data.length, decoded.length);
24+
return decoded;
25+
} catch (Exception e) {
26+
throw new RuntimeException("Exception while decoding", e);
27+
}
28+
}
29+
30+
private static final byte[] ZSTD_FRAME_MAGIC = { (byte) 0x28, (byte) 0xB5, (byte) 0x2F, (byte) 0xFD };
31+
32+
static boolean isZstd(byte[] data) {
33+
if (data.length < 4) { return false; }
34+
35+
for (int i = 0; i < 4; i++) {
36+
if (data[i] != ZSTD_FRAME_MAGIC[i]) { return false; }
37+
}
38+
39+
return true;
40+
}
41+
}
Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
package org.logstash.ackedqueue;
2+
3+
import org.apache.logging.log4j.LogManager;
4+
import org.apache.logging.log4j.Logger;
5+
6+
public interface CompressionCodec {
7+
Logger LOGGER = LogManager.getLogger(CompressionCodec.class);
8+
9+
byte[] encode(byte[] data);
10+
byte[] decode(byte[] data);
11+
12+
/**
13+
* The {@link CompressionCodec#NOOP} is a {@link CompressionCodec} that
14+
* does nothing when encoding and decoding. It is only meant to be activated
15+
* as a safety-latch in the event of compression being broken.
16+
*/
17+
CompressionCodec NOOP = new CompressionCodec() {
18+
@Override
19+
public byte[] encode(byte[] data) {
20+
return data;
21+
}
22+
23+
@Override
24+
public byte[] decode(byte[] data) {
25+
return data;
26+
}
27+
};
28+
29+
static CompressionCodec fromConfigValue(final String configValue) {
30+
return fromConfigValue(configValue, LOGGER);
31+
}
32+
33+
static CompressionCodec fromConfigValue(final String configValue, final Logger logger) {
34+
return switch (configValue) {
35+
case "disabled" -> {
36+
logger.warn("compression support has been disabled");
37+
yield CompressionCodec.NOOP;
38+
}
39+
case "none" -> {
40+
logger.info("compression support is enabled (read-only)");
41+
yield ZstdAwareCompressionCodec.getInstance();
42+
}
43+
case "speed" -> {
44+
logger.info("compression support is enabled (goal: speed)");
45+
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SPEED);
46+
}
47+
case "balanced" -> {
48+
logger.info("compression support is enabled (goal: balanced)");
49+
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.BALANCED);
50+
}
51+
case "size" -> {
52+
logger.info("compression support is enabled (goal: size)");
53+
yield new ZstdEnabledCompressionCodec(ZstdEnabledCompressionCodec.Goal.SIZE);
54+
}
55+
default -> throw new IllegalArgumentException(String.format("Unsupported compression setting `%s`", configValue));
56+
};
57+
}
58+
}

0 commit comments

Comments
 (0)