Skip to content

Commit d128f75

Browse files
committed
feat: unlock GVL for compress
1 parent 788f4f5 commit d128f75

File tree

8 files changed

+52
-11
lines changed

8 files changed

+52
-11
lines changed

benchmarks/multi_thread_comporess.rb

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,12 +14,19 @@
1414
json_string = File.read("./samples/#{sample_file_name}")
1515

1616
queue = Queue.new
17+
# queue = []
1718
GUESSES.times { queue << json_string }
19+
# stream = Zstd::StreamingCompress.new(thread_num: THREADS)
1820
THREADS.times { queue << nil }
1921
THREADS.times.map {
2022
Thread.new {
2123
while str = queue.pop
22-
Zstd.compress(str)
24+
# stream = Zstd::StreamingCompress.new(thread_num: THREADS)
25+
#stream << str
26+
#stream << str
27+
#stream << str
28+
#stream.flush
29+
Zstd.compress(str, thread_num: 1)
2330
end
2431
}
2532
}.each(&:join)

benchmarks/zstd_compress_memory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
sample_file_name = ARGV[0]
1212

13-
json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true)
13+
json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true)
1414
json_string = json_data.to_json
1515

1616
i = 0

benchmarks/zstd_decompress_memory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@
99
p "#{ObjectSpace.memsize_of_all/1000} #{ObjectSpace.count_objects} #{`ps -o rss= -p #{Process.pid}`.to_i}"
1010

1111
sample_file_name = ARGV[0]
12-
json_data = JSON.parse(IO.read("./samples/#{sample_file_name}"), symbolize_names: true)
12+
json_data = JSON.parse(File.read("./samples/#{sample_file_name}"), symbolize_names: true)
1313
json_string = json_data.to_json
1414

1515
i = 0

benchmarks/zstd_streaming_compress_memory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
sample_file_name = ARGV[0]
1212

13-
json_string = IO.read("./samples/#{sample_file_name}")
13+
json_string = File.read("./samples/#{sample_file_name}")
1414

1515
i = 0
1616
start_time = Time.now

benchmarks/zstd_streaming_decompress_memory.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010

1111
sample_file_name = ARGV[0]
1212

13-
cstr = IO.read("./results/#{sample_file_name}.zstd")
13+
cstr = File.read("./results/#{sample_file_name}.zstd")
1414
i = 0
1515
start_time = Time.now
1616
while true do

examples/sinatra/Gemfile.lock

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
PATH
22
remote: ../..
33
specs:
4-
zstd-ruby (1.5.6.1)
4+
zstd-ruby (1.5.6.2)
55

66
GEM
77
remote: https://rubygems.org/

ext/zstdruby/common.h

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,9 @@
22
#define ZSTD_RUBY_H 1
33

44
#include <ruby.h>
5+
#ifdef HAVE_RUBY_THREAD_H
6+
#include <ruby/thread.h>
7+
#endif
58
#include "./libzstd/zstd.h"
69

710
static int convert_compression_level(VALUE compression_level_value)
@@ -12,18 +15,40 @@ static int convert_compression_level(VALUE compression_level_value)
1215
return NUM2INT(compression_level_value);
1316
}
1417

18+
struct compress_params {
19+
ZSTD_CCtx* ctx;
20+
ZSTD_outBuffer* output;
21+
ZSTD_inBuffer* input;
22+
ZSTD_EndDirective endOp;
23+
size_t ret;
24+
};
25+
26+
static void* compress_wrapper(void* args)
27+
{
28+
struct compress_params* params = args;
29+
params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp);
30+
return NULL;
31+
}
32+
1533
static size_t zstd_compress(ZSTD_CCtx* const ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp)
1634
{
17-
return ZSTD_compressStream2(ctx, output, input, endOp);
35+
#ifdef HAVE_RUBY_THREAD_H
36+
struct compress_params params = { ctx, output, input, endOp };
37+
rb_thread_call_without_gvl(compress_wrapper, &params, RUBY_UBF_IO, NULL);
38+
return params.ret;
39+
#else
40+
return ZSTD_compressStream2(ctx, output, input, endOp);
41+
#endif
1842
}
1943

2044
static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VALUE kwargs)
2145
{
22-
ID kwargs_keys[2];
46+
ID kwargs_keys[3];
2347
kwargs_keys[0] = rb_intern("level");
2448
kwargs_keys[1] = rb_intern("dict");
25-
VALUE kwargs_values[2];
26-
rb_get_kwargs(kwargs, kwargs_keys, 0, 2, kwargs_values);
49+
kwargs_keys[2] = rb_intern("thread_num");
50+
VALUE kwargs_values[3];
51+
rb_get_kwargs(kwargs, kwargs_keys, 0, 3, kwargs_values);
2752

2853
int compression_level = ZSTD_CLEVEL_DEFAULT;
2954
if (kwargs_values[0] != Qundef && kwargs_values[0] != Qnil) {
@@ -43,6 +68,15 @@ static void set_compress_params(ZSTD_CCtx* const ctx, VALUE level_from_args, VAL
4368
rb_raise(rb_eRuntimeError, "%s", "ZSTD_CCtx_loadDictionary failed");
4469
}
4570
}
71+
72+
if (kwargs_values[2] != Qundef && kwargs_values[2] != Qnil) {
73+
int thread_num = NUM2INT(kwargs_values[2]);
74+
size_t const r = ZSTD_CCtx_setParameter(ctx, ZSTD_c_nbWorkers, thread_num);
75+
if (ZSTD_isError(r)) {
76+
rb_warn("Note: the linked libzstd library doesn't support multithreading.Reverting to single-thread mode. \n");
77+
}
78+
// ZSTD_CCtx_setParameter(ctx, ZSTD_c_jobSize, thread_num);
79+
}
4680
}
4781

4882
static void set_decompress_params(ZSTD_DCtx* const dctx, VALUE kwargs)

ext/zstdruby/extconf.rb

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
have_func('rb_gc_mark_movable')
44

5-
$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY'
5+
$CFLAGS = '-I. -O3 -std=c99 -DZSTD_STATIC_LINKING_ONLY -DZSTD_MULTITHREAD -pthread'
66
$CPPFLAGS += " -fdeclspec" if CONFIG['CXX'] =~ /clang/
77

88
Dir.chdir File.expand_path('..', __FILE__) do

0 commit comments

Comments
 (0)