diff --git a/docs/sources/configure-server/reference-configuration-parameters/index.md b/docs/sources/configure-server/reference-configuration-parameters/index.md index 81d81ea205..c1e000c269 100644 --- a/docs/sources/configure-server/reference-configuration-parameters/index.md +++ b/docs/sources/configure-server/reference-configuration-parameters/index.md @@ -136,6 +136,14 @@ pyroscopedb: # CLI flag: -pyroscopedb.retention-policy-disable [disable_enforcement: | default = false] + # Compression algorithm for saving to disk, default no compression. Available algorithms: gzip,zstd,lz4,snappy. + # CLI flag: -pyroscopedb.compression-algo + [compression_algo: | default = ""] + + # Compression level, default 0 (0 means default level). See https://github.com/klauspost/compress for the level number. + # CLI flag: -pyroscopedb.compression-level + [compression_level: | default = 0] + tracing: # Set to false to disable tracing. # CLI flag: -tracing.enabled diff --git a/pkg/phlaredb/compression_test.go b/pkg/phlaredb/compression_test.go new file mode 100644 index 0000000000..20ce378275 --- /dev/null +++ b/pkg/phlaredb/compression_test.go @@ -0,0 +1,269 @@ +package phlaredb + +import ( + "fmt" + "os" + "testing" + "time" + + profilev1 "github.com/grafana/pyroscope/api/gen/proto/go/google/v1" + "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/compress" + "github.com/parquet-go/parquet-go/compress/gzip" + "github.com/parquet-go/parquet-go/compress/lz4" + "github.com/parquet-go/parquet-go/compress/snappy" + "github.com/parquet-go/parquet-go/compress/zstd" + "github.com/stretchr/testify/require" +) + +var ( + testLocationFile = "testdata/01HHYG6245NWHZWVP27V8WJRT7/symbols/locations.parquet" +) + +// TestLocationCompressionFunctionality tests the correctness of compression functionality +func TestLocationCompressionFunctionality(t *testing.T) { + // Input file path + // Check if input file exists + if _, err := os.Stat(testLocationFile); os.IsNotExist(err) { + t.Skipf("Input file does not exist: %s", testLocationFile) + return + } + + // Read original data + originalRows, err := parquet.ReadFile[profilev1.Location](testLocationFile) + require.NoError(t, err) + originalRowCount := len(originalRows) + t.Logf("Original data row count: %d", originalRowCount) + + // Get original file size + srcInfo, err := os.Stat(testLocationFile) + require.NoError(t, err) + originalSize := srcInfo.Size() + t.Logf("Original file size: %d bytes", originalSize) + + // Test functionality correctness of different compression algorithms + compressionTests := []struct { + name string + codec compress.Codec + }{ + {"GZIP Best Compression", &gzip.Codec{Level: gzip.BestCompression}}, + {"GZIP Default Compression", &gzip.Codec{Level: gzip.DefaultCompression}}, + {"ZSTD Default Compression", &zstd.Codec{Level: zstd.DefaultLevel}}, + {"Snappy Compression", &snappy.Codec{}}, + {"LZ4 Compression", &lz4.Codec{Level: lz4.DefaultLevel}}, + } + + for _, test := range compressionTests { + t.Run(test.name, func(t *testing.T) { + // Create temporary file + tempFile, err := os.CreateTemp("", "locations_func_test_*.parquet") + require.NoError(t, err) + outputFile := tempFile.Name() + tempFile.Close() + defer os.Remove(outputFile) // Clean up file after test completion + + // Create compressed file + destFile, err := os.Create(outputFile) + require.NoError(t, err) + defer destFile.Close() + + // Configure compression options + options := []parquet.WriterOption{ + parquet.Compression(test.codec), + } + writer := parquet.NewGenericWriter[profilev1.Location](destFile, options...) + + // Write compressed data + _, err = writer.Write(originalRows) + require.NoError(t, err) + err = writer.Close() + require.NoError(t, err) + destFile.Close() + + // Verify compressed file can be read correctly + compressedRows, err := parquet.ReadFile[profilev1.Location](outputFile) + require.NoError(t, err) + require.Equal(t, originalRowCount, len(compressedRows), "Row count after compression should match original data") + // Verify data content consistency (sample check of first few rows) + sampleSize := 10 + if originalRowCount < sampleSize { + sampleSize = originalRowCount + } + for i := 0; i < sampleSize; i++ { + require.Equal(t, originalRows[i].Id, compressedRows[i].Id, + "ID of row %d should be the same", i) + require.Equal(t, originalRows[i].MappingId, compressedRows[i].MappingId, + "MappingId of row %d should be the same", i) + require.Equal(t, originalRows[i].Address, compressedRows[i].Address, + "Address of row %d should be the same", i) + } + + // Check compression effectiveness + compressedInfo, err := os.Stat(outputFile) + require.NoError(t, err) + compressedSize := compressedInfo.Size() + compressionRatio := float64(compressedSize) / float64(originalSize) + t.Logf("%s: compressed size=%d bytes, compression ratio=%.2f%%", + test.name, compressedSize, compressionRatio*100) + + // Ensure compression actually reduces file size (unless original file is very small) + if originalSize > 1024 { // If original file is larger than 1KB + require.Less(t, compressedSize, originalSize, "Compression should reduce file size") + } + }) + } + t.Log("All compression algorithms functionality tests passed") +} + +// CompressionResult stores the results of compression testing +type CompressionResult struct { + Algorithm string + CompressedSize int64 + CompressionTime time.Duration + DecompressionTime time.Duration + CompressionRatio float64 +} + +// TestLocationCompressionPerformance tests the performance of compression algorithms +func TestLocationCompressionPerformance(t *testing.T) { + // Check if input file exists + if _, err := os.Stat(testLocationFile); os.IsNotExist(err) { + t.Skipf("Input file does not exist: %s", testLocationFile) + return + } + // Read data + rows, err := parquet.ReadFile[profilev1.Location](testLocationFile) + require.NoError(t, err) + numRows := len(rows) + t.Logf("Test data row count: %d", numRows) + + // Get source file size + srcInfo, err := os.Stat(testLocationFile) + require.NoError(t, err) + srcSize := srcInfo.Size() + t.Logf("Source file size: %d bytes", srcSize) + + // Define compression algorithms and levels for performance testing + performanceTests := []struct { + name string + codec compress.Codec + }{ + {"No Compression", nil}, + {"Snappy", &snappy.Codec{}}, + {"LZ4 Fastest", &lz4.Codec{Level: lz4.Fastest}}, + {"LZ4 Default", &lz4.Codec{Level: lz4.DefaultLevel}}, + {"LZ4 Best", &lz4.Codec{Level: lz4.Level9}}, + {"GZIP Fastest", &gzip.Codec{Level: gzip.BestSpeed}}, + {"GZIP Default", &gzip.Codec{Level: gzip.DefaultCompression}}, + {"GZIP Best", &gzip.Codec{Level: gzip.BestCompression}}, + {"ZSTD Fastest", &zstd.Codec{Level: zstd.SpeedFastest}}, + {"ZSTD Default", &zstd.Codec{Level: zstd.DefaultLevel}}, + {"ZSTD Best", &zstd.Codec{Level: zstd.SpeedBestCompression}}, + } + + // Store results + var results []CompressionResult + + // Performance testing parameters + const iterations = 10 // Number of runs for each algorithm + // Performance test for each compression algorithm + for _, test := range performanceTests { + t.Run(test.name+"_Performance", func(t *testing.T) { + var totalCompressTime time.Duration + var totalDecompressTime time.Duration + var totalCompressedSize int64 + for i := 0; i < iterations; i++ { + // Create temporary file + tempFile, err := os.CreateTemp("", "locations_perf_test_*.parquet") + require.NoError(t, err) + outputFile := tempFile.Name() + tempFile.Close() + defer os.Remove(outputFile) // Ensure cleanup of temporary files + + // Measure compression time and file size + compressStart := time.Now() + compressedSize, err := compressWithAlgorithm(rows, outputFile, test.codec) + compressTime := time.Since(compressStart) + require.NoError(t, err) + + // Measure decompression time + decompressStart := time.Now() + _, err = parquet.ReadFile[profilev1.Location](outputFile) + decompressTime := time.Since(decompressStart) + require.NoError(t, err) + + // Accumulate results + totalCompressTime += compressTime + totalDecompressTime += decompressTime + totalCompressedSize += compressedSize + } + // Calculate averages + avgCompressTime := totalCompressTime / iterations + avgDecompressTime := totalDecompressTime / iterations + avgCompressedSize := totalCompressedSize / iterations + avgRatio := float64(avgCompressedSize) / float64(srcSize) + + // Record results + result := CompressionResult{ + Algorithm: test.name, + CompressedSize: avgCompressedSize, + CompressionTime: avgCompressTime, + DecompressionTime: avgDecompressTime, + CompressionRatio: avgRatio, + } + results = append(results, result) + + t.Logf("Algorithm: %s, avg compressed size: %d bytes, compression ratio: %.2f%%, avg compression time: %v, avg decompression time: %v", + test.name, avgCompressedSize, avgRatio*100, avgCompressTime, avgDecompressTime) + }) + } + + // Output complete performance comparison table + t.Log("\n=== Compression Algorithm Performance Comparison Results ===") + fmt.Printf("\n%-15s | %-12s | %-10s | %-12s | %-12s\n", + "Algorithm", "Size(KB)", "Ratio(%)", "Compress Time", "Decompress Time") + fmt.Printf("%-15s-+-%-12s-+-%-10s-+-%-12s-+-%-12s\n", + "---------------", "------------", "----------", "------------", "------------") + + for _, result := range results { + fmt.Printf("%-15s | %-12.1f | %-10.2f | %-12v | %-12v\n", + result.Algorithm, + float64(result.CompressedSize)/1024, + result.CompressionRatio*100, + result.CompressionTime, + result.DecompressionTime) + } +} + +// compressWithAlgorithm compresses data using specified algorithm and returns file size +func compressWithAlgorithm(rows []profilev1.Location, outputFile string, codec compress.Codec) (int64, error) { + destFile, err := os.Create(outputFile) + if err != nil { + return 0, err + } + defer destFile.Close() + + // Create writer options + var options []parquet.WriterOption + if codec != nil { + options = append(options, parquet.Compression(codec)) + } + + // Write compressed data + writer := parquet.NewGenericWriter[profilev1.Location](destFile, options...) + _, err = writer.Write(rows) + if err != nil { + return 0, err + } + err = writer.Close() + if err != nil { + return 0, err + } + + // Get file size + info, err := os.Stat(outputFile) + if err != nil { + return 0, err + } + return info.Size(), nil +} diff --git a/pkg/phlaredb/head.go b/pkg/phlaredb/head.go index c861cb85af..47bf6f6d96 100644 --- a/pkg/phlaredb/head.go +++ b/pkg/phlaredb/head.go @@ -115,7 +115,7 @@ func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Hea } // create profile store - h.profiles = newProfileStore(phlarectx) + h.profiles = newProfileStore(phlarectx, cfg) h.delta = newDeltaProfiles() h.tables = []Table{ h.profiles, @@ -135,6 +135,8 @@ func NewHead(phlarectx context.Context, cfg Config, limiter TenantLimiter) (*Hea symdbConfig.Dir = filepath.Join(h.headPath, symdb.DefaultDirName) symdbConfig.Parquet = symdb.ParquetConfig{ MaxBufferRowCount: h.parquetConfig.MaxBufferRowCount, + CompressionAlgo: cfg.CompressionAlgo, + CompressionLevel: cfg.CompressionLevel, } } diff --git a/pkg/phlaredb/phlaredb.go b/pkg/phlaredb/phlaredb.go index 98c457d21b..9d1a8296fb 100644 --- a/pkg/phlaredb/phlaredb.go +++ b/pkg/phlaredb/phlaredb.go @@ -60,6 +60,8 @@ type Config struct { MinDiskAvailablePercentage float64 `yaml:"min_disk_available_percentage"` EnforcementInterval time.Duration `yaml:"enforcement_interval"` DisableEnforcement bool `yaml:"disable_enforcement"` + CompressionAlgo string `yaml:"compression_algo"` + CompressionLevel int `yaml:"compression_level"` } type ParquetConfig struct { @@ -77,6 +79,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.Float64Var(&cfg.MinDiskAvailablePercentage, "pyroscopedb.retention-policy-min-disk-available-percentage", DefaultMinDiskAvailablePercentage, "Which percentage of free disk space to keep") f.DurationVar(&cfg.EnforcementInterval, "pyroscopedb.retention-policy-enforcement-interval", DefaultRetentionPolicyEnforcementInterval, "How often to enforce disk retention") f.BoolVar(&cfg.DisableEnforcement, "pyroscopedb.retention-policy-disable", false, "Disable retention policy enforcement") + f.StringVar(&cfg.CompressionAlgo, "pyroscopedb.compression-algo", "", "Compress algorithm for saving to disk") + f.IntVar(&cfg.CompressionLevel, "pyroscopedb.compression-level", 0, "Compress level") } type TenantLimiter interface { diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index 06cea6ee3e..583f37b08c 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -23,6 +23,7 @@ import ( "github.com/grafana/pyroscope/pkg/phlaredb/query" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" phlarecontext "github.com/grafana/pyroscope/pkg/pyroscope/context" + "github.com/grafana/pyroscope/pkg/util" "github.com/grafana/pyroscope/pkg/util/build" ) @@ -76,7 +77,7 @@ func newParquetProfileWriter(writer io.Writer, options ...parquet.WriterOption) ) } -func newProfileStore(phlarectx context.Context) *profileStore { +func newProfileStore(phlarectx context.Context, cfg Config) *profileStore { s := &profileStore{ logger: phlarecontext.Logger(phlarectx), metrics: contextHeadMetrics(phlarectx), @@ -88,7 +89,11 @@ func newProfileStore(phlarectx context.Context) *profileStore { go s.cutRowGroupLoop() // Initialize writer on /dev/null // TODO: Reuse parquet.Writer beyond life time of the head. - s.writer = newParquetProfileWriter(io.Discard) + var opts []parquet.WriterOption + if o, err := util.ParseCompressionOpt(cfg.CompressionAlgo, cfg.CompressionLevel); err == nil { + opts = append(opts, o) + } + s.writer = newParquetProfileWriter(io.Discard, opts...) return s } diff --git a/pkg/phlaredb/profile_store_test.go b/pkg/phlaredb/profile_store_test.go index fb432f5895..c8cca0fae2 100644 --- a/pkg/phlaredb/profile_store_test.go +++ b/pkg/phlaredb/profile_store_test.go @@ -191,7 +191,7 @@ func readFullParquetFile[M any](t *testing.T, path string) ([]M, uint64) { func TestProfileStore_RowGroupSplitting(t *testing.T) { var ( ctx = testContext(t) - store = newProfileStore(ctx) + store = newProfileStore(ctx, Config{}) ) for _, tc := range []struct { @@ -295,7 +295,7 @@ func threeProfileStreams(i int) *testProfile { func TestProfileStore_Ingestion_SeriesIndexes(t *testing.T) { var ( ctx = testContext(t) - store = newProfileStore(ctx) + store = newProfileStore(ctx, Config{}) ) path := t.TempDir() require.NoError(t, store.Init(path, defaultParquetConfig, newHeadMetrics(prometheus.NewRegistry()))) @@ -339,7 +339,7 @@ func BenchmarkFlush(b *testing.B) { for i := 0; i < b.N; i++ { path := b.TempDir() - store := newProfileStore(ctx) + store := newProfileStore(ctx, Config{}) require.NoError(b, store.Init(path, defaultParquetConfig, metrics)) for rg := 0; rg < 10; rg++ { for i := 0; i < 10^6; i++ { @@ -597,7 +597,7 @@ func TestProfileStore_Querying(t *testing.T) { } func TestRemoveFailedSegment(t *testing.T) { - store := newProfileStore(testContext(t)) + store := newProfileStore(testContext(t), Config{}) dir := t.TempDir() require.NoError(t, store.Init(dir, defaultParquetConfig, contextHeadMetrics(context.Background()))) // fake a failed segment diff --git a/pkg/phlaredb/symdb/block_writer_v2.go b/pkg/phlaredb/symdb/block_writer_v2.go index d3d7bb167e..4489b001cc 100644 --- a/pkg/phlaredb/symdb/block_writer_v2.go +++ b/pkg/phlaredb/symdb/block_writer_v2.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/pyroscope/pkg/phlaredb/block" schemav1 "github.com/grafana/pyroscope/pkg/phlaredb/schemas/v1" + "github.com/grafana/pyroscope/pkg/util" "github.com/grafana/pyroscope/pkg/util/build" ) @@ -211,10 +212,15 @@ func (s *parquetWriter[M, P]) init(dir string, c ParquetConfig) (err error) { } s.rowsBatch = make([]parquet.Row, 0, 128) s.buffer = parquet.NewBuffer(s.persister.Schema()) - s.writer = parquet.NewGenericWriter[P](s.file, s.persister.Schema(), + var opts []parquet.WriterOption + opts = append(opts, s.persister.Schema(), parquet.CreatedBy("github.com/grafana/pyroscope/", build.Version, build.Revision), parquet.PageBufferSize(3*1024*1024), ) + if o, err := util.ParseCompressionOpt(c.CompressionAlgo, c.CompressionLevel); err == nil { + opts = append(opts, o) + } + s.writer = parquet.NewGenericWriter[P](s.file, opts...) return nil } diff --git a/pkg/phlaredb/symdb/symdb.go b/pkg/phlaredb/symdb/symdb.go index 84f276d362..60e3efed1a 100644 --- a/pkg/phlaredb/symdb/symdb.go +++ b/pkg/phlaredb/symdb/symdb.go @@ -120,6 +120,8 @@ type ParquetConfig struct { // DEPRECATED: the parameter is not used and // will be removed in the future versions. MaxBufferRowCount int + CompressionAlgo string + CompressionLevel int } type MemoryStats struct { diff --git a/pkg/util/compression.go b/pkg/util/compression.go new file mode 100644 index 0000000000..1c12000f24 --- /dev/null +++ b/pkg/util/compression.go @@ -0,0 +1,36 @@ +package util + +import ( + "fmt" + + "github.com/parquet-go/parquet-go" + "github.com/parquet-go/parquet-go/compress/gzip" + "github.com/parquet-go/parquet-go/compress/lz4" + "github.com/parquet-go/parquet-go/compress/snappy" + "github.com/parquet-go/parquet-go/compress/zstd" +) + +// ParseCompressionOpt parse parquet compression option +func ParseCompressionOpt(algo string, level int) (parquet.WriterOption, error) { + switch algo { + case "gzip": + if level == 0 { + level = gzip.DefaultCompression + } + return parquet.Compression(&gzip.Codec{Level: level}), nil + case "zstd": + if level == 0 { + level = int(zstd.DefaultLevel) + } + return parquet.Compression(&zstd.Codec{Level: zstd.Level(level)}), nil + case "lz4": + if level == 0 { + level = int(lz4.DefaultLevel) + } + return parquet.Compression(&lz4.Codec{Level: lz4.Level(level)}), nil + case "snaapy": + return parquet.Compression(&snappy.Codec{}), nil + default: + return nil, fmt.Errorf("unknown compression algorithm: %s", algo) + } +}