diff --git a/lib/fluent/plugin/compressable.rb b/lib/fluent/plugin/compressable.rb index 2ec2122931..94c595b1de 100644 --- a/lib/fluent/plugin/compressable.rb +++ b/lib/fluent/plugin/compressable.rb @@ -26,14 +26,29 @@ def compress(data, type: :gzip, **kwargs) io = output_io || StringIO.new if type == :gzip writer = Zlib::GzipWriter.new(io) + writer.write(data) + writer.finish + io.string elsif type == :zstd - writer = Zstd::StreamWriter.new(io) + sc = Zstd::StreamingCompress.new(**kwargs) + chunk_size = kwargs[:chunk_size] || (256 * 1024) + + if data.is_a?(String) + i = 0 + while i < data.bytesize + io << sc.compress(data.byteslice(i, chunk_size)) + i += chunk_size + end + else + data.each { |d| io << sc.compress(d) } + end + + # Need to close frame to prevent unknown frame descriptor errors + io << sc.finish + io.string else raise ArgumentError, "Unknown compression type: #{type}" end - writer.write(data) - writer.finish - output_io || io.string end # compressed_data is String like `compress(data1) + compress(data2) + ... + compress(dataN)`