Skip to content

Commit 4df4d62

Browse files
authored
Merge pull request #53 from tidewise/nogvl
unlock the GVL for compression/decompression operations
2 parents ad0437e + 2759041 commit 4df4d62

File tree

4 files changed

+123
-22
lines changed

4 files changed

+123
-22
lines changed

ext/zstdruby/streaming_compress.c

Lines changed: 42 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,12 @@
11
#include <common.h>
22
#include <streaming_compress.h>
3+
#include <ruby/thread.h>
34

45
struct streaming_compress_t {
56
ZSTD_CCtx* ctx;
67
VALUE buf;
78
size_t buf_size;
9+
char nogvl;
810
};
911

1012
static void
@@ -52,11 +54,18 @@ static VALUE
5254
rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
5355
{
5456
VALUE compression_level_value;
55-
rb_scan_args(argc, argv, "01", &compression_level_value);
57+
VALUE kwargs;
58+
rb_scan_args(argc, argv, "01:", &compression_level_value, &kwargs);
5659
int compression_level = convert_compression_level(compression_level_value);
5760

61+
ID kwargs_keys[1];
62+
kwargs_keys[0] = rb_intern("no_gvl");
63+
VALUE kwargs_values[1];
64+
rb_get_kwargs(kwargs, kwargs_keys, 0, 1, kwargs_values);
65+
5866
struct streaming_compress_t* sc;
5967
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
68+
sc->nogvl = kwargs_values[0] != Qundef && RTEST(kwargs_values[0]);
6069
size_t const buffOutSize = ZSTD_CStreamOutSize();
6170

6271
ZSTD_CCtx* ctx = ZSTD_createCCtx();
@@ -76,6 +85,35 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
7685
: (FIX2INT((val))))
7786
#define ARG_CONTINUE(val) FIXNUMARG((val), ZSTD_e_continue)
7887

88+
struct compress_stream_nogvl_t {
89+
ZSTD_CCtx* ctx;
90+
ZSTD_outBuffer* output;
91+
ZSTD_inBuffer* input;
92+
ZSTD_EndDirective endOp;
93+
size_t ret;
94+
};
95+
96+
static void*
97+
compressStream2_nogvl(void* arg)
98+
{
99+
struct compress_stream_nogvl_t* params = arg;
100+
params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp);
101+
return NULL;
102+
}
103+
104+
static size_t
105+
compressStream2(char nogvl, ZSTD_CCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp)
106+
{
107+
struct compress_stream_nogvl_t params = { ctx, output, input, endOp, 0 };
108+
if (nogvl) {
109+
rb_thread_call_without_gvl(compressStream2_nogvl, &params, NULL, NULL);
110+
}
111+
else {
112+
compressStream2_nogvl(&params);
113+
}
114+
return params.ret;
115+
}
116+
79117
static VALUE
80118
no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
81119
{
@@ -86,7 +124,7 @@ no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
86124
do {
87125
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
88126

89-
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, endOp);
127+
size_t const ret = compressStream2(sc->nogvl, sc->ctx, &output, &input, endOp);
90128
if (ZSTD_isError(ret)) {
91129
rb_raise(rb_eRuntimeError, "flush error error code: %s", ZSTD_getErrorName(ret));
92130
}
@@ -109,7 +147,7 @@ rb_streaming_compress_compress(VALUE obj, VALUE src)
109147
VALUE result = rb_str_new(0, 0);
110148
while (input.pos < input.size) {
111149
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
112-
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
150+
size_t const ret = compressStream2(sc->nogvl, sc->ctx, &output, &input, ZSTD_e_continue);
113151
if (ZSTD_isError(ret)) {
114152
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
115153
}
@@ -132,7 +170,7 @@ rb_streaming_compress_addstr(VALUE obj, VALUE src)
132170

133171
while (input.pos < input.size) {
134172
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
135-
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
173+
size_t const result = compressStream2(sc->nogvl, sc->ctx, &output, &input, ZSTD_e_continue);
136174
if (ZSTD_isError(result)) {
137175
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
138176
}

ext/zstdruby/streaming_decompress.c

Lines changed: 43 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
#include <common.h>
2+
#include <ruby/thread.h>
23

34
struct streaming_decompress_t {
45
ZSTD_DCtx* ctx;
56
VALUE buf;
67
size_t buf_size;
8+
char nogvl;
79
};
810

911
static void
@@ -48,10 +50,19 @@ rb_streaming_decompress_allocate(VALUE klass)
4850
}
4951

5052
static VALUE
51-
rb_streaming_decompress_initialize(VALUE obj)
53+
rb_streaming_decompress_initialize(int argc, VALUE *argv, VALUE obj)
5254
{
55+
VALUE kwargs;
56+
rb_scan_args(argc, argv, "00:", &kwargs);
57+
58+
ID kwargs_keys[1];
59+
kwargs_keys[0] = rb_intern("no_gvl");
60+
VALUE kwargs_values[1];
61+
rb_get_kwargs(kwargs, kwargs_keys, 0, 1, kwargs_values);
62+
5363
struct streaming_decompress_t* sd;
5464
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
65+
sd->nogvl = kwargs_values[0] != Qundef && RTEST(kwargs_values[0]);
5566
size_t const buffOutSize = ZSTD_DStreamOutSize();
5667

5768
ZSTD_DCtx* ctx = ZSTD_createDCtx();
@@ -65,6 +76,34 @@ rb_streaming_decompress_initialize(VALUE obj)
6576
return obj;
6677
}
6778

79+
struct decompress_stream_nogvl_t {
80+
ZSTD_DCtx* ctx;
81+
ZSTD_outBuffer* output;
82+
ZSTD_inBuffer* input;
83+
size_t ret;
84+
};
85+
86+
static void*
87+
decompressStream_nogvl(void* args)
88+
{
89+
struct decompress_stream_nogvl_t* params = args;
90+
params->ret = ZSTD_decompressStream(params->ctx, params->output, params->input);
91+
return NULL;
92+
}
93+
94+
static size_t
95+
decompressStream(char nogvl, ZSTD_DCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
96+
{
97+
struct decompress_stream_nogvl_t params = { ctx, output, input, 0 };
98+
if (nogvl) {
99+
rb_thread_call_without_gvl(decompressStream_nogvl, &params, NULL, NULL);
100+
}
101+
else {
102+
decompressStream_nogvl(&params);
103+
}
104+
return params.ret;
105+
}
106+
68107
static VALUE
69108
rb_streaming_decompress_decompress(VALUE obj, VALUE src)
70109
{
@@ -79,7 +118,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src)
79118
VALUE result = rb_str_new(0, 0);
80119
while (input.pos < input.size) {
81120
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
82-
size_t const ret = ZSTD_decompressStream(sd->ctx, &output, &input);
121+
size_t const ret = decompressStream(sd->nogvl, sd->ctx, &output, &input);
83122
if (ZSTD_isError(ret)) {
84123
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
85124
}
@@ -102,7 +141,7 @@ rb_streaming_decompress_addstr(VALUE obj, VALUE src)
102141

103142
while (input.pos < input.size) {
104143
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
105-
size_t const result = ZSTD_decompressStream(sd->ctx, &output, &input);
144+
size_t const result = decompressStream(sd->nogvl, sd->ctx, &output, &input);
106145
if (ZSTD_isError(result)) {
107146
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
108147
}
@@ -116,7 +155,7 @@ zstd_ruby_streaming_decompress_init(void)
116155
{
117156
VALUE cStreamingDecompress = rb_define_class_under(rb_mZstd, "StreamingDecompress", rb_cObject);
118157
rb_define_alloc_func(cStreamingDecompress, rb_streaming_decompress_allocate);
119-
rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, 0);
158+
rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, -1);
120159
rb_define_method(cStreamingDecompress, "decompress", rb_streaming_decompress_decompress, 1);
121160
rb_define_method(cStreamingDecompress, "<<", rb_streaming_decompress_addstr, 1);
122161
}

spec/zstd-ruby-streaming-compress_spec.rb

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
require "spec_helper"
22
require 'zstd-ruby'
33

4-
RSpec.describe Zstd::StreamingCompress do
4+
shared_examples "a streaming compressor" do
55
describe '<<' do
66
it 'shoud work' do
7-
stream = Zstd::StreamingCompress.new
7+
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
88
stream << "abc" << "def"
99
res = stream.finish
1010
expect(Zstd.decompress(res)).to eq('abcdef')
@@ -13,7 +13,7 @@
1313

1414
describe '<< + GC.compat' do
1515
it 'shoud work' do
16-
stream = Zstd::StreamingCompress.new
16+
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
1717
stream << "abc" << "def"
1818
GC.compact
1919
stream << "ghi"
@@ -24,7 +24,7 @@
2424

2525
describe '<< + flush' do
2626
it 'shoud work' do
27-
stream = Zstd::StreamingCompress.new
27+
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
2828
stream << "abc" << "def"
2929
res = stream.flush
3030
stream << "ghi"
@@ -35,7 +35,7 @@
3535

3636
describe 'compress + flush' do
3737
it 'shoud work' do
38-
stream = Zstd::StreamingCompress.new
38+
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
3939
res = stream.compress("abc")
4040
res << stream.flush
4141
res << stream.compress("def")
@@ -46,7 +46,7 @@
4646

4747
describe 'compression level' do
4848
it 'shoud work' do
49-
stream = Zstd::StreamingCompress.new(5)
49+
stream = Zstd::StreamingCompress.new(5, no_gvl: no_gvl)
5050
stream << "abc" << "def"
5151
res = stream.finish
5252
expect(Zstd.decompress(res)).to eq('abcdef')
@@ -56,14 +56,26 @@
5656
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0.0')
5757
describe 'Ractor' do
5858
it 'should be supported' do
59-
r = Ractor.new {
60-
stream = Zstd::StreamingCompress.new(5)
59+
r = Ractor.new(no_gvl) do |no_gvl|
60+
stream = Zstd::StreamingCompress.new(5, no_gvl: no_gvl)
6161
stream << "abc" << "def"
6262
res = stream.finish
63-
}
63+
end
6464
expect(Zstd.decompress(r.take)).to eq('abcdef')
6565
end
6666
end
6767
end
6868
end
6969

70+
RSpec.describe Zstd::StreamingCompress do
71+
describe "with the global lock" do
72+
let(:no_gvl) { false }
73+
it_behaves_like "a streaming compressor"
74+
end
75+
76+
describe "without the global lock" do
77+
let(:no_gvl) { true }
78+
it_behaves_like "a streaming compressor"
79+
end
80+
end
81+

spec/zstd-ruby-streaming-decompress_spec.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
require 'zstd-ruby'
33
require 'securerandom'
44

5-
RSpec.describe Zstd::StreamingDecompress do
5+
shared_examples "a streaming decompressor" do
66
describe 'streaming decompress' do
77
it 'shoud work' do
88
# str = SecureRandom.hex(150)
@@ -22,7 +22,7 @@
2222
# str = SecureRandom.hex(150)
2323
str = "foo bar buzz" * 100
2424
cstr = Zstd.compress(str)
25-
stream = Zstd::StreamingDecompress.new
25+
stream = Zstd::StreamingDecompress.new(no_gvl: no_gvl)
2626
result = ''
2727
result << stream.decompress(cstr[0, 5])
2828
result << stream.decompress(cstr[5, 5])
@@ -35,18 +35,30 @@
3535
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0.0')
3636
describe 'Ractor' do
3737
it 'should be supported' do
38-
r = Ractor.new {
38+
r = Ractor.new(no_gvl) do |no_gvl|
3939
cstr = Zstd.compress('foo bar buzz')
40-
stream = Zstd::StreamingDecompress.new
40+
stream = Zstd::StreamingDecompress.new(no_gvl: no_gvl)
4141
result = ''
4242
result << stream.decompress(cstr[0, 5])
4343
result << stream.decompress(cstr[5, 5])
4444
result << stream.decompress(cstr[10..-1])
4545
result
46-
}
46+
end
4747
expect(r.take).to eq('foo bar buzz')
4848
end
4949
end
5050
end
5151
end
5252

53+
RSpec.describe Zstd::StreamingDecompress do
54+
describe "with the gvl" do
55+
let(:no_gvl) { false }
56+
it_behaves_like "a streaming decompressor"
57+
end
58+
59+
describe "without the gvl" do
60+
let(:no_gvl) { true }
61+
it_behaves_like "a streaming decompressor"
62+
end
63+
end
64+

0 commit comments

Comments
 (0)