Skip to content

Commit 6854674

Browse files
nsrip-ddfelixge
andauthored
fix(profiler): reduce memory usage for compression (#4058)
Co-authored-by: felixge <[email protected]> Co-authored-by: nick.ripley <[email protected]>
1 parent abd2718 commit 6854674

File tree

4 files changed

+116
-50
lines changed

4 files changed

+116
-50
lines changed

profiler/compression.go

Lines changed: 69 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -137,9 +137,29 @@ func getZstdLevelOrDefault(level int) zstd.EncoderLevel {
137137
return zstd.SpeedDefault
138138
}
139139

140-
// newCompressionPipeline returns a compressor that converts the data written to
141-
// it from the expected input compression to the given output compression.
142-
func newCompressionPipeline(in compression, out compression) (compressor, error) {
140+
type compressionPipelineBuilder struct {
141+
zstdEncoders map[zstd.EncoderLevel]*sharedZstdEncoder
142+
}
143+
144+
func (b *compressionPipelineBuilder) getZstdEncoder(level zstd.EncoderLevel) (*sharedZstdEncoder, error) {
145+
if b.zstdEncoders == nil {
146+
b.zstdEncoders = make(map[zstd.EncoderLevel]*sharedZstdEncoder)
147+
}
148+
encoder, ok := b.zstdEncoders[level]
149+
if !ok {
150+
var err error
151+
encoder, err = newSharedZstdEncoder(level)
152+
if err != nil {
153+
return nil, err
154+
}
155+
b.zstdEncoders[level] = encoder
156+
}
157+
return encoder, nil
158+
}
159+
160+
// Build returns a compressor that converts the data written to it from the
161+
// expected input compression to the given output compression.
162+
func (b *compressionPipelineBuilder) Build(in compression, out compression) (compressor, error) {
143163
if in == out {
144164
return newPassthroughCompressor(), nil
145165
}
@@ -149,11 +169,15 @@ func newCompressionPipeline(in compression, out compression) (compressor, error)
149169
}
150170

151171
if in == noCompression && out.algorithm == compressionAlgorithmZstd {
152-
return zstd.NewWriter(nil, zstd.WithEncoderLevel(getZstdLevelOrDefault(out.level)))
172+
return b.getZstdEncoder(getZstdLevelOrDefault(out.level))
153173
}
154174

155175
if in.algorithm == compressionAlgorithmGzip && out.algorithm == compressionAlgorithmZstd {
156-
return newZstdRecompressor(getZstdLevelOrDefault(out.level))
176+
encoder, err := b.getZstdEncoder(getZstdLevelOrDefault(out.level))
177+
if err != nil {
178+
return nil, err
179+
}
180+
return newZstdRecompressor(encoder), nil
157181
}
158182

159183
return nil, fmt.Errorf("unsupported recompression: %s -> %s", in, out)
@@ -164,8 +188,11 @@ func newCompressionPipeline(in compression, out compression) (compressor, error)
164188
// the data from one format and then re-compresses it into another format.
165189
type compressor interface {
166190
io.Writer
167-
io.Closer
191+
// Reset resets the compressor to the given writer. It may also acquire a
192+
// shared underlying resource, so callers must always call Close().
168193
Reset(w io.Writer)
194+
// Close closes the compressor and releases any shared underlying resource.
195+
Close() error
169196
}
170197

171198
// newPassthroughCompressor returns a compressor that simply passes all data
@@ -186,21 +213,16 @@ func (r *passthroughCompressor) Close() error {
186213
return nil
187214
}
188215

189-
func newZstdRecompressor(level zstd.EncoderLevel) (*zstdRecompressor, error) {
190-
zstdOut, err := zstd.NewWriter(io.Discard, zstd.WithEncoderLevel(level))
191-
if err != nil {
192-
return nil, err
193-
}
194-
return &zstdRecompressor{zstdOut: zstdOut, err: make(chan error)}, nil
216+
func newZstdRecompressor(encoder *sharedZstdEncoder) *zstdRecompressor {
217+
return &zstdRecompressor{zstdOut: encoder, err: make(chan error)}
195218
}
196219

197220
type zstdRecompressor struct {
198221
// err synchronizes finishing writes after closing pw and reports any
199222
// error during recompression
200223
err chan error
201224
pw io.WriteCloser
202-
zstdOut *zstd.Encoder
203-
level zstd.EncoderLevel
225+
zstdOut *sharedZstdEncoder
204226
}
205227

206228
func (r *zstdRecompressor) Reset(w io.Writer) {
@@ -227,3 +249,36 @@ func (r *zstdRecompressor) Close() error {
227249
err := <-r.err
228250
return cmp.Or(err, r.zstdOut.Close())
229251
}
252+
253+
// newSharedZstdEncoder creates a new shared Zstd encoder with the given level.
254+
// It expects the Reset and Close method to be used in an acquire and release
255+
// fashion.
256+
func newSharedZstdEncoder(level zstd.EncoderLevel) (*sharedZstdEncoder, error) {
257+
encoder, err := zstd.NewWriter(nil, zstd.WithEncoderLevel(level))
258+
if err != nil {
259+
return nil, err
260+
}
261+
return &sharedZstdEncoder{encoder: encoder, sema: make(chan struct{}, 1)}, nil
262+
}
263+
264+
type sharedZstdEncoder struct {
265+
encoder *zstd.Encoder
266+
sema chan struct{}
267+
}
268+
269+
// Reset acquires the semaphore and resets the encoder to the given writer.
270+
func (s *sharedZstdEncoder) Reset(w io.Writer) {
271+
s.sema <- struct{}{}
272+
s.encoder.Reset(w)
273+
}
274+
275+
func (s *sharedZstdEncoder) Write(p []byte) (int, error) {
276+
return s.encoder.Write(p)
277+
}
278+
279+
// Close releases the semaphore and closes the encoder.
280+
func (s *sharedZstdEncoder) Close() error {
281+
err := s.encoder.Close()
282+
<-s.sema
283+
return err
284+
}

profiler/compression_test.go

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ func TestNewCompressionPipeline(t *testing.T) {
4343

4444
for _, test := range tests {
4545
t.Run(fmt.Sprintf("%s->%s", test.in, test.out), func(t *testing.T) {
46-
pipeline, err := newCompressionPipeline(test.in, test.out)
46+
var pipelineBuilder compressionPipelineBuilder
47+
pipeline, err := pipelineBuilder.Build(test.in, test.out)
4748
require.NoError(t, err)
4849
buf := &bytes.Buffer{}
4950
pipeline.Reset(buf)
@@ -172,8 +173,13 @@ func BenchmarkRecompression(b *testing.B) {
172173
b.Run(fmt.Sprintf("%s-%s", in.inAlg.String(), in.outLevel), func(b *testing.B) {
173174
data := compressData(b, inputdata, in.inAlg)
174175
b.ResetTimer()
176+
var pipelineBuilder compressionPipelineBuilder
175177
for i := 0; i < b.N; i++ {
176-
z := &zstdRecompressor{level: in.outLevel}
178+
encoder, err := pipelineBuilder.getZstdEncoder(in.outLevel)
179+
if err != nil {
180+
b.Fatal(err)
181+
}
182+
z := newZstdRecompressor(encoder)
177183
z.Reset(io.Discard)
178184
if _, err := z.Write(data); err != nil {
179185
b.Fatal(err)

profiler/profile.go

Lines changed: 37 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -89,6 +89,7 @@ var profileTypes = map[ProfileType]profileType{
8989
Filename: "cpu.pprof",
9090
Collect: func(p *profiler) ([]byte, error) {
9191
var buf bytes.Buffer
92+
var outBuf bytes.Buffer
9293
// Start the CPU profiler at the end of the profiling
9394
// period so that we're sure to capture the CPU usage of
9495
// this library, which mostly happens at the end
@@ -101,9 +102,7 @@ var profileTypes = map[ProfileType]profileType{
101102
runtime.SetCPUProfileRate(p.cfg.cpuProfileRate)
102103
}
103104

104-
compressor := p.compressors[CPUProfile]
105-
compressor.Reset(&buf)
106-
if err := p.startCPUProfile(compressor); err != nil {
105+
if err := p.startCPUProfile(&outBuf); err != nil {
107106
return nil, err
108107
}
109108
p.interruptibleSleep(p.cfg.cpuDuration)
@@ -113,10 +112,12 @@ var profileTypes = map[ProfileType]profileType{
113112
// the other profile types
114113
p.pendingProfiles.Wait()
115114
p.stopCPUProfile()
116-
if err := compressor.Close(); err != nil {
117-
return nil, err
118-
}
119-
return buf.Bytes(), nil
115+
116+
c := p.compressors[CPUProfile]
117+
c.Reset(&buf)
118+
_, writeErr := outBuf.WriteTo(c)
119+
closeErr := c.Close()
120+
return buf.Bytes(), cmp.Or(writeErr, closeErr)
120121
},
121122
},
122123
// HeapProfile is complex due to how the Go runtime exposes it. It contains 4
@@ -175,10 +176,10 @@ var profileTypes = map[ProfileType]profileType{
175176
return nil, err
176177
}
177178

178-
compressor := p.compressors[expGoroutineWaitProfile]
179-
compressor.Reset(pprof)
180-
err := goroutineDebug2ToPprof(text, compressor, now)
181-
err = cmp.Or(err, compressor.Close())
179+
c := p.compressors[expGoroutineWaitProfile]
180+
c.Reset(pprof)
181+
err := goroutineDebug2ToPprof(text, c, now)
182+
err = cmp.Or(err, c.Close())
182183
return pprof.Bytes(), err
183184
},
184185
},
@@ -187,11 +188,11 @@ var profileTypes = map[ProfileType]profileType{
187188
Filename: "metrics.json",
188189
Collect: func(p *profiler) ([]byte, error) {
189190
var buf bytes.Buffer
190-
compressor := p.compressors[MetricsProfile]
191-
compressor.Reset(&buf)
191+
c := p.compressors[MetricsProfile]
192+
c.Reset(&buf)
192193
interrupted := p.interruptibleSleep(p.cfg.period)
193-
err := p.met.report(now(), compressor)
194-
err = cmp.Or(err, compressor.Close())
194+
err := p.met.report(now(), c)
195+
err = cmp.Or(err, c.Close())
195196
if err != nil && interrupted {
196197
err = errProfilerStopped
197198
}
@@ -204,9 +205,8 @@ var profileTypes = map[ProfileType]profileType{
204205
Collect: func(p *profiler) ([]byte, error) {
205206
p.lastTrace = time.Now()
206207
buf := new(bytes.Buffer)
207-
compressor := p.compressors[executionTrace]
208-
compressor.Reset(buf)
209-
lt := newLimitedTraceCollector(compressor, int64(p.cfg.traceConfig.Limit))
208+
outBuf := new(bytes.Buffer)
209+
lt := newLimitedTraceCollector(outBuf, int64(p.cfg.traceConfig.Limit))
210210
if err := trace.Start(lt); err != nil {
211211
return nil, err
212212
}
@@ -217,10 +217,12 @@ var profileTypes = map[ProfileType]profileType{
217217
case <-lt.done: // The trace size limit was exceeded
218218
}
219219
trace.Stop()
220-
if err := compressor.Close(); err != nil {
221-
return nil, err
222-
}
223-
return buf.Bytes(), nil
220+
221+
c := p.compressors[executionTrace]
222+
c.Reset(buf)
223+
_, writeErr := outBuf.WriteTo(c)
224+
closeErr := c.Close()
225+
return buf.Bytes(), cmp.Or(writeErr, closeErr)
224226
},
225227
},
226228
}
@@ -284,10 +286,10 @@ func collectGenericProfile(name string, pt ProfileType) func(p *profiler) ([]byt
284286
var buf bytes.Buffer
285287
dp, ok := p.deltas[pt]
286288
if !ok || !p.cfg.deltaProfiles {
287-
compressor := p.compressors[pt]
288-
compressor.Reset(&buf)
289-
err := p.lookupProfile(name, compressor, 0)
290-
err = cmp.Or(err, compressor.Close())
289+
c := p.compressors[pt]
290+
c.Reset(&buf)
291+
err := p.lookupProfile(name, c, 0)
292+
err = cmp.Or(err, c.Close())
291293
return buf.Bytes(), err
292294
}
293295

@@ -435,13 +437,15 @@ func (fdp *fastDeltaProfiler) Delta(data []byte) (b []byte, err error) {
435437
}
436438

437439
fdp.buf.Reset()
438-
fdp.compressor.Reset(&fdp.buf)
439-
440-
if err = fdp.dc.Delta(data, fdp.compressor); err != nil {
441-
return nil, fmt.Errorf("error computing delta: %s", err.Error())
442-
}
443-
if err = fdp.compressor.Close(); err != nil {
444-
return nil, fmt.Errorf("error flushing gzip writer: %s", err.Error())
440+
c := fdp.compressor
441+
c.Reset(&fdp.buf)
442+
443+
deltaErr := fdp.dc.Delta(data, c)
444+
closeErr := c.Close()
445+
if deltaErr != nil {
446+
return nil, fmt.Errorf("error computing delta: %w", deltaErr)
447+
} else if closeErr != nil {
448+
return nil, fmt.Errorf("error flushing compressor: %w", closeErr)
445449
}
446450
// The returned slice will be retained in case the profile upload fails,
447451
// so we need to return a copy of the buffer's bytes to avoid a data

profiler/profiler.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -259,10 +259,11 @@ func newProfiler(opts ...Option) (*profiler, error) {
259259
if p.cfg.traceConfig.Enabled {
260260
types = append(types, executionTrace)
261261
}
262+
var pipelineBuilder compressionPipelineBuilder
262263
for _, pt := range types {
263264
isDelta := p.cfg.deltaProfiles && len(profileTypes[pt].DeltaValues) > 0
264265
in, out := compressionStrategy(pt, isDelta, p.cfg.compressionConfig)
265-
compressor, err := newCompressionPipeline(in, out)
266+
compressor, err := pipelineBuilder.Build(in, out)
266267
if err != nil {
267268
return nil, err
268269
}

0 commit comments

Comments
 (0)