Skip to content

Commit d290218

Browse files
authored
Measure average batch byte size and event count (#18000)
Implements average batch event count and byte size metrics. The collection of such metric could be disabled, enabled for each batch or done on a sample of the total batches. Exposing metric related to average batch byte size and event count let the user discover the average structure of their batches, understanding if the batches are fulfilled and eventually understand how to set `pipeline.batch.size` and `pipeline.batch.delay` so that goal is reached. - Instantiate metric `pipelines.<pipeline id>.batch.count` to count number of matches to compute the average events and bytes per batch - Instantiate metric `pipelines.<pipeline id>.batch.total_bytes` to sumup all the batches event's byte estimation. Exposed metric `pipelines.<pipeline id>.batch.byte_size.average.lifetime` containing the average byte size of each batch. - create new setting `pipeline.batch.metrics.sampling_mode` which could have 3 values: `disabled`, `minimal` and `full`. In this case id `disable` no `batch` metric is exposed in the `_node/stats` API. `minimal` count batches and estimates the size only for 1% of the total while `full` is for every batch. This setting leverages existing Logstash setting infrastructure so that one defined at pipeline level (defined in `pipelines.yml`) takes precedence over the global one (defined in `logstash.yml`).
1 parent 105eecc commit d290218

29 files changed

+805
-45
lines changed

config/logstash.yml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,13 @@
4949
#
5050
# pipeline.batch.delay: 50
5151
#
52+
# Set the pipeline's batch metrics reporting mode. It can be "disabled" to disable it.
53+
# "minimal" to collect only 1% of the batches metrics, "full" to collect all batches.
54+
# Default is "minimal".
55+
#
56+
# pipeline.batch.metrics.sampling_mode: "minimal"
57+
pipeline.batch.metrics.sampling_mode: minimal
58+
#
5259
# Force Logstash to exit during shutdown even if there are still inflight
5360
# events in memory. By default, logstash will refuse to quit until all
5461
# received events have been pushed to the outputs.

logstash-core/lib/logstash/api/commands/stats.rb

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -172,6 +172,23 @@ def plugin_stats(stats, plugin_type)
172172
end
173173
end
174174

175+
def refine_batch_metrics(stats)
176+
{
177+
:event_count => {
178+
:average => {
179+
# average return a FlowMetric which and we need to invoke getValue to obtain the map with metric details.
180+
:lifetime => stats[:batch][:event_count][:average].value["lifetime"] ? stats[:batch][:event_count][:average].value["lifetime"].round : 0
181+
}
182+
},
183+
:byte_size => {
184+
:average => {
185+
:lifetime => stats[:batch][:byte_size][:average].value["lifetime"] ? stats[:batch][:byte_size][:average].value["lifetime"].round : 0
186+
}
187+
}
188+
}
189+
end
190+
private :refine_batch_metrics
191+
175192
def report(stats, extended_stats = nil, opts = {})
176193
ret = {
177194
:events => stats[:events],
@@ -190,6 +207,7 @@ def report(stats, extended_stats = nil, opts = {})
190207
:batch_delay => stats.dig(:config, :batch_delay),
191208
}
192209
}
210+
ret[:batch] = refine_batch_metrics(stats) if stats.include?(:batch)
193211
ret[:dead_letter_queue] = stats[:dlq] if stats.include?(:dlq)
194212

195213
# if extended_stats were provided, enrich the return value

logstash-core/lib/logstash/environment.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def self.as_java_range(r)
8686
Setting::ExistingFilePath.new("api.ssl.keystore.path", nil, false).nullable,
8787
Setting::Password.new("api.ssl.keystore.password", nil, false).nullable,
8888
Setting::StringArray.new("api.ssl.supported_protocols", nil, true, %w[TLSv1 TLSv1.1 TLSv1.2 TLSv1.3]),
89+
Setting::StringSetting.new("pipeline.batch.metrics.sampling_mode", "minimal", true, ["disabled", "minimal", "full"]),
8990
Setting::StringSetting.new("queue.type", "memory", true, ["persisted", "memory"]),
9091
Setting::BooleanSetting.new("queue.drain", false),
9192
Setting::Bytes.new("queue.page_capacity", "64mb"),

logstash-core/lib/logstash/java_pipeline.rb

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ def start_workers
267267
@preserve_event_order = preserve_event_order?(pipeline_workers)
268268
batch_size = settings.get("pipeline.batch.size")
269269
batch_delay = settings.get("pipeline.batch.delay")
270+
batch_metric_sampling = settings.get("pipeline.batch.metrics.sampling_mode")
270271

271272
max_inflight = batch_size * pipeline_workers
272273

@@ -287,6 +288,7 @@ def start_workers
287288
"pipeline.batch.size" => batch_size,
288289
"pipeline.batch.delay" => batch_delay,
289290
"pipeline.max_inflight" => max_inflight,
291+
"batch_metric_sampling" => batch_metric_sampling,
290292
"pipeline.sources" => pipeline_source_details)
291293
@logger.info("Starting pipeline", pipeline_log_params)
292294

logstash-core/lib/logstash/settings.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def self.included(base)
5858
"path.dead_letter_queue",
5959
"path.queue",
6060
"pipeline.batch.delay",
61+
"pipeline.batch.metrics.sampling_mode",
6162
"pipeline.batch.size",
6263
"pipeline.id",
6364
"pipeline.reloadable",

logstash-core/spec/logstash/acked_queue_concurrent_stress_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@
3838
end
3939

4040
let(:queue) do
41-
described_class.new(queue_settings)
41+
described_class.new(queue_settings, org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode::DISABLED)
4242
end
4343

4444
let(:writer_threads) do

logstash-core/spec/logstash/api/modules/node_stats_spec.rb

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,13 @@
2121
require "logstash/api/modules/node_stats"
2222

2323
describe LogStash::Api::Modules::NodeStats do
24-
# enable PQ to ensure PQ-related metrics are present
25-
include_context "api setup", {"queue.type" => "persisted"}
24+
25+
include_context "api setup", {
26+
# enable PQ to ensure PQ-related metrics are present
27+
"queue.type" => "persisted",
28+
#enable batch metrics
29+
"pipeline.batch.metrics.sampling_mode" => "full"
30+
}
2631
include_examples "not found"
2732

2833
extend ResourceDSLMethods
@@ -142,6 +147,18 @@
142147
"path" => String,
143148
"free_space_in_bytes" => Numeric
144149
}
150+
},
151+
"batch" => {
152+
"event_count" => {
153+
"average" => {
154+
"lifetime" => Numeric
155+
}
156+
},
157+
"byte_size" => {
158+
"average" => {
159+
"lifetime" => Numeric
160+
}
161+
}
145162
}
146163
}
147164
},

logstash-core/spec/logstash/instrument/wrapped_write_client_spec.rb

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ def threaded_read_client
113113
end
114114

115115
context "WrappedSynchronousQueue" do
116-
let(:queue) { LogStash::WrappedSynchronousQueue.new(1024) }
116+
let(:queue) { LogStash::WrappedSynchronousQueue.new(1024, org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode::DISABLED) }
117117

118118
before do
119119
read_client.set_events_metric(metric.namespace([:stats, :events]))
@@ -136,7 +136,9 @@ def threaded_read_client
136136
.build
137137
end
138138

139-
let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }
139+
let(:queue) do
140+
LogStash::WrappedAckedQueue.new(queue_settings, org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode::DISABLED)
141+
end
140142

141143
before do
142144
read_client.set_events_metric(metric.namespace([:stats, :events]))

logstash-core/spec/logstash/queue_factory_spec.rb

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
LogStash::Setting::NumericSetting.new("queue.checkpoint.writes", 1024),
3232
LogStash::Setting::BooleanSetting.new("queue.checkpoint.retry", false),
3333
LogStash::Setting::StringSetting.new("pipeline.id", pipeline_id),
34+
LogStash::Setting::StringSetting.new("pipeline.batch.metrics.sampling_mode", "minimal", true, ["disabled", "minimal", "full"]),
3435
LogStash::Setting::PositiveIntegerSetting.new("pipeline.batch.size", 125),
3536
LogStash::Setting::PositiveIntegerSetting.new("pipeline.workers", LogStash::Config::CpuCoreStrategy.maximum)
3637
]

logstash-core/spec/logstash/util/wrapped_acked_queue_spec.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -64,7 +64,7 @@
6464
.build
6565
end
6666

67-
let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings) }
67+
let(:queue) { LogStash::WrappedAckedQueue.new(queue_settings, org.logstash.ackedqueue.QueueFactoryExt::BatchMetricMode::DISABLED) }
6868

6969
after do
7070
queue.close

0 commit comments

Comments
 (0)