Skip to content

Commit 2759041

Browse files
committed
fix: make the GVL unlock optional
1 parent acadd6e commit 2759041

File tree

4 files changed

+78
-26
lines changed

4 files changed

+78
-26
lines changed

ext/zstdruby/streaming_compress.c

Lines changed: 19 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ struct streaming_compress_t {
66
ZSTD_CCtx* ctx;
77
VALUE buf;
88
size_t buf_size;
9+
char nogvl;
910
};
1011

1112
static void
@@ -53,11 +54,18 @@ static VALUE
5354
rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
5455
{
5556
VALUE compression_level_value;
56-
rb_scan_args(argc, argv, "01", &compression_level_value);
57+
VALUE kwargs;
58+
rb_scan_args(argc, argv, "01:", &compression_level_value, &kwargs);
5759
int compression_level = convert_compression_level(compression_level_value);
5860

61+
ID kwargs_keys[1];
62+
kwargs_keys[0] = rb_intern("no_gvl");
63+
VALUE kwargs_values[1];
64+
rb_get_kwargs(kwargs, kwargs_keys, 0, 1, kwargs_values);
65+
5966
struct streaming_compress_t* sc;
6067
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
68+
sc->nogvl = kwargs_values[0] != Qundef && RTEST(kwargs_values[0]);
6169
size_t const buffOutSize = ZSTD_CStreamOutSize();
6270

6371
ZSTD_CCtx* ctx = ZSTD_createCCtx();
@@ -94,10 +102,15 @@ compressStream2_nogvl(void* arg)
94102
}
95103

96104
static size_t
97-
compressStream2(ZSTD_CCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp)
105+
compressStream2(char nogvl, ZSTD_CCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp)
98106
{
99107
struct compress_stream_nogvl_t params = { ctx, output, input, endOp, 0 };
100-
rb_thread_call_without_gvl(compressStream2_nogvl, &params, NULL, NULL);
108+
if (nogvl) {
109+
rb_thread_call_without_gvl(compressStream2_nogvl, &params, NULL, NULL);
110+
}
111+
else {
112+
compressStream2_nogvl(&params);
113+
}
101114
return params.ret;
102115
}
103116

@@ -111,7 +124,7 @@ no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
111124
do {
112125
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
113126

114-
size_t const ret = compressStream2(sc->ctx, &output, &input, endOp);
127+
size_t const ret = compressStream2(sc->nogvl, sc->ctx, &output, &input, endOp);
115128
if (ZSTD_isError(ret)) {
116129
rb_raise(rb_eRuntimeError, "flush error error code: %s", ZSTD_getErrorName(ret));
117130
}
@@ -134,7 +147,7 @@ rb_streaming_compress_compress(VALUE obj, VALUE src)
134147
VALUE result = rb_str_new(0, 0);
135148
while (input.pos < input.size) {
136149
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
137-
size_t const ret = compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
150+
size_t const ret = compressStream2(sc->nogvl, sc->ctx, &output, &input, ZSTD_e_continue);
138151
if (ZSTD_isError(ret)) {
139152
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
140153
}
@@ -157,7 +170,7 @@ rb_streaming_compress_addstr(VALUE obj, VALUE src)
157170

158171
while (input.pos < input.size) {
159172
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
160-
size_t const result = compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
173+
size_t const result = compressStream2(sc->nogvl, sc->ctx, &output, &input, ZSTD_e_continue);
161174
if (ZSTD_isError(result)) {
162175
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
163176
}

ext/zstdruby/streaming_decompress.c

Lines changed: 21 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@ struct streaming_decompress_t {
55
ZSTD_DCtx* ctx;
66
VALUE buf;
77
size_t buf_size;
8+
char nogvl;
89
};
910

1011
static void
@@ -49,10 +50,19 @@ rb_streaming_decompress_allocate(VALUE klass)
4950
}
5051

5152
static VALUE
52-
rb_streaming_decompress_initialize(VALUE obj)
53+
rb_streaming_decompress_initialize(int argc, VALUE *argv, VALUE obj)
5354
{
55+
VALUE kwargs;
56+
rb_scan_args(argc, argv, "00:", &kwargs);
57+
58+
ID kwargs_keys[1];
59+
kwargs_keys[0] = rb_intern("no_gvl");
60+
VALUE kwargs_values[1];
61+
rb_get_kwargs(kwargs, kwargs_keys, 0, 1, kwargs_values);
62+
5463
struct streaming_decompress_t* sd;
5564
TypedData_Get_Struct(obj, struct streaming_decompress_t, &streaming_decompress_type, sd);
65+
sd->nogvl = kwargs_values[0] != Qundef && RTEST(kwargs_values[0]);
5666
size_t const buffOutSize = ZSTD_DStreamOutSize();
5767

5868
ZSTD_DCtx* ctx = ZSTD_createDCtx();
@@ -82,10 +92,15 @@ decompressStream_nogvl(void* args)
8292
}
8393

8494
static size_t
85-
decompressStream(ZSTD_DCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
95+
decompressStream(char nogvl, ZSTD_DCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
8696
{
8797
struct decompress_stream_nogvl_t params = { ctx, output, input, 0 };
88-
rb_thread_call_without_gvl(decompressStream_nogvl, &params, NULL, NULL);
98+
if (nogvl) {
99+
rb_thread_call_without_gvl(decompressStream_nogvl, &params, NULL, NULL);
100+
}
101+
else {
102+
decompressStream_nogvl(&params);
103+
}
89104
return params.ret;
90105
}
91106

@@ -103,7 +118,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src)
103118
VALUE result = rb_str_new(0, 0);
104119
while (input.pos < input.size) {
105120
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
106-
size_t const ret = decompressStream(sd->ctx, &output, &input);
121+
size_t const ret = decompressStream(sd->nogvl, sd->ctx, &output, &input);
107122
if (ZSTD_isError(ret)) {
108123
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
109124
}
@@ -126,7 +141,7 @@ rb_streaming_decompress_addstr(VALUE obj, VALUE src)
126141

127142
while (input.pos < input.size) {
128143
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
129-
size_t const result = decompressStream(sd->ctx, &output, &input);
144+
size_t const result = decompressStream(sd->nogvl, sd->ctx, &output, &input);
130145
if (ZSTD_isError(result)) {
131146
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
132147
}
@@ -140,7 +155,7 @@ zstd_ruby_streaming_decompress_init(void)
140155
{
141156
VALUE cStreamingDecompress = rb_define_class_under(rb_mZstd, "StreamingDecompress", rb_cObject);
142157
rb_define_alloc_func(cStreamingDecompress, rb_streaming_decompress_allocate);
143-
rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, 0);
158+
rb_define_method(cStreamingDecompress, "initialize", rb_streaming_decompress_initialize, -1);
144159
rb_define_method(cStreamingDecompress, "decompress", rb_streaming_decompress_decompress, 1);
145160
rb_define_method(cStreamingDecompress, "<<", rb_streaming_decompress_addstr, 1);
146161
}

spec/zstd-ruby-streaming-compress_spec.rb

Lines changed: 21 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,10 @@
11
require "spec_helper"
22
require 'zstd-ruby'
33

4-
RSpec.describe Zstd::StreamingCompress do
4+
shared_examples "a streaming compressor" do
55
describe '<<' do
66
it 'shoud work' do
7-
stream = Zstd::StreamingCompress.new
7+
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
88
stream << "abc" << "def"
99
res = stream.finish
1010
expect(Zstd.decompress(res)).to eq('abcdef')
@@ -13,7 +13,7 @@
1313

1414
describe '<< + GC.compat' do
1515
it 'shoud work' do
16-
stream = Zstd::StreamingCompress.new
16+
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
1717
stream << "abc" << "def"
1818
GC.compact
1919
stream << "ghi"
@@ -24,7 +24,7 @@
2424

2525
describe '<< + flush' do
2626
it 'shoud work' do
27-
stream = Zstd::StreamingCompress.new
27+
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
2828
stream << "abc" << "def"
2929
res = stream.flush
3030
stream << "ghi"
@@ -35,7 +35,7 @@
3535

3636
describe 'compress + flush' do
3737
it 'shoud work' do
38-
stream = Zstd::StreamingCompress.new
38+
stream = Zstd::StreamingCompress.new(no_gvl: no_gvl)
3939
res = stream.compress("abc")
4040
res << stream.flush
4141
res << stream.compress("def")
@@ -46,7 +46,7 @@
4646

4747
describe 'compression level' do
4848
it 'shoud work' do
49-
stream = Zstd::StreamingCompress.new(5)
49+
stream = Zstd::StreamingCompress.new(5, no_gvl: no_gvl)
5050
stream << "abc" << "def"
5151
res = stream.finish
5252
expect(Zstd.decompress(res)).to eq('abcdef')
@@ -56,14 +56,26 @@
5656
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0.0')
5757
describe 'Ractor' do
5858
it 'should be supported' do
59-
r = Ractor.new {
60-
stream = Zstd::StreamingCompress.new(5)
59+
r = Ractor.new(no_gvl) do |no_gvl|
60+
stream = Zstd::StreamingCompress.new(5, no_gvl: no_gvl)
6161
stream << "abc" << "def"
6262
res = stream.finish
63-
}
63+
end
6464
expect(Zstd.decompress(r.take)).to eq('abcdef')
6565
end
6666
end
6767
end
6868
end
6969

70+
RSpec.describe Zstd::StreamingCompress do
71+
describe "with the global lock" do
72+
let(:no_gvl) { false }
73+
it_behaves_like "a streaming compressor"
74+
end
75+
76+
describe "without the global lock" do
77+
let(:no_gvl) { true }
78+
it_behaves_like "a streaming compressor"
79+
end
80+
end
81+

spec/zstd-ruby-streaming-decompress_spec.rb

Lines changed: 17 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22
require 'zstd-ruby'
33
require 'securerandom'
44

5-
RSpec.describe Zstd::StreamingDecompress do
5+
shared_examples "a streaming decompressor" do
66
describe 'streaming decompress' do
77
it 'shoud work' do
88
# str = SecureRandom.hex(150)
@@ -22,7 +22,7 @@
2222
# str = SecureRandom.hex(150)
2323
str = "foo bar buzz" * 100
2424
cstr = Zstd.compress(str)
25-
stream = Zstd::StreamingDecompress.new
25+
stream = Zstd::StreamingDecompress.new(no_gvl: no_gvl)
2626
result = ''
2727
result << stream.decompress(cstr[0, 5])
2828
result << stream.decompress(cstr[5, 5])
@@ -35,18 +35,30 @@
3535
if Gem::Version.new(RUBY_VERSION) >= Gem::Version.new('3.0.0')
3636
describe 'Ractor' do
3737
it 'should be supported' do
38-
r = Ractor.new {
38+
r = Ractor.new(no_gvl) do |no_gvl|
3939
cstr = Zstd.compress('foo bar buzz')
40-
stream = Zstd::StreamingDecompress.new
40+
stream = Zstd::StreamingDecompress.new(no_gvl: no_gvl)
4141
result = ''
4242
result << stream.decompress(cstr[0, 5])
4343
result << stream.decompress(cstr[5, 5])
4444
result << stream.decompress(cstr[10..-1])
4545
result
46-
}
46+
end
4747
expect(r.take).to eq('foo bar buzz')
4848
end
4949
end
5050
end
5151
end
5252

53+
RSpec.describe Zstd::StreamingDecompress do
54+
describe "with the gvl" do
55+
let(:no_gvl) { false }
56+
it_behaves_like "a streaming decompressor"
57+
end
58+
59+
describe "without the gvl" do
60+
let(:no_gvl) { true }
61+
it_behaves_like "a streaming decompressor"
62+
end
63+
end
64+

0 commit comments

Comments
 (0)