Skip to content

Commit 2fc2b78

Browse files
committed
feat: add flush and compress methods
1 parent dd18e48 commit 2fc2b78

File tree

2 files changed

+117
-23
lines changed

2 files changed

+117
-23
lines changed

ext/zstdruby/streaming_compress.c

Lines changed: 84 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,9 @@ static void
1313
streaming_compress_mark(void *p)
1414
{
1515
struct streaming_compress_t *sc = p;
16+
rb_gc_mark((VALUE)sc->ctx);
1617
rb_gc_mark(sc->buf);
1718
rb_gc_mark(sc->buf_size);
18-
rb_gc_mark(sc->pos);
1919
}
2020

2121
static void
@@ -50,13 +50,22 @@ rb_streaming_compress_allocate(VALUE klass)
5050
sc->ctx = NULL;
5151
sc->buf = Qnil;
5252
sc->buf_size = 0;
53-
sc->pos = 0;
5453
return obj;
5554
}
5655

5756
static VALUE
5857
rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
5958
{
59+
VALUE compression_level_value;
60+
rb_scan_args(argc, argv, "01", &compression_level_value);
61+
62+
int compression_level;
63+
if (NIL_P(compression_level_value)) {
64+
compression_level = 0; // The default. See ZSTD_CLEVEL_DEFAULT in zstd_compress.c
65+
} else {
66+
compression_level = NUM2INT(compression_level_value);
67+
}
68+
6069
struct streaming_compress_t* sc;
6170
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
6271
size_t const buffOutSize = ZSTD_CStreamOutSize();
@@ -65,14 +74,38 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
6574
if (ctx == NULL) {
6675
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createCCtx error");
6776
}
68-
ZSTD_CCtx_setParameter(ctx, ZSTD_c_compressionLevel, 1);
77+
ZSTD_CCtx_setParameter(ctx, ZSTD_c_compressionLevel, compression_level);
6978
sc->ctx = ctx;
7079
sc->buf = rb_str_new(NULL, buffOutSize);
7180
sc->buf_size = buffOutSize;
7281

7382
return obj;
7483
}
7584

85+
#define FIXNUMARG(val, ifnil) \
86+
(NIL_P((val)) ? (ifnil) \
87+
: (FIX2INT((val))))
88+
#define ARG_CONTINUE(val) FIXNUMARG((val), ZSTD_e_continue)
89+
90+
static VALUE
91+
no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
92+
{
93+
ZSTD_inBuffer input = { NULL, 0, 0 };
94+
const char* output_data = RSTRING_PTR(sc->buf);
95+
VALUE result = rb_str_new(0, 0);
96+
size_t ret;
97+
do {
98+
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
99+
100+
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, endOp);
101+
if (ZSTD_isError(ret)) {
102+
rb_raise(rb_eRuntimeError, "flush error error code: %s", ZSTD_getErrorName(ret));
103+
}
104+
rb_str_cat(result, output.dst, output.pos);
105+
} while (ret > 0);
106+
return result;
107+
}
108+
76109
static VALUE
77110
rb_streaming_compress_compress(VALUE obj, VALUE src)
78111
{
@@ -84,33 +117,58 @@ rb_streaming_compress_compress(VALUE obj, VALUE src)
84117
struct streaming_compress_t* sc;
85118
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
86119
const char* output_data = RSTRING_PTR(sc->buf);
87-
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
88-
89-
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
90-
if (ZSTD_isError(result)) {
91-
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
120+
VALUE result = rb_str_new(0, 0);
121+
while (input.pos < input.size) {
122+
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
123+
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
124+
if (ZSTD_isError(ret)) {
125+
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
126+
}
127+
rb_str_cat(result, output.dst, output.pos);
92128
}
93-
sc->pos += output.pos;
94-
return obj;
129+
return result;
95130
}
96131

97132
static VALUE
98-
rb_streaming_compress_finish(VALUE obj)
133+
rb_streaming_compress_addstr(VALUE obj, VALUE src)
99134
{
100-
ZSTD_inBuffer input = { NULL, 0, 0 };
135+
StringValue(src);
136+
const char* input_data = RSTRING_PTR(src);
137+
size_t input_size = RSTRING_LEN(src);
138+
ZSTD_inBuffer input = { input_data, input_size, 0 };
101139

102140
struct streaming_compress_t* sc;
103141
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
104142
const char* output_data = RSTRING_PTR(sc->buf);
105-
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
106143

107-
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_end);
108-
if (ZSTD_isError(result)) {
109-
rb_raise(rb_eRuntimeError, "finish error error code: %s", ZSTD_getErrorName(result));
144+
while (input.pos < input.size) {
145+
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
146+
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
147+
if (ZSTD_isError(result)) {
148+
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
149+
}
110150
}
111-
sc->pos += output.pos;
112-
rb_str_resize(sc->buf, sc->pos);
113-
return sc->buf;
151+
return obj;
152+
}
153+
154+
155+
static VALUE
156+
rb_streaming_compress_flush(VALUE obj)
157+
{
158+
struct streaming_compress_t* sc;
159+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
160+
VALUE result = no_compress(sc, ZSTD_e_flush);
161+
return result;
162+
}
163+
164+
165+
static VALUE
166+
rb_streaming_compress_finish(VALUE obj)
167+
{
168+
struct streaming_compress_t* sc;
169+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
170+
VALUE result = no_compress(sc, ZSTD_e_end);
171+
return result;
114172
}
115173

116174
extern VALUE rb_mZstd, cStreamingCompress;
@@ -120,7 +178,13 @@ zstd_ruby_streaming_compress_init(void)
120178
VALUE cStreamingCompress = rb_define_class_under(rb_mZstd, "StreamingCompress", rb_cObject);
121179
rb_define_alloc_func(cStreamingCompress, rb_streaming_compress_allocate);
122180
rb_define_method(cStreamingCompress, "initialize", rb_streaming_compress_initialize, -1);
123-
rb_define_method(cStreamingCompress, "<<", rb_streaming_compress_compress, 1);
181+
rb_define_method(cStreamingCompress, "compress", rb_streaming_compress_compress, 1);
182+
rb_define_method(cStreamingCompress, "<<", rb_streaming_compress_addstr, 1);
183+
rb_define_method(cStreamingCompress, "flush", rb_streaming_compress_flush, 0);
124184
rb_define_method(cStreamingCompress, "finish", rb_streaming_compress_finish, 0);
185+
186+
rb_define_const(cStreamingCompress, "CONTINUE", INT2FIX(ZSTD_e_continue));
187+
rb_define_const(cStreamingCompress, "FLUSH", INT2FIX(ZSTD_e_flush));
188+
rb_define_const(cStreamingCompress, "END", INT2FIX(ZSTD_e_end));
125189
}
126190

spec/zstd-ruby-streaming-compress_spec.rb

Lines changed: 33 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -2,14 +2,44 @@
22
require 'zstd-ruby'
33

44
RSpec.describe Zstd::StreamingCompress do
5-
describe 'new' do
5+
describe '<<' do
66
it 'shoud work' do
77
stream = Zstd::StreamingCompress.new
8-
stream << "test" << "test"
8+
stream << "abc" << "def"
99
res = stream.finish
10-
expect(Zstd.decompress(res)).to eq('testtest')
10+
expect(Zstd.decompress(res)).to eq('abcdef')
1111
end
1212
end
1313

14+
describe '<< + flush' do
15+
it 'shoud work' do
16+
stream = Zstd::StreamingCompress.new
17+
stream << "abc" << "def"
18+
res = stream.flush
19+
stream << "ghi"
20+
res << stream.finish
21+
expect(Zstd.decompress(res)).to eq('abcdefghi')
22+
end
23+
end
24+
25+
describe 'compress + flush' do
26+
it 'shoud work' do
27+
stream = Zstd::StreamingCompress.new
28+
res = stream.compress("abc")
29+
res << stream.flush
30+
res << stream.compress("def")
31+
res << stream.finish
32+
expect(Zstd.decompress(res)).to eq('abcdef')
33+
end
34+
end
35+
36+
describe 'compression level' do
37+
it 'shoud work' do
38+
stream = Zstd::StreamingCompress.new(5)
39+
stream << "abc" << "def"
40+
res = stream.finish
41+
expect(Zstd.decompress(res)).to eq('abcdef')
42+
end
43+
end
1444
end
1545

0 commit comments

Comments
 (0)