Skip to content

Commit 2959fc9

Browse files
committed
feat: unlock GVL for simple compression and decomporession
1 parent 0d6de0f commit 2959fc9

File tree

8 files changed

+133
-17
lines changed

8 files changed

+133
-17
lines changed

.github/workflows/ruby.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,3 +41,11 @@ jobs:
4141
run: bundle exec rake compile
4242
- name: Run tests
4343
run: bundle exec rspec
44+
- name: Run benchmarks
45+
working-directory: benchmarks
46+
run: |
47+
time THREADS=4 bundle exec ruby multi_thread_comporess.rb city.json
48+
time THREADS=4 bundle exec ruby multi_thread_decomporess.rb city.json
49+
time THREADS=4 bundle exec ruby multi_thread_streaming_comporess.rb city.json
50+
time THREADS=4 bundle exec ruby multi_thread_streaming_decomporess.rb city.json
51+
bundle exec ruby large_bytes.rb

benchmarks/large_bytes.rb

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2,21 +2,22 @@
22
require 'zstd-ruby'
33
require 'securerandom'
44

5-
source_data = ""
6-
512.times { source_data += SecureRandom.uuid }
5+
# source_data = ""
6+
# 512.times { source_data += SecureRandom.uuid }
7+
source_data = Random.bytes(1<<17 + 15)
78

89
puts "source_data.size:#{source_data.size}"
910

1011
# Test compressing and de-compressing our source data 100,000 times. The cycles
1112
# are intended to exercise the libary and reproduce a memory leak.
12-
100_000.times do |i|
13+
10.times do |i|
1314
compressed_data = Zstd.compress(source_data)
1415
expanded_data = Zstd.decompress(compressed_data)
1516
unless expanded_data == source_data
1617
puts "Error: expanded data does not match source data"
1718
end
18-
if i % 1000 == 0
19+
# if i % 10 == 0
1920
puts " - #{i}: c:#{compressed_data.size} e:#{expanded_data.size} memory:#{`ps -o rss= -p #{Process.pid}`.to_i}"
20-
end
21+
# end
2122

2223
end

benchmarks/multi_thread_comporess.rb

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
$LOAD_PATH.unshift '../lib'
2+
require 'zstd-ruby'
3+
require 'thread'
4+
5+
GUESSES = (ENV['GUESSES'] || 1000).to_i
6+
THREADS = (ENV['THREADS'] || 1).to_i
7+
8+
p GUESSES: GUESSES, THREADS: THREADS
9+
10+
sample_file_name = ARGV[0]
11+
json_string = File.read("./samples/#{sample_file_name}")
12+
13+
queue = Queue.new
14+
GUESSES.times { queue << json_string }
15+
THREADS.times { queue << nil }
16+
THREADS.times.map {
17+
Thread.new {
18+
while str = queue.pop
19+
Zstd.compress(json_string)
20+
end
21+
}
22+
}.each(&:join)
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
$LOAD_PATH.unshift '../lib'
2+
require 'zstd-ruby'
3+
require 'thread'
4+
5+
GUESSES = (ENV['GUESSES'] || 1000).to_i
6+
THREADS = (ENV['THREADS'] || 1).to_i
7+
8+
p GUESSES: GUESSES, THREADS: THREADS
9+
10+
sample_file_name = ARGV[0]
11+
json_string = File.read("./samples/#{sample_file_name}")
12+
target = Zstd.compress(json_string)
13+
14+
queue = Queue.new
15+
GUESSES.times { queue << target }
16+
THREADS.times { queue << nil }
17+
THREADS.times.map {
18+
Thread.new {
19+
while str = queue.pop
20+
Zstd.decompress(str)
21+
end
22+
}
23+
}.each(&:join)

benchmarks/results/city.json.gzip

0 Bytes
Binary file not shown.

ext/zstdruby/common.h

Lines changed: 68 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,37 @@ static size_t zstd_stream_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output,
7474
#endif
7575
}
7676

77+
struct compress_params {
78+
ZSTD_CCtx* ctx;
79+
char* output_data;
80+
size_t output_size;
81+
char* input_data;
82+
size_t input_size;
83+
size_t ret;
84+
};
85+
86+
static void* compress_wrapper(void* args)
87+
{
88+
struct compress_params* params = args;
89+
params->ret = ZSTD_compress2(params->ctx ,params->output_data, params->output_size, params->input_data, params->input_size);
90+
return NULL;
91+
}
92+
93+
static size_t zstd_compress(ZSTD_CCtx* const ctx, char* output_data, size_t output_size, char* input_data, size_t input_size, bool gvl)
94+
{
95+
#ifdef HAVE_RUBY_THREAD_H
96+
if (gvl) {
97+
return ZSTD_compress2(ctx , output_data, output_size, input_data, input_size);
98+
} else {
99+
struct compress_params params = { ctx, output_data, output_size, input_data, input_size };
100+
rb_thread_call_without_gvl(compress_wrapper, &params, NULL, NULL);
101+
return params.ret;
102+
}
103+
#else
104+
return ZSTD_compress2(ctx , output_data, output_size, input_data, input_size);
105+
#endif
106+
}
107+
77108
static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)
78109
{
79110
ID kwargs_keys[1];
@@ -92,33 +123,64 @@ static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)
92123
}
93124
}
94125

95-
struct decompress_params {
126+
struct stream_decompress_params {
96127
ZSTD_DCtx* dctx;
97128
ZSTD_outBuffer* output;
98129
ZSTD_inBuffer* input;
99130
size_t ret;
100131
};
101132

102-
static void* decompress_wrapper(void* args)
133+
static void* stream_decompress_wrapper(void* args)
103134
{
104-
struct decompress_params* params = args;
135+
struct stream_decompress_params* params = args;
105136
params->ret = ZSTD_decompressStream(params->dctx, params->output, params->input);
106137
return NULL;
107138
}
108139

109-
static size_t zstd_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, bool gvl)
140+
static size_t zstd_stream_decompress(ZSTD_DCtx* const dctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, bool gvl)
110141
{
111142
#ifdef HAVE_RUBY_THREAD_H
112143
if (gvl) {
113144
return ZSTD_decompressStream(dctx, output, input);
114145
} else {
115-
struct decompress_params params = { dctx, output, input };
116-
rb_thread_call_without_gvl(decompress_wrapper, &params, NULL, NULL);
146+
struct stream_decompress_params params = { dctx, output, input };
147+
rb_thread_call_without_gvl(stream_decompress_wrapper, &params, NULL, NULL);
117148
return params.ret;
118149
}
119150
#else
120151
return ZSTD_decompressStream(dctx, output, input);
121152
#endif
122153
}
123154

155+
struct decompress_params {
156+
ZSTD_DCtx* dctx;
157+
char* output_data;
158+
size_t output_size;
159+
char* input_data;
160+
size_t input_size;
161+
size_t ret;
162+
};
163+
164+
static void* decompress_wrapper(void* args)
165+
{
166+
struct decompress_params* params = args;
167+
params->ret = ZSTD_decompressDCtx(params->dctx, params->output_data, params->output_size, params->input_data, params->input_size);
168+
return NULL;
169+
}
170+
171+
static size_t zstd_decompress(ZSTD_DCtx* const dctx, char* output_data, size_t output_size, char* input_data, size_t input_size, bool gvl)
172+
{
173+
#ifdef HAVE_RUBY_THREAD_H
174+
if (gvl) {
175+
return ZSTD_decompressDCtx(dctx, output_data, output_size, input_data, input_size);
176+
} else {
177+
struct decompress_params params = { dctx, output_data, output_size, input_data, input_size };
178+
rb_thread_call_without_gvl(decompress_wrapper, &params, NULL, NULL);
179+
return params.ret;
180+
}
181+
#else
182+
return ZSTD_decompressDCtx(dctx, output_data, output_size, input_data, input_size);
183+
#endif
184+
}
185+
124186
#endif /* ZSTD_RUBY_H */

ext/zstdruby/streaming_decompress.c

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src)
104104
VALUE result = rb_str_new(0, 0);
105105
while (input.pos < input.size) {
106106
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
107-
size_t const ret = zstd_decompress(sd->dctx, &output, &input, false);
107+
size_t const ret = zstd_stream_decompress(sd->dctx, &output, &input, false);
108108
if (ZSTD_isError(ret)) {
109109
rb_raise(rb_eRuntimeError, "decompress error error code: %s", ZSTD_getErrorName(ret));
110110
}

ext/zstdruby/zstdruby.c

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ static VALUE rb_compress(int argc, VALUE *argv, VALUE self)
2626
char* input_data = RSTRING_PTR(input_value);
2727
size_t input_size = RSTRING_LEN(input_value);
2828

29-
size_t const max_compressed_size = ZSTD_compressBound(input_size);
29+
size_t max_compressed_size = ZSTD_compressBound(input_size);
3030
VALUE output = rb_str_new(NULL, max_compressed_size);
31-
const char* output_data = RSTRING_PTR(output);
31+
char* output_data = RSTRING_PTR(output);
3232

33-
size_t const ret = ZSTD_compress2(ctx,(void*)output_data, max_compressed_size, (void*)input_data, input_size);
33+
size_t const ret = zstd_compress(ctx, output_data, max_compressed_size, input_data, input_size, false);
3434
if (ZSTD_isError(ret)) {
3535
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
3636
}
@@ -96,7 +96,7 @@ static VALUE decompress_buffered(ZSTD_DCtx* dctx, const char* input_data, size_t
9696
rb_str_resize(output_string, output.size);
9797
output.dst = RSTRING_PTR(output_string);
9898

99-
size_t ret = zstd_decompress(dctx, &output, &input, true);
99+
size_t ret = zstd_stream_decompress(dctx, &output, &input, false);
100100
if (ZSTD_isError(ret)) {
101101
ZSTD_freeDCtx(dctx);
102102
rb_raise(rb_eRuntimeError, "%s: %s", "ZSTD_decompressStream failed", ZSTD_getErrorName(ret));
@@ -134,7 +134,7 @@ static VALUE rb_decompress(int argc, VALUE *argv, VALUE self)
134134
VALUE output = rb_str_new(NULL, uncompressed_size);
135135
char* output_data = RSTRING_PTR(output);
136136

137-
size_t const decompress_size = ZSTD_decompressDCtx(dctx, output_data, uncompressed_size, input_data, input_size);
137+
size_t const decompress_size = zstd_decompress(dctx, output_data, uncompressed_size, input_data, input_size, false);
138138
if (ZSTD_isError(decompress_size)) {
139139
rb_raise(rb_eRuntimeError, "%s: %s", "decompress error", ZSTD_getErrorName(decompress_size));
140140
}

0 commit comments

Comments
 (0)