@@ -38,6 +38,12 @@ class ConcatFilter < Filter
3838 config_param :partial_metadata_format , :enum , list : [ :"docker-fluentd" , :"docker-journald" , :"docker-journald-lowercase" ] , default : :"docker-fluentd"
3939 desc "If true, keep partial metadata"
4040 config_param :keep_partial_metadata , :bool , default : false
41+ desc "Use cri log tag to concatenate multiple records"
42+ config_param :use_partial_cri_logtag , :bool , default : false
43+ desc "The key name that is referred to concatenate records on cri log"
44+ config_param :partial_cri_logtag_key , :string , default : nil
45+ desc "The key name that is referred to detect stream name on cri log"
46+ config_param :partial_cri_stream_key , :string , default : "stream"
4147
4248 class TimeoutError < StandardError
4349 end
@@ -52,11 +58,18 @@ def initialize
5258 end
5359 end
5460
61+ def required_params
62+ params = [ @n_lines . nil? , @multiline_start_regexp . nil? , @multiline_end_regexp . nil? , @partial_key . nil? , !@use_partial_metadata , !@use_partial_cri_logtag ]
63+ names = [ "n_lines" , "multiline_start_regexp" , "multiline_end_regexp" , "partial_key" , "use_partial_metadata" , "use_partial_cri_logtag" ]
64+ return params , names
65+ end
66+
5567 def configure ( conf )
5668 super
5769
58- if @n_lines . nil? && @multiline_start_regexp . nil? && @multiline_end_regexp . nil? && @partial_key . nil? && !@use_partial_metadata
59- raise Fluent ::ConfigError , "Either n_lines, multiline_start_regexp, multiline_end_regexp, partial_key or use_partial_metadata is required"
70+ params , names = required_params
71+ if params . all?
72+ raise Fluent ::ConfigError , "Either #{ [ names [ 0 ..-2 ] . join ( ", " ) , names [ -1 ] ] . join ( " or " ) } is required"
6073 end
6174 if @n_lines && ( @multiline_start_regexp || @multiline_end_regexp )
6275 raise Fluent ::ConfigError , "n_lines and multiline_start_regexp/multiline_end_regexp are exclusive"
@@ -79,6 +92,15 @@ def configure(conf)
7992 if @use_partial_metadata && @partial_key
8093 raise Fluent ::ConfigError , "use_partial_metadata and partial_key are exclusive"
8194 end
95+ if @use_partial_cri_logtag && @n_lines
96+ raise Fluent ::ConfigError , "use_partial_cri_logtag and n_lines are exclusive"
97+ end
98+ if @use_partial_cri_logtag && ( @multiline_start_regexp || @multiline_end_regexp )
99+ raise Fluent ::ConfigError , "use_partial_cri_logtag and multiline_start_regexp/multiline_end_regexp are exclusive"
100+ end
101+ if @use_partial_cri_logtag && @partial_key
102+ raise Fluent ::ConfigError , "use_partial_cri_logtag and partial_key are exclusive"
103+ end
82104
83105 @mode = nil
84106 case
@@ -110,6 +132,11 @@ def configure(conf)
110132 @partial_last_field = "container_partial_last" . freeze
111133 @partial_message_indicator = @partial_id_field
112134 end
135+ when @use_partial_cri_logtag
136+ @mode = :partial_cri
137+ @partial_logtag_delimiter = ":" . freeze
138+ @partial_logtag_continue = "P" . freeze
139+ @partial_logtag_full = "F" . freeze
113140 when @multiline_start_regexp || @multiline_end_regexp
114141 @mode = :regexp
115142 if @multiline_start_regexp
@@ -175,6 +202,9 @@ def filter_stream(tag, es)
175202 merged_record . delete ( @partial_ordinal_field )
176203 merged_record . delete ( @partial_last_field )
177204 end
205+ when :partial_cri
206+ merged_record . delete ( @partial_cri_logtag_key ) unless @keep_partial_key
207+ merged_record . delete ( @partial_cri_stream_key )
178208 end
179209 new_es . add ( time , merged_record )
180210 end
@@ -220,6 +250,8 @@ def process(tag, time, record)
220250 process_partial ( stream_identity , tag , time , record )
221251 when :partial_metadata
222252 process_partial_metadata ( stream_identity , tag , time , record )
253+ when :partial_cri
254+ process_partial_cri ( stream_identity , tag , time , record )
223255 when :regexp
224256 process_regexp ( stream_identity , tag , time , record )
225257 end
@@ -248,6 +280,18 @@ def process_partial(stream_identity, tag, time, record)
248280 new_es
249281 end
250282
283+ def process_partial_cri ( stream_identity , tag , time , record )
284+ new_es = Fluent ::MultiEventStream . new
285+ @buffer [ stream_identity ] << [ tag , time , record ]
286+ if record [ @partial_cri_logtag_key ] . split ( @partial_logtag_delimiter ) [ 0 ] == @partial_logtag_full
287+ new_time , new_record = flush_buffer ( stream_identity )
288+ time = new_time if @use_first_timestamp
289+ new_record . delete ( @partial_cri_logtag_key )
290+ new_es . add ( time , new_record )
291+ end
292+ new_es
293+ end
294+
251295 def process_partial_metadata ( stream_identity , tag , time , record )
252296 new_es = Fluent ::MultiEventStream . new
253297 @buffer [ stream_identity ] << [ tag , time , record ]
0 commit comments