Skip to content

Commit f25d76e

Browse files
committed
Merge pull request #114 from cosmo0920/add-pending-handler-to-concat-128kb-or-more-data
Add pending handler to concat 128kb or more data
1 parent 5894b22 commit f25d76e

File tree

2 files changed

+170
-5
lines changed

2 files changed

+170
-5
lines changed

ext/zstdruby/streaming_compress.c

Lines changed: 20 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ struct streaming_compress_t {
44
ZSTD_CCtx* ctx;
55
VALUE buf;
66
size_t buf_size;
7+
VALUE pending; /* accumulate compressed bytes produced by write() */
78
};
89

910
static void
@@ -12,8 +13,10 @@ streaming_compress_mark(void *p)
1213
struct streaming_compress_t *sc = p;
1314
#ifdef HAVE_RB_GC_MARK_MOVABLE
1415
rb_gc_mark_movable(sc->buf);
16+
rb_gc_mark_movable(sc->pending);
1517
#else
1618
rb_gc_mark(sc->buf);
19+
rb_gc_mark(sc->pending);
1720
#endif
1821
}
1922

@@ -40,6 +43,7 @@ streaming_compress_compact(void *p)
4043
{
4144
struct streaming_compress_t *sc = p;
4245
sc->buf = rb_gc_location(sc->buf);
46+
sc->pending = rb_gc_location(sc->pending);
4347
}
4448
#endif
4549

@@ -64,6 +68,7 @@ rb_streaming_compress_allocate(VALUE klass)
6468
sc->ctx = NULL;
6569
sc->buf = Qnil;
6670
sc->buf_size = 0;
71+
sc->pending = Qnil;
6772
return obj;
6873
}
6974

@@ -87,6 +92,7 @@ rb_streaming_compress_initialize(int argc, VALUE *argv, VALUE obj)
8792
sc->ctx = ctx;
8893
sc->buf = rb_str_new(NULL, buffOutSize);
8994
sc->buf_size = buffOutSize;
95+
sc->pending = rb_str_new(0, 0);
9096

9197
return obj;
9298
}
@@ -143,7 +149,6 @@ static VALUE
143149
rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
144150
{
145151
size_t total = 0;
146-
VALUE result = rb_str_new(0, 0);
147152
struct streaming_compress_t* sc;
148153
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
149154
const char* output_data = RSTRING_PTR(sc->buf);
@@ -161,6 +166,10 @@ rb_streaming_compress_write(int argc, VALUE *argv, VALUE obj)
161166
if (ZSTD_isError(ret)) {
162167
rb_raise(rb_eRuntimeError, "compress error error code: %s", ZSTD_getErrorName(ret));
163168
}
169+
/* collect produced bytes */
170+
if (output.pos > 0) {
171+
rb_str_cat(sc->pending, output.dst, output.pos);
172+
}
164173
total += RSTRING_LEN(str);
165174
}
166175
}
@@ -193,17 +202,23 @@ rb_streaming_compress_flush(VALUE obj)
193202
{
194203
struct streaming_compress_t* sc;
195204
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
196-
VALUE result = no_compress(sc, ZSTD_e_flush);
197-
return result;
205+
VALUE drained = no_compress(sc, ZSTD_e_flush);
206+
rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained));
207+
VALUE out = sc->pending;
208+
sc->pending = rb_str_new(0, 0);
209+
return out;
198210
}
199211

200212
static VALUE
201213
rb_streaming_compress_finish(VALUE obj)
202214
{
203215
struct streaming_compress_t* sc;
204216
TypedData_Get_Struct(obj, struct streaming_compress_t, &streaming_compress_type, sc);
205-
VALUE result = no_compress(sc, ZSTD_e_end);
206-
return result;
217+
VALUE drained = no_compress(sc, ZSTD_e_end);
218+
rb_str_cat(sc->pending, RSTRING_PTR(drained), RSTRING_LEN(drained));
219+
VALUE out = sc->pending;
220+
sc->pending = rb_str_new(0, 0);
221+
return out;
207222
}
208223

209224
extern VALUE rb_mZstd, cStreamingCompress;

spec/streaming_128kb_cliff_spec.rb

Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
# spec/streaming_128kb_cliff_spec.rb
2+
require "spec_helper"
3+
require "stringio"
4+
require "zstd-ruby"
5+
6+
RSpec.describe "Zstd streaming 128KB cliff" do
7+
MAGIC = [0x28, 0xB5, 0x2F, 0xFD].freeze
8+
9+
def hex(str)
10+
str.unpack1("H*")
11+
end
12+
13+
def has_magic?(bin)
14+
bytes = bin.bytes
15+
bytes[0, 4] == MAGIC
16+
end
17+
18+
shared_examples "round-trip streaming compress" do |size|
19+
it "round-trips #{size} bytes and starts with zstd magic" do
20+
# Produce data
21+
src = "a" * size
22+
23+
sc = Zstd::StreamingCompress.new
24+
sc << src
25+
compressed = sc.finish
26+
27+
expect(has_magic?(compressed)).to be(true), "missing magic: #{hex(compressed)[0, 16]}"
28+
expect(Zstd.decompress(compressed)).to eq(src)
29+
end
30+
end
31+
32+
context "exactly around 128 KiB" do
33+
include_examples "round-trip streaming compress", 131_071
34+
include_examples "round-trip streaming compress", 131_072 # the cliff
35+
include_examples "round-trip streaming compress", 131_073
36+
end
37+
38+
context "multiple writes crossing the boundary" do
39+
it "64KiB + 64KiB (exact boundary) works" do
40+
part = "x" * 65_536
41+
sc = Zstd::StreamingCompress.new
42+
sc << part
43+
sc << part
44+
compressed = sc.finish
45+
expect(has_magic?(compressed)).to be(true)
46+
expect(Zstd.decompress(compressed)).to eq(part * 2)
47+
end
48+
49+
it "64KiB + 64KiB + 1 works" do
50+
a = "a" * 65_536
51+
b = "b" * 65_536
52+
c = "c"
53+
sc = Zstd::StreamingCompress.new
54+
sc << a << b << c
55+
compressed = sc.finish
56+
expect(has_magic?(compressed)).to be(true)
57+
expect(Zstd.decompress(compressed)).to eq(a + b + c)
58+
end
59+
end
60+
61+
context "flush/end draining" do
62+
it "returns all produced bytes across flush and finish" do
63+
sc = Zstd::StreamingCompress.new
64+
sc << ("a" * 70_000)
65+
out = sc.flush
66+
expect(out).not_to be_empty
67+
sc << ("b" * 70_000)
68+
out << sc.finish
69+
expect(has_magic?(out)).to be(true)
70+
expect(Zstd.decompress(out)).to eq(("a" * 70_000) + ("b" * 70_000))
71+
end
72+
end
73+
74+
context "GC.compact interaction" do
75+
it "survives compaction around the boundary" do
76+
sc = Zstd::StreamingCompress.new
77+
sc << ("a" * 80_000)
78+
GC.compact if GC.respond_to?(:compact)
79+
sc << ("b" * 60_000) # total now > 128KiB
80+
compressed = sc.finish
81+
expect(has_magic?(compressed)).to be(true)
82+
expect(Zstd.decompress(compressed)).to eq(("a" * 80_000) + ("b" * 60_000))
83+
end
84+
end
85+
86+
context "larger payload sanity" do
87+
it "round-trips ~1 MiB" do
88+
src = "z" * 1_048_576
89+
sc = Zstd::StreamingCompress.new(level: 3)
90+
sc << src
91+
compressed = sc.finish
92+
expect(has_magic?(compressed)).to be(true)
93+
expect(Zstd.decompress(compressed)).to eq(src)
94+
end
95+
end
96+
97+
describe Zstd::StreamWriter do
98+
it "produces a valid frame and round-trips at exactly 128KiB" do
99+
io = StringIO.new
100+
sw = Zstd::StreamWriter.new(io)
101+
sw.write("a" * 131_072)
102+
sw.finish
103+
104+
io.rewind
105+
bin = io.read
106+
expect(has_magic?(bin)).to be(true), "missing magic: #{hex(bin)[0, 16]}"
107+
108+
io.rewind
109+
sr = Zstd::StreamReader.new(io)
110+
out = sr.read(2_000_000)
111+
expect(out.size).to eq(131_072)
112+
expect(out).to eq("a" * 131_072)
113+
io.close
114+
end
115+
116+
it "handles boundary-crossing writes with flush in between" do
117+
io = StringIO.new
118+
sw = Zstd::StreamWriter.new(io)
119+
sw.write("a" * 70_000)
120+
sw.write("b" * 70_000) # crosses 128KiB internally
121+
sw.finish
122+
123+
io.rewind
124+
bin = io.read
125+
expect(has_magic?(bin)).to be(true)
126+
io.rewind
127+
sr = Zstd::StreamReader.new(io)
128+
out = sr.read(1_000_000)
129+
expect(out).to eq("a" * 70_000 + "b" * 70_000)
130+
io.close
131+
end
132+
133+
it "survives GC.compact mid-stream" do
134+
io = StringIO.new
135+
sw = Zstd::StreamWriter.new(io)
136+
sw.write("x" * 90_000)
137+
GC.compact if GC.respond_to?(:compact)
138+
sw.write("y" * 50_000)
139+
sw.finish
140+
141+
io.rewind
142+
bin = io.read
143+
expect(has_magic?(bin)).to be(true)
144+
io.rewind
145+
sr = Zstd::StreamReader.new(io)
146+
out = sr.read(200_000)
147+
expect(out).to eq("x" * 90_000 + "y" * 50_000)
148+
end
149+
end
150+
end

0 commit comments

Comments
 (0)