@@ -34,6 +34,8 @@ class ConcatFilter < Filter
3434 config_param :keep_partial_key , :bool , default : false
3535 desc "Use partial metadata to concatenate multiple records"
3636 config_param :use_partial_metadata , :bool , default : false
37+ desc "Input format of the partial metadata (fluentd or journald docker log driver)"
38+ config_param :partial_metadata_format , :string , default : "docker-fluentd"
3739 desc "If true, keep partial metadata"
3840 config_param :keep_partial_metadata , :bool , default : false
3941
@@ -77,6 +79,9 @@ def configure(conf)
7779 if @use_partial_metadata && @partial_key
7880 raise Fluent ::ConfigError , "use_partial_metadata and partial_key are exclusive"
7981 end
82+ unless [ "docker-fluentd" , "docker-journald" , "docker-journald-lowercase" ] . include? ( @partial_metadata_format )
83+ raise Fluent ::ConfigError , "partial_metadata_format only supports docker-fluentd, docker-journald or docker-journald-lowercase"
84+ end
8085
8186 @mode = nil
8287 case
@@ -86,6 +91,28 @@ def configure(conf)
8691 @mode = :partial
8792 when @use_partial_metadata
8893 @mode = :partial_metadata
94+ case @partial_metadata_format
95+ when "docker-fluentd"
96+ @partial_message_field = "partial_message" . freeze
97+ @partial_id_field = "partial_id" . freeze
98+ @partial_ordinal_field = "partial_ordinal" . freeze
99+ @partial_last_field = "partial_last" . freeze
100+ @partial_message_indicator = @partial_message_field
101+ when "docker-journald"
102+ @partial_message_field = "CONTAINER_PARTIAL_MESSAGE" . freeze
103+ @partial_id_field = "CONTAINER_PARTIAL_ID" . freeze
104+ @partial_ordinal_field = "CONTAINER_PARTIAL_ORDINAL" . freeze
105+ @partial_last_field = "CONTAINER_PARTIAL_LAST" . freeze
106+ # the journald log driver does not add CONTAINER_PARTIAL_MESSAGE to the last message
107+ # so we help ourself by using another indicator
108+ @partial_message_indicator = @partial_id_field
109+ when "docker-journald-lowercase"
110+ @partial_message_field = "container_partial_message" . freeze
111+ @partial_id_field = "container_partial_id" . freeze
112+ @partial_ordinal_field = "container_partial_ordinal" . freeze
113+ @partial_last_field = "container_partial_last" . freeze
114+ @partial_message_indicator = @partial_id_field
115+ end
89116 when @multiline_start_regexp || @multiline_end_regexp
90117 @mode = :regexp
91118 if @multiline_start_regexp
@@ -130,7 +157,7 @@ def filter_stream(tag, es)
130157 end
131158 end
132159 if @mode == :partial_metadata
133- unless record . key? ( "partial_message" . freeze )
160+ unless record . key? ( @partial_message_indicator )
134161 new_es . add ( time , record )
135162 next
136163 end
@@ -146,10 +173,10 @@ def filter_stream(tag, es)
146173 merged_record . delete ( @partial_key ) unless @keep_partial_key
147174 when :partial_metadata
148175 unless @keep_partial_metadata
149- merged_record . delete ( "partial_message" . freeze )
150- merged_record . delete ( "partial_id" . freeze )
151- merged_record . delete ( "partial_ordinal" . freeze )
152- merged_record . delete ( "partial_last" . freeze )
176+ merged_record . delete ( @partial_message_field )
177+ merged_record . delete ( @partial_id_field )
178+ merged_record . delete ( @partial_ordinal_field )
179+ merged_record . delete ( @partial_last_field )
153180 end
154181 end
155182 new_es . add ( time , merged_record )
@@ -175,9 +202,9 @@ def on_timer
175202 def process ( tag , time , record )
176203 if @mode == :partial_metadata
177204 if @stream_identity_key
178- stream_identity = %Q(#{ tag } :#{ record [ @stream_identity_key ] } #{ record [ "partial_id" ] } )
205+ stream_identity = %Q(#{ tag } :#{ record [ @stream_identity_key ] } #{ record [ @partial_id_field ] } )
179206 else
180- stream_identity = %Q(#{ tag } :#{ record [ "partial_id" ] } )
207+ stream_identity = %Q(#{ tag } :#{ record [ @partial_id_field ] } )
181208 end
182209 else
183210 if @stream_identity_key
@@ -227,7 +254,7 @@ def process_partial(stream_identity, tag, time, record)
227254 def process_partial_metadata ( stream_identity , tag , time , record )
228255 new_es = Fluent ::MultiEventStream . new
229256 @buffer [ stream_identity ] << [ tag , time , record ]
230- if record [ "partial_last" ] == "true"
257+ if record [ @partial_last_field ] == "true"
231258 new_time , new_record = flush_buffer ( stream_identity )
232259 time = new_time if @use_first_timestamp
233260 new_record . delete ( @partial_key )
@@ -306,7 +333,7 @@ def continuous_line?(text)
306333 def flush_buffer ( stream_identity , new_element = nil )
307334 lines = if @mode == :partial_metadata
308335 @buffer [ stream_identity ]
309- . sort_by { |_tag , _time , record | record [ "partial_ordinal" ] . to_i }
336+ . sort_by { |_tag , _time , record | record [ @partial_ordinal_field ] . to_i }
310337 . map { |_tag , _time , record | record [ @key ] }
311338 else
312339 @buffer [ stream_identity ] . map { |_tag , _time , record | record [ @key ] }
0 commit comments