Skip to content

Commit acadd6e

Browse files
committed
unlock the GVL for compression/decompression operations
This allows to run them in parallel (i.e. multi-threaded)
1 parent 198582d commit acadd6e

File tree

2 files changed

+54
-5
lines changed

2 files changed

+54
-5
lines changed

ext/zstdruby/streaming_compress.c

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
#include <common.h>
22
#include <streaming_compress.h>
3+
#include <ruby/thread.h>
34

45
struct streaming_compress_t {
56
ZSTD_CCtx* ctx;
@@ -76,6 +77,30 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
7677
: (FIX2INT((val))))
7778
#define ARG_CONTINUE(val) FIXNUMARG((val), ZSTD_e_continue)
7879

80+
struct compress_stream_nogvl_t {
81+
ZSTD_CCtx* ctx;
82+
ZSTD_outBuffer* output;
83+
ZSTD_inBuffer* input;
84+
ZSTD_EndDirective endOp;
85+
size_t ret;
86+
};
87+
88+
static void*
89+
compressStream2_nogvl(void* arg)
90+
{
91+
struct compress_stream_nogvl_t* params = arg;
92+
params->ret = ZSTD_compressStream2(params->ctx, params->output, params->input, params->endOp);
93+
return NULL;
94+
}
95+
96+
static size_t
97+
compressStream2(ZSTD_CCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input, ZSTD_EndDirective endOp)
98+
{
99+
struct compress_stream_nogvl_t params = { ctx, output, input, endOp, 0 };
100+
rb_thread_call_without_gvl(compressStream2_nogvl, &params, NULL, NULL);
101+
return params.ret;
102+
}
103+
79104
static VALUE
80105
no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
81106
{
@@ -86,7 +111,7 @@ no_compress(struct streaming_compress_t* sc, ZSTD_EndDirective endOp)
86111
do {
87112
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
88113

89-
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, endOp);
114+
size_t const ret = compressStream2(sc->ctx, &output, &input, endOp);
90115
if (ZSTD_isError(ret)) {
91116
rb_raise(rb_eRuntimeError, "flush error error code: %s", ZSTD_getErrorName(ret));
92117
}
@@ -109,7 +134,7 @@ rb_streaming_compress_compress(VALUE obj, VALUE src)
109134
VALUE result = rb_str_new(0, 0);
110135
while (input.pos < input.size) {
111136
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
112-
size_t const ret = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
137+
size_t const ret = compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
113138
if (ZSTD_isError(ret)) {
114139
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
115140
}
@@ -132,7 +157,7 @@ rb_streaming_compress_addstr(VALUE obj, VALUE src)
132157

133158
while (input.pos < input.size) {
134159
ZSTD_outBuffer output = { (void*)output_data, sc->buf_size, 0 };
135-
size_t const result = ZSTD_compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
160+
size_t const result = compressStream2(sc->ctx, &output, &input, ZSTD_e_continue);
136161
if (ZSTD_isError(result)) {
137162
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
138163
}

ext/zstdruby/streaming_decompress.c

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
#include <common.h>
2+
#include <ruby/thread.h>
23

34
struct streaming_decompress_t {
45
ZSTD_DCtx* ctx;
@@ -65,6 +66,29 @@ rb_streaming_decompress_initialize(VALUE obj)
6566
return obj;
6667
}
6768

69+
struct decompress_stream_nogvl_t {
70+
ZSTD_DCtx* ctx;
71+
ZSTD_outBuffer* output;
72+
ZSTD_inBuffer* input;
73+
size_t ret;
74+
};
75+
76+
static void*
77+
decompressStream_nogvl(void* args)
78+
{
79+
struct decompress_stream_nogvl_t* params = args;
80+
params->ret = ZSTD_decompressStream(params->ctx, params->output, params->input);
81+
return NULL;
82+
}
83+
84+
static size_t
85+
decompressStream(ZSTD_DCtx* ctx, ZSTD_outBuffer* output, ZSTD_inBuffer* input)
86+
{
87+
struct decompress_stream_nogvl_t params = { ctx, output, input, 0 };
88+
rb_thread_call_without_gvl(decompressStream_nogvl, &params, NULL, NULL);
89+
return params.ret;
90+
}
91+
6892
static VALUE
6993
rb_streaming_decompress_decompress(VALUE obj, VALUE src)
7094
{
@@ -79,7 +103,7 @@ rb_streaming_decompress_decompress(VALUE obj, VALUE src)
79103
VALUE result = rb_str_new(0, 0);
80104
while (input.pos < input.size) {
81105
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
82-
size_t const ret = ZSTD_decompressStream(sd->ctx, &output, &input);
106+
size_t const ret = decompressStream(sd->ctx, &output, &input);
83107
if (ZSTD_isError(ret)) {
84108
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
85109
}
@@ -102,7 +126,7 @@ rb_streaming_decompress_addstr(VALUE obj, VALUE src)
102126

103127
while (input.pos < input.size) {
104128
ZSTD_outBuffer output = { (void*)output_data, sd->buf_size, 0 };
105-
size_t const result = ZSTD_decompressStream(sd->ctx, &output, &input);
129+
size_t const result = decompressStream(sd->ctx, &output, &input);
106130
if (ZSTD_isError(result)) {
107131
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(result));
108132
}

0 commit comments

Comments
 (0)