Skip to content

Commit dd18e48

Browse files
committed
feat: add streaming compress
1 parent fe1b63f commit dd18e48

File tree

6 files changed

+164
-6
lines changed

6 files changed

+164
-6
lines changed

ext/zstdruby/zstdruby.h renamed to ext/zstdruby/common.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,5 +2,7 @@
22
#define ZSTD_RUBY_H 1
33

44
#include "ruby.h"
5+
#include "./libzstd/zstd.h"
6+
57

68
#endif /* ZSTD_RUBY_H */

ext/zstdruby/main.c

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,12 @@
1+
#include <common.h>
2+
VALUE rb_mZstd;
3+
void zstd_ruby_init(void);
4+
void zstd_ruby_streaming_compress_init(void);
5+
6+
void
7+
Init_zstdruby(void)
8+
{
9+
rb_mZstd = rb_define_module("Zstd");
10+
zstd_ruby_init();
11+
zstd_ruby_streaming_compress_init();
12+
}

ext/zstdruby/streaming_compress.c

Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
#include <common.h>
2+
#include <streaming_compress.h>
3+
4+
struct streaming_compress_t {
5+
ZSTD_CCtx* ctx;
6+
VALUE buf;
7+
size_t buf_size;
8+
size_t pos;
9+
};
10+
11+
12+
static void
13+
streaming_compress_mark(void *p)
14+
{
15+
struct streaming_compress_t *sc = p;
16+
rb_gc_mark(sc->buf);
17+
rb_gc_mark(sc->buf_size);
18+
rb_gc_mark(sc->pos);
19+
}
20+
21+
static void
22+
streaming_compress_free(void *p)
23+
{
24+
struct streaming_compress_t *sc = p;
25+
ZSTD_CCtx* ctx = sc->ctx;
26+
if (ctx != NULL) {
27+
ZSTD_freeCCtx(ctx);
28+
}
29+
xfree(sc);
30+
}
31+
32+
static size_t
33+
streaming_compress_memsize(const void *p)
34+
{
35+
/* n.b. this does not track memory managed via zalloc/zfree callbacks */
36+
return sizeof(struct streaming_compress_t);
37+
}
38+
39+
static const rb_data_type_t streaming_compress_type = {
40+
"streaming_compress",
41+
{ streaming_compress_mark, streaming_compress_free, streaming_compress_memsize, },
42+
0, 0, RUBY_TYPED_FREE_IMMEDIATELY
43+
};
44+
45+
static VALUE
46+
rb_streaming_compress_allocate(VALUE klass)
47+
{
48+
struct streaming_compress_t* sc;
49+
VALUE obj = TypedData_Make_Struct(klass, struct streaming_compress_t, &streaming_compress_type, sc);
50+
sc->ctx = NULL;
51+
sc->buf = Qnil;
52+
sc->buf_size = 0;
53+
sc->pos = 0;
54+
return obj;
55+
}
56+
57+
static VALUE
58+
rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
59+
{
60+
struct streaming_compress_t* sc;
61+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
62+
size_t const buffOutSize = ZSTD_CStreamOutSize();
63+
64+
ZSTD_CCtx* ctx = ZSTD_createCCtx();
65+
if (ctx == NULL) {
66+
rb_raise(rb_eRuntimeError, "%s", "ZSTD_createCCtx error");
67+
}
68+
ZSTD_CCtx_setParameter(ctx, ZSTD_c_compressionLevel, 1);
69+
sc->ctx = ctx;
70+
sc->buf = rb_str_new(NULL, buffOutSize);
71+
sc->buf_size = buffOutSize;
72+
73+
return obj;
74+
}
75+
76+
static VALUE
77+
rb_streaming_compress_compress(VALUE obj, VALUE src)
78+
{
79+
StringValue(src);
80+
const char* input_data = RSTRING_PTR(src);
81+
size_t input_size = RSTRING_LEN(src);
82+
ZSTD_inBuffer input = { input_data, input_size, 0 };
83+
84+
struct streaming_compress_t* sc;
85+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
86+
const char* output_data = RSTRING_PTR(sc->buf);
87+
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
88+
89+
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
90+
if (ZSTD_isError(result)) {
91+
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
92+
}
93+
sc->pos += output.pos;
94+
return obj;
95+
}
96+
97+
static VALUE
98+
rb_streaming_compress_finish(VALUE obj)
99+
{
100+
ZSTD_inBuffer input = { NULL, 0, 0 };
101+
102+
struct streaming_compress_t* sc;
103+
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
104+
const char* output_data = RSTRING_PTR(sc->buf);
105+
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
106+
107+
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_end);
108+
if (ZSTD_isError(result)) {
109+
rb_raise(rb_eRuntimeError, "finish error error code: %s", ZSTD_getErrorName(result));
110+
}
111+
sc->pos += output.pos;
112+
rb_str_resize(sc->buf, sc->pos);
113+
return sc->buf;
114+
}
115+
116+
extern VALUE rb_mZstd, cStreamingCompress;
117+
void
118+
zstd_ruby_streaming_compress_init(void)
119+
{
120+
VALUE cStreamingCompress = rb_define_class_under(rb_mZstd, "StreamingCompress", rb_cObject);
121+
rb_define_alloc_func(cStreamingCompress, rb_streaming_compress_allocate);
122+
rb_define_method(cStreamingCompress, "initialize", rb_streaming_compress_initialize, -1);
123+
rb_define_method(cStreamingCompress, "<<", rb_streaming_compress_compress, 1);
124+
rb_define_method(cStreamingCompress, "finish", rb_streaming_compress_finish, 0);
125+
}
126+

ext/zstdruby/streaming_compress.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
#if !defined(STREAMING_COMPRESS_H)
2+
#define STREAMING_COMPRESS_H
3+
4+
5+
#endif // STREAMING_COMPRESS_H

ext/zstdruby/zstdruby.c

Lines changed: 4 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
1-
#include "zstdruby.h"
2-
#include "./libzstd/zstd.h"
1+
#include <common.h>
2+
3+
extern VALUE rb_mZstd;
34

45
static VALUE zstdVersion(VALUE self)
56
{
@@ -104,12 +105,9 @@ static VALUE decompress(VALUE self, VALUE input)
104105
return output;
105106
}
106107

107-
VALUE rb_mZstd;
108-
109108
void
110-
Init_zstdruby(void)
109+
zstd_ruby_init(void)
111110
{
112-
rb_mZstd = rb_define_module("Zstd");
113111
rb_define_module_function(rb_mZstd, "zstd_version", zstdVersion, 0);
114112
rb_define_module_function(rb_mZstd, "compress", compress, -1);
115113
rb_define_module_function(rb_mZstd, "decompress", decompress, 1);
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
require "spec_helper"
2+
require 'zstd-ruby'
3+
4+
RSpec.describe Zstd::StreamingCompress do
5+
describe 'new' do
6+
it 'shoud work' do
7+
stream = Zstd::StreamingCompress.new
8+
stream << "test" << "test"
9+
res = stream.finish
10+
expect(Zstd.decompress(res)).to eq('testtest')
11+
end
12+
end
13+
14+
end
15+

0 commit comments

Comments
 (0)