Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions pkg/profilestore/profilecolumnstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,20 @@
import (
"bytes"
"context"
"encoding/base64"
"fmt"
"strings"
"sync"
"time"

"github.com/apache/arrow-go/v18/arrow"
"github.com/apache/arrow-go/v18/arrow/ipc"
"github.com/apache/arrow-go/v18/arrow/memory"
"github.com/go-kit/log"
"github.com/gogo/status"
"github.com/polarsignals/frostdb/dynparquet"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/promql/parser"
"go.opentelemetry.io/otel/trace"
otelgrpcprofilingpb "go.opentelemetry.io/proto/otlp/collector/profiles/v1development"
"google.golang.org/grpc/codes"
Expand All @@ -37,6 +40,7 @@
profilestorepb "github.com/parca-dev/parca/gen/proto/go/parca/profilestore/v1alpha1"
"github.com/parca-dev/parca/pkg/ingester"
"github.com/parca-dev/parca/pkg/normalizer"
"github.com/parca-dev/parca/pkg/profile"
)

type agent struct {
Expand Down Expand Up @@ -109,9 +113,77 @@
return nil
}

for rowIdx := 0; rowIdx < int(r.NumRows()); rowIdx++ {
profileType, err := getProfileTypeString(r, rowIdx)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function searches for the producer column on every iteration, let’s only find the column once and then on every iteration do this validation.

if err != nil {
return fmt.Errorf("failed to get profile type at row %d: %v", rowIdx, err)
}

// Validate profile type by trying to parse it as a PromQL selector.
queryStr := fmt.Sprintf("%s{}", profileType)
if _, parseErr := parser.ParseMetricSelector(queryStr); parseErr != nil {
return fmt.Errorf("invalid profile type at row %d (%s): %v", rowIdx, profileType, parseErr)
}
}

return s.ingester.Ingest(ctx, r)
}

func getProfileTypeString(r arrow.Record, rowIdx int) (string, error) {

Check failure on line 132 in pkg/profilestore/profilecolumnstore.go

View workflow job for this annotation

GitHub Actions / Go Lint

SA1019: arrow.Record is deprecated: This is deprecated to avoid the confusion of the terminology where Record refers to a single row, use [RecordBatch] instead. (staticcheck)
schema := r.Schema()

nameIdx := schema.FieldIndices(profile.ColumnName)
sampleTypeIdx := schema.FieldIndices(profile.ColumnSampleType)
sampleUnitIdx := schema.FieldIndices(profile.ColumnSampleUnit)
periodTypeIdx := schema.FieldIndices(profile.ColumnPeriodType)
periodUnitIdx := schema.FieldIndices(profile.ColumnPeriodUnit)
durationIdx := schema.FieldIndices(profile.ColumnDuration)

nameEncoded := r.Column(nameIdx[0]).ValueStr(rowIdx)
sampleTypeEncoded := r.Column(sampleTypeIdx[0]).ValueStr(rowIdx)
sampleUnitEncoded := r.Column(sampleUnitIdx[0]).ValueStr(rowIdx)
periodTypeEncoded := r.Column(periodTypeIdx[0]).ValueStr(rowIdx)
periodUnitEncoded := r.Column(periodUnitIdx[0]).ValueStr(rowIdx)
duration := r.Column(durationIdx[0]).ValueStr(rowIdx)

nameBytes, err := base64.StdEncoding.DecodeString(nameEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode name: %v", err)
}
sampleTypeBytes, err := base64.StdEncoding.DecodeString(sampleTypeEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode sample_type: %v", err)
}
sampleUnitBytes, err := base64.StdEncoding.DecodeString(sampleUnitEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode sample_unit: %v", err)
}
periodTypeBytes, err := base64.StdEncoding.DecodeString(periodTypeEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode period_type: %v", err)
}
periodUnitBytes, err := base64.StdEncoding.DecodeString(periodUnitEncoded)
if err != nil {
return "", fmt.Errorf("failed to decode period_unit: %v", err)
}

name := string(nameBytes)
sampleType := string(sampleTypeBytes)
sampleUnit := string(sampleUnitBytes)
periodType := string(periodTypeBytes)
periodUnit := string(periodUnitBytes)

// Construct profile type string: name:sample_type:sample_unit:period_type:period_unit(:delta)
profileType := fmt.Sprintf("%s:%s:%s:%s:%s", name, sampleType, sampleUnit, periodType, periodUnit)

// Add delta suffix if duration is not 0
if duration != "0" {
profileType += ":delta"
}

return profileType, nil
}

func (s *ProfileColumnStore) updateAgents(nodeNameAndIP string, ag agent) {
s.mtx.Lock()
defer s.mtx.Unlock()
Expand Down
Loading