Skip to content

Commit cb96cc7

Browse files
committed
Handle partial CRI-O log format on CRI-O specific mode
In the newer kubernetes, it start to use CRI-O format for logging. We should handle this format on concat plugin with specific mode. ref: https://github.com/kubernetes/community/blob/master/contributors/design-proposals/node/kubelet-cri-logging.md On kubelet implementation for parsing, stream type is just for checking stream names validity and then they are thrown away. Users want to obtain concatenated log contents not for attached metadata. ref: https://github.com/kubernetes/kubernetes/blob/629d5ab21349021cf7d38236620785071ee541b4/pkg/kubelet/kuberuntime/logs/logs.go#L124-L168 When we use regexp to concat CRI-O log, it is hard to remove attached metadata records such as stream names(stdout|stderr) and partial_flag(P or F). Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent 321126d commit cb96cc7

File tree

2 files changed

+70
-3
lines changed

2 files changed

+70
-3
lines changed

lib/fluent/plugin/filter_concat.rb

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,12 @@ class ConcatFilter < Filter
3838
config_param :partial_metadata_format, :enum, list: [:"docker-fluentd", :"docker-journald", :"docker-journald-lowercase"], default: :"docker-fluentd"
3939
desc "If true, keep partial metadata"
4040
config_param :keep_partial_metadata, :bool, default: false
41+
desc "Use cri log tag to concatenate multiple records"
42+
config_param :use_partial_cri_logtag, :bool, default: false
43+
desc "The key name that is referred to concatenate records on cri log"
44+
config_param :partial_cri_logtag_key, :string, default: nil
45+
desc "The key name that is referred to detect stream name on cri log"
46+
config_param :partial_cri_stream_key, :string, default: "stream"
4147

4248
class TimeoutError < StandardError
4349
end
@@ -55,8 +61,8 @@ def initialize
5561
def configure(conf)
5662
super
5763

58-
if @n_lines.nil? && @multiline_start_regexp.nil? && @multiline_end_regexp.nil? && @partial_key.nil? && !@use_partial_metadata
59-
raise Fluent::ConfigError, "Either n_lines, multiline_start_regexp, multiline_end_regexp, partial_key or use_partial_metadata is required"
64+
if @n_lines.nil? && @multiline_start_regexp.nil? && @multiline_end_regexp.nil? && @partial_key.nil? && !@use_partial_metadata && !@use_partial_cri_logtag
65+
raise Fluent::ConfigError, "Either n_lines, multiline_start_regexp, multiline_end_regexp, partial_key, use_partial_metadata or use_partial_cri_logtag is required"
6066
end
6167
if @n_lines && (@multiline_start_regexp || @multiline_end_regexp)
6268
raise Fluent::ConfigError, "n_lines and multiline_start_regexp/multiline_end_regexp are exclusive"
@@ -79,6 +85,15 @@ def configure(conf)
7985
if @use_partial_metadata && @partial_key
8086
raise Fluent::ConfigError, "use_partial_metadata and partial_key are exclusive"
8187
end
88+
if @use_partial_cri_logtag && @n_lines
89+
raise Fluent::ConfigError, "use_partial_cri_logtag and n_lines are exclusive"
90+
end
91+
if @use_partial_cri_logtag && (@multiline_start_regexp || @multiline_end_regexp)
92+
raise Fluent::ConfigError, "use_partial_cri_logtag and multiline_start_regexp/multiline_end_regexp are exclusive"
93+
end
94+
if @use_partial_cri_logtag && @partial_key
95+
raise Fluent::ConfigError, "use_partial_cri_logtag and partial_key are exclusive"
96+
end
8297

8398
@mode = nil
8499
case
@@ -110,6 +125,11 @@ def configure(conf)
110125
@partial_last_field = "container_partial_last".freeze
111126
@partial_message_indicator = @partial_id_field
112127
end
128+
when @use_partial_cri_logtag
129+
@mode = :partial_cri
130+
@partial_logtag_delimiter = ":".freeze
131+
@partial_logtag_continue = "P".freeze
132+
@partial_logtag_full = "F".freeze
113133
when @multiline_start_regexp || @multiline_end_regexp
114134
@mode = :regexp
115135
if @multiline_start_regexp
@@ -175,6 +195,9 @@ def filter_stream(tag, es)
175195
merged_record.delete(@partial_ordinal_field)
176196
merged_record.delete(@partial_last_field)
177197
end
198+
when :partial_cri
199+
merged_record.delete(@partial_cri_logtag_key) unless @keep_partial_key
200+
merged_record.delete(@partial_cri_stream_key)
178201
end
179202
new_es.add(time, merged_record)
180203
end
@@ -220,6 +243,8 @@ def process(tag, time, record)
220243
process_partial(stream_identity, tag, time, record)
221244
when :partial_metadata
222245
process_partial_metadata(stream_identity, tag, time, record)
246+
when :partial_cri
247+
process_partial_cri(stream_identity, tag, time, record)
223248
when :regexp
224249
process_regexp(stream_identity, tag, time, record)
225250
end
@@ -248,6 +273,18 @@ def process_partial(stream_identity, tag, time, record)
248273
new_es
249274
end
250275

276+
def process_partial_cri(stream_identity, tag, time, record)
277+
new_es = Fluent::MultiEventStream.new
278+
@buffer[stream_identity] << [tag, time, record]
279+
if record[@partial_cri_logtag_key].split(@partial_logtag_delimiter)[0] == @partial_logtag_full
280+
new_time, new_record = flush_buffer(stream_identity)
281+
time = new_time if @use_first_timestamp
282+
new_record.delete(@partial_cri_logtag_key)
283+
new_es.add(time, new_record)
284+
end
285+
new_es
286+
end
287+
251288
def process_partial_metadata(stream_identity, tag, time, record)
252289
new_es = Fluent::MultiEventStream.new
253290
@buffer[stream_identity] << [tag, time, record]

test/plugin/test_filter_concat.rb

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,7 +59,7 @@ def filter_with_time(conf, messages, wait: nil)
5959
end
6060

6161
test "either" do
62-
assert_raise(Fluent::ConfigError.new("Either n_lines, multiline_start_regexp, multiline_end_regexp, partial_key or use_partial_metadata is required")) do
62+
assert_raise(Fluent::ConfigError.new("Either n_lines, multiline_start_regexp, multiline_end_regexp, partial_key, use_partial_metadata or use_partial_cri_logtag is required")) do
6363
create_driver(<<-CONFIG)
6464
key message
6565
CONFIG
@@ -125,6 +125,15 @@ def filter_with_time(conf, messages, wait: nil)
125125
CONFIG
126126
assert_equal(:regexp, d.instance.instance_variable_get(:@mode))
127127
end
128+
129+
test "use_partial_cri_logtag" do
130+
d = create_driver(<<-CONFIG)
131+
key message
132+
use_partial_cri_logtag true
133+
partial_cri_logtag_key logtag
134+
CONFIG
135+
assert_equal(:partial_cri, d.instance.instance_variable_get(:@mode))
136+
end
128137
end
129138

130139
sub_test_case "lines" do
@@ -1000,6 +1009,27 @@ def filter_with_time(conf, messages, wait: nil)
10001009
end
10011010
end
10021011

1012+
sub_test_case "CRI-O format log" do
1013+
test "filter with CRI-O style events" do
1014+
config = <<-CONFIG
1015+
key message
1016+
use_partial_cri_logtag true
1017+
partial_cri_logtag_key logtag
1018+
CONFIG
1019+
messages = [
1020+
{"stream" => "stdout", "logtag" => "F", "message" => "The content of the log entry 1"},
1021+
{"stream" => "stdout", "logtag" => "P", "message" => "First line of log entry 2"},
1022+
{"stream" => "stdout", "logtag" => "P", "message" => "Second line of the log entry 2"},
1023+
{"stream" => "stderr", "logtag" => "F", "message" => "Last line of the log entry 2"},
1024+
]
1025+
filtered = filter(config, messages, wait: 3)
1026+
expected = [
1027+
{"message" => "The content of the log entry 1"},
1028+
{"message" => "First line of log entry 2\nSecond line of the log entry 2\nLast line of the log entry 2"}]
1029+
assert_equal(expected, filtered)
1030+
end
1031+
end
1032+
10031033
sub_test_case "raise exception in on_timer" do
10041034
# See also https://github.com/fluent/fluentd/issues/1946
10051035
test "failed to flush timeout buffer" do

0 commit comments

Comments
 (0)