Skip to content

Commit 1928bb1

Browse files
symdb: add WritePartitionV3 function (#3503)
* symdb: add WritePartitionV3 * Update block_writer_v3.go Co-authored-by: Anton Kolesnikov <[email protected]> * add test * fmt --------- Co-authored-by: Anton Kolesnikov <[email protected]>
1 parent 5d10c2a commit 1928bb1

File tree

3 files changed

+118
-34
lines changed

3 files changed

+118
-34
lines changed

pkg/phlaredb/symdb/block_writer_v3.go

Lines changed: 65 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,10 @@ type writerV3 struct {
1919
files []block.File
2020
footer Footer
2121

22+
encodersV3
23+
}
24+
25+
type encodersV3 struct {
2226
stringsEncoder *symbolsEncoder[string]
2327
mappingsEncoder *symbolsEncoder[v1.InMemoryMapping]
2428
functionsEncoder *symbolsEncoder[v1.InMemoryFunction]
@@ -27,22 +31,10 @@ type writerV3 struct {
2731

2832
func newWriterV3(c *Config) *writerV3 {
2933
return &writerV3{
30-
config: c,
31-
index: IndexFile{
32-
Header: IndexHeader{
33-
Magic: symdbMagic,
34-
Version: FormatV3,
35-
},
36-
},
37-
footer: Footer{
38-
Magic: symdbMagic,
39-
Version: FormatV3,
40-
},
41-
42-
stringsEncoder: newStringsEncoder(),
43-
mappingsEncoder: newMappingsEncoder(),
44-
functionsEncoder: newFunctionsEncoder(),
45-
locationsEncoder: newLocationsEncoder(),
34+
config: c,
35+
index: newIndexFileV3(),
36+
footer: newFooterV3(),
37+
encodersV3: newEncodersV3(),
4638
}
4739
}
4840

@@ -58,7 +50,7 @@ func (w *writerV3) writePartitions(partitions []*PartitionWriter) (err error) {
5850
w.files = []block.File{w.dataFile.meta()}
5951
}()
6052
for _, p := range partitions {
61-
if err = writePartitionV3(w, p); err != nil {
53+
if err = writePartitionV3(w.dataFile.w, &w.encodersV3, p); err != nil {
6254
return fmt.Errorf("failed to write partition: %w", err)
6355
}
6456
w.index.PartitionHeaders = append(w.index.PartitionHeaders, &p.header)
@@ -83,17 +75,17 @@ func (w *writerV3) newFile(path string) (f *fileWriter, err error) {
8375
return f, err
8476
}
8577

86-
func writePartitionV3(w *writerV3, p *PartitionWriter) (err error) {
87-
if p.header.V3.Strings, err = writeSymbolsBlock(w.dataFile, p.strings.slice, w.stringsEncoder); err != nil {
78+
func writePartitionV3(w *writerOffset, e *encodersV3, p *PartitionWriter) (err error) {
79+
if p.header.V3.Strings, err = writeSymbolsBlock(w, p.strings.slice, e.stringsEncoder); err != nil {
8880
return err
8981
}
90-
if p.header.V3.Mappings, err = writeSymbolsBlock(w.dataFile, p.mappings.slice, w.mappingsEncoder); err != nil {
82+
if p.header.V3.Mappings, err = writeSymbolsBlock(w, p.mappings.slice, e.mappingsEncoder); err != nil {
9183
return err
9284
}
93-
if p.header.V3.Functions, err = writeSymbolsBlock(w.dataFile, p.functions.slice, w.functionsEncoder); err != nil {
85+
if p.header.V3.Functions, err = writeSymbolsBlock(w, p.functions.slice, e.functionsEncoder); err != nil {
9486
return err
9587
}
96-
if p.header.V3.Locations, err = writeSymbolsBlock(w.dataFile, p.locations.slice, w.locationsEncoder); err != nil {
88+
if p.header.V3.Locations, err = writeSymbolsBlock(w, p.locations.slice, e.locationsEncoder); err != nil {
9789
return err
9890
}
9991
for ci, c := range p.stacktraces.chunks {
@@ -102,7 +94,7 @@ func writePartitionV3(w *writerV3, p *PartitionWriter) (err error) {
10294
stacks = uint32(len(p.stacktraces.hashToIdx))
10395
}
10496
h := StacktraceBlockHeader{
105-
Offset: w.dataFile.w.offset,
97+
Offset: w.offset,
10698
Partition: p.header.Partition,
10799
BlockIndex: uint16(ci),
108100
Encoding: StacktraceEncodingGroupVarint,
@@ -111,7 +103,7 @@ func writePartitionV3(w *writerV3, p *PartitionWriter) (err error) {
111103
StacktraceMaxNodes: c.partition.maxNodesPerChunk,
112104
}
113105
crc := crc32.New(castagnoli)
114-
if h.Size, err = c.WriteTo(io.MultiWriter(crc, w.dataFile)); err != nil {
106+
if h.Size, err = c.WriteTo(io.MultiWriter(crc, w)); err != nil {
115107
return fmt.Errorf("writing stacktrace chunk data: %w", err)
116108
}
117109
h.CRC = crc.Sum32()
@@ -120,18 +112,63 @@ func writePartitionV3(w *writerV3, p *PartitionWriter) (err error) {
120112
return nil
121113
}
122114

123-
func writeSymbolsBlock[T any](w *fileWriter, s []T, e *symbolsEncoder[T]) (h SymbolsBlockHeader, err error) {
124-
h.Offset = uint64(w.w.offset)
115+
func writeSymbolsBlock[T any](w *writerOffset, s []T, e *symbolsEncoder[T]) (h SymbolsBlockHeader, err error) {
116+
h.Offset = uint64(w.offset)
125117
crc := crc32.New(castagnoli)
126-
mw := io.MultiWriter(crc, w.w)
118+
mw := io.MultiWriter(crc, w)
127119
if err = e.encode(mw, s); err != nil {
128120
return h, err
129121
}
130-
h.Size = uint32(w.w.offset) - uint32(h.Offset)
122+
h.Size = uint32(w.offset) - uint32(h.Offset)
131123
h.CRC = crc.Sum32()
132124
h.Length = uint32(len(s))
133125
h.BlockSize = uint32(e.blockSize)
134126
h.BlockHeaderSize = uint16(e.blockEncoder.headerSize())
135127
h.Format = e.blockEncoder.format()
136128
return h, nil
137129
}
130+
131+
func WritePartition(p *PartitionWriter, dst io.Writer) error {
132+
index := newIndexFileV3()
133+
footer := newFooterV3()
134+
encoders := newEncodersV3()
135+
w := withWriterOffset(dst)
136+
137+
if err := writePartitionV3(w, &encoders, p); err != nil {
138+
return fmt.Errorf("failed to write partition: %w", err)
139+
}
140+
index.PartitionHeaders = append(index.PartitionHeaders, &p.header)
141+
footer.IndexOffset = uint64(w.offset)
142+
if _, err := index.WriteTo(w); err != nil {
143+
return fmt.Errorf("failed to write index: %w", err)
144+
}
145+
if _, err := w.Write(footer.MarshalBinary()); err != nil {
146+
return fmt.Errorf("failed to write footer: %w", err)
147+
}
148+
return nil
149+
}
150+
151+
func newEncodersV3() encodersV3 {
152+
return encodersV3{
153+
stringsEncoder: newStringsEncoder(),
154+
mappingsEncoder: newMappingsEncoder(),
155+
functionsEncoder: newFunctionsEncoder(),
156+
locationsEncoder: newLocationsEncoder(),
157+
}
158+
}
159+
160+
func newFooterV3() Footer {
161+
return Footer{
162+
Magic: symdbMagic,
163+
Version: FormatV3,
164+
}
165+
}
166+
167+
func newIndexFileV3() IndexFile {
168+
return IndexFile{
169+
Header: IndexHeader{
170+
Magic: symdbMagic,
171+
Version: FormatV3,
172+
},
173+
}
174+
}

pkg/phlaredb/symdb/symdb.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -188,18 +188,18 @@ func (s *SymDB) PartitionWriter(partition uint64) *PartitionWriter {
188188
s.m.Unlock()
189189
return p
190190
}
191-
p = s.newPartition(partition)
191+
p = NewPartitionWriter(partition, &s.config)
192192
s.partitions[partition] = p
193193
s.m.Unlock()
194194
return p
195195
}
196196

197-
func (s *SymDB) newPartition(partition uint64) *PartitionWriter {
197+
func NewPartitionWriter(partition uint64, config *Config) *PartitionWriter {
198198
p := PartitionWriter{
199199
header: PartitionHeader{Partition: partition},
200-
stacktraces: newStacktracesPartition(s.config.Stacktraces.MaxNodesPerChunk),
200+
stacktraces: newStacktracesPartition(config.Stacktraces.MaxNodesPerChunk),
201201
}
202-
switch s.config.Version {
202+
switch config.Version {
203203
case FormatV2:
204204
p.header.V2 = new(PartitionHeaderV2)
205205
case FormatV3:

pkg/phlaredb/symdb/symdb_test.go

Lines changed: 49 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,17 @@
11
package symdb
22

33
import (
4+
"bytes"
45
"context"
56
"io"
67
"sort"
78
"sync/atomic"
89
"testing"
10+
"time"
11+
12+
phlareobj "github.com/grafana/pyroscope/pkg/objstore"
13+
"github.com/grafana/pyroscope/pkg/objstore/providers/memory"
14+
pprofth "github.com/grafana/pyroscope/pkg/pprof/testhelper"
915

1016
"github.com/cespare/xxhash/v2"
1117
"github.com/stretchr/testify/require"
@@ -101,8 +107,8 @@ func (b *testBucket) GetRange(ctx context.Context, name string, off, length int6
101107
return b.Bucket.GetRange(ctx, name, off, length)
102108
}
103109

104-
func newTestFileWriter(w io.Writer) *fileWriter {
105-
return &fileWriter{w: &writerOffset{Writer: w}}
110+
func newTestFileWriter(w io.Writer) *writerOffset {
111+
return &writerOffset{Writer: w}
106112
}
107113

108114
//nolint:unparam
@@ -180,3 +186,44 @@ func Test_Stats(t *testing.T) {
180186
}
181187
require.Equal(t, expected, actual)
182188
}
189+
190+
func TestWritePartition(t *testing.T) {
191+
p := NewPartitionWriter(0, &Config{
192+
Version: FormatV3,
193+
Stacktraces: StacktracesConfig{
194+
MaxNodesPerChunk: 4 << 20,
195+
},
196+
Parquet: ParquetConfig{
197+
MaxBufferRowCount: 100 << 10,
198+
},
199+
})
200+
profile := pprofth.NewProfileBuilder(time.Now().UnixNano()).
201+
CPUProfile().
202+
WithLabels(phlaremodel.LabelNameServiceName, "svc").
203+
ForStacktraceString("foo", "bar").
204+
AddSamples(1).
205+
ForStacktraceString("qwe", "foo", "bar").
206+
AddSamples(2)
207+
208+
profiles := p.WriteProfileSymbols(profile.Profile)
209+
symdbBlob := bytes.NewBuffer(nil)
210+
err := WritePartition(p, symdbBlob)
211+
require.NoError(t, err)
212+
213+
bucket := phlareobj.NewBucket(memory.NewInMemBucket())
214+
require.NoError(t, bucket.Upload(context.Background(), DefaultFileName, bytes.NewReader(symdbBlob.Bytes())))
215+
reader, err := Open(context.Background(), bucket, testBlockMeta)
216+
require.NoError(t, err)
217+
218+
r := NewResolver(context.Background(), reader)
219+
defer r.Release()
220+
r.AddSamples(0, profiles[0].Samples)
221+
resolved, err := r.Tree()
222+
require.NoError(t, err)
223+
expected := `.
224+
└── bar: self 0 total 3
225+
└── foo: self 1 total 3
226+
└── qwe: self 2 total 2
227+
`
228+
require.Equal(t, expected, resolved.String())
229+
}

0 commit comments

Comments
 (0)