Skip to content

Commit bdc221c

Browse files
authored
Merge pull request #41 from SpringMT/feature/add-streaming-compress
feat: add streaming compress
2 parents c66e860 + e798e54 commit bdc221c

File tree

6 files changed

+254
-6
lines changed

6 files changed

+254
-6
lines changed

ext/zstdruby/zstdruby.h renamed to ext/zstdruby/common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,7 @@
22
#define ZSTD_RUBY_H 1
33

44
#include "ruby.h"
5+
#include "./libzstd/zstd.h"
6+
57

68
#endif /* ZSTD_RUBY_H */

ext/zstdruby/main.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#include <common.h>
2+
VALUE rb_mZstd;
3+
void zstd_ruby_init(void);
4+
void zstd_ruby_streaming_compress_init(void);
5+
6+
void
7+
Init_zstdruby(void)
8+
{
9+
rb_mZstd = rb_define_module("Zstd");
10+
zstd_ruby_init();
11+
zstd_ruby_streaming_compress_init();
12+
}

ext/zstdruby/streaming_compress.c

Lines changed: 186 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,186 @@
1+
#include <common.h>
2+
#include <streaming_compress.h>
3+
4+
struct streaming_compress_t {
5+
ZSTD_CCtx* ctx;
6+
VALUE buf;
7+
size_t buf_size;
8+
size_t pos;
9+
};
10+
11+
static void
12+
streaming_compress_mark(void *p)
13+
{
14+
struct streaming_compress_t *sc = p;
15+
rb_gc_mark((VALUE)sc->ctx);
16+
rb_gc_mark(sc->buf);
17+
rb_gc_mark(sc->buf_size);
18+
}
19+
20+
static void
21+
streaming_compress_free(void *p)
22+
{
23+
struct streaming_compress_t *sc = p;
24+
ZSTD_CCtx* ctx = sc->ctx;
25+
if (ctx != NULL) {
26+
ZSTD_freeCCtx(ctx);
27+
}
28+
xfree(sc);
29+
}
30+
31+
static size_t
32+
streaming_compress_memsize(const void *p)
33+
{
34+
return sizeof(struct streaming_compress_t);
35+
}
36+
37+
static const rb_data_type_t streaming_compress_type = {
38+
"streaming_compress",
39+
{ streaming_compress_mark, streaming_compress_free, streaming_compress_memsize, },
40+
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
41+
};
42+
43+
static VALUE
44+
rb_streaming_compress_allocate(VALUE klass)
45+
{
46+
struct streaming_compress_t* sc;
47+
VALUE obj = TypedData_Make_Struct(klass, struct streaming_compress_t, &streaming_compress_type, sc);
48+
sc->ctx = NULL;
49+
sc->buf = Qnil;
50+
sc->buf_size = 0;
51+
return obj;
52+
}
53+
54+
static VALUE
55+
rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
56+
{
57+
VALUE compression_level_value;
58+
rb_scan_args(argc, argv, "01", &compression_level_value);
59+
60+
int compression_level;
61+
if (NIL_P(compression_level_value)) {
62+
compression_level = 0; // The default. See ZSTD_CLEVEL_DEFAULT in zstd_compress.c
63+
} else {
64+
compression_level = NUM2INT(compression_level_value);
65+
}
66+
67+
struct streaming_compress_t* sc;
68+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
69+
size_t const buffOutSize = ZSTD_CStreamOutSize();
70+
71+
ZSTD_CCtx* ctx = ZSTD_createCCtx();
72+
if (ctx == NULL) {
73+
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createCCtx error");
74+
}
75+
ZSTD_CCtx_setParameter(ctx, ZSTD_c_compressionLevel, compression_level);
76+
sc->ctx = ctx;
77+
sc->buf = rb_str_new(NULL, buffOutSize);
78+
sc->buf_size = buffOutSize;
79+
80+
return obj;
81+
}
82+
83+
#define FIXNUMARG(val, ifnil) \
84+
(NIL_P((val)) ? (ifnil) \
85+
: (FIX2INT((val))))
86+
#define ARG_CONTINUE(val) FIXNUMARG((val), ZSTD_e_continue)
87+
88+
static VALUE
89+
no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
90+
{
91+
ZSTD_inBuffer input = { NULL, 0, 0 };
92+
const char* output_data = RSTRING_PTR(sc->buf);
93+
VALUE result = rb_str_new(0, 0);
94+
size_t ret;
95+
do {
96+
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
97+
98+
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, endOp);
99+
if (ZSTD_isError(ret)) {
100+
rb_raise(rb_eRuntimeError, "flush error error code: %s", ZSTD_getErrorName(ret));
101+
}
102+
rb_str_cat(result, output.dst, output.pos);
103+
} while (ret > 0);
104+
return result;
105+
}
106+
107+
static VALUE
108+
rb_streaming_compress_compress(VALUE obj, VALUE src)
109+
{
110+
StringValue(src);
111+
const char* input_data = RSTRING_PTR(src);
112+
size_t input_size = RSTRING_LEN(src);
113+
ZSTD_inBuffer input = { input_data, input_size, 0 };
114+
115+
struct streaming_compress_t* sc;
116+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
117+
const char* output_data = RSTRING_PTR(sc->buf);
118+
VALUE result = rb_str_new(0, 0);
119+
while (input.pos < input.size) {
120+
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
121+
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
122+
if (ZSTD_isError(ret)) {
123+
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
124+
}
125+
rb_str_cat(result, output.dst, output.pos);
126+
}
127+
return result;
128+
}
129+
130+
static VALUE
131+
rb_streaming_compress_addstr(VALUE obj, VALUE src)
132+
{
133+
StringValue(src);
134+
const char* input_data = RSTRING_PTR(src);
135+
size_t input_size = RSTRING_LEN(src);
136+
ZSTD_inBuffer input = { input_data, input_size, 0 };
137+
138+
struct streaming_compress_t* sc;
139+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
140+
const char* output_data = RSTRING_PTR(sc->buf);
141+
142+
while (input.pos < input.size) {
143+
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
144+
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
145+
if (ZSTD_isError(result)) {
146+
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
147+
}
148+
}
149+
return obj;
150+
}
151+
152+
static VALUE
153+
rb_streaming_compress_flush(VALUE obj)
154+
{
155+
struct streaming_compress_t* sc;
156+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
157+
VALUE result = no_compress(sc, ZSTD_e_flush);
158+
return result;
159+
}
160+
161+
static VALUE
162+
rb_streaming_compress_finish(VALUE obj)
163+
{
164+
struct streaming_compress_t* sc;
165+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
166+
VALUE result = no_compress(sc, ZSTD_e_end);
167+
return result;
168+
}
169+
170+
extern VALUE rb_mZstd, cStreamingCompress;
171+
void
172+
zstd_ruby_streaming_compress_init(void)
173+
{
174+
VALUE cStreamingCompress = rb_define_class_under(rb_mZstd, "StreamingCompress", rb_cObject);
175+
rb_define_alloc_func(cStreamingCompress, rb_streaming_compress_allocate);
176+
rb_define_method(cStreamingCompress, "initialize", rb_streaming_compress_initialize, -1);
177+
rb_define_method(cStreamingCompress, "compress", rb_streaming_compress_compress, 1);
178+
rb_define_method(cStreamingCompress, "<<", rb_streaming_compress_addstr, 1);
179+
rb_define_method(cStreamingCompress, "flush", rb_streaming_compress_flush, 0);
180+
rb_define_method(cStreamingCompress, "finish", rb_streaming_compress_finish, 0);
181+
182+
rb_define_const(cStreamingCompress, "CONTINUE", INT2FIX(ZSTD_e_continue));
183+
rb_define_const(cStreamingCompress, "FLUSH", INT2FIX(ZSTD_e_flush));
184+
rb_define_const(cStreamingCompress, "END", INT2FIX(ZSTD_e_end));
185+
}
186+

ext/zstdruby/streaming_compress.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#if !defined(STREAMING_COMPRESS_H)
2+
#define STREAMING_COMPRESS_H
3+
4+
5+
#endif // STREAMING_COMPRESS_H

ext/zstdruby/zstdruby.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
#include "zstdruby.h"
2-
#include "./libzstd/zstd.h"
1+
#include <common.h>
2+
3+
extern VALUE rb_mZstd;
34

45
static VALUE zstdVersion(VALUE self)
56
{
@@ -104,12 +105,9 @@ static VALUE decompress(VALUE self, VALUE input)
104105
return output;
105106
}
106107

107-
VALUE rb_mZstd;
108-
109108
void
110-
Init_zstdruby(void)
109+
zstd_ruby_init(void)
111110
{
112-
rb_mZstd = rb_define_module("Zstd");
113111
rb_define_module_function(rb_mZstd, "zstd_version", zstdVersion, 0);
114112
rb_define_module_function(rb_mZstd, "compress", compress, -1);
115113
rb_define_module_function(rb_mZstd, "decompress", decompress, 1);
Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
require "spec_helper"
2+
require 'zstd-ruby'
3+
4+
RSpec.describe Zstd::StreamingCompress do
5+
describe '<<' do
6+
it 'shoud work' do
7+
stream = Zstd::StreamingCompress.new
8+
stream << "abc" << "def"
9+
res = stream.finish
10+
expect(Zstd.decompress(res)).to eq('abcdef')
11+
end
12+
end
13+
14+
describe '<< + flush' do
15+
it 'shoud work' do
16+
stream = Zstd::StreamingCompress.new
17+
stream << "abc" << "def"
18+
res = stream.flush
19+
stream << "ghi"
20+
res << stream.finish
21+
expect(Zstd.decompress(res)).to eq('abcdefghi')
22+
end
23+
end
24+
25+
describe 'compress + flush' do
26+
it 'shoud work' do
27+
stream = Zstd::StreamingCompress.new
28+
res = stream.compress("abc")
29+
res << stream.flush
30+
res << stream.compress("def")
31+
res << stream.finish
32+
expect(Zstd.decompress(res)).to eq('abcdef')
33+
end
34+
end
35+
36+
describe 'compression level' do
37+
it 'shoud work' do
38+
stream = Zstd::StreamingCompress.new(5)
39+
stream << "abc" << "def"
40+
res = stream.finish
41+
expect(Zstd.decompress(res)).to eq('abcdef')
42+
end
43+
end
44+
end
45+

0 commit comments

Comments
 (0)