Skip to content

Commit 0e6f97f

Browse files
committed
Add pending mechanism to handle 128kb or more data for zstd frames
Signed-off-by: Hiroshi Hatake <[email protected]>
1 parent ed6c63e commit 0e6f97f

File tree

1 file changed

+20
-5
lines changed

1 file changed

+20
-5
lines changed

ext/zstdruby/streaming_compress.c

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ struct streaming_compress_t {
44
ZSTD_CCtx* ctx;
55
VALUE buf;
66
size_t buf_size;
7+
VALUE pending; /* accumulate compressed bytes produced by write() */
78
};
89

910
static void
@@ -12,8 +13,10 @@ streaming_compress_mark(void *p)
1213
struct streaming_compress_t *sc = p;
1314
#ifdef HAVE_RB_GC_MARK_MOVABLE
1415
rb_gc_mark_movable(sc->buf);
16+
rb_gc_mark_movable(sc->pending);
1517
#else
1618
rb_gc_mark(sc->buf);
19+
rb_gc_mark(sc->pending);
1720
#endif
1821
}
1922

@@ -40,6 +43,7 @@ streaming_compress_compact(void *p)
4043
{
4144
struct streaming_compress_t *sc = p;
4245
sc->buf = rb_gc_location(sc->buf);
46+
sc->pending = rb_gc_location(sc->pending);
4347
}
4448
#endif
4549

@@ -64,6 +68,7 @@ rb_streaming_compress_allocate(VALUE klass)
6468
sc->ctx = NULL;
6569
sc->buf = Qnil;
6670
sc->buf_size = 0;
71+
sc->pending = Qnil;
6772
return obj;
6873
}
6974

@@ -86,6 +91,7 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
8691
sc->ctx = ctx;
8792
sc->buf = rb_str_new(NULL, buffOutSize);
8893
sc->buf_size = buffOutSize;
94+
sc->pending = rb_str_new(0, 0);
8995

9096
return obj;
9197
}
@@ -142,7 +148,6 @@ static VALUE
142148
rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
143149
{
144150
size_t total = 0;
145-
VALUE result = rb_str_new(0, 0);
146151
struct streaming_compress_t* sc;
147152
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
148153
const char* output_data = RSTRING_PTR(sc->buf);
@@ -160,6 +165,10 @@ rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
160165
if (ZSTD_isError(ret)) {
161166
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
162167
}
168+
/* collect produced bytes */
169+
if (output.pos > 0) {
170+
rb_str_cat(sc->pending, output.dst, output.pos);
171+
}
163172
total += RSTRING_LEN(str);
164173
}
165174
}
@@ -192,17 +201,23 @@ rb_streaming_compress_flush(VALUE obj)
192201
{
193202
struct streaming_compress_t* sc;
194203
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
195-
VALUE result = no_compress(sc, ZSTD_e_flush);
196-
return result;
204+
VALUE drained = no_compress(sc, ZSTD_e_flush);
205+
rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained));
206+
VALUE out = sc->pending;
207+
sc->pending = rb_str_new(0, 0);
208+
return out;
197209
}
198210

199211
static VALUE
200212
rb_streaming_compress_finish(VALUE obj)
201213
{
202214
struct streaming_compress_t* sc;
203215
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
204-
VALUE result = no_compress(sc, ZSTD_e_end);
205-
return result;
216+
VALUE drained = no_compress(sc, ZSTD_e_end);
217+
rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained));
218+
VALUE out = sc->pending;
219+
sc->pending = rb_str_new(0, 0);
220+
return out;
206221
}
207222

208223
extern VALUE rb_mZstd, cStreamingCompress;

0 commit comments

Comments
 (0)