Skip to content

Commit 019b816

Browse files
committed
broker: multiple members for gzip compression
Each active gzip writer introduces a small but significant amount of memory overhead, on the order of several hundred KB. When there are many active journals being written, this can add up to a large amount of memory usage. This adds a threshold where incremental compression will occur only if there is at least 1 MB of data to compress, and creates a new gzip writing mechanism that allows closing & creating new gzip members, concatenated into the same output file. The spool logic uses this mechanism to create a new gzip member for every batch of incremental compression, eliminating the need to hold a gzip writer in memory for the entire lifetime of the fragment file. This change only applies to standard gzip compression with client-side decompression. If decompression offloading is used, gzip files will continue to be written in a single stream, as there are issues with some object stores truncating multi-member gzip content after the first member.
1 parent 0f34a43 commit 019b816

3 files changed

Lines changed: 223 additions & 5 deletions

File tree

broker/codecs/codecs.go

Lines changed: 31 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,9 @@ func NewCodecWriter(w io.Writer, codec pb.CompressionCodec) (Compressor, error)
4040
switch codec {
4141
case pb.CompressionCodec_NONE:
4242
return nopWriteCloser{w}, nil
43-
case pb.CompressionCodec_GZIP, pb.CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION:
43+
case pb.CompressionCodec_GZIP:
44+
return &GzipMultiMemberWriter{w: w}, nil
45+
case pb.CompressionCodec_GZIP_OFFLOAD_DECOMPRESSION:
4446
return gzip.NewWriter(w), nil
4547
case pb.CompressionCodec_SNAPPY:
4648
return snappy.NewBufferedWriter(w), nil
@@ -51,6 +53,34 @@ func NewCodecWriter(w io.Writer, codec pb.CompressionCodec) (Compressor, error)
5153
}
5254
}
5355

56+
// GzipMultiMemberWriter allows for batching multiple writes into a single gzip
57+
// member, which are concatenated into a single file per RFC 1952. Members are
58+
// terminated by calling Close, with a new gzip writer initialized on the next
59+
// Write.
60+
type GzipMultiMemberWriter struct {
61+
w io.Writer
62+
gz *gzip.Writer
63+
}
64+
65+
func (gzb *GzipMultiMemberWriter) Write(p []byte) (n int, err error) {
66+
if gzb.gz == nil {
67+
gzb.gz = gzip.NewWriter(gzb.w)
68+
}
69+
70+
return gzb.gz.Write(p)
71+
}
72+
73+
func (gzb *GzipMultiMemberWriter) Close() error {
74+
if gzb.gz == nil {
75+
return nil
76+
} else if err := gzb.gz.Close(); err != nil {
77+
return err
78+
}
79+
gzb.gz = nil
80+
81+
return nil
82+
}
83+
5484
type nopWriteCloser struct{ io.Writer }
5585

5686
func (nopWriteCloser) Close() error { return nil }

broker/fragment/spool.go

Lines changed: 31 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,11 @@ import (
1414
pb "go.gazette.dev/core/broker/protocol"
1515
)
1616

17+
// Minimum size of accumulated uncompressed data before performing incremental
18+
// compression. Useful for GZIP, which creates a new member per compression
19+
// invocation.
20+
var compressionBatchSize = 1024 * 1024
21+
1722
// Spool is a Fragment which is in the process of being created, backed by a
1823
// local *os.File. As commits occur and the file extent is updated, the Spool
1924
// Fragment is also updated to reflect the new committed extent. At all
@@ -31,6 +36,8 @@ type Spool struct {
3136
// Length of compressed content written to |compressedFile|. Set only after
3237
// the compressor is finalized.
3338
compressedLength int64
39+
// Spool file offset through which content has been compressed.
40+
compressedTo int64
3441
// Compressor of |compressedFile|.
3542
compressor codecs.Compressor
3643

@@ -119,7 +126,7 @@ func (s *Spool) applyCommit(r *pb.ReplicateRequest, primary bool) pb.ReplicateRe
119126
if r.Proposal.End > s.Fragment.End+s.delta ||
120127
(r.Proposal.End == s.Fragment.End && r.Proposal.ContentLength() == 0) {
121128

122-
if s.compressor != nil {
129+
if s.compressor != nil || primary && s.CompressionCodec != pb.CompressionCodec_NONE {
123130
s.finishCompression()
124131
}
125132
if s.ContentLength() != 0 {
@@ -166,7 +173,8 @@ func (s *Spool) applyCommit(r *pb.ReplicateRequest, primary bool) pb.ReplicateRe
166173
spoolCommitsTotal.Inc()
167174
spoolCommitBytesTotal.Add(float64(s.delta))
168175

169-
if primary && s.CompressionCodec != pb.CompressionCodec_NONE {
176+
var uncompressedBytes = (r.Proposal.End - s.Fragment.Begin) - s.compressedTo
177+
if primary && s.CompressionCodec != pb.CompressionCodec_NONE && int(uncompressedBytes) >= compressionBatchSize {
170178
s.compressThrough(r.Proposal.End)
171179
}
172180
s.Fragment.Fragment = *r.Proposal
@@ -233,6 +241,7 @@ func (s *Spool) compressThrough(end int64) {
233241
if s.CompressionCodec == pb.CompressionCodec_NONE {
234242
panic("expected CompressionCodec != NONE")
235243
}
244+
236245
var err error
237246

238247
var buf = bufferPool.Get().([]byte)
@@ -241,9 +250,15 @@ func (s *Spool) compressThrough(end int64) {
241250
// Garden path: we've already compressed all content of the current Fragment,
242251
// and now incrementally compress through |end|.
243252
if s.compressor != nil {
244-
var offset, delta = s.Fragment.ContentLength(), end - s.Fragment.End
253+
var offset, delta = s.compressedTo, (end - s.Fragment.Begin) - s.compressedTo
245254

246255
if _, err = io.CopyBuffer(s.compressor, io.NewSectionReader(s.File, offset, delta), buf); err == nil {
256+
if s.CompressionCodec == pb.CompressionCodec_GZIP {
257+
err = s.compressor.Close()
258+
}
259+
}
260+
if err == nil {
261+
s.compressedTo = end - s.Fragment.Begin
247262
return // Done.
248263
}
249264
err = fmt.Errorf("while incrementally compressing: %s", err)
@@ -281,7 +296,15 @@ func (s *Spool) compressThrough(end int64) {
281296
s.compressor = nil
282297
continue
283298
}
299+
if s.CompressionCodec == pb.CompressionCodec_GZIP {
300+
if err = s.compressor.Close(); err != nil {
301+
err = fmt.Errorf("flushing gzip batch compressor: %s", err)
302+
s.compressor = nil
303+
continue
304+
}
305+
}
284306

307+
s.compressedTo = end - s.Fragment.Begin
285308
break // Success.
286309
}
287310
}
@@ -294,9 +317,13 @@ func (s *Spool) finishCompression() {
294317
}
295318
var err error
296319

297-
if s.compressor == nil {
320+
if s.compressedTo < s.Fragment.End-s.Fragment.Begin {
298321
s.compressThrough(s.Fragment.End)
322+
} else if s.compressor == nil {
323+
// Empty fragment.
324+
return
299325
}
326+
300327
for {
301328
if err != nil {
302329
log.WithField("err", err).Error("failed to finishCompression (will retry)")

broker/fragment/spool_test.go

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package fragment
22

33
import (
4+
"bytes"
45
"errors"
56
"io"
67
"testing"
@@ -117,6 +118,166 @@ func TestCompressionNotPrimary(t *testing.T) {
117118
contentString(t, obv.completes[0], pb.CompressionCodec_GZIP))
118119
}
119120

121+
func TestGzipBatcherMultipleMembers(t *testing.T) {
122+
var origBatchSize = compressionBatchSize
123+
compressionBatchSize = 10
124+
defer func() { compressionBatchSize = origBatchSize }()
125+
126+
var obv testSpoolObserver
127+
var spool = NewSpool("a/journal", &obv)
128+
129+
var resp, err = spool.Apply(newProposal(pb.Fragment{
130+
Journal: "a/journal",
131+
Begin: 0,
132+
End: 0,
133+
CompressionCodec: pb.CompressionCodec_GZIP,
134+
}, regEmpty), true)
135+
require.NoError(t, err)
136+
require.Equal(t, pb.Status_OK, resp.Status)
137+
138+
// Commit some data. The compression batch size has been artificially
139+
// lowered so the first commit is compressed, the second commit is buffered,
140+
// the third commit triggers a second member, the fourth commit triggers
141+
// a third member, and the fifth commit is smaller than compressionBatchSize
142+
// and will not trigger compression until spool completion.
143+
for _, req := range []pb.ReplicateRequest{
144+
{Content: []byte("first write ")},
145+
{Content: []byte("second ")},
146+
{Content: []byte("third ")},
147+
{Content: []byte("fourth write ")},
148+
{Content: []byte("final")},
149+
} {
150+
var resp, err = spool.Apply(&req, true)
151+
require.NoError(t, err)
152+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
153+
154+
resp, err = spool.Apply(newProposal(spool.Next(), regEmpty), true)
155+
require.NoError(t, err)
156+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
157+
}
158+
159+
// Complete the spool.
160+
resp, err = spool.Apply(newProposal(pb.Fragment{
161+
Journal: "a/journal",
162+
Begin: spool.Fragment.End,
163+
End: spool.Fragment.End,
164+
CompressionCodec: pb.CompressionCodec_GZIP,
165+
}, regEmpty), true)
166+
require.NoError(t, err)
167+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
168+
169+
require.Len(t, obv.commits, 5)
170+
require.Len(t, obv.completes, 1)
171+
require.NotNil(t, obv.completes[0].compressedFile)
172+
require.NotEqual(t, int64(0), obv.completes[0].compressedLength)
173+
174+
var expected = "first write second third fourth write final"
175+
var actual = contentString(t, obv.completes[0], pb.CompressionCodec_GZIP)
176+
require.Equal(t, expected, actual)
177+
178+
// Decompress and verify each member.
179+
var parts []string
180+
var compressedData = make([]byte, obv.completes[0].compressedLength)
181+
_, err = obv.completes[0].compressedFile.ReadAt(compressedData, 0)
182+
require.NoError(t, err)
183+
var gzipHeader = []byte{0x1f, 0x8b}
184+
for start := bytes.Index(compressedData, gzipHeader); start != -1; {
185+
var next = bytes.Index(compressedData[start+len(gzipHeader):], gzipHeader)
186+
var end int
187+
if next == -1 {
188+
end = len(compressedData)
189+
} else {
190+
end = start + len(gzipHeader) + next
191+
}
192+
193+
var memberData = compressedData[start:end]
194+
var reader, readerErr = codecs.NewCodecReader(bytes.NewReader(memberData), pb.CompressionCodec_GZIP)
195+
require.NoError(t, readerErr)
196+
var decompressed, readErr = io.ReadAll(reader)
197+
require.NoError(t, readErr)
198+
require.NoError(t, reader.Close())
199+
200+
parts = append(parts, string(decompressed))
201+
if next == -1 {
202+
break
203+
}
204+
start = end
205+
}
206+
207+
require.Len(t, parts, 4)
208+
require.Equal(t, "first write ", parts[0])
209+
require.Equal(t, "second third ", parts[1])
210+
require.Equal(t, "fourth write ", parts[2])
211+
require.Equal(t, "final", parts[3])
212+
213+
// Now test that compression offset tracking works correctly across spool
214+
// rolls by verifying that multiple commits written to the new spool can be
215+
// read.
216+
for _, req := range []pb.ReplicateRequest{
217+
{Content: []byte("abc")},
218+
{Content: []byte("xyz")},
219+
} {
220+
var resp, err = spool.Apply(&req, true)
221+
require.NoError(t, err)
222+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
223+
224+
resp, err = spool.Apply(newProposal(spool.Next(), regEmpty), true)
225+
require.NoError(t, err)
226+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
227+
}
228+
229+
// Complete the second spool.
230+
resp, err = spool.Apply(newProposal(pb.Fragment{
231+
Journal: "a/journal",
232+
Begin: spool.Fragment.End,
233+
End: spool.Fragment.End,
234+
CompressionCodec: pb.CompressionCodec_GZIP,
235+
}, regEmpty), true)
236+
require.NoError(t, err)
237+
require.Equal(t, pb.Status_OK, resp.Status)
238+
239+
require.Len(t, obv.completes, 2)
240+
require.Equal(t, "abcxyz", contentString(t, obv.completes[1], pb.CompressionCodec_GZIP))
241+
242+
// Test case where the first and only write is below the compression
243+
// threshold, which will require initializing compression of the spool in
244+
// finishCompression.
245+
resp, err = spool.Apply(&pb.ReplicateRequest{Content: []byte("small")}, true)
246+
require.NoError(t, err)
247+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
248+
249+
resp, err = spool.Apply(newProposal(spool.Next(), regEmpty), true)
250+
require.NoError(t, err)
251+
require.Equal(t, pb.ReplicateResponse{Status: pb.Status_OK}, resp)
252+
253+
// Complete the third spool.
254+
resp, err = spool.Apply(newProposal(pb.Fragment{
255+
Journal: "a/journal",
256+
Begin: spool.Fragment.End,
257+
End: spool.Fragment.End,
258+
CompressionCodec: pb.CompressionCodec_GZIP,
259+
}, regEmpty), true)
260+
require.NoError(t, err)
261+
require.Equal(t, pb.Status_OK, resp.Status)
262+
263+
require.Len(t, obv.completes, 3)
264+
require.Equal(t, "small", contentString(t, obv.completes[2], pb.CompressionCodec_GZIP))
265+
266+
// Test case where an empty fragment with compression is completed.
267+
// This should not panic due to nil compressor. Empty fragments don't
268+
// trigger completion events (no ContentLength), so completes stays at 3.
269+
resp, err = spool.Apply(newProposal(pb.Fragment{
270+
Journal: "a/journal",
271+
Begin: spool.Fragment.End,
272+
End: spool.Fragment.End,
273+
CompressionCodec: pb.CompressionCodec_GZIP,
274+
}, regEmpty), true)
275+
require.NoError(t, err)
276+
require.Equal(t, pb.Status_OK, resp.Status)
277+
278+
require.Len(t, obv.completes, 3)
279+
}
280+
120281
func TestRejectRollBeforeCurrentEnd(t *testing.T) {
121282
var obv testSpoolObserver
122283
var spool = NewSpool("a/journal", &obv)

0 commit comments

Comments
 (0)