File tree Expand file tree Collapse file tree 3 files changed +19
-1
lines changed
logstash-core/src/main/java/org/logstash/ackedqueue Expand file tree Collapse file tree 3 files changed +19
-1
lines changed Original file line number Diff line number Diff line change @@ -2346,6 +2346,11 @@ components:
2346
2346
encode :
2347
2347
type : object
2348
2348
properties :
2349
+ goal :
2350
+ - enum :
2351
+ - speed
2352
+ - balanced
2353
+ - size
2349
2354
ratio :
2350
2355
type : object
2351
2356
description : the ratio of event size in bytes to its representation on disk
Original file line number Diff line number Diff line change 4
4
import co .elastic .logstash .api .NamespacedMetric ;
5
5
import com .github .luben .zstd .Zstd ;
6
6
7
+ import java .util .Locale ;
8
+
7
9
/**
8
10
* A {@link ZstdEnabledCompressionCodec} is a {@link CompressionCodec} that can decode deflate-compressed
9
11
* bytes and performs deflate compression when encoding.
@@ -34,6 +36,7 @@ public enum Goal {
34
36
this .internalLevel = internalLevel .internalLevel ;
35
37
36
38
final NamespacedMetric encodeNamespace = queueMetric .namespace ("compression" , "encode" );
39
+ encodeNamespace .gauge ("goal" , internalLevel .name ().toLowerCase (Locale .ROOT ));
37
40
encodeRatioMetric = encodeNamespace .namespace ("ratio" )
38
41
.register ("lifetime" , AtomicIORatioMetric .FACTORY );
39
42
encodeTimerMetric = encodeNamespace .namespace ("spend" )
Original file line number Diff line number Diff line change 208
208
shared_examples "pipeline metrics" do
209
209
# let(:pipeline_id) { defined?(super()) or fail NotImplementedError }
210
210
let ( :settings_overrides ) do
211
- super ( ) . merge ( { 'pipeline.id' => pipeline_id } )
211
+ super ( ) . dup . tap do |overrides |
212
+ overrides [ 'pipeline.id' ] = pipeline_id
213
+ if logstash_service . settings . feature_flag == "persistent_queues"
214
+ overrides [ 'queue.compression' ] = %w( none speed balanced size ) . sample
215
+ end
216
+ end
212
217
end
213
218
214
219
it "can retrieve queue stats" do
242
247
queue_compression_stats = queue_stats . fetch ( "compression" )
243
248
expect ( queue_compression_stats . dig ( 'decode' , 'ratio' , 'lifetime' ) ) . to be >= 1
244
249
expect ( queue_compression_stats . dig ( 'decode' , 'spend' , 'lifetime' ) ) . not_to be_nil
250
+ if settings_overrides [ 'queue.compression' ] != 'none'
251
+ expect ( queue_compression_stats . dig ( 'encode' , 'goal' ) ) . to eq ( settings_overrides [ 'queue.compression' ] )
252
+ expect ( queue_compression_stats . dig ( 'encode' , 'ratio' , 'lifetime' ) ) . to be <= 1
253
+ expect ( queue_compression_stats . dig ( 'encode' , 'spend' , 'lifetime' ) ) . not_to be_nil
254
+ end
245
255
else
246
256
expect ( queue_stats [ "type" ] ) . to eq ( "memory" )
247
257
end
You can’t perform that action at this time.
0 commit comments