Skip to content

Commit 894a9f8

Browse files
authored
Merge pull request #43 from SpringMT/feature/add-streaming-decompression
feat: add streaming decompression
2 parents 3ff6a32 + 3616e4f commit 894a9f8

File tree

7 files changed

+163
-5
lines changed

7 files changed

+163
-5
lines changed

.github/workflows/ruby.yml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,14 @@ jobs:
1515
runs-on: ubuntu-latest
1616
strategy:
1717
matrix:
18-
ruby-version: ['2.6', '2.7', '3.0', '3.1']
18+
ruby-version: ['2.7', '3.0', '3.1']
1919

2020
steps:
2121
- uses: actions/checkout@v2
2222
- name: Set up Ruby
2323
# To automatically get bug fixes and new Ruby versions for ruby/setup-ruby,
2424
# change this to (see https://github.com/ruby/setup-ruby#versioning):
25-
# uses: ruby/setup-ruby@v1
26-
uses: ruby/setup-ruby@168d6a54b412713c0ed60998a78093a270ca8d84
25+
uses: ruby/setup-ruby@v1
2726
with:
2827
ruby-version: ${{ matrix.ruby-version }}
2928
bundler-cache: true # runs 'bundle install' and caches installed gems automatically

.gitignore

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,3 +17,5 @@ vendor/
1717

1818
# rspec failure tracking
1919
.rspec_status
20+
21+
.ruby-version

ext/zstdruby/main.c

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,13 @@
22
VALUE rb_mZstd;
33
void zstd_ruby_init(void);
44
void zstd_ruby_streaming_compress_init(void);
5+
void zstd_ruby_streaming_decompress_init(void);
56

67
void
78
Init_zstdruby(void)
89
{
910
rb_mZstd = rb_define_module("Zstd");
1011
zstd_ruby_init();
1112
zstd_ruby_streaming_compress_init();
13+
zstd_ruby_streaming_decompress_init();
1214
}

ext/zstdruby/streaming_compress.c

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,13 @@ struct streaming_compress_t {
55
ZSTD_CCtx* ctx;
66
VALUE buf;
77
size_t buf_size;
8-
size_t pos;
98
};
109

1110
static void
1211
streaming_compress_mark(void *p)
1312
{
1413
struct streaming_compress_t *sc = p;
15-
rb_gc_mark((VALUE)sc->ctx);
14+
// rb_gc_mark((VALUE)sc->ctx);
1615
rb_gc_mark(sc->buf);
1716
rb_gc_mark(sc->buf_size);
1817
}

ext/zstdruby/streaming_decompress.c

Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
#include <common.h>
2+
3+
struct streaming_decompress_t {
4+
ZSTD_DCtx* ctx;
5+
VALUE buf;
6+
size_t buf_size;
7+
};
8+
9+
static void
10+
streaming_decompress_mark(void *p)
11+
{
12+
struct streaming_decompress_t *sd = p;
13+
// rb_gc_mark((VALUE)sd->ctx);
14+
rb_gc_mark(sd->buf);
15+
rb_gc_mark(sd->buf_size);
16+
}
17+
18+
static void
19+
streaming_decompress_free(void *p)
20+
{
21+
struct streaming_decompress_t *sd = p;
22+
ZSTD_DCtx* ctx = sd->ctx;
23+
if (ctx != NULL) {
24+
ZSTD_freeDCtx(ctx);
25+
}
26+
xfree(sd);
27+
}
28+
29+
static size_t
30+
streaming_decompress_memsize(const void *p)
31+
{
32+
return sizeof(struct streaming_decompress_t);
33+
}
34+
35+
static const rb_data_type_t streaming_decompress_type = {
36+
"streaming_decompress",
37+
{ streaming_decompress_mark, streaming_decompress_free, streaming_decompress_memsize, },
38+
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
39+
};
40+
41+
static VALUE
42+
rb_streaming_decompress_allocate(VALUE klass)
43+
{
44+
struct streaming_decompress_t* sd;
45+
VALUE obj = TypedData_Make_Struct(klass, struct streaming_decompress_t, &streaming_decompress_type, sd);
46+
sd->ctx = NULL;
47+
sd->buf = Qnil;
48+
sd->buf_size = 0;
49+
return obj;
50+
}
51+
52+
static VALUE
53+
rb_streaming_decompress_initialize(VALUE obj)
54+
{
55+
struct streaming_decompress_t* sd;
56+
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
57+
size_t const buffOutSize = ZSTD_DStreamOutSize();
58+
59+
ZSTD_DCtx* ctx = ZSTD_createDCtx();
60+
if (ctx == NULL) {
61+
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createDCtx error");
62+
}
63+
sd->ctx = ctx;
64+
sd->buf = rb_str_new(NULL, buffOutSize);
65+
sd->buf_size = buffOutSize;
66+
67+
return obj;
68+
}
69+
70+
static VALUE
71+
rb_streaming_decompress_decompress(VALUE obj, VALUE src)
72+
{
73+
StringValue(src);
74+
const char* input_data = RSTRING_PTR(src);
75+
size_t input_size = RSTRING_LEN(src);
76+
ZSTD_inBuffer input = { input_data, input_size, 0 };
77+
78+
struct streaming_decompress_t* sd;
79+
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
80+
const char* output_data = RSTRING_PTR(sd->buf);
81+
VALUE result = rb_str_new(0, 0);
82+
while (input.pos < input.size) {
83+
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
84+
size_t const ret = ZSTD_decompressStream(sd->ctx, &output, &input);
85+
if (ZSTD_isError(ret)) {
86+
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
87+
}
88+
rb_str_cat(result, output.dst, output.pos);
89+
}
90+
return result;
91+
}
92+
93+
static VALUE
94+
rb_streaming_decompress_addstr(VALUE obj, VALUE src)
95+
{
96+
StringValue(src);
97+
const char* input_data = RSTRING_PTR(src);
98+
size_t input_size = RSTRING_LEN(src);
99+
ZSTD_inBuffer input = { input_data, input_size, 0 };
100+
101+
struct streaming_decompress_t* sd;
102+
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
103+
const char* output_data = RSTRING_PTR(sd->buf);
104+
105+
while (input.pos < input.size) {
106+
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
107+
size_t const result = ZSTD_decompressStream(sd->ctx, &output, &input);
108+
if (ZSTD_isError(result)) {
109+
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
110+
}
111+
}
112+
return obj;
113+
}
114+
115+
extern VALUE rb_mZstd, cStreamingDecompress;
116+
void
117+
zstd_ruby_streaming_decompress_init(void)
118+
{
119+
VALUE cStreamingDecompress = rb_define_class_under(rb_mZstd, "StreamingDecompress", rb_cObject);
120+
rb_define_alloc_func(cStreamingDecompress, rb_streaming_decompress_allocate);
121+
rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, 0);
122+
rb_define_method(cStreamingDecompress, "decompress", rb_streaming_decompress_decompress, 1);
123+
rb_define_method(cStreamingDecompress, "<<", rb_streaming_decompress_addstr, 1);
124+
}
125+

spec/zstd-ruby-streaming-compress_spec.rb

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,17 @@
1111
end
1212
end
1313

14+
describe '<< + GC.compat' do
15+
it 'shoud work' do
16+
stream = Zstd::StreamingCompress.new
17+
stream << "abc" << "def"
18+
GC.compact
19+
stream << "ghi"
20+
res = stream.finish
21+
expect(Zstd.decompress(res)).to eq('abcdefghi')
22+
end
23+
end
24+
1425
describe '<< + flush' do
1526
it 'shoud work' do
1627
stream = Zstd::StreamingCompress.new
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
require "spec_helper"
2+
require 'zstd-ruby'
3+
require 'securerandom'
4+
5+
RSpec.describe Zstd::StreamingDecompress do
6+
describe 'streaming decompress' do
7+
it 'shoud work' do
8+
# str = SecureRandom.hex(150)
9+
str = "foo bar buzz" * 100
10+
cstr = Zstd.compress(str)
11+
stream = Zstd::StreamingDecompress.new
12+
result = ''
13+
result << stream.decompress(cstr[0, 5])
14+
result << stream.decompress(cstr[5, 5])
15+
result << stream.decompress(cstr[10..-1])
16+
expect(result).to eq(str)
17+
end
18+
end
19+
end
20+

0 commit comments

Comments
 (0)