diff --git a/lib/fluent/plugin/buffer/file_chunk.rb b/lib/fluent/plugin/buffer/file_chunk.rb index 1c42b4705b..24dbaaeb55 100644 --- a/lib/fluent/plugin/buffer/file_chunk.rb +++ b/lib/fluent/plugin/buffer/file_chunk.rb @@ -219,13 +219,17 @@ def restore_metadata(bindata) # old type of restore data = Fluent::MessagePackFactory.msgpack_unpacker(symbolize_keys: true).feed(bindata).read rescue {} end + raise FileChunkError, "invalid meta data" if data.nil? || !data.is_a?(Hash) + raise FileChunkError, "invalid unique_id" unless data[:id] + raise FileChunkError, "invalid created_at" unless data[:c].to_i > 0 + raise FileChunkError, "invalid modified_at" unless data[:m].to_i > 0 now = Fluent::Clock.real_now - @unique_id = data[:id] || self.class.unique_id_from_path(@path) || @unique_id + @unique_id = data[:id] @size = data[:s] || 0 - @created_at = data.fetch(:c, now.to_i) - @modified_at = data.fetch(:m, now.to_i) + @created_at = data[:c] + @modified_at = data[:m] @metadata.timekey = data[:timekey] @metadata.tag = data[:tag] diff --git a/test/plugin/test_buf_file.rb b/test/plugin/test_buf_file.rb index ac23334140..17174d9c48 100644 --- a/test/plugin/test_buf_file.rb +++ b/test/plugin/test_buf_file.rb @@ -1322,6 +1322,36 @@ def compare_log(plugin, msg) assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(c2id)}.log") } end + test '#resume backups enqueued broken metadata which has broken id, c, m fields' do + setup_plugins({'path' => @bufpath}) + cid, path = create_first_chunk('q') + metadata = File.read(path + '.meta') + File.open(path + '.meta', 'wb') { |f| f.write(metadata[0..6] + "\0" * (metadata.size - 6)) } # create enqueued broken meta file + + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + + compare_log(@p, 'enqueued meta file is broken') + assert { not File.exist?(path) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(cid)}.log") } + end + + test '#resume backups enqueued broken metadata by truncated' do + setup_plugins({'path' => @bufpath}) + cid, path = create_first_chunk('q') + metadata = File.read(path + '.meta') + File.open(path + '.meta', 'wb') { |f| f.write(metadata[0..-2]) } # create enqueued broken meta file with last byte truncated + + Fluent::SystemConfig.overwrite_system_config('root_dir' => @bufdir) do + @p.start + end + + compare_log(@p, 'enqueued meta file is broken') + assert { not File.exist?(path) } + assert { File.exist?("#{@bufdir}/backup/worker0/#{@id_output}/#{@d.dump_unique_id_hex(cid)}.log") } + end + test '#resume throws away broken chunk with disable_chunk_backup' do setup_plugins({'path' => @bufpath, 'disable_chunk_backup' => true}) c1id, _ = create_first_chunk('b')