Skip to content

Commit 7f9f22b

Browse files
authored
Merge pull request #76 from SpringMT/feature/add-stream-writer-and-reader
feat: add StreamWriter and StreamReader
2 parents 5395b01 + ca52ddf commit 7f9f22b

9 files changed

+140
-40
lines changed

ext/zstdruby/streaming_compress.c

Lines changed: 46 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -125,6 +125,7 @@ rb_streaming_compress_compress(VALUE obj, VALUE src)
125125

126126
struct streaming_compress_t* sc;
127127
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
128+
128129
const char* output_data = RSTRING_PTR(sc->buf);
129130
VALUE result = rb_str_new(0, 0);
130131
while (input.pos < input.size) {
@@ -139,27 +140,54 @@ rb_streaming_compress_compress(VALUE obj, VALUE src)
139140
}
140141

141142
static VALUE
142-
rb_streaming_compress_addstr(VALUE obj, VALUE src)
143+
rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
143144
{
144-
StringValue(src);
145-
const char* input_data = RSTRING_PTR(src);
146-
size_t input_size = RSTRING_LEN(src);
147-
ZSTD_inBuffer input = { input_data, input_size, 0 };
148-
145+
size_t total = 0;
146+
VALUE result = rb_str_new(0, 0);
149147
struct streaming_compress_t* sc;
150148
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
151149
const char* output_data = RSTRING_PTR(sc->buf);
152-
153-
while (input.pos < input.size) {
154-
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
155-
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
156-
if (ZSTD_isError(result)) {
157-
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
150+
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
151+
152+
while (argc-- > 0) {
153+
VALUE str = *argv++;
154+
StringValue(str);
155+
const char* input_data = RSTRING_PTR(str);
156+
size_t input_size = RSTRING_LEN(str);
157+
ZSTD_inBuffer input = { input_data, input_size, 0 };
158+
159+
while (input.pos < input.size) {
160+
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
161+
if (ZSTD_isError(ret)) {
162+
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
163+
}
164+
total += RSTRING_LEN(str);
158165
}
159166
}
160-
return obj;
167+
return SIZET2NUM(total);
161168
}
162169

170+
/*
171+
* Document-method: <<
172+
* Same as IO.
173+
*/
174+
#define rb_streaming_compress_addstr rb_io_addstr
175+
/*
176+
* Document-method: printf
177+
* Same as IO.
178+
*/
179+
#define rb_streaming_compress_printf rb_io_printf
180+
/*
181+
* Document-method: print
182+
* Same as IO.
183+
*/
184+
#define rb_streaming_compress_print rb_io_print
185+
/*
186+
* Document-method: puts
187+
* Same as IO.
188+
*/
189+
#define rb_streaming_compress_puts rb_io_puts
190+
163191
static VALUE
164192
rb_streaming_compress_flush(VALUE obj)
165193
{
@@ -186,12 +214,16 @@ zstd_ruby_streaming_compress_init(void)
186214
rb_define_alloc_func(cStreamingCompress, rb_streaming_compress_allocate);
187215
rb_define_method(cStreamingCompress, "initialize", rb_streaming_compress_initialize, -1);
188216
rb_define_method(cStreamingCompress, "compress", rb_streaming_compress_compress, 1);
217+
rb_define_method(cStreamingCompress, "write", rb_streaming_compress_write, -1);
189218
rb_define_method(cStreamingCompress, "<<", rb_streaming_compress_addstr, 1);
219+
rb_define_method(cStreamingCompress, "printf", rb_streaming_compress_printf, -1);
220+
rb_define_method(cStreamingCompress, "print", rb_streaming_compress_print, -1);
221+
rb_define_method(cStreamingCompress, "puts", rb_streaming_compress_puts, -1);
222+
190223
rb_define_method(cStreamingCompress, "flush", rb_streaming_compress_flush, 0);
191224
rb_define_method(cStreamingCompress, "finish", rb_streaming_compress_finish, 0);
192225

193226
rb_define_const(cStreamingCompress, "CONTINUE", INT2FIX(ZSTD_e_continue));
194227
rb_define_const(cStreamingCompress, "FLUSH", INT2FIX(ZSTD_e_flush));
195228
rb_define_const(cStreamingCompress, "END", INT2FIX(ZSTD_e_end));
196229
}
197-

ext/zstdruby/streaming_decompress.c

Lines changed: 1 addition & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -101,35 +101,13 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src)
101101
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
102102
size_t const ret = ZSTD_decompressStream(sd->ctx, &output, &input);
103103
if (ZSTD_isError(ret)) {
104-
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
104+
rb_raise(rb_eRuntimeError, "decompress error error code: %s", ZSTD_getErrorName(ret));
105105
}
106106
rb_str_cat(result, output.dst, output.pos);
107107
}
108108
return result;
109109
}
110110

111-
static VALUE
112-
rb_streaming_decompress_addstr(VALUE obj, VALUE src)
113-
{
114-
StringValue(src);
115-
const char* input_data = RSTRING_PTR(src);
116-
size_t input_size = RSTRING_LEN(src);
117-
ZSTD_inBuffer input = { input_data, input_size, 0 };
118-
119-
struct streaming_decompress_t* sd;
120-
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
121-
const char* output_data = RSTRING_PTR(sd->buf);
122-
123-
while (input.pos < input.size) {
124-
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
125-
size_t const result = ZSTD_decompressStream(sd->ctx, &output, &input);
126-
if (ZSTD_isError(result)) {
127-
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
128-
}
129-
}
130-
return obj;
131-
}
132-
133111
extern VALUE rb_mZstd, cStreamingDecompress;
134112
void
135113
zstd_ruby_streaming_decompress_init(void)
@@ -138,6 +116,4 @@ zstd_ruby_streaming_decompress_init(void)
138116
rb_define_alloc_func(cStreamingDecompress, rb_streaming_decompress_allocate);
139117
rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, 0);
140118
rb_define_method(cStreamingDecompress, "decompress", rb_streaming_decompress_decompress, 1);
141-
rb_define_method(cStreamingDecompress, "<<", rb_streaming_decompress_addstr, 1);
142119
}
143-

lib/zstd-ruby/stream_reader.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
module Zstd
2+
# @todo Exprimental
3+
class StreamReader
4+
def initialize(io)
5+
@io = io
6+
@stream = Zstd::StreamingDecompress.new
7+
end
8+
9+
def read(length)
10+
if @io.eof?
11+
raise StandardError, "EOF"
12+
end
13+
data = @io.read(length)
14+
@stream.decompress(data)
15+
end
16+
17+
def close
18+
@io.write(@stream.finish)
19+
@io.close
20+
end
21+
end
22+
end

lib/zstd-ruby/stream_writer.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
module Zstd
2+
# @todo Exprimental
3+
class StreamWriter
4+
def initialize(io, level: nil)
5+
@io = io
6+
@stream = Zstd::StreamingCompress.new(level)
7+
end
8+
9+
def write(*data)
10+
@stream.write(*data)
11+
@io.write(@stream.flush)
12+
end
13+
14+
def finish
15+
@io.write(@stream.finish)
16+
end
17+
18+
def close
19+
@io.write(@stream.finish)
20+
@io.close
21+
end
22+
end
23+
end

spec/zstd-ruby-stream_reader_spec.rb

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
require "spec_helper"
2+
require 'zstd-ruby'
3+
require 'zstd-ruby/stream_writer'
4+
require 'zstd-ruby/stream_reader'
5+
require 'pry'
6+
7+
RSpec.describe Zstd::StreamReader do
8+
describe 'read' do
9+
it 'shoud work' do
10+
io = StringIO.new
11+
writer = Zstd::StreamWriter.new(io)
12+
writer.write("abc")
13+
writer.write("def")
14+
writer.finish
15+
io.rewind
16+
17+
reader = Zstd::StreamReader.new(io)
18+
expect(reader.read(10)).to eq('a')
19+
expect(reader.read(10)).to eq('bcdef')
20+
expect(reader.read(10)).to eq('')
21+
end
22+
end
23+
end

spec/zstd-ruby-stream_writer_spec.rb

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
require "spec_helper"
2+
require 'zstd-ruby'
3+
require 'zstd-ruby/stream_writer'
4+
5+
RSpec.describe Zstd::StreamWriter do
6+
describe 'write' do
7+
it 'shoud work' do
8+
io = StringIO.new
9+
stream = Zstd::StreamWriter.new(io)
10+
stream.write("abc")
11+
stream.write("def")
12+
stream.finish
13+
io.rewind
14+
expect(Zstd.decompress(io.read)).to eq('abcdef')
15+
end
16+
end
17+
end

spec/zstd-ruby-streaming-compress_spec.rb

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,4 +66,3 @@
6666
end
6767
end
6868
end
69-

spec/zstd-ruby_spec.rb

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,13 @@ def to_str
6060
expect(decompressed).to eq('')
6161
end
6262

63+
it 'should work for non-ascii string' do
64+
compressed = Zstd.compress('あああ')
65+
expect(compressed.bytesize).to eq(18)
66+
decompressed = Zstd.decompress(compressed)
67+
expect(decompressed.force_encoding('UTF-8')).to eq('あああ')
68+
end
69+
6370
it 'should raise exception with unsupported object' do
6471
expect { Zstd.decompress(Object.new) }.to raise_error(TypeError)
6572
end

zstd-ruby.gemspec

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,4 +35,5 @@ Gem::Specification.new do |spec|
3535
spec.add_development_dependency "rake", "~> 13.0"
3636
spec.add_development_dependency "rake-compiler", '~> 1'
3737
spec.add_development_dependency "rspec", "~> 3.0"
38+
spec.add_development_dependency "pry"
3839
end

0 commit comments

Comments
 (0)